1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * Copyright (c) 2011 Whamcloud, Inc.
36 * This file is part of Lustre, http://www.lustre.org/
37 * Lustre is a trademark of Sun Microsystems, Inc.
39 * lustre/mdd/mdd_object.c
41 * Lustre Metadata Server (mdd) routines
43 * Author: Wang Di <wangdi@clusterfs.com>
47 # define EXPORT_SYMTAB
49 #define DEBUG_SUBSYSTEM S_MDS
51 #include <linux/module.h>
53 #include <obd_class.h>
54 #include <obd_support.h>
55 #include <lprocfs_status.h>
56 /* fid_be_cpu(), fid_cpu_to_be(). */
57 #include <lustre_fid.h>
60 #include <lustre_param.h>
61 #include <lustre_mds.h>
62 #include <lustre/lustre_idl.h>
64 #include "mdd_internal.h"
66 static const struct lu_object_operations mdd_lu_obj_ops;
68 static int mdd_xattr_get(const struct lu_env *env,
69 struct md_object *obj, struct lu_buf *buf,
72 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
75 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
76 PFID(mdd_object_fid(obj)));
77 mdo_data_get(env, obj, data);
81 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
82 struct lu_attr *la, struct lustre_capa *capa)
84 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
85 PFID(mdd_object_fid(obj)));
86 return mdo_attr_get(env, obj, la, capa);
89 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
91 obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
93 if (flags & LUSTRE_APPEND_FL)
94 obj->mod_flags |= APPEND_OBJ;
96 if (flags & LUSTRE_IMMUTABLE_FL)
97 obj->mod_flags |= IMMUTE_OBJ;
100 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
102 struct mdd_thread_info *info;
104 info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
105 LASSERT(info != NULL);
109 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
113 buf = &mdd_env_info(env)->mti_buf;
119 void mdd_buf_put(struct lu_buf *buf)
121 if (buf == NULL || buf->lb_buf == NULL)
123 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
128 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
129 const void *area, ssize_t len)
133 buf = &mdd_env_info(env)->mti_buf;
134 buf->lb_buf = (void *)area;
139 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
141 struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
143 if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
144 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
147 if (buf->lb_buf == NULL) {
149 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
150 if (buf->lb_buf == NULL)
156 /** Increase the size of the \a mti_big_buf.
157 * preserves old data in buffer
158 * old buffer remains unchanged on error
159 * \retval 0 or -ENOMEM
161 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
163 struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
166 LASSERT(len >= oldbuf->lb_len);
167 OBD_ALLOC_LARGE(buf.lb_buf, len);
169 if (buf.lb_buf == NULL)
173 memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
175 OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
177 memcpy(oldbuf, &buf, sizeof(buf));
182 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
183 struct mdd_device *mdd)
185 struct mdd_thread_info *mti = mdd_env_info(env);
188 max_cookie_size = mdd_lov_cookiesize(env, mdd);
189 if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
190 if (mti->mti_max_cookie)
191 OBD_FREE_LARGE(mti->mti_max_cookie,
192 mti->mti_max_cookie_size);
193 mti->mti_max_cookie = NULL;
194 mti->mti_max_cookie_size = 0;
196 if (unlikely(mti->mti_max_cookie == NULL)) {
197 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
198 if (likely(mti->mti_max_cookie != NULL))
199 mti->mti_max_cookie_size = max_cookie_size;
201 if (likely(mti->mti_max_cookie != NULL))
202 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
203 return mti->mti_max_cookie;
206 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
207 struct mdd_device *mdd)
209 struct mdd_thread_info *mti = mdd_env_info(env);
212 max_lmm_size = mdd_lov_mdsize(env, mdd);
213 if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
214 if (mti->mti_max_lmm)
215 OBD_FREE_LARGE(mti->mti_max_lmm, mti->mti_max_lmm_size);
216 mti->mti_max_lmm = NULL;
217 mti->mti_max_lmm_size = 0;
219 if (unlikely(mti->mti_max_lmm == NULL)) {
220 OBD_ALLOC_LARGE(mti->mti_max_lmm, max_lmm_size);
221 if (likely(mti->mti_max_lmm != NULL))
222 mti->mti_max_lmm_size = max_lmm_size;
224 return mti->mti_max_lmm;
227 struct lu_object *mdd_object_alloc(const struct lu_env *env,
228 const struct lu_object_header *hdr,
231 struct mdd_object *mdd_obj;
233 OBD_ALLOC_PTR(mdd_obj);
234 if (mdd_obj != NULL) {
237 o = mdd2lu_obj(mdd_obj);
238 lu_object_init(o, NULL, d);
239 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
240 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
241 mdd_obj->mod_count = 0;
242 o->lo_ops = &mdd_lu_obj_ops;
249 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
250 const struct lu_object_conf *unused)
252 struct mdd_device *d = lu2mdd_dev(o->lo_dev);
253 struct mdd_object *mdd_obj = lu2mdd_obj(o);
254 struct lu_object *below;
255 struct lu_device *under;
258 mdd_obj->mod_cltime = 0;
259 under = &d->mdd_child->dd_lu_dev;
260 below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
261 mdd_pdlock_init(mdd_obj);
265 lu_object_add(o, below);
270 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
272 if (lu_object_exists(o))
273 return mdd_get_flags(env, lu2mdd_obj(o));
278 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
280 struct mdd_object *mdd = lu2mdd_obj(o);
286 static int mdd_object_print(const struct lu_env *env, void *cookie,
287 lu_printer_t p, const struct lu_object *o)
289 struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
290 return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
291 "valid=%x, cltime="LPU64", flags=%lx)",
292 mdd, mdd->mod_count, mdd->mod_valid,
293 mdd->mod_cltime, mdd->mod_flags);
296 static const struct lu_object_operations mdd_lu_obj_ops = {
297 .loo_object_init = mdd_object_init,
298 .loo_object_start = mdd_object_start,
299 .loo_object_free = mdd_object_free,
300 .loo_object_print = mdd_object_print,
303 struct mdd_object *mdd_object_find(const struct lu_env *env,
304 struct mdd_device *d,
305 const struct lu_fid *f)
307 return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
310 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
311 const char *path, struct lu_fid *fid)
314 struct lu_fid *f = &mdd_env_info(env)->mti_fid;
315 struct mdd_object *obj;
316 struct lu_name *lname = &mdd_env_info(env)->mti_name;
321 /* temp buffer for path element */
322 buf = mdd_buf_alloc(env, PATH_MAX);
323 if (buf->lb_buf == NULL)
326 lname->ln_name = name = buf->lb_buf;
327 lname->ln_namelen = 0;
328 *f = mdd->mdd_root_fid;
335 while (*path != '/' && *path != '\0') {
343 /* find obj corresponding to fid */
344 obj = mdd_object_find(env, mdd, f);
346 GOTO(out, rc = -EREMOTE);
348 GOTO(out, rc = PTR_ERR(obj));
349 /* get child fid from parent and name */
350 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
351 mdd_object_put(env, obj);
356 lname->ln_namelen = 0;
365 /** The maximum depth that fid2path() will search.
366 * This is limited only because we want to store the fids for
367 * historical path lookup purposes.
369 #define MAX_PATH_DEPTH 100
371 /** mdd_path() lookup structure. */
372 struct path_lookup_info {
373 __u64 pli_recno; /**< history point */
374 __u64 pli_currec; /**< current record */
375 struct lu_fid pli_fid;
376 struct lu_fid pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
377 struct mdd_object *pli_mdd_obj;
378 char *pli_path; /**< full path */
380 int pli_linkno; /**< which hardlink to follow */
381 int pli_fidcount; /**< number of \a pli_fids */
384 static int mdd_path_current(const struct lu_env *env,
385 struct path_lookup_info *pli)
387 struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
388 struct mdd_object *mdd_obj;
389 struct lu_buf *buf = NULL;
390 struct link_ea_header *leh;
391 struct link_ea_entry *lee;
392 struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
393 struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid;
399 ptr = pli->pli_path + pli->pli_pathlen - 1;
402 pli->pli_fidcount = 0;
403 pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
405 while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
406 mdd_obj = mdd_object_find(env, mdd,
407 &pli->pli_fids[pli->pli_fidcount]);
409 GOTO(out, rc = -EREMOTE);
411 GOTO(out, rc = PTR_ERR(mdd_obj));
412 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
414 mdd_object_put(env, mdd_obj);
418 /* Do I need to error out here? */
423 /* Get parent fid and object name */
424 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
425 buf = mdd_links_get(env, mdd_obj);
426 mdd_read_unlock(env, mdd_obj);
427 mdd_object_put(env, mdd_obj);
429 GOTO(out, rc = PTR_ERR(buf));
432 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
433 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
435 /* If set, use link #linkno for path lookup, otherwise use
436 link #0. Only do this for the final path element. */
437 if ((pli->pli_fidcount == 0) &&
438 (pli->pli_linkno < leh->leh_reccount)) {
440 for (count = 0; count < pli->pli_linkno; count++) {
441 lee = (struct link_ea_entry *)
442 ((char *)lee + reclen);
443 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
445 if (pli->pli_linkno < leh->leh_reccount - 1)
446 /* indicate to user there are more links */
450 /* Pack the name in the end of the buffer */
451 ptr -= tmpname->ln_namelen;
452 if (ptr - 1 <= pli->pli_path)
453 GOTO(out, rc = -EOVERFLOW);
454 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
457 /* Store the parent fid for historic lookup */
458 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
459 GOTO(out, rc = -EOVERFLOW);
460 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
463 /* Verify that our path hasn't changed since we started the lookup.
464 Record the current index, and verify the path resolves to the
465 same fid. If it does, then the path is correct as of this index. */
466 cfs_spin_lock(&mdd->mdd_cl.mc_lock);
467 pli->pli_currec = mdd->mdd_cl.mc_index;
468 cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
469 rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
471 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
472 GOTO (out, rc = -EAGAIN);
474 if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
475 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
476 " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
477 PFID(&pli->pli_fid));
478 GOTO(out, rc = -EAGAIN);
480 ptr++; /* skip leading / */
481 memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
485 if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
486 /* if we vmalloced a large buffer drop it */
492 static int mdd_path_historic(const struct lu_env *env,
493 struct path_lookup_info *pli)
498 /* Returns the full path to this fid, as of changelog record recno. */
499 static int mdd_path(const struct lu_env *env, struct md_object *obj,
500 char *path, int pathlen, __u64 *recno, int *linkno)
502 struct path_lookup_info *pli;
510 if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
519 pli->pli_mdd_obj = md2mdd_obj(obj);
520 pli->pli_recno = *recno;
521 pli->pli_path = path;
522 pli->pli_pathlen = pathlen;
523 pli->pli_linkno = *linkno;
525 /* Retry multiple times in case file is being moved */
526 while (tries-- && rc == -EAGAIN)
527 rc = mdd_path_current(env, pli);
529 /* For historical path lookup, the current links may not have existed
530 * at "recno" time. We must switch over to earlier links/parents
531 * by using the changelog records. If the earlier parent doesn't
532 * exist, we must search back through the changelog to reconstruct
533 * its parents, then check if it exists, etc.
534 * We may ignore this problem for the initial implementation and
535 * state that an "original" hardlink must still exist for us to find
536 * historic path name. */
537 if (pli->pli_recno != -1) {
538 rc = mdd_path_historic(env, pli);
540 *recno = pli->pli_currec;
541 /* Return next link index to caller */
542 *linkno = pli->pli_linkno;
550 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
552 struct lu_attr *la = &mdd_env_info(env)->mti_la;
556 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
558 mdd_flags_xlate(obj, la->la_flags);
559 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
560 obj->mod_flags |= MNLINK_OBJ;
565 /* get only inode attributes */
566 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
572 if (ma->ma_valid & MA_INODE)
575 rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
576 mdd_object_capa(env, mdd_obj));
578 ma->ma_valid |= MA_INODE;
582 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
584 struct lov_desc *ldesc;
585 struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
586 struct lov_user_md *lum = (struct lov_user_md*)lmm;
592 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
593 LASSERT(ldesc != NULL);
595 lum->lmm_magic = LOV_MAGIC_V1;
596 lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
597 lum->lmm_pattern = ldesc->ld_pattern;
598 lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
599 lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
600 lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
602 RETURN(sizeof(*lum));
605 static int is_rootdir(struct mdd_object *mdd_obj)
607 const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
608 const struct lu_fid *fid = mdo2fid(mdd_obj);
610 return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
613 /* get lov EA only */
614 static int __mdd_lmm_get(const struct lu_env *env,
615 struct mdd_object *mdd_obj, struct md_attr *ma)
620 if (ma->ma_valid & MA_LOV)
623 rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
625 if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
626 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
628 ma->ma_lmm_size = rc;
629 ma->ma_valid |= MA_LOV;
635 /* get the first parent fid from link EA */
636 static int mdd_pfid_get(const struct lu_env *env,
637 struct mdd_object *mdd_obj, struct md_attr *ma)
640 struct link_ea_header *leh;
641 struct link_ea_entry *lee;
642 struct lu_fid *pfid = &ma->ma_pfid;
645 if (ma->ma_valid & MA_PFID)
648 buf = mdd_links_get(env, mdd_obj);
650 RETURN(PTR_ERR(buf));
653 lee = (struct link_ea_entry *)(leh + 1);
654 memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
655 fid_be_to_cpu(pfid, pfid);
656 ma->ma_valid |= MA_PFID;
657 if (buf->lb_len > OBD_ALLOC_BIG)
658 /* if we vmalloced a large buffer drop it */
663 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
669 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
670 rc = __mdd_lmm_get(env, mdd_obj, ma);
671 mdd_read_unlock(env, mdd_obj);
676 static int __mdd_lmv_get(const struct lu_env *env,
677 struct mdd_object *mdd_obj, struct md_attr *ma)
682 if (ma->ma_valid & MA_LMV)
685 rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
688 ma->ma_valid |= MA_LMV;
694 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
697 struct mdd_thread_info *info = mdd_env_info(env);
698 struct lustre_mdt_attrs *lma =
699 (struct lustre_mdt_attrs *)info->mti_xattr_buf;
704 /* If all needed data are already valid, nothing to do */
705 if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
706 (ma->ma_need & (MA_HSM | MA_SOM)))
709 /* Read LMA from disk EA */
710 lma_size = sizeof(info->mti_xattr_buf);
711 rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
715 /* Useless to check LMA incompatibility because this is already done in
716 * osd_ea_fid_get(), and this will fail long before this code is
718 * So, if we are here, LMA is compatible.
721 lustre_lma_swab(lma);
723 /* Swab and copy LMA */
724 if (ma->ma_need & MA_HSM) {
725 if (lma->lma_compat & LMAC_HSM)
726 ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
728 ma->ma_hsm.mh_flags = 0;
729 ma->ma_valid |= MA_HSM;
733 if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
734 LASSERT(ma->ma_som != NULL);
735 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
736 ma->ma_som->msd_size = lma->lma_som_size;
737 ma->ma_som->msd_blocks = lma->lma_som_blocks;
738 ma->ma_som->msd_mountid = lma->lma_som_mountid;
739 ma->ma_valid |= MA_SOM;
745 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
751 if (ma->ma_need & MA_INODE)
752 rc = mdd_iattr_get(env, mdd_obj, ma);
754 if (rc == 0 && ma->ma_need & MA_LOV) {
755 if (S_ISREG(mdd_object_type(mdd_obj)) ||
756 S_ISDIR(mdd_object_type(mdd_obj)))
757 rc = __mdd_lmm_get(env, mdd_obj, ma);
759 if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
760 if (S_ISREG(mdd_object_type(mdd_obj)))
761 rc = mdd_pfid_get(env, mdd_obj, ma);
763 if (rc == 0 && ma->ma_need & MA_LMV) {
764 if (S_ISDIR(mdd_object_type(mdd_obj)))
765 rc = __mdd_lmv_get(env, mdd_obj, ma);
767 if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
768 if (S_ISREG(mdd_object_type(mdd_obj)))
769 rc = __mdd_lma_get(env, mdd_obj, ma);
771 #ifdef CONFIG_FS_POSIX_ACL
772 if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
773 if (S_ISDIR(mdd_object_type(mdd_obj)))
774 rc = mdd_def_acl_get(env, mdd_obj, ma);
777 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
778 rc, ma->ma_valid, ma->ma_lmm);
782 int mdd_attr_get_internal_locked(const struct lu_env *env,
783 struct mdd_object *mdd_obj, struct md_attr *ma)
786 int needlock = ma->ma_need &
787 (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
790 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
791 rc = mdd_attr_get_internal(env, mdd_obj, ma);
793 mdd_read_unlock(env, mdd_obj);
798 * No permission check is needed.
800 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
803 struct mdd_object *mdd_obj = md2mdd_obj(obj);
807 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
812 * No permission check is needed.
814 static int mdd_xattr_get(const struct lu_env *env,
815 struct md_object *obj, struct lu_buf *buf,
818 struct mdd_object *mdd_obj = md2mdd_obj(obj);
823 LASSERT(mdd_object_exists(mdd_obj));
825 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
826 rc = mdo_xattr_get(env, mdd_obj, buf, name,
827 mdd_object_capa(env, mdd_obj));
828 mdd_read_unlock(env, mdd_obj);
834 * Permission check is done when open,
835 * no need check again.
837 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
840 struct mdd_object *mdd_obj = md2mdd_obj(obj);
841 struct dt_object *next;
846 LASSERT(mdd_object_exists(mdd_obj));
848 next = mdd_object_child(mdd_obj);
849 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
850 rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
851 mdd_object_capa(env, mdd_obj));
852 mdd_read_unlock(env, mdd_obj);
857 * No permission check is needed.
859 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
862 struct mdd_object *mdd_obj = md2mdd_obj(obj);
867 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
868 rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
869 mdd_read_unlock(env, mdd_obj);
874 int mdd_declare_object_create_internal(const struct lu_env *env,
875 struct mdd_object *p,
876 struct mdd_object *c,
878 struct thandle *handle,
879 const struct md_op_spec *spec)
881 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
882 const struct dt_index_features *feat = spec->sp_feat;
886 if (feat != &dt_directory_features && feat != NULL)
887 dof->dof_type = DFT_INDEX;
889 dof->dof_type = dt_mode_to_dft(ma->ma_attr.la_mode);
891 dof->u.dof_idx.di_feat = feat;
893 rc = mdo_declare_create_obj(env, c, &ma->ma_attr, NULL, dof, handle);
898 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
899 struct mdd_object *c, struct md_attr *ma,
900 struct thandle *handle,
901 const struct md_op_spec *spec)
903 struct lu_attr *attr = &ma->ma_attr;
904 struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
905 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
906 const struct dt_index_features *feat = spec->sp_feat;
910 if (!mdd_object_exists(c)) {
911 struct dt_object *next = mdd_object_child(c);
914 if (feat != &dt_directory_features && feat != NULL)
915 dof->dof_type = DFT_INDEX;
917 dof->dof_type = dt_mode_to_dft(attr->la_mode);
919 dof->u.dof_idx.di_feat = feat;
921 /* @hint will be initialized by underlying device. */
922 next->do_ops->do_ah_init(env, hint,
923 p ? mdd_object_child(p) : NULL,
924 attr->la_mode & S_IFMT);
926 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
927 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
935 * Make sure the ctime is increased only.
937 static inline int mdd_attr_check(const struct lu_env *env,
938 struct mdd_object *obj,
939 struct lu_attr *attr)
941 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
945 if (attr->la_valid & LA_CTIME) {
946 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
950 if (attr->la_ctime < tmp_la->la_ctime)
951 attr->la_valid &= ~(LA_MTIME | LA_CTIME);
952 else if (attr->la_valid == LA_CTIME &&
953 attr->la_ctime == tmp_la->la_ctime)
954 attr->la_valid &= ~LA_CTIME;
959 int mdd_attr_set_internal(const struct lu_env *env,
960 struct mdd_object *obj,
961 struct lu_attr *attr,
962 struct thandle *handle,
968 rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
969 #ifdef CONFIG_FS_POSIX_ACL
970 if (!rc && (attr->la_valid & LA_MODE) && needacl)
971 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
976 int mdd_attr_check_set_internal(const struct lu_env *env,
977 struct mdd_object *obj,
978 struct lu_attr *attr,
979 struct thandle *handle,
985 rc = mdd_attr_check(env, obj, attr);
990 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
994 static int mdd_attr_set_internal_locked(const struct lu_env *env,
995 struct mdd_object *obj,
996 struct lu_attr *attr,
997 struct thandle *handle,
1003 needacl = needacl && (attr->la_valid & LA_MODE);
1005 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1006 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1008 mdd_write_unlock(env, obj);
1012 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
1013 struct mdd_object *obj,
1014 struct lu_attr *attr,
1015 struct thandle *handle,
1021 needacl = needacl && (attr->la_valid & LA_MODE);
1023 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1024 rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
1026 mdd_write_unlock(env, obj);
1030 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
1031 const struct lu_buf *buf, const char *name,
1032 int fl, struct thandle *handle)
1034 struct lustre_capa *capa = mdd_object_capa(env, obj);
1038 if (buf->lb_buf && buf->lb_len > 0)
1039 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1040 else if (buf->lb_buf == NULL && buf->lb_len == 0)
1041 rc = mdo_xattr_del(env, obj, name, handle, capa);
1047 * This gives the same functionality as the code between
1048 * sys_chmod and inode_setattr
1049 * chown_common and inode_setattr
1050 * utimes and inode_setattr
1051 * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1053 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1054 struct lu_attr *la, const struct md_attr *ma)
1056 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1057 struct md_ucred *uc;
1064 /* Do not permit change file type */
1065 if (la->la_valid & LA_TYPE)
1068 /* They should not be processed by setattr */
1069 if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1072 /* export destroy does not have ->le_ses, but we may want
1073 * to drop LUSTRE_SOM_FL. */
1079 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1083 if (la->la_valid == LA_CTIME) {
1084 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1085 /* This is only for set ctime when rename's source is
1087 rc = mdd_may_delete(env, NULL, obj,
1088 (struct md_attr *)ma, 1, 0);
1089 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1090 la->la_valid &= ~LA_CTIME;
1094 if (la->la_valid == LA_ATIME) {
1095 /* This is atime only set for read atime update on close. */
1096 if (la->la_atime >= tmp_la->la_atime &&
1097 la->la_atime < (tmp_la->la_atime +
1098 mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1099 la->la_valid &= ~LA_ATIME;
1103 /* Check if flags change. */
1104 if (la->la_valid & LA_FLAGS) {
1105 unsigned int oldflags = 0;
1106 unsigned int newflags = la->la_flags &
1107 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1109 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1110 !mdd_capable(uc, CFS_CAP_FOWNER))
1113 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1114 * only be changed by the relevant capability. */
1115 if (mdd_is_immutable(obj))
1116 oldflags |= LUSTRE_IMMUTABLE_FL;
1117 if (mdd_is_append(obj))
1118 oldflags |= LUSTRE_APPEND_FL;
1119 if ((oldflags ^ newflags) &&
1120 !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1123 if (!S_ISDIR(tmp_la->la_mode))
1124 la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1127 if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1128 (la->la_valid & ~LA_FLAGS) &&
1129 !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1132 /* Check for setting the obj time. */
1133 if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1134 !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1135 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1136 !mdd_capable(uc, CFS_CAP_FOWNER)) {
1137 rc = mdd_permission_internal_locked(env, obj, tmp_la,
1145 if (la->la_valid & LA_KILL_SUID) {
1146 la->la_valid &= ~LA_KILL_SUID;
1147 if ((tmp_la->la_mode & S_ISUID) &&
1148 !(la->la_valid & LA_MODE)) {
1149 la->la_mode = tmp_la->la_mode;
1150 la->la_valid |= LA_MODE;
1152 la->la_mode &= ~S_ISUID;
1155 if (la->la_valid & LA_KILL_SGID) {
1156 la->la_valid &= ~LA_KILL_SGID;
1157 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1158 (S_ISGID | S_IXGRP)) &&
1159 !(la->la_valid & LA_MODE)) {
1160 la->la_mode = tmp_la->la_mode;
1161 la->la_valid |= LA_MODE;
1163 la->la_mode &= ~S_ISGID;
1166 /* Make sure a caller can chmod. */
1167 if (la->la_valid & LA_MODE) {
1168 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1169 (uc->mu_fsuid != tmp_la->la_uid) &&
1170 !mdd_capable(uc, CFS_CAP_FOWNER))
1173 if (la->la_mode == (cfs_umode_t) -1)
1174 la->la_mode = tmp_la->la_mode;
1176 la->la_mode = (la->la_mode & S_IALLUGO) |
1177 (tmp_la->la_mode & ~S_IALLUGO);
1179 /* Also check the setgid bit! */
1180 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1181 la->la_gid : tmp_la->la_gid) &&
1182 !mdd_capable(uc, CFS_CAP_FSETID))
1183 la->la_mode &= ~S_ISGID;
1185 la->la_mode = tmp_la->la_mode;
1188 /* Make sure a caller can chown. */
1189 if (la->la_valid & LA_UID) {
1190 if (la->la_uid == (uid_t) -1)
1191 la->la_uid = tmp_la->la_uid;
1192 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1193 (la->la_uid != tmp_la->la_uid)) &&
1194 !mdd_capable(uc, CFS_CAP_CHOWN))
1197 /* If the user or group of a non-directory has been
1198 * changed by a non-root user, remove the setuid bit.
1199 * 19981026 David C Niemi <niemi@tux.org>
1201 * Changed this to apply to all users, including root,
1202 * to avoid some races. This is the behavior we had in
1203 * 2.0. The check for non-root was definitely wrong
1204 * for 2.2 anyway, as it should have been using
1205 * CAP_FSETID rather than fsuid -- 19990830 SD. */
1206 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1207 !S_ISDIR(tmp_la->la_mode)) {
1208 la->la_mode &= ~S_ISUID;
1209 la->la_valid |= LA_MODE;
1213 /* Make sure caller can chgrp. */
1214 if (la->la_valid & LA_GID) {
1215 if (la->la_gid == (gid_t) -1)
1216 la->la_gid = tmp_la->la_gid;
1217 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1218 ((la->la_gid != tmp_la->la_gid) &&
1219 !lustre_in_group_p(uc, la->la_gid))) &&
1220 !mdd_capable(uc, CFS_CAP_CHOWN))
1223 /* Likewise, if the user or group of a non-directory
1224 * has been changed by a non-root user, remove the
1225 * setgid bit UNLESS there is no group execute bit
1226 * (this would be a file marked for mandatory
1227 * locking). 19981026 David C Niemi <niemi@tux.org>
1229 * Removed the fsuid check (see the comment above) --
1231 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1232 (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1233 la->la_mode &= ~S_ISGID;
1234 la->la_valid |= LA_MODE;
1238 /* For both Size-on-MDS case and truncate case,
1239 * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1240 * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1241 * For SOM case, it is true, the MAY_WRITE perm has been checked
1242 * when open, no need check again. For truncate case, it is false,
1243 * the MAY_WRITE perm should be checked here. */
1244 if (ma->ma_attr_flags & MDS_SOM) {
1245 /* For the "Size-on-MDS" setattr update, merge coming
1246 * attributes with the set in the inode. BUG 10641 */
1247 if ((la->la_valid & LA_ATIME) &&
1248 (la->la_atime <= tmp_la->la_atime))
1249 la->la_valid &= ~LA_ATIME;
1251 /* OST attributes do not have a priority over MDS attributes,
1252 * so drop times if ctime is equal. */
1253 if ((la->la_valid & LA_CTIME) &&
1254 (la->la_ctime <= tmp_la->la_ctime))
1255 la->la_valid &= ~(LA_MTIME | LA_CTIME);
1257 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1258 if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1259 (uc->mu_fsuid == tmp_la->la_uid)) &&
1260 !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1261 rc = mdd_permission_internal_locked(env, obj,
1268 if (la->la_valid & LA_CTIME) {
1269 /* The pure setattr, it has the priority over what is
1270 * already set, do not drop it if ctime is equal. */
1271 if (la->la_ctime < tmp_la->la_ctime)
1272 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1280 /** Store a data change changelog record
1281 * If this fails, we must fail the whole transaction; we don't
1282 * want the change to commit without the log entry.
1283 * \param mdd_obj - mdd_object of change
1284 * \param handle - transacion handle
1286 static int mdd_changelog_data_store(const struct lu_env *env,
1287 struct mdd_device *mdd,
1288 enum changelog_rec_type type,
1290 struct mdd_object *mdd_obj,
1291 struct thandle *handle)
1293 const struct lu_fid *tfid = mdo2fid(mdd_obj);
1294 struct llog_changelog_rec *rec;
1295 struct thandle *th = NULL;
1301 if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1303 if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1306 LASSERT(mdd_obj != NULL);
1307 LASSERT(handle != NULL);
1309 if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1310 cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1311 /* Don't need multiple updates in this log */
1312 /* Don't check under lock - no big deal if we get an extra
1317 reclen = llog_data_len(sizeof(*rec));
1318 buf = mdd_buf_alloc(env, reclen);
1319 if (buf->lb_buf == NULL)
1321 rec = (struct llog_changelog_rec *)buf->lb_buf;
1323 rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1324 rec->cr.cr_type = (__u32)type;
1325 rec->cr.cr_tfid = *tfid;
1326 rec->cr.cr_namelen = 0;
1327 mdd_obj->mod_cltime = cfs_time_current_64();
1329 rc = mdd_changelog_llog_write(mdd, rec, handle ? : th);
1332 mdd_trans_stop(env, mdd, rc, th);
1335 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1336 rc, type, PFID(tfid));
1343 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1344 int flags, struct md_object *obj)
1346 struct thandle *handle;
1347 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1348 struct mdd_device *mdd = mdo2mdd(obj);
1352 handle = mdd_trans_create(env, mdd);
1354 return(PTR_ERR(handle));
1356 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1360 rc = mdd_trans_start(env, mdd, handle);
1364 rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1368 mdd_trans_stop(env, mdd, rc, handle);
1374 * Should be called with write lock held.
1376 * \see mdd_lma_set_locked().
1378 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1379 const struct md_attr *ma, struct thandle *handle)
1381 struct mdd_thread_info *info = mdd_env_info(env);
1383 struct lustre_mdt_attrs *lma =
1384 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1385 int lmasize = sizeof(struct lustre_mdt_attrs);
1390 /* Either HSM or SOM part is not valid, we need to read it before */
1391 if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1392 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1396 lustre_lma_swab(lma);
1398 memset(lma, 0, lmasize);
1402 if (ma->ma_valid & MA_HSM) {
1403 lma->lma_flags |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1404 lma->lma_compat |= LMAC_HSM;
1408 if (ma->ma_valid & MA_SOM) {
1409 LASSERT(ma->ma_som != NULL);
1410 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1411 lma->lma_compat &= ~LMAC_SOM;
1413 lma->lma_compat |= LMAC_SOM;
1414 lma->lma_ioepoch = ma->ma_som->msd_ioepoch;
1415 lma->lma_som_size = ma->ma_som->msd_size;
1416 lma->lma_som_blocks = ma->ma_som->msd_blocks;
1417 lma->lma_som_mountid = ma->ma_som->msd_mountid;
1422 memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1424 lustre_lma_swab(lma);
1425 buf = mdd_buf_get(env, lma, lmasize);
1426 rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1432 * Save LMA extended attributes with data from \a ma.
1434 * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1435 * not, LMA EA will be first read from disk, modified and write back.
1438 static int mdd_lma_set_locked(const struct lu_env *env,
1439 struct mdd_object *mdd_obj,
1440 const struct md_attr *ma, struct thandle *handle)
1444 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1445 rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1446 mdd_write_unlock(env, mdd_obj);
1450 /* Precedence for choosing record type when multiple
1451 * attributes change: setattr > mtime > ctime > atime
1452 * (ctime changes when mtime does, plus chmod/chown.
1453 * atime and ctime are independent.) */
1454 static int mdd_attr_set_changelog(const struct lu_env *env,
1455 struct md_object *obj, struct thandle *handle,
1458 struct mdd_device *mdd = mdo2mdd(obj);
1461 bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1462 bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1463 bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1464 bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1465 bits = bits & mdd->mdd_cl.mc_mask;
1469 /* The record type is the lowest non-masked set bit */
1470 while (bits && ((bits & 1) == 0)) {
1475 /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1476 return mdd_changelog_data_store(env, mdd, type, (int)valid,
1477 md2mdd_obj(obj), handle);
1480 static int mdd_declare_attr_set(const struct lu_env *env,
1481 struct mdd_device *mdd,
1482 struct mdd_object *obj,
1483 const struct md_attr *ma,
1484 struct lov_mds_md *lmm,
1485 struct thandle *handle)
1487 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1490 rc = mdo_declare_attr_set(env, obj, &ma->ma_attr, handle);
1494 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1498 if (ma->ma_valid & MA_LOV) {
1500 buf->lb_len = ma->ma_lmm_size;
1501 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
1507 if (ma->ma_valid & (MA_HSM | MA_SOM)) {
1509 buf->lb_len = sizeof(struct lustre_mdt_attrs);
1510 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA,
1516 /* basically the log is the same as in unlink case */
1518 if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V1 &&
1519 le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V3) {
1520 CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n",
1521 mdd->mdd_obd_dev->obd_name,
1522 le32_to_cpu(lmm->lmm_magic),
1523 PFID(lu_object_fid(&obj->mod_obj.mo_lu)));
1527 stripe = mdd2obd_dev(mdd)->u.mds.mds_lov_desc.ld_tgt_count;
1528 if ((int)le32_to_cpu(lmm->lmm_stripe_count) >= 0)
1529 stripe = le32_to_cpu(lmm->lmm_stripe_count);
1531 for (i = 0; i < stripe; i++) {
1532 rc = mdd_declare_llog_record(env, mdd,
1533 sizeof(struct llog_unlink_rec),
1543 /* set attr and LOV EA at once, return updated attr */
1544 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1545 const struct md_attr *ma)
1547 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1548 struct mdd_device *mdd = mdo2mdd(obj);
1549 struct thandle *handle;
1550 struct lov_mds_md *lmm = NULL;
1551 struct llog_cookie *logcookies = NULL;
1552 int rc, lmm_size = 0, cookie_size = 0;
1553 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1554 struct obd_device *obd = mdd->mdd_obd_dev;
1555 struct mds_obd *mds = &obd->u.mds;
1556 #ifdef HAVE_QUOTA_SUPPORT
1557 unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1558 unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1559 int quota_opc = 0, block_count = 0;
1560 int inode_pending[MAXQUOTAS] = { 0, 0 };
1561 int block_pending[MAXQUOTAS] = { 0, 0 };
1565 *la_copy = ma->ma_attr;
1566 rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1570 /* setattr on "close" only change atime, or do nothing */
1571 if (ma->ma_valid == MA_INODE &&
1572 ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1575 if (S_ISREG(mdd_object_type(mdd_obj)) &&
1576 ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1577 lmm_size = mdd_lov_mdsize(env, mdd);
1578 lmm = mdd_max_lmm_get(env, mdd);
1582 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1589 handle = mdd_trans_create(env, mdd);
1591 RETURN(PTR_ERR(handle));
1593 rc = mdd_declare_attr_set(env, mdd, mdd_obj, ma,
1594 lmm_size > 0 ? lmm : NULL, handle);
1598 /* permission changes may require sync operation */
1599 if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1600 handle->th_sync = !!mdd->mdd_sync_permission;
1602 rc = mdd_trans_start(env, mdd, handle);
1606 /* permission changes may require sync operation */
1607 if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1608 handle->th_sync |= mdd->mdd_sync_permission;
1610 if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1611 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1612 ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1614 #ifdef HAVE_QUOTA_SUPPORT
1615 if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1616 struct obd_export *exp = md_quota(env)->mq_exp;
1617 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1619 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1621 quota_opc = FSFILT_OP_SETATTR;
1622 mdd_quota_wrapper(la_copy, qnids);
1623 mdd_quota_wrapper(la_tmp, qoids);
1624 /* get file quota for new owner */
1625 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1626 qnids, inode_pending, 1, NULL, 0,
1628 block_count = (la_tmp->la_blocks + 7) >> 3;
1631 mdd_data_get(env, mdd_obj, &data);
1632 /* get block quota for new owner */
1633 lquota_chkquota(mds_quota_interface_ref, obd,
1634 exp, qnids, block_pending,
1636 LQUOTA_FLAGS_BLK, data, 1);
1642 if (la_copy->la_valid & LA_FLAGS) {
1643 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1646 mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1647 } else if (la_copy->la_valid) { /* setattr */
1648 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1650 /* journal chown/chgrp in llog, just like unlink */
1651 if (rc == 0 && lmm_size){
1652 cookie_size = mdd_lov_cookiesize(env, mdd);
1653 logcookies = mdd_max_cookie_get(env, mdd);
1654 if (logcookies == NULL)
1655 GOTO(cleanup, rc = -ENOMEM);
1657 if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1658 logcookies, cookie_size) <= 0)
1663 if (rc == 0 && ma->ma_valid & MA_LOV) {
1666 mode = mdd_object_type(mdd_obj);
1667 if (S_ISREG(mode) || S_ISDIR(mode)) {
1668 rc = mdd_lsm_sanity_check(env, mdd_obj);
1672 rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1673 ma->ma_lmm_size, handle, 1);
1677 if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1680 mode = mdd_object_type(mdd_obj);
1682 rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1687 rc = mdd_attr_set_changelog(env, obj, handle,
1688 ma->ma_attr.la_valid);
1690 mdd_trans_stop(env, mdd, rc, handle);
1691 if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1692 /*set obd attr, if needed*/
1693 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1696 #ifdef HAVE_QUOTA_SUPPORT
1698 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1700 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1702 /* Trigger dqrel/dqacq for original owner and new owner.
1703 * If failed, the next call for lquota_chkquota will
1705 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1712 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1713 const struct lu_buf *buf, const char *name, int fl,
1714 struct thandle *handle)
1719 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1720 rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1721 mdd_write_unlock(env, obj);
1726 static int mdd_xattr_sanity_check(const struct lu_env *env,
1727 struct mdd_object *obj)
1729 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1730 struct md_ucred *uc = md_ucred(env);
1734 if (mdd_is_immutable(obj) || mdd_is_append(obj))
1737 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1741 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1742 !mdd_capable(uc, CFS_CAP_FOWNER))
1748 static int mdd_declare_xattr_set(const struct lu_env *env,
1749 struct mdd_device *mdd,
1750 struct mdd_object *obj,
1751 const struct lu_buf *buf,
1753 struct thandle *handle)
1758 rc = mdo_declare_xattr_set(env, obj, buf, name, 0, handle);
1762 /* Only record user xattr changes */
1763 if ((strncmp("user.", name, 5) == 0))
1764 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1770 * The caller should guarantee to update the object ctime
1771 * after xattr_set if needed.
1773 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1774 const struct lu_buf *buf, const char *name,
1777 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1778 struct mdd_device *mdd = mdo2mdd(obj);
1779 struct thandle *handle;
1783 rc = mdd_xattr_sanity_check(env, mdd_obj);
1787 handle = mdd_trans_create(env, mdd);
1789 RETURN(PTR_ERR(handle));
1791 /* security-replated changes may require sync */
1792 if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1793 mdd->mdd_sync_permission == 1)
1794 handle->th_sync = 1;
1796 rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, handle);
1800 rc = mdd_trans_start(env, mdd, handle);
1804 /* security-replated changes may require sync */
1805 if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1806 handle->th_sync |= mdd->mdd_sync_permission;
1808 rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1810 /* Only record system & user xattr changes */
1811 if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1812 sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1813 strncmp(POSIX_ACL_XATTR_ACCESS, name,
1814 sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1815 strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1816 sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1817 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1821 mdd_trans_stop(env, mdd, rc, handle);
1826 static int mdd_declare_xattr_del(const struct lu_env *env,
1827 struct mdd_device *mdd,
1828 struct mdd_object *obj,
1830 struct thandle *handle)
1834 rc = mdo_declare_xattr_del(env, obj, name, handle);
1838 /* Only record user xattr changes */
1839 if ((strncmp("user.", name, 5) == 0))
1840 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1846 * The caller should guarantee to update the object ctime
1847 * after xattr_set if needed.
1849 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1852 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1853 struct mdd_device *mdd = mdo2mdd(obj);
1854 struct thandle *handle;
1858 rc = mdd_xattr_sanity_check(env, mdd_obj);
1862 handle = mdd_trans_create(env, mdd);
1864 RETURN(PTR_ERR(handle));
1866 rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
1870 rc = mdd_trans_start(env, mdd, handle);
1874 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1875 rc = mdo_xattr_del(env, mdd_obj, name, handle,
1876 mdd_object_capa(env, mdd_obj));
1877 mdd_write_unlock(env, mdd_obj);
1879 /* Only record system & user xattr changes */
1880 if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1881 sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1882 strncmp(POSIX_ACL_XATTR_ACCESS, name,
1883 sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1884 strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1885 sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1886 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1890 mdd_trans_stop(env, mdd, rc, handle);
1895 /* partial unlink */
1896 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1899 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1900 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1901 struct mdd_device *mdd = mdo2mdd(obj);
1902 struct thandle *handle;
1903 #ifdef HAVE_QUOTA_SUPPORT
1904 struct obd_device *obd = mdd->mdd_obd_dev;
1905 struct mds_obd *mds = &obd->u.mds;
1906 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1912 /* XXX: this code won't be used ever:
1913 * DNE uses slightly different approach */
1917 * Check -ENOENT early here because we need to get object type
1918 * to calculate credits before transaction start
1920 if (!mdd_object_exists(mdd_obj))
1923 LASSERT(mdd_object_exists(mdd_obj) > 0);
1925 handle = mdd_trans_create(env, mdd);
1929 rc = mdd_trans_start(env, mdd, handle);
1931 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1933 rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1937 __mdd_ref_del(env, mdd_obj, handle, 0);
1939 if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1941 __mdd_ref_del(env, mdd_obj, handle, 1);
1944 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1945 la_copy->la_ctime = ma->ma_attr.la_ctime;
1947 la_copy->la_valid = LA_CTIME;
1948 rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1952 rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1953 #ifdef HAVE_QUOTA_SUPPORT
1954 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1955 ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1956 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1957 mdd_quota_wrapper(&ma->ma_attr, qids);
1964 mdd_write_unlock(env, mdd_obj);
1965 mdd_trans_stop(env, mdd, rc, handle);
1966 #ifdef HAVE_QUOTA_SUPPORT
1968 /* Trigger dqrel on the owner of child. If failed,
1969 * the next call for lquota_chkquota will process it */
1970 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1976 /* partial operation */
1977 static int mdd_oc_sanity_check(const struct lu_env *env,
1978 struct mdd_object *obj,
1984 switch (ma->ma_attr.la_mode & S_IFMT) {
2001 static int mdd_object_create(const struct lu_env *env,
2002 struct md_object *obj,
2003 const struct md_op_spec *spec,
2007 struct mdd_device *mdd = mdo2mdd(obj);
2008 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2009 const struct lu_fid *pfid = spec->u.sp_pfid;
2010 struct thandle *handle;
2011 #ifdef HAVE_QUOTA_SUPPORT
2012 struct obd_device *obd = mdd->mdd_obd_dev;
2013 struct obd_export *exp = md_quota(env)->mq_exp;
2014 struct mds_obd *mds = &obd->u.mds;
2015 unsigned int qids[MAXQUOTAS] = { 0, 0 };
2016 int quota_opc = 0, block_count = 0;
2017 int inode_pending[MAXQUOTAS] = { 0, 0 };
2018 int block_pending[MAXQUOTAS] = { 0, 0 };
2023 /* XXX: this code won't be used ever:
2024 * DNE uses slightly different approach */
2027 #ifdef HAVE_QUOTA_SUPPORT
2028 if (mds->mds_quota) {
2029 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
2030 mdd_quota_wrapper(&ma->ma_attr, qids);
2031 /* get file quota for child */
2032 lquota_chkquota(mds_quota_interface_ref, obd, exp,
2033 qids, inode_pending, 1, NULL, 0,
2035 switch (ma->ma_attr.la_mode & S_IFMT) {
2044 /* get block quota for child */
2046 lquota_chkquota(mds_quota_interface_ref, obd, exp,
2047 qids, block_pending, block_count,
2048 NULL, LQUOTA_FLAGS_BLK, NULL, 0);
2052 handle = mdd_trans_create(env, mdd);
2054 GOTO(out_pending, rc = PTR_ERR(handle));
2056 rc = mdd_trans_start(env, mdd, handle);
2058 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2059 rc = mdd_oc_sanity_check(env, mdd_obj, ma);
2063 rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
2067 if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
2068 /* If creating the slave object, set slave EA here. */
2069 int lmv_size = spec->u.sp_ea.eadatalen;
2070 struct lmv_stripe_md *lmv;
2072 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
2073 LASSERT(lmv != NULL && lmv_size > 0);
2075 rc = __mdd_xattr_set(env, mdd_obj,
2076 mdd_buf_get_const(env, lmv, lmv_size),
2077 XATTR_NAME_LMV, 0, handle);
2081 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
2084 #ifdef CONFIG_FS_POSIX_ACL
2085 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
2086 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
2088 buf->lb_buf = (void *)spec->u.sp_ea.eadata;
2089 buf->lb_len = spec->u.sp_ea.eadatalen;
2090 if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
2091 rc = __mdd_acl_init(env, mdd_obj, buf,
2092 &ma->ma_attr.la_mode,
2097 ma->ma_attr.la_valid |= LA_MODE;
2100 pfid = spec->u.sp_ea.fid;
2103 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
2109 rc = mdd_attr_get_internal(env, mdd_obj, ma);
2110 mdd_write_unlock(env, mdd_obj);
2112 mdd_trans_stop(env, mdd, rc, handle);
2114 #ifdef HAVE_QUOTA_SUPPORT
2116 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2118 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2120 /* Trigger dqacq on the owner of child. If failed,
2121 * the next call for lquota_chkquota will process it. */
2122 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2130 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
2131 const struct md_attr *ma)
2133 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
2134 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2135 struct mdd_device *mdd = mdo2mdd(obj);
2136 struct thandle *handle;
2140 /* XXX: this code won't be used ever:
2141 * DNE uses slightly different approach */
2144 handle = mdd_trans_create(env, mdd);
2148 rc = mdd_trans_start(env, mdd, handle);
2150 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2151 rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
2153 __mdd_ref_add(env, mdd_obj, handle);
2154 mdd_write_unlock(env, mdd_obj);
2156 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2157 la_copy->la_ctime = ma->ma_attr.la_ctime;
2159 la_copy->la_valid = LA_CTIME;
2160 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
2163 mdd_trans_stop(env, mdd, 0, handle);
2169 * do NOT or the MAY_*'s, you'll get the weakest
2171 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
2175 /* Sadly, NFSD reopens a file repeatedly during operation, so the
2176 * "acc_mode = 0" allowance for newly-created files isn't honoured.
2177 * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
2178 * owner can write to a file even if it is marked readonly to hide
2179 * its brokenness. (bug 5781) */
2180 if (flags & MDS_OPEN_OWNEROVERRIDE) {
2181 struct md_ucred *uc = md_ucred(env);
2183 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
2184 (la->la_uid == uc->mu_fsuid))
2188 if (flags & FMODE_READ)
2190 if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2192 if (flags & MDS_FMODE_EXEC)
2197 static int mdd_open_sanity_check(const struct lu_env *env,
2198 struct mdd_object *obj, int flag)
2200 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2205 if (mdd_is_dead_obj(obj))
2208 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
2212 if (S_ISLNK(tmp_la->la_mode))
2215 mode = accmode(env, tmp_la, flag);
2217 if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2220 if (!(flag & MDS_OPEN_CREATED)) {
2221 rc = mdd_permission_internal(env, obj, tmp_la, mode);
2226 if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2227 S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2228 flag &= ~MDS_OPEN_TRUNC;
2230 /* For writing append-only file must open it with append mode. */
2231 if (mdd_is_append(obj)) {
2232 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2234 if (flag & MDS_OPEN_TRUNC)
2240 * Now, flag -- O_NOATIME does not be packed by client.
2242 if (flag & O_NOATIME) {
2243 struct md_ucred *uc = md_ucred(env);
2245 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2246 (uc->mu_valid == UCRED_NEW)) &&
2247 (uc->mu_fsuid != tmp_la->la_uid) &&
2248 !mdd_capable(uc, CFS_CAP_FOWNER))
2256 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2259 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2262 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2264 rc = mdd_open_sanity_check(env, mdd_obj, flags);
2266 mdd_obj->mod_count++;
2268 mdd_write_unlock(env, mdd_obj);
2272 int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
2273 struct md_attr *ma, struct thandle *handle)
2277 rc = mdd_declare_unlink_log(env, obj, ma, handle);
2281 return mdo_declare_destroy(env, obj, handle);
2284 /* return md_attr back,
2285 * if it is last unlink then return lov ea + llog cookie*/
2286 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2287 struct md_attr *ma, struct thandle *handle)
2292 if (S_ISREG(mdd_object_type(obj))) {
2293 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2294 * Caller must be ready for that. */
2296 rc = __mdd_lmm_get(env, obj, ma);
2297 if ((ma->ma_valid & MA_LOV))
2298 rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2303 rc = mdo_destroy(env, obj, handle);
2308 static int mdd_declare_close(const struct lu_env *env,
2309 struct mdd_object *obj,
2311 struct thandle *handle)
2315 rc = orph_declare_index_delete(env, obj, handle);
2319 return mdd_declare_object_kill(env, obj, ma, handle);
2323 * No permission check is needed.
2325 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2326 struct md_attr *ma, int mode)
2328 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2329 struct mdd_device *mdd = mdo2mdd(obj);
2330 struct thandle *handle = NULL;
2334 #ifdef HAVE_QUOTA_SUPPORT
2335 struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2336 struct mds_obd *mds = &obd->u.mds;
2337 unsigned int qids[MAXQUOTAS] = { 0, 0 };
2342 if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2343 mdd_obj->mod_count--;
2345 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2346 CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2347 "list\n", PFID(mdd_object_fid(mdd_obj)));
2351 /* check without any lock */
2352 if (mdd_obj->mod_count == 1 &&
2353 (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2355 handle = mdd_trans_create(env, mdo2mdd(obj));
2357 RETURN(PTR_ERR(handle));
2359 rc = mdd_declare_close(env, mdd_obj, ma, handle);
2363 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
2367 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2372 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2373 if (handle == NULL && mdd_obj->mod_count == 1 &&
2374 (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2375 mdd_write_unlock(env, mdd_obj);
2379 /* release open count */
2380 mdd_obj->mod_count --;
2382 if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2383 /* remove link to object from orphan index */
2384 LASSERT(handle != NULL);
2385 rc = __mdd_orphan_del(env, mdd_obj, handle);
2387 CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2388 "list, OSS objects to be destroyed.\n",
2389 PFID(mdd_object_fid(mdd_obj)));
2391 CERROR("Object "DFID" can not be deleted from orphan "
2392 "list, maybe cause OST objects can not be "
2393 "destroyed (err: %d).\n",
2394 PFID(mdd_object_fid(mdd_obj)), rc);
2395 /* If object was not deleted from orphan list, do not
2396 * destroy OSS objects, which will be done when next
2402 rc = mdd_iattr_get(env, mdd_obj, ma);
2403 /* Object maybe not in orphan list originally, it is rare case for
2404 * mdd_finish_unlink() failure. */
2405 if (rc == 0 && ma->ma_attr.la_nlink == 0) {
2406 #ifdef HAVE_QUOTA_SUPPORT
2407 if (mds->mds_quota) {
2408 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2409 mdd_quota_wrapper(&ma->ma_attr, qids);
2412 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2413 if (ma->ma_valid & MA_FLAGS &&
2414 ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2415 rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2417 if (handle == NULL) {
2418 handle = mdd_trans_create(env, mdo2mdd(obj));
2420 GOTO(out, rc = PTR_ERR(handle));
2422 rc = mdd_declare_object_kill(env, mdd_obj, ma,
2427 rc = mdd_declare_changelog_store(env, mdd,
2432 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2437 rc = mdd_object_kill(env, mdd_obj, ma, handle);
2443 CERROR("Error when prepare to delete Object "DFID" , "
2444 "which will cause OST objects can not be "
2445 "destroyed.\n", PFID(mdd_object_fid(mdd_obj)));
2451 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2453 mdd_write_unlock(env, mdd_obj);
2456 (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
2457 !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
2458 if (handle == NULL) {
2459 handle = mdd_trans_create(env, mdo2mdd(obj));
2461 GOTO(stop, rc = IS_ERR(handle));
2463 rc = mdd_declare_changelog_store(env, mdd, NULL,
2468 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2473 mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
2479 mdd_trans_stop(env, mdd, rc, handle);
2480 #ifdef HAVE_QUOTA_SUPPORT
2482 /* Trigger dqrel on the owner of child. If failed,
2483 * the next call for lquota_chkquota will process it */
2484 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2491 * Permission check is done when open,
2492 * no need check again.
2494 static int mdd_readpage_sanity_check(const struct lu_env *env,
2495 struct mdd_object *obj)
2497 struct dt_object *next = mdd_object_child(obj);
2501 if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2509 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2510 struct lu_dirpage *dp, int nob,
2511 const struct dt_it_ops *iops, struct dt_it *it,
2517 struct lu_dirent *ent;
2518 struct lu_dirent *last = NULL;
2521 memset(area, 0, sizeof (*dp));
2522 area += sizeof (*dp);
2523 nob -= sizeof (*dp);
2530 len = iops->key_size(env, it);
2532 /* IAM iterator can return record with zero len. */
2536 hash = iops->store(env, it);
2537 if (unlikely(first)) {
2539 dp->ldp_hash_start = cpu_to_le64(hash);
2542 /* calculate max space required for lu_dirent */
2543 recsize = lu_dirent_calc_size(len, attr);
2545 if (nob >= recsize) {
2546 result = iops->rec(env, it, (struct dt_rec *)ent, attr);
2547 if (result == -ESTALE)
2552 /* osd might not able to pack all attributes,
2553 * so recheck rec length */
2554 recsize = le16_to_cpu(ent->lde_reclen);
2556 result = (last != NULL) ? 0 :-EINVAL;
2560 ent = (void *)ent + recsize;
2564 result = iops->next(env, it);
2565 if (result == -ESTALE)
2567 } while (result == 0);
2570 dp->ldp_hash_end = cpu_to_le64(hash);
2572 if (last->lde_hash == dp->ldp_hash_end)
2573 dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2574 last->lde_reclen = 0; /* end mark */
2579 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2580 const struct lu_rdpg *rdpg)
2583 struct dt_object *next = mdd_object_child(obj);
2584 const struct dt_it_ops *iops;
2586 struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2592 LASSERT(rdpg->rp_pages != NULL);
2593 LASSERT(next->do_index_ops != NULL);
2595 if (rdpg->rp_count <= 0)
2599 * iterate through directory and fill pages from @rdpg
2601 iops = &next->do_index_ops->dio_it;
2602 it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
2606 rc = iops->load(env, it, rdpg->rp_hash);
2610 * Iterator didn't find record with exactly the key requested.
2612 * It is currently either
2614 * - positioned above record with key less than
2615 * requested---skip it.
2617 * - or not positioned at all (is in IAM_IT_SKEWED
2618 * state)---position it on the next item.
2620 rc = iops->next(env, it);
2625 * At this point and across for-loop:
2627 * rc == 0 -> ok, proceed.
2628 * rc > 0 -> end of directory.
2631 for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2632 i++, nob -= CFS_PAGE_SIZE) {
2633 struct lu_dirpage *dp;
2635 LASSERT(i < rdpg->rp_npages);
2636 pg = rdpg->rp_pages[i];
2638 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2641 rc = mdd_dir_page_build(env, mdd, dp,
2642 min_t(int, nob, LU_PAGE_SIZE),
2643 iops, it, rdpg->rp_attrs);
2648 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2650 } else if (rc < 0) {
2651 CWARN("build page failed: %d!\n", rc);
2654 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2655 dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2656 if ((unsigned long)dp & ~CFS_PAGE_MASK)
2663 struct lu_dirpage *dp;
2665 dp = cfs_kmap(rdpg->rp_pages[0]);
2666 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2669 * No pages were processed, mark this for first page
2672 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2675 cfs_kunmap(rdpg->rp_pages[0]);
2677 rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
2680 iops->fini(env, it);
2685 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2686 const struct lu_rdpg *rdpg)
2688 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2692 LASSERT(mdd_object_exists(mdd_obj));
2694 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2695 rc = mdd_readpage_sanity_check(env, mdd_obj);
2697 GOTO(out_unlock, rc);
2699 if (mdd_is_dead_obj(mdd_obj)) {
2701 struct lu_dirpage *dp;
2704 * According to POSIX, please do not return any entry to client:
2705 * even dot and dotdot should not be returned.
2707 CWARN("readdir from dead object: "DFID"\n",
2708 PFID(mdd_object_fid(mdd_obj)));
2710 if (rdpg->rp_count <= 0)
2711 GOTO(out_unlock, rc = -EFAULT);
2712 LASSERT(rdpg->rp_pages != NULL);
2714 pg = rdpg->rp_pages[0];
2715 dp = (struct lu_dirpage*)cfs_kmap(pg);
2716 memset(dp, 0 , sizeof(struct lu_dirpage));
2717 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2718 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2719 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2721 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2724 rc = __mdd_readpage(env, mdd_obj, rdpg);
2728 mdd_read_unlock(env, mdd_obj);
2732 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2734 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2735 struct dt_object *next;
2737 LASSERT(mdd_object_exists(mdd_obj));
2738 next = mdd_object_child(mdd_obj);
2739 return next->do_ops->do_object_sync(env, next);
2742 const struct md_object_operations mdd_obj_ops = {
2743 .moo_permission = mdd_permission,
2744 .moo_attr_get = mdd_attr_get,
2745 .moo_attr_set = mdd_attr_set,
2746 .moo_xattr_get = mdd_xattr_get,
2747 .moo_xattr_set = mdd_xattr_set,
2748 .moo_xattr_list = mdd_xattr_list,
2749 .moo_xattr_del = mdd_xattr_del,
2750 .moo_object_create = mdd_object_create,
2751 .moo_ref_add = mdd_ref_add,
2752 .moo_ref_del = mdd_ref_del,
2753 .moo_open = mdd_open,
2754 .moo_close = mdd_close,
2755 .moo_readpage = mdd_readpage,
2756 .moo_readlink = mdd_readlink,
2757 .moo_changelog = mdd_changelog,
2758 .moo_capa_get = mdd_capa_get,
2759 .moo_object_sync = mdd_object_sync,
2760 .moo_path = mdd_path,
2761 .moo_file_lock = mdd_file_lock,
2762 .moo_file_unlock = mdd_file_unlock,