4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Whamcloud, Inc.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mdd/mdd_object.c
38 * Lustre Metadata Server (mdd) routines
40 * Author: Wang Di <wangdi@clusterfs.com>
43 #define DEBUG_SUBSYSTEM S_MDS
45 #include <linux/module.h>
47 #include <obd_class.h>
48 #include <obd_support.h>
49 #include <lprocfs_status.h>
50 /* fid_be_cpu(), fid_cpu_to_be(). */
51 #include <lustre_fid.h>
54 #include <lustre_param.h>
55 #include <lustre_mds.h>
56 #include <lustre/lustre_idl.h>
58 #include "mdd_internal.h"
60 static const struct lu_object_operations mdd_lu_obj_ops;
61 extern cfs_mem_cache_t *mdd_object_kmem;
63 static int mdd_xattr_get(const struct lu_env *env,
64 struct md_object *obj, struct lu_buf *buf,
67 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
70 if (mdd_object_exists(obj) == 0) {
71 CERROR("%s: object "DFID" not found: rc = -2\n",
72 mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
75 mdo_data_get(env, obj, data);
79 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
80 struct lu_attr *la, struct lustre_capa *capa)
82 if (mdd_object_exists(obj) == 0) {
83 CERROR("%s: object "DFID" not found: rc = -2\n",
84 mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
87 return mdo_attr_get(env, obj, la, capa);
90 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
92 obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
94 if (flags & LUSTRE_APPEND_FL)
95 obj->mod_flags |= APPEND_OBJ;
97 if (flags & LUSTRE_IMMUTABLE_FL)
98 obj->mod_flags |= IMMUTE_OBJ;
101 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
103 struct mdd_thread_info *info;
105 info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
106 LASSERT(info != NULL);
110 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
114 buf = &mdd_env_info(env)->mti_buf;
120 void mdd_buf_put(struct lu_buf *buf)
122 if (buf == NULL || buf->lb_buf == NULL)
124 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
129 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
130 const void *area, ssize_t len)
134 buf = &mdd_env_info(env)->mti_buf;
135 buf->lb_buf = (void *)area;
140 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
142 struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
144 if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
145 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
148 if (buf->lb_buf == NULL) {
150 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
151 if (buf->lb_buf == NULL)
157 /** Increase the size of the \a mti_big_buf.
158 * preserves old data in buffer
159 * old buffer remains unchanged on error
160 * \retval 0 or -ENOMEM
162 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
164 struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
167 LASSERT(len >= oldbuf->lb_len);
168 OBD_ALLOC_LARGE(buf.lb_buf, len);
170 if (buf.lb_buf == NULL)
174 memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
176 OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
178 memcpy(oldbuf, &buf, sizeof(buf));
183 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
184 struct mdd_device *mdd)
186 struct mdd_thread_info *mti = mdd_env_info(env);
189 max_cookie_size = mdd_lov_cookiesize(env, mdd);
190 if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
191 if (mti->mti_max_cookie)
192 OBD_FREE_LARGE(mti->mti_max_cookie,
193 mti->mti_max_cookie_size);
194 mti->mti_max_cookie = NULL;
195 mti->mti_max_cookie_size = 0;
197 if (unlikely(mti->mti_max_cookie == NULL)) {
198 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
199 if (likely(mti->mti_max_cookie != NULL))
200 mti->mti_max_cookie_size = max_cookie_size;
202 if (likely(mti->mti_max_cookie != NULL))
203 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
204 return mti->mti_max_cookie;
207 struct lov_mds_md *mdd_max_lmm_buffer(const struct lu_env *env, int size)
209 struct mdd_thread_info *mti = mdd_env_info(env);
211 if (unlikely(mti->mti_max_lmm_size < size)) {
212 int rsize = size_roundup_power2(size);
214 if (mti->mti_max_lmm_size > 0) {
215 LASSERT(mti->mti_max_lmm);
216 OBD_FREE_LARGE(mti->mti_max_lmm,
217 mti->mti_max_lmm_size);
218 mti->mti_max_lmm = NULL;
219 mti->mti_max_lmm_size = 0;
222 OBD_ALLOC_LARGE(mti->mti_max_lmm, rsize);
223 if (likely(mti->mti_max_lmm != NULL))
224 mti->mti_max_lmm_size = rsize;
226 return mti->mti_max_lmm;
229 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
230 struct mdd_device *mdd)
234 max_lmm_size = mdd_lov_mdsize(env, mdd);
235 return mdd_max_lmm_buffer(env, max_lmm_size);
238 struct lu_object *mdd_object_alloc(const struct lu_env *env,
239 const struct lu_object_header *hdr,
242 struct mdd_object *mdd_obj;
244 OBD_SLAB_ALLOC_PTR_GFP(mdd_obj, mdd_object_kmem, CFS_ALLOC_IO);
245 if (mdd_obj != NULL) {
248 o = mdd2lu_obj(mdd_obj);
249 lu_object_init(o, NULL, d);
250 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
251 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
252 mdd_obj->mod_count = 0;
253 o->lo_ops = &mdd_lu_obj_ops;
260 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
261 const struct lu_object_conf *unused)
263 struct mdd_device *d = lu2mdd_dev(o->lo_dev);
264 struct mdd_object *mdd_obj = lu2mdd_obj(o);
265 struct lu_object *below;
266 struct lu_device *under;
269 mdd_obj->mod_cltime = 0;
270 under = &d->mdd_child->dd_lu_dev;
271 below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
272 mdd_pdlock_init(mdd_obj);
276 lu_object_add(o, below);
281 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
283 if (lu_object_exists(o))
284 return mdd_get_flags(env, lu2mdd_obj(o));
289 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
291 struct mdd_object *mdd = lu2mdd_obj(o);
294 OBD_SLAB_FREE_PTR(mdd, mdd_object_kmem);
297 static int mdd_object_print(const struct lu_env *env, void *cookie,
298 lu_printer_t p, const struct lu_object *o)
300 struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
301 return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
302 "valid=%x, cltime="LPU64", flags=%lx)",
303 mdd, mdd->mod_count, mdd->mod_valid,
304 mdd->mod_cltime, mdd->mod_flags);
307 static const struct lu_object_operations mdd_lu_obj_ops = {
308 .loo_object_init = mdd_object_init,
309 .loo_object_start = mdd_object_start,
310 .loo_object_free = mdd_object_free,
311 .loo_object_print = mdd_object_print,
314 struct mdd_object *mdd_object_find(const struct lu_env *env,
315 struct mdd_device *d,
316 const struct lu_fid *f)
318 return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
321 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
322 const char *path, struct lu_fid *fid)
325 struct lu_fid *f = &mdd_env_info(env)->mti_fid;
326 struct mdd_object *obj;
327 struct lu_name *lname = &mdd_env_info(env)->mti_name;
332 /* temp buffer for path element */
333 buf = mdd_buf_alloc(env, PATH_MAX);
334 if (buf->lb_buf == NULL)
337 lname->ln_name = name = buf->lb_buf;
338 lname->ln_namelen = 0;
339 *f = mdd->mdd_root_fid;
346 while (*path != '/' && *path != '\0') {
354 /* find obj corresponding to fid */
355 obj = mdd_object_find(env, mdd, f);
357 GOTO(out, rc = -EREMOTE);
359 GOTO(out, rc = PTR_ERR(obj));
360 /* get child fid from parent and name */
361 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
362 mdd_object_put(env, obj);
367 lname->ln_namelen = 0;
376 /** The maximum depth that fid2path() will search.
377 * This is limited only because we want to store the fids for
378 * historical path lookup purposes.
380 #define MAX_PATH_DEPTH 100
382 /** mdd_path() lookup structure. */
383 struct path_lookup_info {
384 __u64 pli_recno; /**< history point */
385 __u64 pli_currec; /**< current record */
386 struct lu_fid pli_fid;
387 struct lu_fid pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
388 struct mdd_object *pli_mdd_obj;
389 char *pli_path; /**< full path */
391 int pli_linkno; /**< which hardlink to follow */
392 int pli_fidcount; /**< number of \a pli_fids */
395 static int mdd_path_current(const struct lu_env *env,
396 struct path_lookup_info *pli)
398 struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
399 struct mdd_object *mdd_obj;
400 struct lu_buf *buf = NULL;
401 struct link_ea_header *leh;
402 struct link_ea_entry *lee;
403 struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
404 struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid;
410 ptr = pli->pli_path + pli->pli_pathlen - 1;
413 pli->pli_fidcount = 0;
414 pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
416 while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
417 mdd_obj = mdd_object_find(env, mdd,
418 &pli->pli_fids[pli->pli_fidcount]);
420 GOTO(out, rc = -EREMOTE);
422 GOTO(out, rc = PTR_ERR(mdd_obj));
423 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
425 mdd_object_put(env, mdd_obj);
429 /* Do I need to error out here? */
434 /* Get parent fid and object name */
435 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
436 buf = mdd_links_get(env, mdd_obj);
437 mdd_read_unlock(env, mdd_obj);
438 mdd_object_put(env, mdd_obj);
440 GOTO(out, rc = PTR_ERR(buf));
443 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
444 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
446 /* If set, use link #linkno for path lookup, otherwise use
447 link #0. Only do this for the final path element. */
448 if ((pli->pli_fidcount == 0) &&
449 (pli->pli_linkno < leh->leh_reccount)) {
451 for (count = 0; count < pli->pli_linkno; count++) {
452 lee = (struct link_ea_entry *)
453 ((char *)lee + reclen);
454 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
456 if (pli->pli_linkno < leh->leh_reccount - 1)
457 /* indicate to user there are more links */
461 /* Pack the name in the end of the buffer */
462 ptr -= tmpname->ln_namelen;
463 if (ptr - 1 <= pli->pli_path)
464 GOTO(out, rc = -EOVERFLOW);
465 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
468 /* Store the parent fid for historic lookup */
469 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
470 GOTO(out, rc = -EOVERFLOW);
471 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
474 /* Verify that our path hasn't changed since we started the lookup.
475 Record the current index, and verify the path resolves to the
476 same fid. If it does, then the path is correct as of this index. */
477 cfs_spin_lock(&mdd->mdd_cl.mc_lock);
478 pli->pli_currec = mdd->mdd_cl.mc_index;
479 cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
480 rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
482 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
483 GOTO (out, rc = -EAGAIN);
485 if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
486 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
487 " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
488 PFID(&pli->pli_fid));
489 GOTO(out, rc = -EAGAIN);
491 ptr++; /* skip leading / */
492 memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
496 if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
497 /* if we vmalloced a large buffer drop it */
503 static int mdd_path_historic(const struct lu_env *env,
504 struct path_lookup_info *pli)
509 /* Returns the full path to this fid, as of changelog record recno. */
510 static int mdd_path(const struct lu_env *env, struct md_object *obj,
511 char *path, int pathlen, __u64 *recno, int *linkno)
513 struct path_lookup_info *pli;
521 if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
530 pli->pli_mdd_obj = md2mdd_obj(obj);
531 pli->pli_recno = *recno;
532 pli->pli_path = path;
533 pli->pli_pathlen = pathlen;
534 pli->pli_linkno = *linkno;
536 /* Retry multiple times in case file is being moved */
537 while (tries-- && rc == -EAGAIN)
538 rc = mdd_path_current(env, pli);
540 /* For historical path lookup, the current links may not have existed
541 * at "recno" time. We must switch over to earlier links/parents
542 * by using the changelog records. If the earlier parent doesn't
543 * exist, we must search back through the changelog to reconstruct
544 * its parents, then check if it exists, etc.
545 * We may ignore this problem for the initial implementation and
546 * state that an "original" hardlink must still exist for us to find
547 * historic path name. */
548 if (pli->pli_recno != -1) {
549 rc = mdd_path_historic(env, pli);
551 *recno = pli->pli_currec;
552 /* Return next link index to caller */
553 *linkno = pli->pli_linkno;
561 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
563 struct lu_attr *la = &mdd_env_info(env)->mti_la;
567 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
569 mdd_flags_xlate(obj, la->la_flags);
574 /* get only inode attributes */
575 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
581 if (ma->ma_valid & MA_INODE)
584 rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
585 mdd_object_capa(env, mdd_obj));
587 ma->ma_valid |= MA_INODE;
591 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
593 struct lov_desc *ldesc;
594 struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
595 struct lov_user_md *lum = (struct lov_user_md*)lmm;
601 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
602 LASSERT(ldesc != NULL);
604 lum->lmm_magic = LOV_MAGIC_V1;
605 lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
606 lum->lmm_pattern = ldesc->ld_pattern;
607 lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
608 lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
609 lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
611 RETURN(sizeof(*lum));
614 static int is_rootdir(struct mdd_object *mdd_obj)
616 const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
617 const struct lu_fid *fid = mdo2fid(mdd_obj);
619 return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
622 int mdd_big_lmm_get(const struct lu_env *env, struct mdd_object *obj,
625 struct mdd_thread_info *info = mdd_env_info(env);
630 LASSERT(info != NULL);
631 LASSERT(ma->ma_big_lmm_used == 0);
633 if (ma->ma_lmm_size == 0) {
634 CERROR("No buffer to hold %s xattr of object "DFID"\n",
635 XATTR_NAME_LOV, PFID(mdd_object_fid(obj)));
639 rc = mdo_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
640 mdd_object_capa(env, obj));
644 /* big_lmm may need to grow */
646 mdd_max_lmm_buffer(env, size);
647 if (info->mti_max_lmm == NULL)
650 LASSERT(info->mti_max_lmm_size >= size);
651 rc = mdd_get_md(env, obj, info->mti_max_lmm, &size,
656 ma->ma_big_lmm_used = 1;
657 ma->ma_valid |= MA_LOV;
658 ma->ma_lmm = info->mti_max_lmm;
659 ma->ma_lmm_size = size;
664 /* get lov EA only */
665 static int __mdd_lmm_get(const struct lu_env *env,
666 struct mdd_object *mdd_obj, struct md_attr *ma)
671 if (ma->ma_valid & MA_LOV)
674 rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
677 rc = mdd_big_lmm_get(env, mdd_obj, ma);
678 else if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
679 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
682 ma->ma_lmm_size = rc;
683 ma->ma_layout_gen = ma->ma_lmm->lmm_layout_gen;
684 ma->ma_valid |= MA_LOV | MA_LAY_GEN;
690 /* get the first parent fid from link EA */
691 static int mdd_pfid_get(const struct lu_env *env,
692 struct mdd_object *mdd_obj, struct md_attr *ma)
695 struct link_ea_header *leh;
696 struct link_ea_entry *lee;
697 struct lu_fid *pfid = &ma->ma_pfid;
700 if (ma->ma_valid & MA_PFID)
703 buf = mdd_links_get(env, mdd_obj);
705 RETURN(PTR_ERR(buf));
708 lee = (struct link_ea_entry *)(leh + 1);
709 memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
710 fid_be_to_cpu(pfid, pfid);
711 ma->ma_valid |= MA_PFID;
712 if (buf->lb_len > OBD_ALLOC_BIG)
713 /* if we vmalloced a large buffer drop it */
718 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
724 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
725 rc = __mdd_lmm_get(env, mdd_obj, ma);
726 mdd_read_unlock(env, mdd_obj);
731 static int __mdd_lmv_get(const struct lu_env *env,
732 struct mdd_object *mdd_obj, struct md_attr *ma)
737 if (ma->ma_valid & MA_LMV)
740 rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
743 ma->ma_valid |= MA_LMV;
749 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
752 struct mdd_thread_info *info = mdd_env_info(env);
753 struct lustre_mdt_attrs *lma =
754 (struct lustre_mdt_attrs *)info->mti_xattr_buf;
759 /* If all needed data are already valid, nothing to do */
760 if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
761 (ma->ma_need & (MA_HSM | MA_SOM)))
764 /* Read LMA from disk EA */
765 lma_size = sizeof(info->mti_xattr_buf);
766 rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
770 /* Useless to check LMA incompatibility because this is already done in
771 * osd_ea_fid_get(), and this will fail long before this code is
773 * So, if we are here, LMA is compatible.
776 lustre_lma_swab(lma);
778 /* Swab and copy LMA */
779 if (ma->ma_need & MA_HSM) {
780 if (lma->lma_compat & LMAC_HSM)
781 ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
783 ma->ma_hsm.mh_flags = 0;
784 ma->ma_valid |= MA_HSM;
788 if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
789 LASSERT(ma->ma_som != NULL);
790 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
791 ma->ma_som->msd_size = lma->lma_som_size;
792 ma->ma_som->msd_blocks = lma->lma_som_blocks;
793 ma->ma_som->msd_mountid = lma->lma_som_mountid;
794 ma->ma_valid |= MA_SOM;
800 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
806 if (ma->ma_need & MA_INODE)
807 rc = mdd_iattr_get(env, mdd_obj, ma);
809 if (rc == 0 && ma->ma_need & MA_LOV) {
810 if (S_ISREG(mdd_object_type(mdd_obj)) ||
811 S_ISDIR(mdd_object_type(mdd_obj)))
812 rc = __mdd_lmm_get(env, mdd_obj, ma);
814 if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
815 if (S_ISREG(mdd_object_type(mdd_obj)))
816 rc = mdd_pfid_get(env, mdd_obj, ma);
818 if (rc == 0 && ma->ma_need & MA_LMV) {
819 if (S_ISDIR(mdd_object_type(mdd_obj)))
820 rc = __mdd_lmv_get(env, mdd_obj, ma);
822 if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
823 if (S_ISREG(mdd_object_type(mdd_obj)))
824 rc = __mdd_lma_get(env, mdd_obj, ma);
826 #ifdef CONFIG_FS_POSIX_ACL
827 if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
828 if (S_ISDIR(mdd_object_type(mdd_obj)))
829 rc = mdd_def_acl_get(env, mdd_obj, ma);
832 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
833 rc, ma->ma_valid, ma->ma_lmm);
837 int mdd_attr_get_internal_locked(const struct lu_env *env,
838 struct mdd_object *mdd_obj, struct md_attr *ma)
841 int needlock = ma->ma_need &
842 (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
845 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
846 rc = mdd_attr_get_internal(env, mdd_obj, ma);
848 mdd_read_unlock(env, mdd_obj);
853 * No permission check is needed.
855 int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
858 struct mdd_object *mdd_obj = md2mdd_obj(obj);
862 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
867 * No permission check is needed.
869 static int mdd_xattr_get(const struct lu_env *env,
870 struct md_object *obj, struct lu_buf *buf,
873 struct mdd_object *mdd_obj = md2mdd_obj(obj);
878 if (mdd_object_exists(mdd_obj) == 0) {
879 CERROR("%s: object "DFID" not found: rc = -2\n",
880 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
884 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
885 rc = mdo_xattr_get(env, mdd_obj, buf, name,
886 mdd_object_capa(env, mdd_obj));
887 mdd_read_unlock(env, mdd_obj);
893 * Permission check is done when open,
894 * no need check again.
896 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
899 struct mdd_object *mdd_obj = md2mdd_obj(obj);
900 struct dt_object *next;
905 if (mdd_object_exists(mdd_obj) == 0) {
906 CERROR("%s: object "DFID" not found: rc = -2\n",
907 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
911 next = mdd_object_child(mdd_obj);
912 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
913 rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
914 mdd_object_capa(env, mdd_obj));
915 mdd_read_unlock(env, mdd_obj);
920 * No permission check is needed.
922 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
925 struct mdd_object *mdd_obj = md2mdd_obj(obj);
930 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
931 rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
932 mdd_read_unlock(env, mdd_obj);
937 int mdd_declare_object_create_internal(const struct lu_env *env,
938 struct mdd_object *p,
939 struct mdd_object *c,
941 struct thandle *handle,
942 const struct md_op_spec *spec)
944 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
945 const struct dt_index_features *feat = spec->sp_feat;
949 if (feat != &dt_directory_features && feat != NULL)
950 dof->dof_type = DFT_INDEX;
952 dof->dof_type = dt_mode_to_dft(ma->ma_attr.la_mode);
954 dof->u.dof_idx.di_feat = feat;
956 rc = mdo_declare_create_obj(env, c, &ma->ma_attr, NULL, dof, handle);
961 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
962 struct mdd_object *c, struct md_attr *ma,
963 struct thandle *handle,
964 const struct md_op_spec *spec)
966 struct lu_attr *attr = &ma->ma_attr;
967 struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
968 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
969 const struct dt_index_features *feat = spec->sp_feat;
973 if (!mdd_object_exists(c)) {
974 struct dt_object *next = mdd_object_child(c);
977 if (feat != &dt_directory_features && feat != NULL)
978 dof->dof_type = DFT_INDEX;
980 dof->dof_type = dt_mode_to_dft(attr->la_mode);
982 dof->u.dof_idx.di_feat = feat;
984 /* @hint will be initialized by underlying device. */
985 next->do_ops->do_ah_init(env, hint,
986 p ? mdd_object_child(p) : NULL,
987 attr->la_mode & S_IFMT);
989 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
990 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
998 * Make sure the ctime is increased only.
1000 static inline int mdd_attr_check(const struct lu_env *env,
1001 struct mdd_object *obj,
1002 struct lu_attr *attr)
1004 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1008 if (attr->la_valid & LA_CTIME) {
1009 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1013 if (attr->la_ctime < tmp_la->la_ctime)
1014 attr->la_valid &= ~(LA_MTIME | LA_CTIME);
1015 else if (attr->la_valid == LA_CTIME &&
1016 attr->la_ctime == tmp_la->la_ctime)
1017 attr->la_valid &= ~LA_CTIME;
1022 int mdd_attr_set_internal(const struct lu_env *env,
1023 struct mdd_object *obj,
1024 struct lu_attr *attr,
1025 struct thandle *handle,
1031 rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
1032 #ifdef CONFIG_FS_POSIX_ACL
1033 if (!rc && (attr->la_valid & LA_MODE) && needacl)
1034 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
1039 int mdd_attr_check_set_internal(const struct lu_env *env,
1040 struct mdd_object *obj,
1041 struct lu_attr *attr,
1042 struct thandle *handle,
1048 rc = mdd_attr_check(env, obj, attr);
1053 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1057 static int mdd_attr_set_internal_locked(const struct lu_env *env,
1058 struct mdd_object *obj,
1059 struct lu_attr *attr,
1060 struct thandle *handle,
1066 needacl = needacl && (attr->la_valid & LA_MODE);
1068 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1069 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1071 mdd_write_unlock(env, obj);
1075 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
1076 struct mdd_object *obj,
1077 struct lu_attr *attr,
1078 struct thandle *handle,
1084 needacl = needacl && (attr->la_valid & LA_MODE);
1086 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1087 rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
1089 mdd_write_unlock(env, obj);
1093 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
1094 const struct lu_buf *buf, const char *name,
1095 int fl, struct thandle *handle)
1097 struct lustre_capa *capa = mdd_object_capa(env, obj);
1101 if (buf->lb_buf && buf->lb_len > 0)
1102 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1103 else if (buf->lb_buf == NULL && buf->lb_len == 0)
1104 rc = mdo_xattr_del(env, obj, name, handle, capa);
1110 * This gives the same functionality as the code between
1111 * sys_chmod and inode_setattr
1112 * chown_common and inode_setattr
1113 * utimes and inode_setattr
1114 * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1116 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1117 struct lu_attr *la, const struct md_attr *ma)
1119 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1120 struct md_ucred *uc;
1127 /* Do not permit change file type */
1128 if (la->la_valid & LA_TYPE)
1131 /* They should not be processed by setattr */
1132 if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1135 /* export destroy does not have ->le_ses, but we may want
1136 * to drop LUSTRE_SOM_FL. */
1142 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1146 if (la->la_valid == LA_CTIME) {
1147 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1148 /* This is only for set ctime when rename's source is
1150 rc = mdd_may_delete(env, NULL, obj,
1151 (struct md_attr *)ma, 1, 0);
1152 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1153 la->la_valid &= ~LA_CTIME;
1157 if (la->la_valid == LA_ATIME) {
1158 /* This is atime only set for read atime update on close. */
1159 if (la->la_atime >= tmp_la->la_atime &&
1160 la->la_atime < (tmp_la->la_atime +
1161 mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1162 la->la_valid &= ~LA_ATIME;
1166 /* Check if flags change. */
1167 if (la->la_valid & LA_FLAGS) {
1168 unsigned int oldflags = 0;
1169 unsigned int newflags = la->la_flags &
1170 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1172 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1173 !mdd_capable(uc, CFS_CAP_FOWNER))
1176 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1177 * only be changed by the relevant capability. */
1178 if (mdd_is_immutable(obj))
1179 oldflags |= LUSTRE_IMMUTABLE_FL;
1180 if (mdd_is_append(obj))
1181 oldflags |= LUSTRE_APPEND_FL;
1182 if ((oldflags ^ newflags) &&
1183 !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1186 if (!S_ISDIR(tmp_la->la_mode))
1187 la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1190 if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1191 (la->la_valid & ~LA_FLAGS) &&
1192 !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1195 /* Check for setting the obj time. */
1196 if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1197 !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1198 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1199 !mdd_capable(uc, CFS_CAP_FOWNER)) {
1200 rc = mdd_permission_internal_locked(env, obj, tmp_la,
1208 if (la->la_valid & LA_KILL_SUID) {
1209 la->la_valid &= ~LA_KILL_SUID;
1210 if ((tmp_la->la_mode & S_ISUID) &&
1211 !(la->la_valid & LA_MODE)) {
1212 la->la_mode = tmp_la->la_mode;
1213 la->la_valid |= LA_MODE;
1215 la->la_mode &= ~S_ISUID;
1218 if (la->la_valid & LA_KILL_SGID) {
1219 la->la_valid &= ~LA_KILL_SGID;
1220 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1221 (S_ISGID | S_IXGRP)) &&
1222 !(la->la_valid & LA_MODE)) {
1223 la->la_mode = tmp_la->la_mode;
1224 la->la_valid |= LA_MODE;
1226 la->la_mode &= ~S_ISGID;
1229 /* Make sure a caller can chmod. */
1230 if (la->la_valid & LA_MODE) {
1231 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1232 (uc->mu_fsuid != tmp_la->la_uid) &&
1233 !mdd_capable(uc, CFS_CAP_FOWNER))
1236 if (la->la_mode == (cfs_umode_t) -1)
1237 la->la_mode = tmp_la->la_mode;
1239 la->la_mode = (la->la_mode & S_IALLUGO) |
1240 (tmp_la->la_mode & ~S_IALLUGO);
1242 /* Also check the setgid bit! */
1243 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1244 la->la_gid : tmp_la->la_gid) &&
1245 !mdd_capable(uc, CFS_CAP_FSETID))
1246 la->la_mode &= ~S_ISGID;
1248 la->la_mode = tmp_la->la_mode;
1251 /* Make sure a caller can chown. */
1252 if (la->la_valid & LA_UID) {
1253 if (la->la_uid == (uid_t) -1)
1254 la->la_uid = tmp_la->la_uid;
1255 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1256 (la->la_uid != tmp_la->la_uid)) &&
1257 !mdd_capable(uc, CFS_CAP_CHOWN))
1260 /* If the user or group of a non-directory has been
1261 * changed by a non-root user, remove the setuid bit.
1262 * 19981026 David C Niemi <niemi@tux.org>
1264 * Changed this to apply to all users, including root,
1265 * to avoid some races. This is the behavior we had in
1266 * 2.0. The check for non-root was definitely wrong
1267 * for 2.2 anyway, as it should have been using
1268 * CAP_FSETID rather than fsuid -- 19990830 SD. */
1269 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1270 !S_ISDIR(tmp_la->la_mode)) {
1271 la->la_mode &= ~S_ISUID;
1272 la->la_valid |= LA_MODE;
1276 /* Make sure caller can chgrp. */
1277 if (la->la_valid & LA_GID) {
1278 if (la->la_gid == (gid_t) -1)
1279 la->la_gid = tmp_la->la_gid;
1280 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1281 ((la->la_gid != tmp_la->la_gid) &&
1282 !lustre_in_group_p(uc, la->la_gid))) &&
1283 !mdd_capable(uc, CFS_CAP_CHOWN))
1286 /* Likewise, if the user or group of a non-directory
1287 * has been changed by a non-root user, remove the
1288 * setgid bit UNLESS there is no group execute bit
1289 * (this would be a file marked for mandatory
1290 * locking). 19981026 David C Niemi <niemi@tux.org>
1292 * Removed the fsuid check (see the comment above) --
1294 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1295 (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1296 la->la_mode &= ~S_ISGID;
1297 la->la_valid |= LA_MODE;
1301 /* For both Size-on-MDS case and truncate case,
1302 * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1303 * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1304 * For SOM case, it is true, the MAY_WRITE perm has been checked
1305 * when open, no need check again. For truncate case, it is false,
1306 * the MAY_WRITE perm should be checked here. */
1307 if (ma->ma_attr_flags & MDS_SOM) {
1308 /* For the "Size-on-MDS" setattr update, merge coming
1309 * attributes with the set in the inode. BUG 10641 */
1310 if ((la->la_valid & LA_ATIME) &&
1311 (la->la_atime <= tmp_la->la_atime))
1312 la->la_valid &= ~LA_ATIME;
1314 /* OST attributes do not have a priority over MDS attributes,
1315 * so drop times if ctime is equal. */
1316 if ((la->la_valid & LA_CTIME) &&
1317 (la->la_ctime <= tmp_la->la_ctime))
1318 la->la_valid &= ~(LA_MTIME | LA_CTIME);
1320 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1321 if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1322 (uc->mu_fsuid == tmp_la->la_uid)) &&
1323 !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1324 rc = mdd_permission_internal_locked(env, obj,
1331 if (la->la_valid & LA_CTIME) {
1332 /* The pure setattr, it has the priority over what is
1333 * already set, do not drop it if ctime is equal. */
1334 if (la->la_ctime < tmp_la->la_ctime)
1335 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1343 /** Store a data change changelog record
1344 * If this fails, we must fail the whole transaction; we don't
1345 * want the change to commit without the log entry.
1346 * \param mdd_obj - mdd_object of change
1347 * \param handle - transacion handle
1349 static int mdd_changelog_data_store(const struct lu_env *env,
1350 struct mdd_device *mdd,
1351 enum changelog_rec_type type,
1353 struct mdd_object *mdd_obj,
1354 struct thandle *handle)
1356 const struct lu_fid *tfid = mdo2fid(mdd_obj);
1357 struct llog_changelog_rec *rec;
1358 struct thandle *th = NULL;
1364 if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1366 if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1369 LASSERT(mdd_obj != NULL);
1370 LASSERT(handle != NULL);
1372 if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1373 cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1374 /* Don't need multiple updates in this log */
1375 /* Don't check under lock - no big deal if we get an extra
1380 reclen = llog_data_len(sizeof(*rec));
1381 buf = mdd_buf_alloc(env, reclen);
1382 if (buf->lb_buf == NULL)
1384 rec = (struct llog_changelog_rec *)buf->lb_buf;
1386 rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1387 rec->cr.cr_type = (__u32)type;
1388 rec->cr.cr_tfid = *tfid;
1389 rec->cr.cr_namelen = 0;
1390 mdd_obj->mod_cltime = cfs_time_current_64();
1392 rc = mdd_changelog_llog_write(mdd, rec, handle ? : th);
1395 mdd_trans_stop(env, mdd, rc, th);
1398 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1399 rc, type, PFID(tfid));
1406 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1407 int flags, struct md_object *obj)
1409 struct thandle *handle;
1410 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1411 struct mdd_device *mdd = mdo2mdd(obj);
1415 handle = mdd_trans_create(env, mdd);
1417 return(PTR_ERR(handle));
1419 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1423 rc = mdd_trans_start(env, mdd, handle);
1427 rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1431 mdd_trans_stop(env, mdd, rc, handle);
1437 * Should be called with write lock held.
1439 * \see mdd_lma_set_locked().
1441 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1442 const struct md_attr *ma, struct thandle *handle)
1444 struct mdd_thread_info *info = mdd_env_info(env);
1446 struct lustre_mdt_attrs *lma =
1447 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1448 int lmasize = sizeof(struct lustre_mdt_attrs);
1453 /* Either HSM or SOM part is not valid, we need to read it before */
1454 if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1455 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1459 lustre_lma_swab(lma);
1461 memset(lma, 0, lmasize);
1465 if (ma->ma_valid & MA_HSM) {
1466 lma->lma_flags |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1467 lma->lma_compat |= LMAC_HSM;
1471 if (ma->ma_valid & MA_SOM) {
1472 LASSERT(ma->ma_som != NULL);
1473 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1474 lma->lma_compat &= ~LMAC_SOM;
1476 lma->lma_compat |= LMAC_SOM;
1477 lma->lma_ioepoch = ma->ma_som->msd_ioepoch;
1478 lma->lma_som_size = ma->ma_som->msd_size;
1479 lma->lma_som_blocks = ma->ma_som->msd_blocks;
1480 lma->lma_som_mountid = ma->ma_som->msd_mountid;
1485 memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1487 lustre_lma_swab(lma);
1488 buf = mdd_buf_get(env, lma, lmasize);
1489 rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1495 * Save LMA extended attributes with data from \a ma.
1497 * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1498 * not, LMA EA will be first read from disk, modified and write back.
1501 static int mdd_lma_set_locked(const struct lu_env *env,
1502 struct mdd_object *mdd_obj,
1503 const struct md_attr *ma, struct thandle *handle)
1507 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1508 rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1509 mdd_write_unlock(env, mdd_obj);
1513 /* Precedence for choosing record type when multiple
1514 * attributes change: setattr > mtime > ctime > atime
1515 * (ctime changes when mtime does, plus chmod/chown.
1516 * atime and ctime are independent.) */
1517 static int mdd_attr_set_changelog(const struct lu_env *env,
1518 struct md_object *obj, struct thandle *handle,
1521 struct mdd_device *mdd = mdo2mdd(obj);
1524 bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1525 bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1526 bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1527 bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1528 bits = bits & mdd->mdd_cl.mc_mask;
1532 /* The record type is the lowest non-masked set bit */
1533 while (bits && ((bits & 1) == 0)) {
1538 /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1539 return mdd_changelog_data_store(env, mdd, type, (int)valid,
1540 md2mdd_obj(obj), handle);
1543 static int mdd_declare_attr_set(const struct lu_env *env,
1544 struct mdd_device *mdd,
1545 struct mdd_object *obj,
1546 const struct md_attr *ma,
1547 struct lov_mds_md *lmm,
1548 struct thandle *handle)
1550 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1553 rc = mdo_declare_attr_set(env, obj, &ma->ma_attr, handle);
1557 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1561 if (ma->ma_valid & MA_LOV) {
1563 buf->lb_len = ma->ma_lmm_size;
1564 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
1570 if (ma->ma_valid & (MA_HSM | MA_SOM)) {
1572 buf->lb_len = sizeof(struct lustre_mdt_attrs);
1573 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA,
1579 #ifdef CONFIG_FS_POSIX_ACL
1580 if (ma->ma_attr.la_valid & LA_MODE) {
1581 mdd_read_lock(env, obj, MOR_TGT_CHILD);
1582 rc = mdo_xattr_get(env, obj, &LU_BUF_NULL,XATTR_NAME_ACL_ACCESS,
1584 mdd_read_unlock(env, obj);
1585 if (rc == -EOPNOTSUPP || rc == -ENODATA)
1593 rc = mdo_declare_xattr_set(env, obj, buf,
1594 XATTR_NAME_ACL_ACCESS, 0,
1602 /* basically the log is the same as in unlink case */
1606 if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V1 &&
1607 le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V3) {
1608 CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n",
1609 mdd->mdd_obd_dev->obd_name,
1610 le32_to_cpu(lmm->lmm_magic),
1611 PFID(lu_object_fid(&obj->mod_obj.mo_lu)));
1615 stripe = le16_to_cpu(lmm->lmm_stripe_count);
1616 if (stripe == LOV_ALL_STRIPES) {
1617 struct lov_desc *ldesc;
1619 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
1620 LASSERT(ldesc != NULL);
1621 stripe = ldesc->ld_tgt_count;
1624 for (i = 0; i < stripe; i++) {
1625 rc = mdd_declare_llog_record(env, mdd,
1626 sizeof(struct llog_unlink_rec),
1636 /* set attr and LOV EA at once, return updated attr */
1637 int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1638 const struct md_attr *ma)
1640 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1641 struct mdd_device *mdd = mdo2mdd(obj);
1642 struct thandle *handle;
1643 struct lov_mds_md *lmm = NULL;
1644 struct llog_cookie *logcookies = NULL;
1645 int rc, lmm_size = 0, cookie_size = 0;
1646 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1647 #ifdef HAVE_QUOTA_SUPPORT
1648 struct obd_device *obd = mdd->mdd_obd_dev;
1649 struct mds_obd *mds = &obd->u.mds;
1650 unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1651 unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1652 int quota_opc = 0, block_count = 0;
1653 int inode_pending[MAXQUOTAS] = { 0, 0 };
1654 int block_pending[MAXQUOTAS] = { 0, 0 };
1658 *la_copy = ma->ma_attr;
1659 rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1663 /* setattr on "close" only change atime, or do nothing */
1664 if (ma->ma_valid == MA_INODE &&
1665 ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1668 if (S_ISREG(mdd_object_type(mdd_obj)) &&
1669 ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1670 lmm_size = mdd_lov_mdsize(env, mdd);
1671 lmm = mdd_max_lmm_get(env, mdd);
1675 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1682 handle = mdd_trans_create(env, mdd);
1684 RETURN(PTR_ERR(handle));
1686 rc = mdd_declare_attr_set(env, mdd, mdd_obj, ma,
1687 lmm_size > 0 ? lmm : NULL, handle);
1691 rc = mdd_trans_start(env, mdd, handle);
1695 /* permission changes may require sync operation */
1696 if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1697 handle->th_sync |= !!mdd->mdd_sync_permission;
1699 if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1700 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1701 ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1703 #ifdef HAVE_QUOTA_SUPPORT
1704 if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1705 struct obd_export *exp = md_quota(env)->mq_exp;
1706 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1708 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1710 quota_opc = FSFILT_OP_SETATTR;
1711 mdd_quota_wrapper(la_copy, qnids);
1712 mdd_quota_wrapper(la_tmp, qoids);
1713 /* get file quota for new owner */
1714 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1715 qnids, inode_pending, 1, NULL, 0,
1717 block_count = (la_tmp->la_blocks + 7) >> 3;
1720 mdd_data_get(env, mdd_obj, &data);
1721 /* get block quota for new owner */
1722 lquota_chkquota(mds_quota_interface_ref, obd,
1723 exp, qnids, block_pending,
1725 LQUOTA_FLAGS_BLK, data, 1);
1731 if (la_copy->la_valid & LA_FLAGS) {
1732 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1735 mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1736 } else if (la_copy->la_valid) { /* setattr */
1737 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1739 /* journal chown/chgrp in llog, just like unlink */
1740 if (rc == 0 && lmm_size){
1741 cookie_size = mdd_lov_cookiesize(env, mdd);
1742 logcookies = mdd_max_cookie_get(env, mdd);
1743 if (logcookies == NULL)
1744 GOTO(cleanup, rc = -ENOMEM);
1746 if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1747 logcookies, cookie_size) <= 0)
1752 if (rc == 0 && ma->ma_valid & MA_LOV) {
1755 mode = mdd_object_type(mdd_obj);
1756 if (S_ISREG(mode) || S_ISDIR(mode)) {
1757 rc = mdd_lsm_sanity_check(env, mdd_obj);
1761 rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1762 ma->ma_lmm_size, handle, 1);
1766 if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1769 mode = mdd_object_type(mdd_obj);
1771 rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1776 rc = mdd_attr_set_changelog(env, obj, handle,
1777 ma->ma_attr.la_valid);
1779 mdd_trans_stop(env, mdd, rc, handle);
1780 if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1781 /*set obd attr, if needed*/
1782 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1785 #ifdef HAVE_QUOTA_SUPPORT
1787 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1789 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1791 /* Trigger dqrel/dqacq for original owner and new owner.
1792 * If failed, the next call for lquota_chkquota will
1794 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1801 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1802 const struct lu_buf *buf, const char *name, int fl,
1803 struct thandle *handle)
1808 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1809 rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1810 mdd_write_unlock(env, obj);
1815 static int mdd_xattr_sanity_check(const struct lu_env *env,
1816 struct mdd_object *obj)
1818 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1819 struct md_ucred *uc = md_ucred(env);
1823 if (mdd_is_immutable(obj) || mdd_is_append(obj))
1826 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1830 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1831 !mdd_capable(uc, CFS_CAP_FOWNER))
1837 static int mdd_declare_xattr_set(const struct lu_env *env,
1838 struct mdd_device *mdd,
1839 struct mdd_object *obj,
1840 const struct lu_buf *buf,
1842 struct thandle *handle)
1847 rc = mdo_declare_xattr_set(env, obj, buf, name, 0, handle);
1851 /* Only record user xattr changes */
1852 if ((strncmp("user.", name, 5) == 0))
1853 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1859 * The caller should guarantee to update the object ctime
1860 * after xattr_set if needed.
1862 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1863 const struct lu_buf *buf, const char *name,
1866 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1867 struct mdd_device *mdd = mdo2mdd(obj);
1868 struct thandle *handle;
1872 rc = mdd_xattr_sanity_check(env, mdd_obj);
1876 handle = mdd_trans_create(env, mdd);
1878 RETURN(PTR_ERR(handle));
1880 rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, handle);
1884 rc = mdd_trans_start(env, mdd, handle);
1888 /* security-replated changes may require sync */
1889 if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1890 handle->th_sync |= !!mdd->mdd_sync_permission;
1892 rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1894 /* Only record system & user xattr changes */
1895 if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1896 sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1897 strncmp(POSIX_ACL_XATTR_ACCESS, name,
1898 sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1899 strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1900 sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1901 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1905 mdd_trans_stop(env, mdd, rc, handle);
1910 static int mdd_declare_xattr_del(const struct lu_env *env,
1911 struct mdd_device *mdd,
1912 struct mdd_object *obj,
1914 struct thandle *handle)
1918 rc = mdo_declare_xattr_del(env, obj, name, handle);
1922 /* Only record user xattr changes */
1923 if ((strncmp("user.", name, 5) == 0))
1924 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1930 * The caller should guarantee to update the object ctime
1931 * after xattr_set if needed.
1933 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1936 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1937 struct mdd_device *mdd = mdo2mdd(obj);
1938 struct thandle *handle;
1942 rc = mdd_xattr_sanity_check(env, mdd_obj);
1946 handle = mdd_trans_create(env, mdd);
1948 RETURN(PTR_ERR(handle));
1950 rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
1954 rc = mdd_trans_start(env, mdd, handle);
1958 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1959 rc = mdo_xattr_del(env, mdd_obj, name, handle,
1960 mdd_object_capa(env, mdd_obj));
1961 mdd_write_unlock(env, mdd_obj);
1963 /* Only record system & user xattr changes */
1964 if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1965 sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1966 strncmp(POSIX_ACL_XATTR_ACCESS, name,
1967 sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1968 strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1969 sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1970 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1974 mdd_trans_stop(env, mdd, rc, handle);
1979 /* partial unlink */
1980 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1983 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1984 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1985 struct mdd_device *mdd = mdo2mdd(obj);
1986 struct thandle *handle;
1987 #ifdef HAVE_QUOTA_SUPPORT
1988 struct obd_device *obd = mdd->mdd_obd_dev;
1989 struct mds_obd *mds = &obd->u.mds;
1990 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1996 /* XXX: this code won't be used ever:
1997 * DNE uses slightly different approach */
2001 * Check -ENOENT early here because we need to get object type
2002 * to calculate credits before transaction start
2004 if (mdd_object_exists(mdd_obj) == 0) {
2005 CERROR("%s: object "DFID" not found: rc = -2\n",
2006 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2010 LASSERT(mdd_object_exists(mdd_obj) > 0);
2012 handle = mdd_trans_create(env, mdd);
2016 rc = mdd_trans_start(env, mdd, handle);
2018 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2020 rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
2024 mdo_ref_del(env, mdd_obj, handle);
2026 if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
2028 mdo_ref_del(env, mdd_obj, handle);
2031 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2032 la_copy->la_ctime = ma->ma_attr.la_ctime;
2034 la_copy->la_valid = LA_CTIME;
2035 rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
2039 rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
2040 #ifdef HAVE_QUOTA_SUPPORT
2041 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
2042 ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
2043 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2044 mdd_quota_wrapper(&ma->ma_attr, qids);
2051 mdd_write_unlock(env, mdd_obj);
2052 mdd_trans_stop(env, mdd, rc, handle);
2053 #ifdef HAVE_QUOTA_SUPPORT
2055 /* Trigger dqrel on the owner of child. If failed,
2056 * the next call for lquota_chkquota will process it */
2057 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2063 /* partial operation */
2064 static int mdd_oc_sanity_check(const struct lu_env *env,
2065 struct mdd_object *obj,
2071 switch (ma->ma_attr.la_mode & S_IFMT) {
2088 static int mdd_object_create(const struct lu_env *env,
2089 struct md_object *obj,
2090 const struct md_op_spec *spec,
2094 struct mdd_device *mdd = mdo2mdd(obj);
2095 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2096 const struct lu_fid *pfid = spec->u.sp_pfid;
2097 struct thandle *handle;
2098 #ifdef HAVE_QUOTA_SUPPORT
2099 struct obd_device *obd = mdd->mdd_obd_dev;
2100 struct obd_export *exp = md_quota(env)->mq_exp;
2101 struct mds_obd *mds = &obd->u.mds;
2102 unsigned int qids[MAXQUOTAS] = { 0, 0 };
2103 int quota_opc = 0, block_count = 0;
2104 int inode_pending[MAXQUOTAS] = { 0, 0 };
2105 int block_pending[MAXQUOTAS] = { 0, 0 };
2110 /* XXX: this code won't be used ever:
2111 * DNE uses slightly different approach */
2114 #ifdef HAVE_QUOTA_SUPPORT
2115 if (mds->mds_quota) {
2116 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
2117 mdd_quota_wrapper(&ma->ma_attr, qids);
2118 /* get file quota for child */
2119 lquota_chkquota(mds_quota_interface_ref, obd, exp,
2120 qids, inode_pending, 1, NULL, 0,
2122 switch (ma->ma_attr.la_mode & S_IFMT) {
2131 /* get block quota for child */
2133 lquota_chkquota(mds_quota_interface_ref, obd, exp,
2134 qids, block_pending, block_count,
2135 NULL, LQUOTA_FLAGS_BLK, NULL, 0);
2139 handle = mdd_trans_create(env, mdd);
2141 GOTO(out_pending, rc = PTR_ERR(handle));
2143 rc = mdd_trans_start(env, mdd, handle);
2145 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2146 rc = mdd_oc_sanity_check(env, mdd_obj, ma);
2150 rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
2154 if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
2155 /* If creating the slave object, set slave EA here. */
2156 int lmv_size = spec->u.sp_ea.eadatalen;
2157 struct lmv_stripe_md *lmv;
2159 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
2160 LASSERT(lmv != NULL && lmv_size > 0);
2162 rc = __mdd_xattr_set(env, mdd_obj,
2163 mdd_buf_get_const(env, lmv, lmv_size),
2164 XATTR_NAME_LMV, 0, handle);
2168 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
2171 #ifdef CONFIG_FS_POSIX_ACL
2172 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
2173 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
2175 buf->lb_buf = (void *)spec->u.sp_ea.eadata;
2176 buf->lb_len = spec->u.sp_ea.eadatalen;
2177 if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
2178 rc = __mdd_acl_init(env, mdd_obj, buf,
2179 &ma->ma_attr.la_mode,
2184 ma->ma_attr.la_valid |= LA_MODE;
2187 pfid = spec->u.sp_ea.fid;
2190 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
2196 rc = mdd_attr_get_internal(env, mdd_obj, ma);
2197 mdd_write_unlock(env, mdd_obj);
2199 mdd_trans_stop(env, mdd, rc, handle);
2201 #ifdef HAVE_QUOTA_SUPPORT
2203 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2205 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2207 /* Trigger dqacq on the owner of child. If failed,
2208 * the next call for lquota_chkquota will process it. */
2209 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2217 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
2218 const struct md_attr *ma)
2220 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
2221 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2222 struct mdd_device *mdd = mdo2mdd(obj);
2223 struct thandle *handle;
2227 /* XXX: this code won't be used ever:
2228 * DNE uses slightly different approach */
2231 handle = mdd_trans_create(env, mdd);
2235 rc = mdd_trans_start(env, mdd, handle);
2237 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2238 rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
2240 mdo_ref_add(env, mdd_obj, handle);
2241 mdd_write_unlock(env, mdd_obj);
2243 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2244 la_copy->la_ctime = ma->ma_attr.la_ctime;
2246 la_copy->la_valid = LA_CTIME;
2247 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
2250 mdd_trans_stop(env, mdd, 0, handle);
2256 * do NOT or the MAY_*'s, you'll get the weakest
2258 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
2262 /* Sadly, NFSD reopens a file repeatedly during operation, so the
2263 * "acc_mode = 0" allowance for newly-created files isn't honoured.
2264 * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
2265 * owner can write to a file even if it is marked readonly to hide
2266 * its brokenness. (bug 5781) */
2267 if (flags & MDS_OPEN_OWNEROVERRIDE) {
2268 struct md_ucred *uc = md_ucred(env);
2270 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
2271 (la->la_uid == uc->mu_fsuid))
2275 if (flags & FMODE_READ)
2277 if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2279 if (flags & MDS_FMODE_EXEC)
2284 static int mdd_open_sanity_check(const struct lu_env *env,
2285 struct mdd_object *obj, int flag)
2287 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2292 if (mdd_is_dead_obj(obj))
2295 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
2299 if (S_ISLNK(tmp_la->la_mode))
2302 mode = accmode(env, tmp_la, flag);
2304 if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2307 if (!(flag & MDS_OPEN_CREATED)) {
2308 rc = mdd_permission_internal(env, obj, tmp_la, mode);
2313 if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2314 S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2315 flag &= ~MDS_OPEN_TRUNC;
2317 /* For writing append-only file must open it with append mode. */
2318 if (mdd_is_append(obj)) {
2319 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2321 if (flag & MDS_OPEN_TRUNC)
2327 * Now, flag -- O_NOATIME does not be packed by client.
2329 if (flag & O_NOATIME) {
2330 struct md_ucred *uc = md_ucred(env);
2332 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2333 (uc->mu_valid == UCRED_NEW)) &&
2334 (uc->mu_fsuid != tmp_la->la_uid) &&
2335 !mdd_capable(uc, CFS_CAP_FOWNER))
2343 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2346 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2349 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2351 rc = mdd_open_sanity_check(env, mdd_obj, flags);
2353 mdd_obj->mod_count++;
2355 mdd_write_unlock(env, mdd_obj);
2359 int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
2360 struct md_attr *ma, struct thandle *handle)
2364 rc = mdd_declare_unlink_log(env, obj, ma, handle);
2368 return mdo_declare_destroy(env, obj, handle);
2371 /* return md_attr back,
2372 * if it is last unlink then return lov ea + llog cookie*/
2373 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2374 struct md_attr *ma, struct thandle *handle)
2379 if (S_ISREG(mdd_object_type(obj))) {
2380 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2381 * Caller must be ready for that. */
2382 rc = __mdd_lmm_get(env, obj, ma);
2383 if ((ma->ma_valid & MA_LOV))
2384 rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2389 rc = mdo_destroy(env, obj, handle);
2394 static int mdd_declare_close(const struct lu_env *env,
2395 struct mdd_object *obj,
2397 struct thandle *handle)
2401 rc = orph_declare_index_delete(env, obj, handle);
2405 return mdd_declare_object_kill(env, obj, ma, handle);
2409 * No permission check is needed.
2411 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2412 struct md_attr *ma, int mode)
2414 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2415 struct mdd_device *mdd = mdo2mdd(obj);
2416 struct thandle *handle = NULL;
2418 int is_orphan = 0, reset = 1;
2420 #ifdef HAVE_QUOTA_SUPPORT
2421 struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2422 struct mds_obd *mds = &obd->u.mds;
2423 unsigned int qids[MAXQUOTAS] = { 0, 0 };
2428 if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2429 mdd_obj->mod_count--;
2431 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2432 CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2433 "list\n", PFID(mdd_object_fid(mdd_obj)));
2437 /* check without any lock */
2438 if (mdd_obj->mod_count == 1 &&
2439 (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2441 handle = mdd_trans_create(env, mdo2mdd(obj));
2443 RETURN(PTR_ERR(handle));
2445 rc = mdd_declare_close(env, mdd_obj, ma, handle);
2449 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
2453 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2458 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2459 if (handle == NULL && mdd_obj->mod_count == 1 &&
2460 (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2461 mdd_write_unlock(env, mdd_obj);
2465 /* release open count */
2466 mdd_obj->mod_count --;
2468 if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2469 /* remove link to object from orphan index */
2470 LASSERT(handle != NULL);
2471 rc = __mdd_orphan_del(env, mdd_obj, handle);
2473 CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2474 "list, OSS objects to be destroyed.\n",
2475 PFID(mdd_object_fid(mdd_obj)));
2478 CERROR("Object "DFID" can not be deleted from orphan "
2479 "list, maybe cause OST objects can not be "
2480 "destroyed (err: %d).\n",
2481 PFID(mdd_object_fid(mdd_obj)), rc);
2482 /* If object was not deleted from orphan list, do not
2483 * destroy OSS objects, which will be done when next
2489 rc = mdd_iattr_get(env, mdd_obj, ma);
2490 /* Object maybe not in orphan list originally, it is rare case for
2491 * mdd_finish_unlink() failure. */
2492 if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_orphan)) {
2493 #ifdef HAVE_QUOTA_SUPPORT
2494 if (mds->mds_quota) {
2495 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2496 mdd_quota_wrapper(&ma->ma_attr, qids);
2499 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2500 if (ma->ma_valid & MA_FLAGS &&
2501 ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2502 rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2504 if (handle == NULL) {
2505 handle = mdd_trans_create(env, mdo2mdd(obj));
2507 GOTO(out, rc = PTR_ERR(handle));
2509 rc = mdd_declare_object_kill(env, mdd_obj, ma,
2514 rc = mdd_declare_changelog_store(env, mdd,
2519 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2524 rc = mdd_object_kill(env, mdd_obj, ma, handle);
2530 CERROR("Error when prepare to delete Object "DFID" , "
2531 "which will cause OST objects can not be "
2532 "destroyed.\n", PFID(mdd_object_fid(mdd_obj)));
2538 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2540 mdd_write_unlock(env, mdd_obj);
2543 (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
2544 !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
2545 if (handle == NULL) {
2546 handle = mdd_trans_create(env, mdo2mdd(obj));
2548 GOTO(stop, rc = IS_ERR(handle));
2550 rc = mdd_declare_changelog_store(env, mdd, NULL,
2555 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2560 mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
2566 mdd_trans_stop(env, mdd, rc, handle);
2567 #ifdef HAVE_QUOTA_SUPPORT
2569 /* Trigger dqrel on the owner of child. If failed,
2570 * the next call for lquota_chkquota will process it */
2571 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2578 * Permission check is done when open,
2579 * no need check again.
2581 static int mdd_readpage_sanity_check(const struct lu_env *env,
2582 struct mdd_object *obj)
2584 struct dt_object *next = mdd_object_child(obj);
2588 if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2596 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2597 struct lu_dirpage *dp, int nob,
2598 const struct dt_it_ops *iops, struct dt_it *it,
2604 struct lu_dirent *ent;
2605 struct lu_dirent *last = NULL;
2608 memset(area, 0, sizeof (*dp));
2609 area += sizeof (*dp);
2610 nob -= sizeof (*dp);
2617 len = iops->key_size(env, it);
2619 /* IAM iterator can return record with zero len. */
2623 hash = iops->store(env, it);
2624 if (unlikely(first)) {
2626 dp->ldp_hash_start = cpu_to_le64(hash);
2629 /* calculate max space required for lu_dirent */
2630 recsize = lu_dirent_calc_size(len, attr);
2632 if (nob >= recsize) {
2633 result = iops->rec(env, it, (struct dt_rec *)ent, attr);
2634 if (result == -ESTALE)
2639 /* osd might not able to pack all attributes,
2640 * so recheck rec length */
2641 recsize = le16_to_cpu(ent->lde_reclen);
2643 result = (last != NULL) ? 0 :-EINVAL;
2647 ent = (void *)ent + recsize;
2651 result = iops->next(env, it);
2652 if (result == -ESTALE)
2654 } while (result == 0);
2657 dp->ldp_hash_end = cpu_to_le64(hash);
2659 if (last->lde_hash == dp->ldp_hash_end)
2660 dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2661 last->lde_reclen = 0; /* end mark */
2666 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2667 const struct lu_rdpg *rdpg)
2670 struct dt_object *next = mdd_object_child(obj);
2671 const struct dt_it_ops *iops;
2673 struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2679 LASSERT(rdpg->rp_pages != NULL);
2680 LASSERT(next->do_index_ops != NULL);
2682 if (rdpg->rp_count <= 0)
2686 * iterate through directory and fill pages from @rdpg
2688 iops = &next->do_index_ops->dio_it;
2689 it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
2693 rc = iops->load(env, it, rdpg->rp_hash);
2697 * Iterator didn't find record with exactly the key requested.
2699 * It is currently either
2701 * - positioned above record with key less than
2702 * requested---skip it.
2704 * - or not positioned at all (is in IAM_IT_SKEWED
2705 * state)---position it on the next item.
2707 rc = iops->next(env, it);
2712 * At this point and across for-loop:
2714 * rc == 0 -> ok, proceed.
2715 * rc > 0 -> end of directory.
2718 for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2719 i++, nob -= CFS_PAGE_SIZE) {
2720 struct lu_dirpage *dp;
2722 LASSERT(i < rdpg->rp_npages);
2723 pg = rdpg->rp_pages[i];
2725 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2728 rc = mdd_dir_page_build(env, mdd, dp,
2729 min_t(int, nob, LU_PAGE_SIZE),
2730 iops, it, rdpg->rp_attrs);
2735 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2737 } else if (rc < 0) {
2738 CWARN("build page failed: %d!\n", rc);
2741 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2742 dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2743 if ((unsigned long)dp & ~CFS_PAGE_MASK)
2750 struct lu_dirpage *dp;
2752 dp = cfs_kmap(rdpg->rp_pages[0]);
2753 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2756 * No pages were processed, mark this for first page
2759 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2762 cfs_kunmap(rdpg->rp_pages[0]);
2764 rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
2767 iops->fini(env, it);
2772 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2773 const struct lu_rdpg *rdpg)
2775 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2779 if (mdd_object_exists(mdd_obj) == 0) {
2780 CERROR("%s: object "DFID" not found: rc = -2\n",
2781 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2785 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2786 rc = mdd_readpage_sanity_check(env, mdd_obj);
2788 GOTO(out_unlock, rc);
2790 if (mdd_is_dead_obj(mdd_obj)) {
2792 struct lu_dirpage *dp;
2795 * According to POSIX, please do not return any entry to client:
2796 * even dot and dotdot should not be returned.
2798 CDEBUG(D_INODE, "readdir from dead object: "DFID"\n",
2799 PFID(mdd_object_fid(mdd_obj)));
2801 if (rdpg->rp_count <= 0)
2802 GOTO(out_unlock, rc = -EFAULT);
2803 LASSERT(rdpg->rp_pages != NULL);
2805 pg = rdpg->rp_pages[0];
2806 dp = (struct lu_dirpage*)cfs_kmap(pg);
2807 memset(dp, 0 , sizeof(struct lu_dirpage));
2808 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2809 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2810 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2812 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2815 rc = __mdd_readpage(env, mdd_obj, rdpg);
2819 mdd_read_unlock(env, mdd_obj);
2823 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2825 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2827 if (mdd_object_exists(mdd_obj) == 0) {
2828 CERROR("%s: object "DFID" not found: rc = -2\n",
2829 mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2832 return dt_object_sync(env, mdd_object_child(mdd_obj));
2835 const struct md_object_operations mdd_obj_ops = {
2836 .moo_permission = mdd_permission,
2837 .moo_attr_get = mdd_attr_get,
2838 .moo_attr_set = mdd_attr_set,
2839 .moo_xattr_get = mdd_xattr_get,
2840 .moo_xattr_set = mdd_xattr_set,
2841 .moo_xattr_list = mdd_xattr_list,
2842 .moo_xattr_del = mdd_xattr_del,
2843 .moo_object_create = mdd_object_create,
2844 .moo_ref_add = mdd_ref_add,
2845 .moo_ref_del = mdd_ref_del,
2846 .moo_open = mdd_open,
2847 .moo_close = mdd_close,
2848 .moo_readpage = mdd_readpage,
2849 .moo_readlink = mdd_readlink,
2850 .moo_changelog = mdd_changelog,
2851 .moo_capa_get = mdd_capa_get,
2852 .moo_object_sync = mdd_object_sync,
2853 .moo_path = mdd_path,
2854 .moo_file_lock = mdd_file_lock,
2855 .moo_file_unlock = mdd_file_unlock,