1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mdd/mdd_object.c
38 * Lustre Metadata Server (mdd) routines
40 * Author: Wang Di <wangdi@clusterfs.com>
44 # define EXPORT_SYMTAB
46 #define DEBUG_SUBSYSTEM S_MDS
48 #include <linux/module.h>
49 #include <linux/jbd.h>
51 #include <obd_class.h>
52 #include <obd_support.h>
53 #include <lprocfs_status.h>
54 /* fid_be_cpu(), fid_cpu_to_be(). */
55 #include <lustre_fid.h>
57 #include <lustre_param.h>
58 #include <linux/ldiskfs_fs.h>
59 #include <lustre_mds.h>
60 #include <lustre/lustre_idl.h>
62 #include "mdd_internal.h"
64 static const struct lu_object_operations mdd_lu_obj_ops;
66 static int mdd_xattr_get(const struct lu_env *env,
67 struct md_object *obj, struct lu_buf *buf,
70 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
73 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
74 PFID(mdd_object_fid(obj)));
75 mdo_data_get(env, obj, data);
79 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
80 struct lu_attr *la, struct lustre_capa *capa)
82 LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
83 PFID(mdd_object_fid(obj)));
84 return mdo_attr_get(env, obj, la, capa);
87 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
89 obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
91 if (flags & LUSTRE_APPEND_FL)
92 obj->mod_flags |= APPEND_OBJ;
94 if (flags & LUSTRE_IMMUTABLE_FL)
95 obj->mod_flags |= IMMUTE_OBJ;
98 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
100 struct mdd_thread_info *info;
102 info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
103 LASSERT(info != NULL);
107 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
111 buf = &mdd_env_info(env)->mti_buf;
117 void mdd_buf_put(struct lu_buf *buf)
119 if (buf == NULL || buf->lb_buf == NULL)
122 OBD_VFREE(buf->lb_buf, buf->lb_len);
124 OBD_FREE(buf->lb_buf, buf->lb_len);
128 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
129 const void *area, ssize_t len)
133 buf = &mdd_env_info(env)->mti_buf;
134 buf->lb_buf = (void *)area;
139 #define BUF_VMALLOC_SIZE (CFS_PAGE_SIZE<<2) /* 16k */
140 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
142 struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
144 if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
146 OBD_VFREE(buf->lb_buf, buf->lb_len);
148 OBD_FREE(buf->lb_buf, buf->lb_len);
151 if (buf->lb_buf == NULL) {
153 if (buf->lb_len <= BUF_VMALLOC_SIZE) {
154 OBD_ALLOC(buf->lb_buf, buf->lb_len);
157 if (buf->lb_buf == NULL) {
158 OBD_VMALLOC(buf->lb_buf, buf->lb_len);
161 if (buf->lb_buf == NULL)
167 /** Increase the size of the \a mti_big_buf.
168 * preserves old data in buffer
169 * old buffer remains unchanged on error
170 * \retval 0 or -ENOMEM
172 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
174 struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
177 LASSERT(len >= oldbuf->lb_len);
178 if (len > BUF_VMALLOC_SIZE) {
179 OBD_VMALLOC(buf.lb_buf, len);
182 OBD_ALLOC(buf.lb_buf, len);
185 if (buf.lb_buf == NULL)
189 memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
191 if (oldbuf->lb_vmalloc)
192 OBD_VFREE(oldbuf->lb_buf, oldbuf->lb_len);
194 OBD_FREE(oldbuf->lb_buf, oldbuf->lb_len);
196 memcpy(oldbuf, &buf, sizeof(buf));
201 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
202 struct mdd_device *mdd)
204 struct mdd_thread_info *mti = mdd_env_info(env);
207 max_cookie_size = mdd_lov_cookiesize(env, mdd);
208 if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
209 if (mti->mti_max_cookie)
210 OBD_FREE(mti->mti_max_cookie, mti->mti_max_cookie_size);
211 mti->mti_max_cookie = NULL;
212 mti->mti_max_cookie_size = 0;
214 if (unlikely(mti->mti_max_cookie == NULL)) {
215 OBD_ALLOC(mti->mti_max_cookie, max_cookie_size);
216 if (likely(mti->mti_max_cookie != NULL))
217 mti->mti_max_cookie_size = max_cookie_size;
219 if (likely(mti->mti_max_cookie != NULL))
220 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
221 return mti->mti_max_cookie;
224 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
225 struct mdd_device *mdd)
227 struct mdd_thread_info *mti = mdd_env_info(env);
230 max_lmm_size = mdd_lov_mdsize(env, mdd);
231 if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
232 if (mti->mti_max_lmm)
233 OBD_FREE(mti->mti_max_lmm, mti->mti_max_lmm_size);
234 mti->mti_max_lmm = NULL;
235 mti->mti_max_lmm_size = 0;
237 if (unlikely(mti->mti_max_lmm == NULL)) {
238 OBD_ALLOC(mti->mti_max_lmm, max_lmm_size);
239 if (unlikely(mti->mti_max_lmm != NULL))
240 mti->mti_max_lmm_size = max_lmm_size;
242 return mti->mti_max_lmm;
245 struct lu_object *mdd_object_alloc(const struct lu_env *env,
246 const struct lu_object_header *hdr,
249 struct mdd_object *mdd_obj;
251 OBD_ALLOC_PTR(mdd_obj);
252 if (mdd_obj != NULL) {
255 o = mdd2lu_obj(mdd_obj);
256 lu_object_init(o, NULL, d);
257 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
258 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
259 mdd_obj->mod_count = 0;
260 o->lo_ops = &mdd_lu_obj_ops;
267 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
268 const struct lu_object_conf *unused)
270 struct mdd_device *d = lu2mdd_dev(o->lo_dev);
271 struct mdd_object *mdd_obj = lu2mdd_obj(o);
272 struct lu_object *below;
273 struct lu_device *under;
276 mdd_obj->mod_cltime = 0;
277 under = &d->mdd_child->dd_lu_dev;
278 below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
279 mdd_pdlock_init(mdd_obj);
283 lu_object_add(o, below);
288 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
290 if (lu_object_exists(o))
291 return mdd_get_flags(env, lu2mdd_obj(o));
296 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
298 struct mdd_object *mdd = lu2mdd_obj(o);
304 static int mdd_object_print(const struct lu_env *env, void *cookie,
305 lu_printer_t p, const struct lu_object *o)
307 struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
308 return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
309 "valid=%x, cltime=%llu, flags=%lx)",
310 mdd, mdd->mod_count, mdd->mod_valid,
311 mdd->mod_cltime, mdd->mod_flags);
314 static const struct lu_object_operations mdd_lu_obj_ops = {
315 .loo_object_init = mdd_object_init,
316 .loo_object_start = mdd_object_start,
317 .loo_object_free = mdd_object_free,
318 .loo_object_print = mdd_object_print,
321 struct mdd_object *mdd_object_find(const struct lu_env *env,
322 struct mdd_device *d,
323 const struct lu_fid *f)
325 return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
328 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
329 const char *path, struct lu_fid *fid)
332 struct lu_fid *f = &mdd_env_info(env)->mti_fid;
333 struct mdd_object *obj;
334 struct lu_name *lname = &mdd_env_info(env)->mti_name;
339 /* temp buffer for path element */
340 buf = mdd_buf_alloc(env, PATH_MAX);
341 if (buf->lb_buf == NULL)
344 lname->ln_name = name = buf->lb_buf;
345 lname->ln_namelen = 0;
346 *f = mdd->mdd_root_fid;
353 while (*path != '/' && *path != '\0') {
361 /* find obj corresponding to fid */
362 obj = mdd_object_find(env, mdd, f);
364 GOTO(out, rc = -EREMOTE);
366 GOTO(out, rc = -PTR_ERR(obj));
367 /* get child fid from parent and name */
368 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
369 mdd_object_put(env, obj);
374 lname->ln_namelen = 0;
383 /** The maximum depth that fid2path() will search.
384 * This is limited only because we want to store the fids for
385 * historical path lookup purposes.
387 #define MAX_PATH_DEPTH 100
389 /** mdd_path() lookup structure. */
390 struct path_lookup_info {
391 __u64 pli_recno; /**< history point */
392 __u64 pli_currec; /**< current record */
393 struct lu_fid pli_fid;
394 struct lu_fid pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
395 struct mdd_object *pli_mdd_obj;
396 char *pli_path; /**< full path */
398 int pli_linkno; /**< which hardlink to follow */
399 int pli_fidcount; /**< number of \a pli_fids */
402 static int mdd_path_current(const struct lu_env *env,
403 struct path_lookup_info *pli)
405 struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
406 struct mdd_object *mdd_obj;
407 struct lu_buf *buf = NULL;
408 struct link_ea_header *leh;
409 struct link_ea_entry *lee;
410 struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
411 struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid;
417 ptr = pli->pli_path + pli->pli_pathlen - 1;
420 pli->pli_fidcount = 0;
421 pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
423 while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
424 mdd_obj = mdd_object_find(env, mdd,
425 &pli->pli_fids[pli->pli_fidcount]);
427 GOTO(out, rc = -EREMOTE);
429 GOTO(out, rc = -PTR_ERR(mdd_obj));
430 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
432 mdd_object_put(env, mdd_obj);
436 /* Do I need to error out here? */
441 /* Get parent fid and object name */
442 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
443 buf = mdd_links_get(env, mdd_obj);
444 mdd_read_unlock(env, mdd_obj);
445 mdd_object_put(env, mdd_obj);
447 GOTO(out, rc = PTR_ERR(buf));
450 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
451 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
453 /* If set, use link #linkno for path lookup, otherwise use
454 link #0. Only do this for the final path element. */
455 if ((pli->pli_fidcount == 0) &&
456 (pli->pli_linkno < leh->leh_reccount)) {
458 for (count = 0; count < pli->pli_linkno; count++) {
459 lee = (struct link_ea_entry *)
460 ((char *)lee + reclen);
461 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
463 if (pli->pli_linkno < leh->leh_reccount - 1)
464 /* indicate to user there are more links */
468 /* Pack the name in the end of the buffer */
469 ptr -= tmpname->ln_namelen;
470 if (ptr - 1 <= pli->pli_path)
471 GOTO(out, rc = -EOVERFLOW);
472 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
475 /* Store the parent fid for historic lookup */
476 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
477 GOTO(out, rc = -EOVERFLOW);
478 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
481 /* Verify that our path hasn't changed since we started the lookup.
482 Record the current index, and verify the path resolves to the
483 same fid. If it does, then the path is correct as of this index. */
484 spin_lock(&mdd->mdd_cl.mc_lock);
485 pli->pli_currec = mdd->mdd_cl.mc_index;
486 spin_unlock(&mdd->mdd_cl.mc_lock);
487 rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
489 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
490 GOTO (out, rc = -EAGAIN);
492 if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
493 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
494 " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
495 PFID(&pli->pli_fid));
496 GOTO(out, rc = -EAGAIN);
499 memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
503 if (buf && !IS_ERR(buf) && buf->lb_vmalloc)
504 /* if we vmalloced a large buffer drop it */
510 static int mdd_path_historic(const struct lu_env *env,
511 struct path_lookup_info *pli)
516 /* Returns the full path to this fid, as of changelog record recno. */
517 static int mdd_path(const struct lu_env *env, struct md_object *obj,
518 char *path, int pathlen, __u64 *recno, int *linkno)
520 struct path_lookup_info *pli;
528 if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
538 pli->pli_mdd_obj = md2mdd_obj(obj);
539 pli->pli_recno = *recno;
540 pli->pli_path = path;
541 pli->pli_pathlen = pathlen;
542 pli->pli_linkno = *linkno;
544 /* Retry multiple times in case file is being moved */
545 while (tries-- && rc == -EAGAIN)
546 rc = mdd_path_current(env, pli);
548 /* For historical path lookup, the current links may not have existed
549 * at "recno" time. We must switch over to earlier links/parents
550 * by using the changelog records. If the earlier parent doesn't
551 * exist, we must search back through the changelog to reconstruct
552 * its parents, then check if it exists, etc.
553 * We may ignore this problem for the initial implementation and
554 * state that an "original" hardlink must still exist for us to find
555 * historic path name. */
556 if (pli->pli_recno != -1) {
557 rc = mdd_path_historic(env, pli);
559 *recno = pli->pli_currec;
560 /* Return next link index to caller */
561 *linkno = pli->pli_linkno;
569 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
571 struct lu_attr *la = &mdd_env_info(env)->mti_la;
575 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
577 mdd_flags_xlate(obj, la->la_flags);
578 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
579 obj->mod_flags |= MNLINK_OBJ;
584 /* get only inode attributes */
585 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
591 if (ma->ma_valid & MA_INODE)
594 rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
595 mdd_object_capa(env, mdd_obj));
597 ma->ma_valid |= MA_INODE;
601 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm,
604 struct lov_desc *ldesc;
605 struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
608 ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
609 LASSERT(ldesc != NULL);
614 lmm->lmm_magic = LOV_MAGIC_V1;
615 lmm->lmm_object_gr = LOV_OBJECT_GROUP_DEFAULT;
616 lmm->lmm_pattern = ldesc->ld_pattern;
617 lmm->lmm_stripe_size = ldesc->ld_default_stripe_size;
618 lmm->lmm_stripe_count = ldesc->ld_default_stripe_count;
619 *size = sizeof(struct lov_mds_md);
621 RETURN(sizeof(struct lov_mds_md));
624 /* get lov EA only */
625 static int __mdd_lmm_get(const struct lu_env *env,
626 struct mdd_object *mdd_obj, struct md_attr *ma)
631 if (ma->ma_valid & MA_LOV)
634 rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
637 if (rc == 0 && (ma->ma_need & MA_LOV_DEF)) {
638 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm,
643 ma->ma_valid |= MA_LOV;
649 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
655 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
656 rc = __mdd_lmm_get(env, mdd_obj, ma);
657 mdd_read_unlock(env, mdd_obj);
662 static int __mdd_lmv_get(const struct lu_env *env,
663 struct mdd_object *mdd_obj, struct md_attr *ma)
668 if (ma->ma_valid & MA_LMV)
671 rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
674 ma->ma_valid |= MA_LMV;
680 static int mdd_attr_get_internal(const struct lu_env *env,
681 struct mdd_object *mdd_obj,
687 if (ma->ma_need & MA_INODE)
688 rc = mdd_iattr_get(env, mdd_obj, ma);
690 if (rc == 0 && ma->ma_need & MA_LOV) {
691 if (S_ISREG(mdd_object_type(mdd_obj)) ||
692 S_ISDIR(mdd_object_type(mdd_obj)))
693 rc = __mdd_lmm_get(env, mdd_obj, ma);
695 if (rc == 0 && ma->ma_need & MA_LMV) {
696 if (S_ISDIR(mdd_object_type(mdd_obj)))
697 rc = __mdd_lmv_get(env, mdd_obj, ma);
699 #ifdef CONFIG_FS_POSIX_ACL
700 if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
701 if (S_ISDIR(mdd_object_type(mdd_obj)))
702 rc = mdd_def_acl_get(env, mdd_obj, ma);
705 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
710 int mdd_attr_get_internal_locked(const struct lu_env *env,
711 struct mdd_object *mdd_obj, struct md_attr *ma)
714 int needlock = ma->ma_need & (MA_LOV | MA_LMV | MA_ACL_DEF);
717 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
718 rc = mdd_attr_get_internal(env, mdd_obj, ma);
720 mdd_read_unlock(env, mdd_obj);
725 * No permission check is needed.
727 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
730 struct mdd_object *mdd_obj = md2mdd_obj(obj);
734 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
739 * No permission check is needed.
741 static int mdd_xattr_get(const struct lu_env *env,
742 struct md_object *obj, struct lu_buf *buf,
745 struct mdd_object *mdd_obj = md2mdd_obj(obj);
750 LASSERT(mdd_object_exists(mdd_obj));
752 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
753 rc = mdo_xattr_get(env, mdd_obj, buf, name,
754 mdd_object_capa(env, mdd_obj));
755 mdd_read_unlock(env, mdd_obj);
761 * Permission check is done when open,
762 * no need check again.
764 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
767 struct mdd_object *mdd_obj = md2mdd_obj(obj);
768 struct dt_object *next;
773 LASSERT(mdd_object_exists(mdd_obj));
775 next = mdd_object_child(mdd_obj);
776 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
777 rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
778 mdd_object_capa(env, mdd_obj));
779 mdd_read_unlock(env, mdd_obj);
784 * No permission check is needed.
786 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
789 struct mdd_object *mdd_obj = md2mdd_obj(obj);
794 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
795 rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
796 mdd_read_unlock(env, mdd_obj);
801 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
802 struct mdd_object *c, struct md_attr *ma,
803 struct thandle *handle,
804 const struct md_op_spec *spec)
806 struct lu_attr *attr = &ma->ma_attr;
807 struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
808 struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
809 const struct dt_index_features *feat = spec->sp_feat;
813 if (!mdd_object_exists(c)) {
814 struct dt_object *next = mdd_object_child(c);
817 if (feat != &dt_directory_features && feat != NULL)
818 dof->dof_type = DFT_INDEX;
820 dof->dof_type = dt_mode_to_dft(attr->la_mode);
822 dof->u.dof_idx.di_feat = feat;
824 /* @hint will be initialized by underlying device. */
825 next->do_ops->do_ah_init(env, hint,
826 p ? mdd_object_child(p) : NULL,
827 attr->la_mode & S_IFMT);
829 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
830 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
838 * Make sure the ctime is increased only.
840 static inline int mdd_attr_check(const struct lu_env *env,
841 struct mdd_object *obj,
842 struct lu_attr *attr)
844 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
848 if (attr->la_valid & LA_CTIME) {
849 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
853 if (attr->la_ctime < tmp_la->la_ctime)
854 attr->la_valid &= ~(LA_MTIME | LA_CTIME);
855 else if (attr->la_valid == LA_CTIME &&
856 attr->la_ctime == tmp_la->la_ctime)
857 attr->la_valid &= ~LA_CTIME;
862 int mdd_attr_set_internal(const struct lu_env *env,
863 struct mdd_object *obj,
864 struct lu_attr *attr,
865 struct thandle *handle,
871 rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
872 #ifdef CONFIG_FS_POSIX_ACL
873 if (!rc && (attr->la_valid & LA_MODE) && needacl)
874 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
879 int mdd_attr_check_set_internal(const struct lu_env *env,
880 struct mdd_object *obj,
881 struct lu_attr *attr,
882 struct thandle *handle,
888 rc = mdd_attr_check(env, obj, attr);
893 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
897 static int mdd_attr_set_internal_locked(const struct lu_env *env,
898 struct mdd_object *obj,
899 struct lu_attr *attr,
900 struct thandle *handle,
906 needacl = needacl && (attr->la_valid & LA_MODE);
908 mdd_write_lock(env, obj, MOR_TGT_CHILD);
909 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
911 mdd_write_unlock(env, obj);
915 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
916 struct mdd_object *obj,
917 struct lu_attr *attr,
918 struct thandle *handle,
924 needacl = needacl && (attr->la_valid & LA_MODE);
926 mdd_write_lock(env, obj, MOR_TGT_CHILD);
927 rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
929 mdd_write_unlock(env, obj);
933 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
934 const struct lu_buf *buf, const char *name,
935 int fl, struct thandle *handle)
937 struct lustre_capa *capa = mdd_object_capa(env, obj);
941 if (buf->lb_buf && buf->lb_len > 0)
942 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
943 else if (buf->lb_buf == NULL && buf->lb_len == 0)
944 rc = mdo_xattr_del(env, obj, name, handle, capa);
950 * This gives the same functionality as the code between
951 * sys_chmod and inode_setattr
952 * chown_common and inode_setattr
953 * utimes and inode_setattr
954 * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
956 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
957 struct lu_attr *la, const struct md_attr *ma)
959 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
960 struct md_ucred *uc = md_ucred(env);
967 /* Do not permit change file type */
968 if (la->la_valid & LA_TYPE)
971 /* They should not be processed by setattr */
972 if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
975 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
979 if (la->la_valid == LA_CTIME) {
980 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
981 /* This is only for set ctime when rename's source is
983 rc = mdd_may_delete(env, NULL, obj,
984 (struct md_attr *)ma, 1, 0);
985 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
986 la->la_valid &= ~LA_CTIME;
990 if (la->la_valid == LA_ATIME) {
991 /* This is atime only set for read atime update on close. */
992 if (la->la_atime <= tmp_la->la_atime +
993 mdd_obj2mdd_dev(obj)->mdd_atime_diff)
994 la->la_valid &= ~LA_ATIME;
998 /* Check if flags change. */
999 if (la->la_valid & LA_FLAGS) {
1000 unsigned int oldflags = 0;
1001 unsigned int newflags = la->la_flags &
1002 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1004 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1005 !mdd_capable(uc, CFS_CAP_FOWNER))
1008 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1009 * only be changed by the relevant capability. */
1010 if (mdd_is_immutable(obj))
1011 oldflags |= LUSTRE_IMMUTABLE_FL;
1012 if (mdd_is_append(obj))
1013 oldflags |= LUSTRE_APPEND_FL;
1014 if ((oldflags ^ newflags) &&
1015 !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1018 if (!S_ISDIR(tmp_la->la_mode))
1019 la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1022 if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1023 (la->la_valid & ~LA_FLAGS) &&
1024 !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1027 /* Check for setting the obj time. */
1028 if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1029 !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1030 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1031 !mdd_capable(uc, CFS_CAP_FOWNER)) {
1032 rc = mdd_permission_internal_locked(env, obj, tmp_la,
1040 /* Make sure a caller can chmod. */
1041 if (la->la_valid & LA_MODE) {
1042 /* Bypass la_vaild == LA_MODE,
1043 * this is for changing file with SUID or SGID. */
1044 if ((la->la_valid & ~LA_MODE) &&
1045 !(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1046 (uc->mu_fsuid != tmp_la->la_uid) &&
1047 !mdd_capable(uc, CFS_CAP_FOWNER))
1050 if (la->la_mode == (umode_t) -1)
1051 la->la_mode = tmp_la->la_mode;
1053 la->la_mode = (la->la_mode & S_IALLUGO) |
1054 (tmp_la->la_mode & ~S_IALLUGO);
1056 /* Also check the setgid bit! */
1057 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1058 la->la_gid : tmp_la->la_gid) &&
1059 !mdd_capable(uc, CFS_CAP_FSETID))
1060 la->la_mode &= ~S_ISGID;
1062 la->la_mode = tmp_la->la_mode;
1065 /* Make sure a caller can chown. */
1066 if (la->la_valid & LA_UID) {
1067 if (la->la_uid == (uid_t) -1)
1068 la->la_uid = tmp_la->la_uid;
1069 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1070 (la->la_uid != tmp_la->la_uid)) &&
1071 !mdd_capable(uc, CFS_CAP_CHOWN))
1074 /* If the user or group of a non-directory has been
1075 * changed by a non-root user, remove the setuid bit.
1076 * 19981026 David C Niemi <niemi@tux.org>
1078 * Changed this to apply to all users, including root,
1079 * to avoid some races. This is the behavior we had in
1080 * 2.0. The check for non-root was definitely wrong
1081 * for 2.2 anyway, as it should have been using
1082 * CAP_FSETID rather than fsuid -- 19990830 SD. */
1083 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1084 !S_ISDIR(tmp_la->la_mode)) {
1085 la->la_mode &= ~S_ISUID;
1086 la->la_valid |= LA_MODE;
1090 /* Make sure caller can chgrp. */
1091 if (la->la_valid & LA_GID) {
1092 if (la->la_gid == (gid_t) -1)
1093 la->la_gid = tmp_la->la_gid;
1094 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1095 ((la->la_gid != tmp_la->la_gid) &&
1096 !lustre_in_group_p(uc, la->la_gid))) &&
1097 !mdd_capable(uc, CFS_CAP_CHOWN))
1100 /* Likewise, if the user or group of a non-directory
1101 * has been changed by a non-root user, remove the
1102 * setgid bit UNLESS there is no group execute bit
1103 * (this would be a file marked for mandatory
1104 * locking). 19981026 David C Niemi <niemi@tux.org>
1106 * Removed the fsuid check (see the comment above) --
1108 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1109 (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1110 la->la_mode &= ~S_ISGID;
1111 la->la_valid |= LA_MODE;
1115 /* For both Size-on-MDS case and truncate case,
1116 * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1117 * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1118 * For SOM case, it is true, the MAY_WRITE perm has been checked
1119 * when open, no need check again. For truncate case, it is false,
1120 * the MAY_WRITE perm should be checked here. */
1121 if (ma->ma_attr_flags & MDS_SOM) {
1122 /* For the "Size-on-MDS" setattr update, merge coming
1123 * attributes with the set in the inode. BUG 10641 */
1124 if ((la->la_valid & LA_ATIME) &&
1125 (la->la_atime <= tmp_la->la_atime))
1126 la->la_valid &= ~LA_ATIME;
1128 /* OST attributes do not have a priority over MDS attributes,
1129 * so drop times if ctime is equal. */
1130 if ((la->la_valid & LA_CTIME) &&
1131 (la->la_ctime <= tmp_la->la_ctime))
1132 la->la_valid &= ~(LA_MTIME | LA_CTIME);
1134 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1135 if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1136 (uc->mu_fsuid == tmp_la->la_uid)) &&
1137 !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1138 rc = mdd_permission_internal_locked(env, obj,
1145 if (la->la_valid & LA_CTIME) {
1146 /* The pure setattr, it has the priority over what is
1147 * already set, do not drop it if ctime is equal. */
1148 if (la->la_ctime < tmp_la->la_ctime)
1149 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1157 /** Store a data change changelog record
1158 * If this fails, we must fail the whole transaction; we don't
1159 * want the change to commit without the log entry.
1160 * \param mdd_obj - mdd_object of change
1161 * \param handle - transacion handle
1163 static int mdd_changelog_data_store(const struct lu_env *env,
1164 struct mdd_device *mdd,
1165 enum changelog_rec_type type,
1166 struct mdd_object *mdd_obj,
1167 struct thandle *handle)
1169 const struct lu_fid *tfid = mdo2fid(mdd_obj);
1170 struct llog_changelog_rec *rec;
1175 if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1178 LASSERT(handle != NULL);
1179 LASSERT(mdd_obj != NULL);
1181 if ((type == CL_SETATTR) &&
1182 cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1183 /* Don't need multiple updates in this log */
1184 /* Don't check under lock - no big deal if we get an extra
1189 reclen = llog_data_len(sizeof(*rec));
1190 buf = mdd_buf_alloc(env, reclen);
1191 if (buf->lb_buf == NULL)
1193 rec = (struct llog_changelog_rec *)buf->lb_buf;
1195 rec->cr_flags = CLF_VERSION;
1196 rec->cr_type = (__u32)type;
1197 rec->cr_tfid = *tfid;
1198 rec->cr_namelen = 0;
1199 mdd_obj->mod_cltime = cfs_time_current_64();
1201 rc = mdd_changelog_llog_write(mdd, rec, handle);
1203 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1204 rc, type, PFID(tfid));
1211 /* set attr and LOV EA at once, return updated attr */
1212 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1213 const struct md_attr *ma)
1215 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1216 struct mdd_device *mdd = mdo2mdd(obj);
1217 struct thandle *handle;
1218 struct lov_mds_md *lmm = NULL;
1219 struct llog_cookie *logcookies = NULL;
1220 int rc, lmm_size = 0, cookie_size = 0;
1221 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1222 #ifdef HAVE_QUOTA_SUPPORT
1223 struct obd_device *obd = mdd->mdd_obd_dev;
1224 struct mds_obd *mds = &obd->u.mds;
1225 unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1226 unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1227 int quota_opc = 0, block_count = 0;
1228 int inode_pending[MAXQUOTAS] = { 0, 0 };
1229 int block_pending[MAXQUOTAS] = { 0, 0 };
1233 mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1234 MDD_TXN_ATTR_SET_OP);
1235 handle = mdd_trans_start(env, mdd);
1237 RETURN(PTR_ERR(handle));
1238 /*TODO: add lock here*/
1239 /* start a log jounal handle if needed */
1240 if (S_ISREG(mdd_object_type(mdd_obj)) &&
1241 ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1242 lmm_size = mdd_lov_mdsize(env, mdd);
1243 lmm = mdd_max_lmm_get(env, mdd);
1245 GOTO(cleanup, rc = -ENOMEM);
1247 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1254 if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1255 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1256 ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1258 *la_copy = ma->ma_attr;
1259 rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1263 #ifdef HAVE_QUOTA_SUPPORT
1264 if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1265 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1267 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1269 quota_opc = FSFILT_OP_SETATTR;
1270 mdd_quota_wrapper(la_copy, qnids);
1271 mdd_quota_wrapper(la_tmp, qoids);
1272 /* get file quota for new owner */
1273 lquota_chkquota(mds_quota_interface_ref, obd, qnids,
1274 inode_pending, 1, NULL, 0, NULL, 0);
1275 block_count = (la_tmp->la_blocks + 7) >> 3;
1278 mdd_data_get(env, mdd_obj, &data);
1279 /* get block quota for new owner */
1280 lquota_chkquota(mds_quota_interface_ref, obd,
1281 qnids, block_pending,
1283 LQUOTA_FLAGS_BLK, data, 1);
1289 if (la_copy->la_valid & LA_FLAGS) {
1290 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1293 mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1294 } else if (la_copy->la_valid) { /* setattr */
1295 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1297 /* journal chown/chgrp in llog, just like unlink */
1298 if (rc == 0 && lmm_size){
1299 cookie_size = mdd_lov_cookiesize(env, mdd);
1300 logcookies = mdd_max_cookie_get(env, mdd);
1301 if (logcookies == NULL)
1302 GOTO(cleanup, rc = -ENOMEM);
1304 if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1305 logcookies, cookie_size) <= 0)
1310 if (rc == 0 && ma->ma_valid & MA_LOV) {
1313 mode = mdd_object_type(mdd_obj);
1314 if (S_ISREG(mode) || S_ISDIR(mode)) {
1315 rc = mdd_lsm_sanity_check(env, mdd_obj);
1319 rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1320 ma->ma_lmm_size, handle, 1);
1325 if ((rc == 0) && (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME)))
1326 rc = mdd_changelog_data_store(env, mdd, CL_SETATTR, mdd_obj,
1328 mdd_trans_stop(env, mdd, rc, handle);
1329 if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1330 /*set obd attr, if needed*/
1331 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1334 #ifdef HAVE_QUOTA_SUPPORT
1336 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1338 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1340 /* Trigger dqrel/dqacq for original owner and new owner.
1341 * If failed, the next call for lquota_chkquota will
1343 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1350 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1351 const struct lu_buf *buf, const char *name, int fl,
1352 struct thandle *handle)
1357 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1358 rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1359 mdd_write_unlock(env, obj);
1364 static int mdd_xattr_sanity_check(const struct lu_env *env,
1365 struct mdd_object *obj)
1367 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1368 struct md_ucred *uc = md_ucred(env);
1372 if (mdd_is_immutable(obj) || mdd_is_append(obj))
1375 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1379 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1380 !mdd_capable(uc, CFS_CAP_FOWNER))
1387 * The caller should guarantee to update the object ctime
1388 * after xattr_set if needed.
1390 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1391 const struct lu_buf *buf, const char *name,
1394 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1395 struct mdd_device *mdd = mdo2mdd(obj);
1396 struct thandle *handle;
1400 rc = mdd_xattr_sanity_check(env, mdd_obj);
1404 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1405 handle = mdd_trans_start(env, mdd);
1407 RETURN(PTR_ERR(handle));
1409 rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1411 /* Only record user xattr changes */
1412 if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1413 (strncmp("user.", name, 5) == 0))
1414 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1416 mdd_trans_stop(env, mdd, rc, handle);
1422 * The caller should guarantee to update the object ctime
1423 * after xattr_set if needed.
1425 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1428 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1429 struct mdd_device *mdd = mdo2mdd(obj);
1430 struct thandle *handle;
1434 rc = mdd_xattr_sanity_check(env, mdd_obj);
1438 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1439 handle = mdd_trans_start(env, mdd);
1441 RETURN(PTR_ERR(handle));
1443 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1444 rc = mdo_xattr_del(env, mdd_obj, name, handle,
1445 mdd_object_capa(env, mdd_obj));
1446 mdd_write_unlock(env, mdd_obj);
1448 /* Only record user xattr changes */
1449 if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1450 (strncmp("user.", name, 5) != 0))
1451 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1454 mdd_trans_stop(env, mdd, rc, handle);
1459 /* partial unlink */
1460 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1463 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1464 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1465 struct mdd_device *mdd = mdo2mdd(obj);
1466 struct thandle *handle;
1467 #ifdef HAVE_QUOTA_SUPPORT
1468 struct obd_device *obd = mdd->mdd_obd_dev;
1469 struct mds_obd *mds = &obd->u.mds;
1470 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1477 * Check -ENOENT early here because we need to get object type
1478 * to calculate credits before transaction start
1480 if (!mdd_object_exists(mdd_obj))
1483 LASSERT(mdd_object_exists(mdd_obj) > 0);
1485 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1489 handle = mdd_trans_start(env, mdd);
1493 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1495 rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1499 __mdd_ref_del(env, mdd_obj, handle, 0);
1501 if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1503 __mdd_ref_del(env, mdd_obj, handle, 1);
1506 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1507 la_copy->la_ctime = ma->ma_attr.la_ctime;
1509 la_copy->la_valid = LA_CTIME;
1510 rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1514 rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1515 #ifdef HAVE_QUOTA_SUPPORT
1516 if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1517 ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1518 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1519 mdd_quota_wrapper(&ma->ma_attr, qids);
1526 mdd_write_unlock(env, mdd_obj);
1527 mdd_trans_stop(env, mdd, rc, handle);
1528 #ifdef HAVE_QUOTA_SUPPORT
1530 /* Trigger dqrel on the owner of child. If failed,
1531 * the next call for lquota_chkquota will process it */
1532 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1538 /* partial operation */
1539 static int mdd_oc_sanity_check(const struct lu_env *env,
1540 struct mdd_object *obj,
1546 switch (ma->ma_attr.la_mode & S_IFMT) {
1563 static int mdd_object_create(const struct lu_env *env,
1564 struct md_object *obj,
1565 const struct md_op_spec *spec,
1569 struct mdd_device *mdd = mdo2mdd(obj);
1570 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1571 const struct lu_fid *pfid = spec->u.sp_pfid;
1572 struct thandle *handle;
1573 #ifdef HAVE_QUOTA_SUPPORT
1574 struct obd_device *obd = mdd->mdd_obd_dev;
1575 struct mds_obd *mds = &obd->u.mds;
1576 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1577 int quota_opc = 0, block_count = 0;
1578 int inode_pending[MAXQUOTAS] = { 0, 0 };
1579 int block_pending[MAXQUOTAS] = { 0, 0 };
1584 #ifdef HAVE_QUOTA_SUPPORT
1585 if (mds->mds_quota) {
1586 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1587 mdd_quota_wrapper(&ma->ma_attr, qids);
1588 /* get file quota for child */
1589 lquota_chkquota(mds_quota_interface_ref, obd, qids,
1590 inode_pending, 1, NULL, 0, NULL, 0);
1591 switch (ma->ma_attr.la_mode & S_IFMT) {
1600 /* get block quota for child */
1602 lquota_chkquota(mds_quota_interface_ref, obd, qids,
1603 block_pending, block_count, NULL,
1604 LQUOTA_FLAGS_BLK, NULL, 0);
1608 mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
1609 handle = mdd_trans_start(env, mdd);
1611 GOTO(out_pending, rc = PTR_ERR(handle));
1613 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1614 rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1618 rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1622 if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1623 /* If creating the slave object, set slave EA here. */
1624 int lmv_size = spec->u.sp_ea.eadatalen;
1625 struct lmv_stripe_md *lmv;
1627 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1628 LASSERT(lmv != NULL && lmv_size > 0);
1630 rc = __mdd_xattr_set(env, mdd_obj,
1631 mdd_buf_get_const(env, lmv, lmv_size),
1632 XATTR_NAME_LMV, 0, handle);
1636 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1639 #ifdef CONFIG_FS_POSIX_ACL
1640 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1641 struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1643 buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1644 buf->lb_len = spec->u.sp_ea.eadatalen;
1645 if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1646 rc = __mdd_acl_init(env, mdd_obj, buf,
1647 &ma->ma_attr.la_mode,
1652 ma->ma_attr.la_valid |= LA_MODE;
1655 pfid = spec->u.sp_ea.fid;
1658 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1664 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1665 mdd_write_unlock(env, mdd_obj);
1667 mdd_trans_stop(env, mdd, rc, handle);
1669 #ifdef HAVE_QUOTA_SUPPORT
1671 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1673 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1675 /* Trigger dqacq on the owner of child. If failed,
1676 * the next call for lquota_chkquota will process it. */
1677 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1685 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1686 const struct md_attr *ma)
1688 struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1689 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1690 struct mdd_device *mdd = mdo2mdd(obj);
1691 struct thandle *handle;
1695 mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1696 handle = mdd_trans_start(env, mdd);
1700 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1701 rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1703 __mdd_ref_add(env, mdd_obj, handle);
1704 mdd_write_unlock(env, mdd_obj);
1706 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1707 la_copy->la_ctime = ma->ma_attr.la_ctime;
1709 la_copy->la_valid = LA_CTIME;
1710 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1713 mdd_trans_stop(env, mdd, 0, handle);
1719 * do NOT or the MAY_*'s, you'll get the weakest
1721 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1725 /* Sadly, NFSD reopens a file repeatedly during operation, so the
1726 * "acc_mode = 0" allowance for newly-created files isn't honoured.
1727 * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1728 * owner can write to a file even if it is marked readonly to hide
1729 * its brokenness. (bug 5781) */
1730 if (flags & MDS_OPEN_OWNEROVERRIDE) {
1731 struct md_ucred *uc = md_ucred(env);
1733 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1734 (la->la_uid == uc->mu_fsuid))
1738 if (flags & FMODE_READ)
1740 if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1742 if (flags & MDS_FMODE_EXEC)
1747 static int mdd_open_sanity_check(const struct lu_env *env,
1748 struct mdd_object *obj, int flag)
1750 struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1755 if (mdd_is_dead_obj(obj))
1758 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1762 if (S_ISLNK(tmp_la->la_mode))
1765 mode = accmode(env, tmp_la, flag);
1767 if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1770 if (!(flag & MDS_OPEN_CREATED)) {
1771 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1776 if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1777 S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1778 flag &= ~MDS_OPEN_TRUNC;
1780 /* For writing append-only file must open it with append mode. */
1781 if (mdd_is_append(obj)) {
1782 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
1784 if (flag & MDS_OPEN_TRUNC)
1790 * Now, flag -- O_NOATIME does not be packed by client.
1792 if (flag & O_NOATIME) {
1793 struct md_ucred *uc = md_ucred(env);
1795 if (uc && ((uc->mu_valid == UCRED_OLD) ||
1796 (uc->mu_valid == UCRED_NEW)) &&
1797 (uc->mu_fsuid != tmp_la->la_uid) &&
1798 !mdd_capable(uc, CFS_CAP_FOWNER))
1806 static int mdd_open(const struct lu_env *env, struct md_object *obj,
1809 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1812 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1814 rc = mdd_open_sanity_check(env, mdd_obj, flags);
1816 mdd_obj->mod_count++;
1818 mdd_write_unlock(env, mdd_obj);
1822 /* return md_attr back,
1823 * if it is last unlink then return lov ea + llog cookie*/
1824 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
1830 if (S_ISREG(mdd_object_type(obj))) {
1831 /* Return LOV & COOKIES unconditionally here. We clean evth up.
1832 * Caller must be ready for that. */
1834 rc = __mdd_lmm_get(env, obj, ma);
1835 if ((ma->ma_valid & MA_LOV))
1836 rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
1843 * No permission check is needed.
1845 static int mdd_close(const struct lu_env *env, struct md_object *obj,
1848 struct mdd_object *mdd_obj = md2mdd_obj(obj);
1849 struct mdd_device *mdd = mdo2mdd(obj);
1850 struct thandle *handle;
1854 #ifdef HAVE_QUOTA_SUPPORT
1855 struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
1856 struct mds_obd *mds = &obd->u.mds;
1857 unsigned int qids[MAXQUOTAS] = { 0, 0 };
1862 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1865 handle = mdd_trans_start(env, mdo2mdd(obj));
1867 RETURN(PTR_ERR(handle));
1869 mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1870 /* release open count */
1871 mdd_obj->mod_count --;
1873 if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
1874 /* remove link to object from orphan index */
1875 rc = __mdd_orphan_del(env, mdd_obj, handle);
1877 CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
1878 "list, OSS objects to be destroyed.\n",
1879 PFID(mdd_object_fid(mdd_obj)));
1881 CERROR("Object "DFID" can not be deleted from orphan "
1882 "list, maybe cause OST objects can not be "
1883 "destroyed (err: %d).\n",
1884 PFID(mdd_object_fid(mdd_obj)), rc);
1885 /* If object was not deleted from orphan list, do not
1886 * destroy OSS objects, which will be done when next
1892 rc = mdd_iattr_get(env, mdd_obj, ma);
1893 /* Object maybe not in orphan list originally, it is rare case for
1894 * mdd_finish_unlink() failure. */
1895 if (rc == 0 && ma->ma_attr.la_nlink == 0) {
1896 #ifdef HAVE_QUOTA_SUPPORT
1897 if (mds->mds_quota) {
1898 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1899 mdd_quota_wrapper(&ma->ma_attr, qids);
1902 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
1903 if (ma->ma_valid & MA_FLAGS &&
1904 ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
1905 rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
1907 rc = mdd_object_kill(env, mdd_obj, ma);
1913 CERROR("Error when prepare to delete Object "DFID" , "
1914 "which will cause OST objects can not be "
1915 "destroyed.\n", PFID(mdd_object_fid(mdd_obj)));
1921 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
1923 mdd_write_unlock(env, mdd_obj);
1924 mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
1925 #ifdef HAVE_QUOTA_SUPPORT
1927 /* Trigger dqrel on the owner of child. If failed,
1928 * the next call for lquota_chkquota will process it */
1929 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1936 * Permission check is done when open,
1937 * no need check again.
1939 static int mdd_readpage_sanity_check(const struct lu_env *env,
1940 struct mdd_object *obj)
1942 struct dt_object *next = mdd_object_child(obj);
1946 if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
1954 static int mdd_append_attrs(const struct lu_env *env,
1955 struct mdd_device *mdd,
1957 const struct dt_it_ops *iops,
1959 struct lu_dirent*ent)
1961 struct mdd_thread_info *info = mdd_env_info(env);
1962 struct lu_fid *fid = &info->mti_fid2;
1963 int len = cpu_to_le16(ent->lde_namelen);
1964 const unsigned align = sizeof(struct luda_type) - 1;
1965 struct lu_fid_pack *pack;
1966 struct mdd_object *obj;
1967 struct luda_type *lt;
1970 if (attr & LUDA_FID) {
1971 pack = (struct lu_fid_pack *)iops->rec(env, it);
1977 rc = fid_unpack(pack, fid);
1983 fid_cpu_to_le(&ent->lde_fid, fid);
1984 ent->lde_attrs = LUDA_FID;
1987 /* check if file type is required */
1988 if (attr & LUDA_TYPE) {
1989 if (!(attr & LUDA_FID)) {
1990 CERROR("wrong attr : [%x]\n",attr);
1995 obj = mdd_object_find(env, mdd, fid);
1996 if (obj == NULL) /* remote object */
2004 if (mdd_object_exists(obj) == +1) {
2005 len = (len + align) & ~align;
2007 lt = (void *) ent->lde_name + len;
2008 lt->lt_type = cpu_to_le16(mdd_object_type(obj));
2010 ent->lde_attrs |= LUDA_TYPE;
2012 mdd_object_put(env, obj);
2015 ent->lde_attrs = cpu_to_le32(ent->lde_attrs);
2019 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2020 int first, void *area, int nob,
2021 const struct dt_it_ops *iops, struct dt_it *it,
2022 __u64 *start, __u64 *end,
2023 struct lu_dirent **last, __u32 attr)
2026 struct lu_dirent *ent;
2030 memset(area, 0, sizeof (struct lu_dirpage));
2031 area += sizeof (struct lu_dirpage);
2032 nob -= sizeof (struct lu_dirpage);
2041 len = iops->key_size(env, it);
2043 /* IAM iterator can return record with zero len. */
2047 name = (char *)iops->key(env, it);
2048 hash = iops->store(env, it);
2050 if (unlikely(first)) {
2055 recsize = lu_dirent_calc_size(len, attr);
2057 CDEBUG(D_INFO, "%p %p %d "LPU64" (%d) \"%*.*s\"\n",
2058 name, ent, nob, hash, len, len, len, name);
2060 if (nob >= recsize) {
2061 ent->lde_hash = cpu_to_le64(hash);
2062 ent->lde_namelen = cpu_to_le16(len);
2063 ent->lde_reclen = cpu_to_le16(recsize);
2064 memcpy(ent->lde_name, name, len);
2066 result = mdd_append_attrs(env, mdd, attr, iops, it, ent);
2067 if (result == -ESTALE)
2073 * record doesn't fit into page, enlarge previous one.
2076 (*last)->lde_reclen =
2077 cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
2086 ent = (void *)ent + recsize;
2090 result = iops->next(env, it);
2091 if (result == -ESTALE)
2093 } while (result == 0);
2100 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2101 const struct lu_rdpg *rdpg)
2104 struct dt_object *next = mdd_object_child(obj);
2105 const struct dt_it_ops *iops;
2107 struct lu_dirent *last = NULL;
2108 struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2115 LASSERT(rdpg->rp_pages != NULL);
2116 LASSERT(next->do_index_ops != NULL);
2118 if (rdpg->rp_count <= 0)
2122 * iterate through directory and fill pages from @rdpg
2124 iops = &next->do_index_ops->dio_it;
2125 it = iops->init(env, next, mdd_object_capa(env, obj));
2129 rc = iops->load(env, it, rdpg->rp_hash);
2133 * Iterator didn't find record with exactly the key requested.
2135 * It is currently either
2137 * - positioned above record with key less than
2138 * requested---skip it.
2140 * - or not positioned at all (is in IAM_IT_SKEWED
2141 * state)---position it on the next item.
2143 rc = iops->next(env, it);
2148 * At this point and across for-loop:
2150 * rc == 0 -> ok, proceed.
2151 * rc > 0 -> end of directory.
2154 for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2155 i++, nob -= CFS_PAGE_SIZE) {
2156 LASSERT(i < rdpg->rp_npages);
2157 pg = rdpg->rp_pages[i];
2158 rc = mdd_dir_page_build(env, mdd, !i, cfs_kmap(pg),
2159 min_t(int, nob, CFS_PAGE_SIZE), iops,
2160 it, &hash_start, &hash_end, &last,
2162 if (rc != 0 || i == rdpg->rp_npages - 1) {
2164 last->lde_reclen = 0;
2172 hash_end = DIR_END_OFF;
2176 struct lu_dirpage *dp;
2178 dp = cfs_kmap(rdpg->rp_pages[0]);
2179 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2180 dp->ldp_hash_end = cpu_to_le64(hash_end);
2183 * No pages were processed, mark this.
2185 dp->ldp_flags |= LDF_EMPTY;
2187 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2188 cfs_kunmap(rdpg->rp_pages[0]);
2191 iops->fini(env, it);
2196 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2197 const struct lu_rdpg *rdpg)
2199 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2203 LASSERT(mdd_object_exists(mdd_obj));
2205 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2206 rc = mdd_readpage_sanity_check(env, mdd_obj);
2208 GOTO(out_unlock, rc);
2210 if (mdd_is_dead_obj(mdd_obj)) {
2212 struct lu_dirpage *dp;
2215 * According to POSIX, please do not return any entry to client:
2216 * even dot and dotdot should not be returned.
2218 CWARN("readdir from dead object: "DFID"\n",
2219 PFID(mdd_object_fid(mdd_obj)));
2221 if (rdpg->rp_count <= 0)
2222 GOTO(out_unlock, rc = -EFAULT);
2223 LASSERT(rdpg->rp_pages != NULL);
2225 pg = rdpg->rp_pages[0];
2226 dp = (struct lu_dirpage*)cfs_kmap(pg);
2227 memset(dp, 0 , sizeof(struct lu_dirpage));
2228 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2229 dp->ldp_hash_end = cpu_to_le64(DIR_END_OFF);
2230 dp->ldp_flags |= LDF_EMPTY;
2231 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2233 GOTO(out_unlock, rc = 0);
2236 rc = __mdd_readpage(env, mdd_obj, rdpg);
2240 mdd_read_unlock(env, mdd_obj);
2244 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2246 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2247 struct dt_object *next;
2249 LASSERT(mdd_object_exists(mdd_obj));
2250 next = mdd_object_child(mdd_obj);
2251 return next->do_ops->do_object_sync(env, next);
2254 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2255 struct md_object *obj)
2257 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2259 LASSERT(mdd_object_exists(mdd_obj));
2260 return do_version_get(env, mdd_object_child(mdd_obj));
2263 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2264 dt_obj_version_t version)
2266 struct mdd_object *mdd_obj = md2mdd_obj(obj);
2268 LASSERT(mdd_object_exists(mdd_obj));
2269 return do_version_set(env, mdd_object_child(mdd_obj), version);
2272 const struct md_object_operations mdd_obj_ops = {
2273 .moo_permission = mdd_permission,
2274 .moo_attr_get = mdd_attr_get,
2275 .moo_attr_set = mdd_attr_set,
2276 .moo_xattr_get = mdd_xattr_get,
2277 .moo_xattr_set = mdd_xattr_set,
2278 .moo_xattr_list = mdd_xattr_list,
2279 .moo_xattr_del = mdd_xattr_del,
2280 .moo_object_create = mdd_object_create,
2281 .moo_ref_add = mdd_ref_add,
2282 .moo_ref_del = mdd_ref_del,
2283 .moo_open = mdd_open,
2284 .moo_close = mdd_close,
2285 .moo_readpage = mdd_readpage,
2286 .moo_readlink = mdd_readlink,
2287 .moo_capa_get = mdd_capa_get,
2288 .moo_object_sync = mdd_object_sync,
2289 .moo_version_get = mdd_version_get,
2290 .moo_version_set = mdd_version_set,
2291 .moo_path = mdd_path,