Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_object.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49 #include <linux/jbd.h>
50 #include <obd.h>
51 #include <obd_class.h>
52 #include <obd_support.h>
53 #include <lprocfs_status.h>
54 /* fid_be_cpu(), fid_cpu_to_be(). */
55 #include <lustre_fid.h>
56
57 #include <lustre_param.h>
58 #include <linux/ldiskfs_fs.h>
59 #include <lustre_mds.h>
60 #include <lustre/lustre_idl.h>
61
62 #include "mdd_internal.h"
63
64 static const struct lu_object_operations mdd_lu_obj_ops;
65
66 static int mdd_xattr_get(const struct lu_env *env,
67                          struct md_object *obj, struct lu_buf *buf,
68                          const char *name);
69
70 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
71                struct lu_attr *la, struct lustre_capa *capa)
72 {
73         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
74                  PFID(mdd_object_fid(obj)));
75         return mdo_attr_get(env, obj, la, capa);
76 }
77
78 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
79 {
80         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
81
82         if (flags & LUSTRE_APPEND_FL)
83                 obj->mod_flags |= APPEND_OBJ;
84
85         if (flags & LUSTRE_IMMUTABLE_FL)
86                 obj->mod_flags |= IMMUTE_OBJ;
87 }
88
89 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
90 {
91         struct mdd_thread_info *info;
92
93         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
94         LASSERT(info != NULL);
95         return info;
96 }
97
98 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
99 {
100         struct lu_buf *buf;
101
102         buf = &mdd_env_info(env)->mti_buf;
103         buf->lb_buf = area;
104         buf->lb_len = len;
105         return buf;
106 }
107
108 void mdd_buf_put(struct lu_buf *buf)
109 {
110         if (buf == NULL || buf->lb_buf == NULL)
111                 return;
112         if (buf->lb_vmalloc)
113                 OBD_VFREE(buf->lb_buf, buf->lb_len);
114         else
115                 OBD_FREE(buf->lb_buf, buf->lb_len);
116         buf->lb_buf = NULL;
117 }
118
119 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
120                                        const void *area, ssize_t len)
121 {
122         struct lu_buf *buf;
123
124         buf = &mdd_env_info(env)->mti_buf;
125         buf->lb_buf = (void *)area;
126         buf->lb_len = len;
127         return buf;
128 }
129
130 #define BUF_VMALLOC_SIZE (CFS_PAGE_SIZE<<2) /* 16k */
131 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
132 {
133         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
134
135         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
136                 if (buf->lb_vmalloc)
137                         OBD_VFREE(buf->lb_buf, buf->lb_len);
138                 else
139                         OBD_FREE(buf->lb_buf, buf->lb_len);
140                 buf->lb_buf = NULL;
141         }
142         if (buf->lb_buf == NULL) {
143                 buf->lb_len = len;
144                 if (buf->lb_len <= BUF_VMALLOC_SIZE) {
145                         OBD_ALLOC(buf->lb_buf, buf->lb_len);
146                         buf->lb_vmalloc = 0;
147                 }
148                 if (buf->lb_buf == NULL) {
149                         OBD_VMALLOC(buf->lb_buf, buf->lb_len);
150                         buf->lb_vmalloc = 1;
151                 }
152                 if (buf->lb_buf == NULL)
153                         buf->lb_len = 0;
154         }
155         return buf;
156 }
157
158 /* preserve old data */
159 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
160 {
161         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
162         struct lu_buf buf;
163
164         LASSERT(len >= oldbuf->lb_len);
165         if (len > BUF_VMALLOC_SIZE) {
166                 OBD_VMALLOC(buf.lb_buf, len);
167                 buf.lb_vmalloc = 1;
168         } else {
169                 OBD_ALLOC(buf.lb_buf, len);
170                 buf.lb_vmalloc = 0;
171         }
172         if (buf.lb_buf == NULL)
173                 return -ENOMEM;
174
175         buf.lb_len = len;
176         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
177
178         if (oldbuf->lb_vmalloc)
179                 OBD_VFREE(oldbuf->lb_buf, oldbuf->lb_len);
180         else
181                 OBD_FREE(oldbuf->lb_buf, oldbuf->lb_len);
182
183         memcpy(oldbuf, &buf, sizeof(buf));
184
185         return 0;
186 }
187
188 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
189                                        struct mdd_device *mdd)
190 {
191         struct mdd_thread_info *mti = mdd_env_info(env);
192         int                     max_cookie_size;
193
194         max_cookie_size = mdd_lov_cookiesize(env, mdd);
195         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
196                 if (mti->mti_max_cookie)
197                         OBD_FREE(mti->mti_max_cookie, mti->mti_max_cookie_size);
198                 mti->mti_max_cookie = NULL;
199                 mti->mti_max_cookie_size = 0;
200         }
201         if (unlikely(mti->mti_max_cookie == NULL)) {
202                 OBD_ALLOC(mti->mti_max_cookie, max_cookie_size);
203                 if (likely(mti->mti_max_cookie != NULL))
204                         mti->mti_max_cookie_size = max_cookie_size;
205         }
206         if (likely(mti->mti_max_cookie != NULL))
207                 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
208         return mti->mti_max_cookie;
209 }
210
211 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
212                                    struct mdd_device *mdd)
213 {
214         struct mdd_thread_info *mti = mdd_env_info(env);
215         int                     max_lmm_size;
216
217         max_lmm_size = mdd_lov_mdsize(env, mdd);
218         if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
219                 if (mti->mti_max_lmm)
220                         OBD_FREE(mti->mti_max_lmm, mti->mti_max_lmm_size);
221                 mti->mti_max_lmm = NULL;
222                 mti->mti_max_lmm_size = 0;
223         }
224         if (unlikely(mti->mti_max_lmm == NULL)) {
225                 OBD_ALLOC(mti->mti_max_lmm, max_lmm_size);
226                 if (unlikely(mti->mti_max_lmm != NULL))
227                         mti->mti_max_lmm_size = max_lmm_size;
228         }
229         return mti->mti_max_lmm;
230 }
231
232 struct lu_object *mdd_object_alloc(const struct lu_env *env,
233                                    const struct lu_object_header *hdr,
234                                    struct lu_device *d)
235 {
236         struct mdd_object *mdd_obj;
237
238         OBD_ALLOC_PTR(mdd_obj);
239         if (mdd_obj != NULL) {
240                 struct lu_object *o;
241
242                 o = mdd2lu_obj(mdd_obj);
243                 lu_object_init(o, NULL, d);
244                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
245                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
246                 mdd_obj->mod_count = 0;
247                 o->lo_ops = &mdd_lu_obj_ops;
248                 return o;
249         } else {
250                 return NULL;
251         }
252 }
253
254 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
255                            const struct lu_object_conf *_)
256 {
257         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
258         struct mdd_object *mdd_obj = lu2mdd_obj(o);
259         struct lu_object  *below;
260         struct lu_device  *under;
261         ENTRY;
262
263         mdd_obj->mod_cltime = 0;
264         under = &d->mdd_child->dd_lu_dev;
265         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
266         mdd_pdlock_init(mdd_obj);
267         if (below == NULL)
268                 RETURN(-ENOMEM);
269
270         lu_object_add(o, below);
271
272         RETURN(0);
273 }
274
275 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
276 {
277         if (lu_object_exists(o))
278                 return mdd_get_flags(env, lu2mdd_obj(o));
279         else
280                 return 0;
281 }
282
283 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
284 {
285         struct mdd_object *mdd = lu2mdd_obj(o);
286
287         lu_object_fini(o);
288         OBD_FREE_PTR(mdd);
289 }
290
291 static int mdd_object_print(const struct lu_env *env, void *cookie,
292                             lu_printer_t p, const struct lu_object *o)
293 {
294         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p", o);
295 }
296
297 static const struct lu_object_operations mdd_lu_obj_ops = {
298         .loo_object_init    = mdd_object_init,
299         .loo_object_start   = mdd_object_start,
300         .loo_object_free    = mdd_object_free,
301         .loo_object_print   = mdd_object_print,
302 };
303
304 struct mdd_object *mdd_object_find(const struct lu_env *env,
305                                    struct mdd_device *d,
306                                    const struct lu_fid *f)
307 {
308         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
309 }
310
311 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
312                         const char *path, struct lu_fid *fid)
313 {
314         struct lu_buf *buf;
315         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
316         struct mdd_object *obj;
317         struct lu_name *lname = &mdd_env_info(env)->mti_name;
318         char *name;
319         int rc = 0;
320         ENTRY;
321
322         /* temp buffer for path element */
323         buf = mdd_buf_alloc(env, PATH_MAX);
324         if (buf->lb_buf == NULL)
325                 RETURN(-ENOMEM);
326
327         lname->ln_name = name = buf->lb_buf;
328         lname->ln_namelen = 0;
329         *f = mdd->mdd_root_fid;
330
331         while(1) {
332                 while (*path == '/')
333                         path++;
334                 if (*path == '\0')
335                         break;
336                 while (*path != '/' && *path != '\0') {
337                         *name = *path;
338                         path++;
339                         name++;
340                         lname->ln_namelen++;
341                 }
342
343                 *name = '\0';
344                 /* find obj corresponding to fid */
345                 obj = mdd_object_find(env, mdd, f);
346                 if (obj == NULL)
347                         GOTO(out, rc = -EREMOTE);
348                 if (IS_ERR(obj))
349                         GOTO(out, rc = -PTR_ERR(obj));
350                 /* get child fid from parent and name */
351                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
352                 mdd_object_put(env, obj);
353                 if (rc)
354                         break;
355
356                 name = buf->lb_buf;
357                 lname->ln_namelen = 0;
358         }
359
360         if (!rc)
361                 *fid = *f;
362 out:
363         RETURN(rc);
364 }
365
366 /** The maximum depth that fid2path() will search.
367  * This is limited only because we want to store the fids for
368  * historical path lookup purposes.
369  */
370 #define MAX_PATH_DEPTH 100
371
372 /** mdd_path() lookup structure. */
373 struct path_lookup_info {
374         __u64                pli_recno;        /**< history point */
375         struct lu_fid        pli_fid;
376         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
377         struct mdd_object   *pli_mdd_obj;
378         char                *pli_path;         /**< full path */
379         int                  pli_pathlen;
380         int                  pli_linkno;       /**< which hardlink to follow */
381         int                  pli_fidcount;     /**< number of \a pli_fids */
382 };
383
384 static int mdd_path_current(const struct lu_env *env,
385                             struct path_lookup_info *pli)
386 {
387         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
388         struct mdd_object *mdd_obj;
389         struct lu_buf     *buf = NULL;
390         struct link_ea_header *leh;
391         struct link_ea_entry  *lee;
392         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
393         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
394         char *ptr;
395         int reclen;
396         int rc;
397         ENTRY;
398
399         ptr = pli->pli_path + pli->pli_pathlen - 1;
400         *ptr = 0;
401         --ptr;
402         pli->pli_fidcount = 0;
403         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
404
405         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
406                 mdd_obj = mdd_object_find(env, mdd,
407                                           &pli->pli_fids[pli->pli_fidcount]);
408                 if (mdd_obj == NULL)
409                         GOTO(out, rc = -EREMOTE);
410                 if (IS_ERR(mdd_obj))
411                         GOTO(out, rc = -PTR_ERR(mdd_obj));
412                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
413                 if (rc <= 0) {
414                         mdd_object_put(env, mdd_obj);
415                         if (rc == -1)
416                                 rc = -EREMOTE;
417                         else if (rc == 0)
418                                 /* Do I need to error out here? */
419                                 rc = -ENOENT;
420                         GOTO(out, rc);
421                 }
422
423                 /* Get parent fid and object name */
424                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
425                 buf = mdd_links_get(env, mdd_obj);
426                 if (IS_ERR(buf))
427                         GOTO(out, rc = PTR_ERR(buf));
428                 mdd_read_unlock(env, mdd_obj);
429                 mdd_object_put(env, mdd_obj);
430                 if (rc < 0)
431                         GOTO(out, rc);
432
433                 leh = buf->lb_buf;
434                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
435                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
436
437                 /* If set, use link #linkno for path lookup, otherwise use
438                    link #0.  Only do this for the final path element. */
439                 if ((pli->pli_fidcount == 0) &&
440                     (pli->pli_linkno < leh->leh_reccount)) {
441                         int count;
442                         for (count = 0; count < pli->pli_linkno; count++) {
443                                 lee = (struct link_ea_entry *)
444                                      ((char *)lee + reclen);
445                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
446                         }
447                         if (pli->pli_linkno < leh->leh_reccount - 1)
448                                 /* indicate to user there are more links */
449                                 pli->pli_linkno++;
450                 }
451
452                 /* Pack the name in the end of the buffer */
453                 ptr -= tmpname->ln_namelen;
454                 if (ptr - 1 <= pli->pli_path)
455                         GOTO(out, rc = -EOVERFLOW);
456                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
457                 *(--ptr) = '/';
458
459                 /* Store the parent fid for historic lookup */
460                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
461                         GOTO(out, rc = -EOVERFLOW);
462                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
463         }
464
465         /* Verify that our path hasn't changed since we started the lookup */
466         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
467         if (rc) {
468                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
469                 GOTO (out, rc = -EAGAIN);
470         }
471         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
472                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
473                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
474                        PFID(&pli->pli_fid));
475                 GOTO(out, rc = -EAGAIN);
476         }
477
478         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
479
480         EXIT;
481 out:
482         if (buf && !IS_ERR(buf) && buf->lb_vmalloc)
483                 /* if we vmalloced a large buffer drop it */
484                 mdd_buf_put(buf);
485
486         return rc;
487 }
488
489 /* Returns the full path to this fid, as of changelog record recno. */
490 static int mdd_path(const struct lu_env *env, struct md_object *obj,
491                     char *path, int pathlen, __u64 recno, int *linkno)
492 {
493         struct path_lookup_info *pli;
494         int tries = 3;
495         int rc = -EAGAIN;
496         ENTRY;
497
498         if (pathlen < 3)
499                 RETURN(-EOVERFLOW);
500
501         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
502                 path[0] = '/';
503                 path[1] = '\0';
504                 RETURN(0);
505         }
506
507         OBD_ALLOC_PTR(pli);
508         if (pli == NULL)
509                 RETURN(-ENOMEM);
510
511         pli->pli_mdd_obj = md2mdd_obj(obj);
512         pli->pli_recno = recno;
513         pli->pli_path = path;
514         pli->pli_pathlen = pathlen;
515         pli->pli_linkno = *linkno;
516
517         /* Retry multiple times in case file is being moved */
518         while (tries-- && rc == -EAGAIN)
519                 rc = mdd_path_current(env, pli);
520
521 #if 0   /* We need old path names only for replication */
522         /* For historical path lookup, the current links may not have existed
523          * at "recno" time.  We must switch over to earlier links/parents
524          * by using the changelog records.  If the earlier parent doesn't
525          * exist, we must search back through the changelog to reconstruct
526          * its parents, then check if it exists, etc.
527          * We may ignore this problem for the initial implementation and
528          * state that an "original" hardlink must still exist for us to find
529          * historic path name. */
530         if (pli->pli_recno != -1)
531                 rc = mdd_path_historic(env, pli);
532 #endif
533
534         /* return next link index to caller */
535         *linkno = pli->pli_linkno;
536
537         OBD_FREE_PTR(pli);
538
539         RETURN (rc);
540 }
541
542 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
543 {
544         struct lu_attr *la = &mdd_env_info(env)->mti_la;
545         int rc;
546
547         ENTRY;
548         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
549         if (rc == 0) {
550                 mdd_flags_xlate(obj, la->la_flags);
551                 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
552                         obj->mod_flags |= MNLINK_OBJ;
553         }
554         RETURN(rc);
555 }
556
557 /* get only inode attributes */
558 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
559                   struct md_attr *ma)
560 {
561         int rc = 0;
562         ENTRY;
563
564         if (ma->ma_valid & MA_INODE)
565                 RETURN(0);
566
567         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
568                           mdd_object_capa(env, mdd_obj));
569         if (rc == 0)
570                 ma->ma_valid |= MA_INODE;
571         RETURN(rc);
572 }
573
574 static int mdd_get_default_md(struct mdd_object *mdd_obj,
575                 struct lov_mds_md *lmm, int *size)
576 {
577         struct lov_desc *ldesc;
578         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
579         ENTRY;
580
581         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
582         LASSERT(ldesc != NULL);
583
584         if (!lmm)
585                 RETURN(0);
586
587         lmm->lmm_magic = LOV_MAGIC_V1;
588         lmm->lmm_object_gr = LOV_OBJECT_GROUP_DEFAULT;
589         lmm->lmm_pattern = ldesc->ld_pattern;
590         lmm->lmm_stripe_size = ldesc->ld_default_stripe_size;
591         lmm->lmm_stripe_count = ldesc->ld_default_stripe_count;
592         *size = sizeof(struct lov_mds_md);
593
594         RETURN(sizeof(struct lov_mds_md));
595 }
596
597 /* get lov EA only */
598 static int __mdd_lmm_get(const struct lu_env *env,
599                          struct mdd_object *mdd_obj, struct md_attr *ma)
600 {
601         int rc;
602         ENTRY;
603
604         if (ma->ma_valid & MA_LOV)
605                 RETURN(0);
606
607         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
608                         XATTR_NAME_LOV);
609
610         if (rc == 0 && (ma->ma_need & MA_LOV_DEF)) {
611                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm,
612                                 &ma->ma_lmm_size);
613         }
614
615         if (rc > 0) {
616                 ma->ma_valid |= MA_LOV;
617                 rc = 0;
618         }
619         RETURN(rc);
620 }
621
622 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
623                        struct md_attr *ma)
624 {
625         int rc;
626         ENTRY;
627
628         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
629         rc = __mdd_lmm_get(env, mdd_obj, ma);
630         mdd_read_unlock(env, mdd_obj);
631         RETURN(rc);
632 }
633
634 /* get lmv EA only*/
635 static int __mdd_lmv_get(const struct lu_env *env,
636                          struct mdd_object *mdd_obj, struct md_attr *ma)
637 {
638         int rc;
639         ENTRY;
640
641         if (ma->ma_valid & MA_LMV)
642                 RETURN(0);
643
644         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
645                         XATTR_NAME_LMV);
646         if (rc > 0) {
647                 ma->ma_valid |= MA_LMV;
648                 rc = 0;
649         }
650         RETURN(rc);
651 }
652
653 static int mdd_attr_get_internal(const struct lu_env *env,
654                                  struct mdd_object *mdd_obj,
655                                  struct md_attr *ma)
656 {
657         int rc = 0;
658         ENTRY;
659
660         if (ma->ma_need & MA_INODE)
661                 rc = mdd_iattr_get(env, mdd_obj, ma);
662
663         if (rc == 0 && ma->ma_need & MA_LOV) {
664                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
665                     S_ISDIR(mdd_object_type(mdd_obj)))
666                         rc = __mdd_lmm_get(env, mdd_obj, ma);
667         }
668         if (rc == 0 && ma->ma_need & MA_LMV) {
669                 if (S_ISDIR(mdd_object_type(mdd_obj)))
670                         rc = __mdd_lmv_get(env, mdd_obj, ma);
671         }
672 #ifdef CONFIG_FS_POSIX_ACL
673         if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
674                 if (S_ISDIR(mdd_object_type(mdd_obj)))
675                         rc = mdd_def_acl_get(env, mdd_obj, ma);
676         }
677 #endif
678         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
679                rc, ma->ma_valid);
680         RETURN(rc);
681 }
682
683 int mdd_attr_get_internal_locked(const struct lu_env *env,
684                                  struct mdd_object *mdd_obj, struct md_attr *ma)
685 {
686         int rc;
687         int needlock = ma->ma_need & (MA_LOV | MA_LMV | MA_ACL_DEF);
688
689         if (needlock)
690                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
691         rc = mdd_attr_get_internal(env, mdd_obj, ma);
692         if (needlock)
693                 mdd_read_unlock(env, mdd_obj);
694         return rc;
695 }
696
697 /*
698  * No permission check is needed.
699  */
700 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
701                         struct md_attr *ma)
702 {
703         struct mdd_object *mdd_obj = md2mdd_obj(obj);
704         int                rc;
705
706         ENTRY;
707         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
708         RETURN(rc);
709 }
710
711 /*
712  * No permission check is needed.
713  */
714 static int mdd_xattr_get(const struct lu_env *env,
715                          struct md_object *obj, struct lu_buf *buf,
716                          const char *name)
717 {
718         struct mdd_object *mdd_obj = md2mdd_obj(obj);
719         int rc;
720
721         ENTRY;
722
723         LASSERT(mdd_object_exists(mdd_obj));
724
725         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
726         rc = mdo_xattr_get(env, mdd_obj, buf, name,
727                            mdd_object_capa(env, mdd_obj));
728         mdd_read_unlock(env, mdd_obj);
729
730         RETURN(rc);
731 }
732
733 /*
734  * Permission check is done when open,
735  * no need check again.
736  */
737 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
738                         struct lu_buf *buf)
739 {
740         struct mdd_object *mdd_obj = md2mdd_obj(obj);
741         struct dt_object  *next;
742         loff_t             pos = 0;
743         int                rc;
744         ENTRY;
745
746         LASSERT(mdd_object_exists(mdd_obj));
747
748         next = mdd_object_child(mdd_obj);
749         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
750         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
751                                          mdd_object_capa(env, mdd_obj));
752         mdd_read_unlock(env, mdd_obj);
753         RETURN(rc);
754 }
755
756 /*
757  * No permission check is needed.
758  */
759 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
760                           struct lu_buf *buf)
761 {
762         struct mdd_object *mdd_obj = md2mdd_obj(obj);
763         int rc;
764
765         ENTRY;
766
767         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
768         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
769         mdd_read_unlock(env, mdd_obj);
770
771         RETURN(rc);
772 }
773
774 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
775                                struct mdd_object *c, struct md_attr *ma,
776                                struct thandle *handle,
777                                const struct md_op_spec *spec)
778 {
779         struct lu_attr *attr = &ma->ma_attr;
780         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
781         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
782         const struct dt_index_features *feat = spec->sp_feat;
783         int rc;
784         ENTRY;
785
786         if (!mdd_object_exists(c)) {
787                 struct dt_object *next = mdd_object_child(c);
788                 LASSERT(next);
789
790                 if (feat != &dt_directory_features && feat != NULL)
791                         dof->dof_type = DFT_INDEX;
792                 else
793                         dof->dof_type = dt_mode_to_dft(attr->la_mode);
794
795                 dof->u.dof_idx.di_feat = feat;
796
797                 /* @hint will be initialized by underlying device. */
798                 next->do_ops->do_ah_init(env, hint,
799                                          p ? mdd_object_child(p) : NULL,
800                                          attr->la_mode & S_IFMT);
801
802                 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
803                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
804         } else
805                 rc = -EEXIST;
806
807         RETURN(rc);
808 }
809
810 /**
811  * Make sure the ctime is increased only.
812  */
813 static inline int mdd_attr_check(const struct lu_env *env,
814                                  struct mdd_object *obj,
815                                  struct lu_attr *attr)
816 {
817         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
818         int rc;
819         ENTRY;
820
821         if (attr->la_valid & LA_CTIME) {
822                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
823                 if (rc)
824                         RETURN(rc);
825
826                 if (attr->la_ctime < tmp_la->la_ctime)
827                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
828                 else if (attr->la_valid == LA_CTIME &&
829                          attr->la_ctime == tmp_la->la_ctime)
830                         attr->la_valid &= ~LA_CTIME;
831         }
832         RETURN(0);
833 }
834
835 int mdd_attr_set_internal(const struct lu_env *env,
836                           struct mdd_object *obj,
837                           struct lu_attr *attr,
838                           struct thandle *handle,
839                           int needacl)
840 {
841         int rc;
842         ENTRY;
843
844         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
845 #ifdef CONFIG_FS_POSIX_ACL
846         if (!rc && (attr->la_valid & LA_MODE) && needacl)
847                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
848 #endif
849         RETURN(rc);
850 }
851
852 int mdd_attr_check_set_internal(const struct lu_env *env,
853                                 struct mdd_object *obj,
854                                 struct lu_attr *attr,
855                                 struct thandle *handle,
856                                 int needacl)
857 {
858         int rc;
859         ENTRY;
860
861         rc = mdd_attr_check(env, obj, attr);
862         if (rc)
863                 RETURN(rc);
864
865         if (attr->la_valid)
866                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
867         RETURN(rc);
868 }
869
870 static int mdd_attr_set_internal_locked(const struct lu_env *env,
871                                         struct mdd_object *obj,
872                                         struct lu_attr *attr,
873                                         struct thandle *handle,
874                                         int needacl)
875 {
876         int rc;
877         ENTRY;
878
879         needacl = needacl && (attr->la_valid & LA_MODE);
880         if (needacl)
881                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
882         rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
883         if (needacl)
884                 mdd_write_unlock(env, obj);
885         RETURN(rc);
886 }
887
888 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
889                                        struct mdd_object *obj,
890                                        struct lu_attr *attr,
891                                        struct thandle *handle,
892                                        int needacl)
893 {
894         int rc;
895         ENTRY;
896
897         needacl = needacl && (attr->la_valid & LA_MODE);
898         if (needacl)
899                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
900         rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
901         if (needacl)
902                 mdd_write_unlock(env, obj);
903         RETURN(rc);
904 }
905
906 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
907                     const struct lu_buf *buf, const char *name,
908                     int fl, struct thandle *handle)
909 {
910         struct lustre_capa *capa = mdd_object_capa(env, obj);
911         int rc = -EINVAL;
912         ENTRY;
913
914         if (buf->lb_buf && buf->lb_len > 0)
915                 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
916         else if (buf->lb_buf == NULL && buf->lb_len == 0)
917                 rc = mdo_xattr_del(env, obj, name, handle, capa);
918
919         RETURN(rc);
920 }
921
922 /*
923  * This gives the same functionality as the code between
924  * sys_chmod and inode_setattr
925  * chown_common and inode_setattr
926  * utimes and inode_setattr
927  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
928  */
929 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
930                         struct lu_attr *la, const struct md_attr *ma)
931 {
932         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
933         struct md_ucred  *uc         = md_ucred(env);
934         int               rc;
935         ENTRY;
936
937         if (!la->la_valid)
938                 RETURN(0);
939
940         /* Do not permit change file type */
941         if (la->la_valid & LA_TYPE)
942                 RETURN(-EPERM);
943
944         /* They should not be processed by setattr */
945         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
946                 RETURN(-EPERM);
947
948         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
949         if (rc)
950                 RETURN(rc);
951
952         if (la->la_valid == LA_CTIME) {
953                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
954                         /* This is only for set ctime when rename's source is
955                          * on remote MDS. */
956                         rc = mdd_may_delete(env, NULL, obj,
957                                             (struct md_attr *)ma, 1, 0);
958                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
959                         la->la_valid &= ~LA_CTIME;
960                 RETURN(rc);
961         }
962
963         if (la->la_valid == LA_ATIME) {
964                 /* This is atime only set for read atime update on close. */
965                 if (la->la_atime <= tmp_la->la_atime +
966                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff)
967                         la->la_valid &= ~LA_ATIME;
968                 RETURN(0);
969         }
970
971         /* Check if flags change. */
972         if (la->la_valid & LA_FLAGS) {
973                 unsigned int oldflags = 0;
974                 unsigned int newflags = la->la_flags &
975                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
976
977                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
978                     !mdd_capable(uc, CFS_CAP_FOWNER))
979                         RETURN(-EPERM);
980
981                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
982                  * only be changed by the relevant capability. */
983                 if (mdd_is_immutable(obj))
984                         oldflags |= LUSTRE_IMMUTABLE_FL;
985                 if (mdd_is_append(obj))
986                         oldflags |= LUSTRE_APPEND_FL;
987                 if ((oldflags ^ newflags) &&
988                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
989                         RETURN(-EPERM);
990
991                 if (!S_ISDIR(tmp_la->la_mode))
992                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
993         }
994
995         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
996             (la->la_valid & ~LA_FLAGS) &&
997             !(ma->ma_attr_flags & MDS_PERM_BYPASS))
998                 RETURN(-EPERM);
999
1000         /* Check for setting the obj time. */
1001         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1002             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1003                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1004                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
1005                         rc = mdd_permission_internal_locked(env, obj, tmp_la,
1006                                                             MAY_WRITE,
1007                                                             MOR_TGT_CHILD);
1008                         if (rc)
1009                                 RETURN(rc);
1010                 }
1011         }
1012
1013         /* Make sure a caller can chmod. */
1014         if (la->la_valid & LA_MODE) {
1015                 /* Bypass la_vaild == LA_MODE,
1016                  * this is for changing file with SUID or SGID. */
1017                 if ((la->la_valid & ~LA_MODE) &&
1018                     !(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1019                     (uc->mu_fsuid != tmp_la->la_uid) &&
1020                     !mdd_capable(uc, CFS_CAP_FOWNER))
1021                         RETURN(-EPERM);
1022
1023                 if (la->la_mode == (umode_t) -1)
1024                         la->la_mode = tmp_la->la_mode;
1025                 else
1026                         la->la_mode = (la->la_mode & S_IALLUGO) |
1027                                       (tmp_la->la_mode & ~S_IALLUGO);
1028
1029                 /* Also check the setgid bit! */
1030                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1031                                        la->la_gid : tmp_la->la_gid) &&
1032                     !mdd_capable(uc, CFS_CAP_FSETID))
1033                         la->la_mode &= ~S_ISGID;
1034         } else {
1035                la->la_mode = tmp_la->la_mode;
1036         }
1037
1038         /* Make sure a caller can chown. */
1039         if (la->la_valid & LA_UID) {
1040                 if (la->la_uid == (uid_t) -1)
1041                         la->la_uid = tmp_la->la_uid;
1042                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1043                     (la->la_uid != tmp_la->la_uid)) &&
1044                     !mdd_capable(uc, CFS_CAP_CHOWN))
1045                         RETURN(-EPERM);
1046
1047                 /* If the user or group of a non-directory has been
1048                  * changed by a non-root user, remove the setuid bit.
1049                  * 19981026 David C Niemi <niemi@tux.org>
1050                  *
1051                  * Changed this to apply to all users, including root,
1052                  * to avoid some races. This is the behavior we had in
1053                  * 2.0. The check for non-root was definitely wrong
1054                  * for 2.2 anyway, as it should have been using
1055                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
1056                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1057                     !S_ISDIR(tmp_la->la_mode)) {
1058                         la->la_mode &= ~S_ISUID;
1059                         la->la_valid |= LA_MODE;
1060                 }
1061         }
1062
1063         /* Make sure caller can chgrp. */
1064         if (la->la_valid & LA_GID) {
1065                 if (la->la_gid == (gid_t) -1)
1066                         la->la_gid = tmp_la->la_gid;
1067                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1068                     ((la->la_gid != tmp_la->la_gid) &&
1069                     !lustre_in_group_p(uc, la->la_gid))) &&
1070                     !mdd_capable(uc, CFS_CAP_CHOWN))
1071                         RETURN(-EPERM);
1072
1073                 /* Likewise, if the user or group of a non-directory
1074                  * has been changed by a non-root user, remove the
1075                  * setgid bit UNLESS there is no group execute bit
1076                  * (this would be a file marked for mandatory
1077                  * locking).  19981026 David C Niemi <niemi@tux.org>
1078                  *
1079                  * Removed the fsuid check (see the comment above) --
1080                  * 19990830 SD. */
1081                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1082                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1083                         la->la_mode &= ~S_ISGID;
1084                         la->la_valid |= LA_MODE;
1085                 }
1086         }
1087
1088         /* For both Size-on-MDS case and truncate case,
1089          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1090          * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1091          * For SOM case, it is true, the MAY_WRITE perm has been checked
1092          * when open, no need check again. For truncate case, it is false,
1093          * the MAY_WRITE perm should be checked here. */
1094         if (ma->ma_attr_flags & MDS_SOM) {
1095                 /* For the "Size-on-MDS" setattr update, merge coming
1096                  * attributes with the set in the inode. BUG 10641 */
1097                 if ((la->la_valid & LA_ATIME) &&
1098                     (la->la_atime <= tmp_la->la_atime))
1099                         la->la_valid &= ~LA_ATIME;
1100
1101                 /* OST attributes do not have a priority over MDS attributes,
1102                  * so drop times if ctime is equal. */
1103                 if ((la->la_valid & LA_CTIME) &&
1104                     (la->la_ctime <= tmp_la->la_ctime))
1105                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1106         } else {
1107                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1108                         if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1109                               (uc->mu_fsuid == tmp_la->la_uid)) &&
1110                             !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1111                                 rc = mdd_permission_internal_locked(env, obj,
1112                                                             tmp_la, MAY_WRITE,
1113                                                             MOR_TGT_CHILD);
1114                                 if (rc)
1115                                         RETURN(rc);
1116                         }
1117                 }
1118                 if (la->la_valid & LA_CTIME) {
1119                         /* The pure setattr, it has the priority over what is
1120                          * already set, do not drop it if ctime is equal. */
1121                         if (la->la_ctime < tmp_la->la_ctime)
1122                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1123                                                   LA_CTIME);
1124                 }
1125         }
1126
1127         RETURN(0);
1128 }
1129
1130 /** Store a data change changelog record
1131  * If this fails, we must fail the whole transaction; we don't
1132  * want the change to commit without the log entry.
1133  * \param mdd_obj - mdd_object of change
1134  * \param handle - transacion handle
1135  */
1136 static int mdd_changelog_data_store(const struct lu_env     *env,
1137                                     struct mdd_device       *mdd,
1138                                     enum changelog_rec_type type,
1139                                     struct mdd_object       *mdd_obj,
1140                                     struct thandle          *handle)
1141 {
1142         const struct lu_fid *tfid = mdo2fid(mdd_obj);
1143         struct llog_changelog_rec *rec;
1144         struct lu_buf *buf;
1145         int reclen;
1146         int rc;
1147
1148         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1149                 RETURN(0);
1150
1151         LASSERT(handle != NULL);
1152         LASSERT(mdd_obj != NULL);
1153
1154         if ((type == CL_SETATTR) &&
1155             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1156                 /* Don't need multiple updates in this log */
1157                 /* Don't check under lock - no big deal if we get an extra
1158                    entry */
1159                 RETURN(0);
1160         }
1161
1162         reclen = llog_data_len(sizeof(*rec));
1163         buf = mdd_buf_alloc(env, reclen);
1164         if (buf->lb_buf == NULL)
1165                 RETURN(-ENOMEM);
1166         rec = (struct llog_changelog_rec *)buf->lb_buf;
1167
1168         rec->cr_flags = CLF_VERSION;
1169         rec->cr_type = (__u32)type;
1170         rec->cr_tfid = *tfid;
1171         rec->cr_namelen = 0;
1172         mdd_obj->mod_cltime = cfs_time_current_64();
1173
1174         rc = mdd_changelog_llog_write(mdd, rec, handle);
1175         if (rc < 0) {
1176                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1177                        rc, type, PFID(tfid));
1178                 return -EFAULT;
1179         }
1180
1181         return 0;
1182 }
1183
1184 /* set attr and LOV EA at once, return updated attr */
1185 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1186                         const struct md_attr *ma)
1187 {
1188         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1189         struct mdd_device *mdd = mdo2mdd(obj);
1190         struct thandle *handle;
1191         struct lov_mds_md *lmm = NULL;
1192         struct llog_cookie *logcookies = NULL;
1193         int  rc, lmm_size = 0, cookie_size = 0;
1194         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1195 #ifdef HAVE_QUOTA_SUPPORT
1196         struct obd_device *obd = mdd->mdd_obd_dev;
1197         struct mds_obd *mds = &obd->u.mds;
1198         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1199         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1200         int quota_opc = 0, block_count = 0;
1201         int inode_pending = 0, block_pending = 0;
1202 #endif
1203         ENTRY;
1204
1205         mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1206                                     MDD_TXN_ATTR_SET_OP);
1207         handle = mdd_trans_start(env, mdd);
1208         if (IS_ERR(handle))
1209                 RETURN(PTR_ERR(handle));
1210         /*TODO: add lock here*/
1211         /* start a log jounal handle if needed */
1212         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1213             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1214                 lmm_size = mdd_lov_mdsize(env, mdd);
1215                 lmm = mdd_max_lmm_get(env, mdd);
1216                 if (lmm == NULL)
1217                         GOTO(cleanup, rc = -ENOMEM);
1218
1219                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1220                                 XATTR_NAME_LOV);
1221
1222                 if (rc < 0)
1223                         GOTO(cleanup, rc);
1224         }
1225
1226         if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1227                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1228                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1229
1230         *la_copy = ma->ma_attr;
1231         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1232         if (rc)
1233                 GOTO(cleanup, rc);
1234
1235 #ifdef HAVE_QUOTA_SUPPORT
1236         if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1237                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1238
1239                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1240                 if (!rc) {
1241                         quota_opc = FSFILT_OP_SETATTR;
1242                         mdd_quota_wrapper(la_copy, qnids);
1243                         mdd_quota_wrapper(la_tmp, qoids);
1244                         /* get file quota for new owner */
1245                         lquota_chkquota(mds_quota_interface_ref, obd,
1246                                         qnids[USRQUOTA], qnids[GRPQUOTA], 1,
1247                                         &inode_pending, NULL, 0, NULL, 0);
1248                         block_count = (la_tmp->la_blocks + 7) >> 3;
1249                         if (block_count)
1250                                 /* get block quota for new owner */
1251                                 lquota_chkquota(mds_quota_interface_ref, obd,
1252                                                 qnids[USRQUOTA],
1253                                                 qnids[GRPQUOTA],
1254                                                 block_count, &block_pending,
1255                                                 NULL, LQUOTA_FLAGS_BLK,
1256                                                 NULL, 0);
1257                 }
1258         }
1259 #endif
1260
1261         if (la_copy->la_valid & LA_FLAGS) {
1262                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1263                                                   handle, 1);
1264                 if (rc == 0)
1265                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1266         } else if (la_copy->la_valid) {            /* setattr */
1267                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1268                                                   handle, 1);
1269                 /* journal chown/chgrp in llog, just like unlink */
1270                 if (rc == 0 && lmm_size){
1271                         cookie_size = mdd_lov_cookiesize(env, mdd);
1272                         logcookies = mdd_max_cookie_get(env, mdd);
1273                         if (logcookies == NULL)
1274                                 GOTO(cleanup, rc = -ENOMEM);
1275
1276                         if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1277                                             logcookies, cookie_size) <= 0)
1278                                 logcookies = NULL;
1279                 }
1280         }
1281
1282         if (rc == 0 && ma->ma_valid & MA_LOV) {
1283                 umode_t mode;
1284
1285                 mode = mdd_object_type(mdd_obj);
1286                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1287                         rc = mdd_lsm_sanity_check(env, mdd_obj);
1288                         if (rc)
1289                                 GOTO(cleanup, rc);
1290
1291                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1292                                             ma->ma_lmm_size, handle, 1);
1293                 }
1294
1295         }
1296 cleanup:
1297         if ((rc == 0) && (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME)))
1298                 rc = mdd_changelog_data_store(env, mdd, CL_SETATTR, mdd_obj,
1299                                               handle);
1300         mdd_trans_stop(env, mdd, rc, handle);
1301         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1302                 /*set obd attr, if needed*/
1303                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1304                                            logcookies);
1305         }
1306 #ifdef HAVE_QUOTA_SUPPORT
1307         if (quota_opc) {
1308                 if (inode_pending)
1309                         lquota_pending_commit(mds_quota_interface_ref, obd,
1310                                               qnids[USRQUOTA], qnids[GRPQUOTA],
1311                                               inode_pending, 0);
1312                 if (block_pending)
1313                         lquota_pending_commit(mds_quota_interface_ref, obd,
1314                                               qnids[USRQUOTA], qnids[GRPQUOTA],
1315                                               block_pending, 1);
1316                 /* Trigger dqrel/dqacq for original owner and new owner.
1317                  * If failed, the next call for lquota_chkquota will
1318                  * process it. */
1319                 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1320                               quota_opc);
1321         }
1322 #endif
1323         RETURN(rc);
1324 }
1325
1326 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1327                       const struct lu_buf *buf, const char *name, int fl,
1328                       struct thandle *handle)
1329 {
1330         int  rc;
1331         ENTRY;
1332
1333         mdd_write_lock(env, obj, MOR_TGT_CHILD);
1334         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1335         mdd_write_unlock(env, obj);
1336
1337         RETURN(rc);
1338 }
1339
1340 static int mdd_xattr_sanity_check(const struct lu_env *env,
1341                                   struct mdd_object *obj)
1342 {
1343         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1344         struct md_ucred *uc     = md_ucred(env);
1345         int rc;
1346         ENTRY;
1347
1348         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1349                 RETURN(-EPERM);
1350
1351         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1352         if (rc)
1353                 RETURN(rc);
1354
1355         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1356             !mdd_capable(uc, CFS_CAP_FOWNER))
1357                 RETURN(-EPERM);
1358
1359         RETURN(rc);
1360 }
1361
1362 /**
1363  * The caller should guarantee to update the object ctime
1364  * after xattr_set if needed.
1365  */
1366 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1367                          const struct lu_buf *buf, const char *name,
1368                          int fl)
1369 {
1370         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1371         struct mdd_device *mdd = mdo2mdd(obj);
1372         struct thandle *handle;
1373         int  rc;
1374         ENTRY;
1375
1376         rc = mdd_xattr_sanity_check(env, mdd_obj);
1377         if (rc)
1378                 RETURN(rc);
1379
1380         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1381         handle = mdd_trans_start(env, mdd);
1382         if (IS_ERR(handle))
1383                 RETURN(PTR_ERR(handle));
1384
1385         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1386
1387         /* Only record user xattr changes */
1388         if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1389             (strncmp("user.", name, 5) == 0))
1390                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1391                                               handle);
1392         mdd_trans_stop(env, mdd, rc, handle);
1393
1394         RETURN(rc);
1395 }
1396
1397 /**
1398  * The caller should guarantee to update the object ctime
1399  * after xattr_set if needed.
1400  */
1401 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1402                   const char *name)
1403 {
1404         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1405         struct mdd_device *mdd = mdo2mdd(obj);
1406         struct thandle *handle;
1407         int  rc;
1408         ENTRY;
1409
1410         rc = mdd_xattr_sanity_check(env, mdd_obj);
1411         if (rc)
1412                 RETURN(rc);
1413
1414         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1415         handle = mdd_trans_start(env, mdd);
1416         if (IS_ERR(handle))
1417                 RETURN(PTR_ERR(handle));
1418
1419         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1420         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1421                            mdd_object_capa(env, mdd_obj));
1422         mdd_write_unlock(env, mdd_obj);
1423
1424         /* Only record user xattr changes */
1425         if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1426             (strncmp("user.", name, 5) != 0))
1427                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1428                                               handle);
1429
1430         mdd_trans_stop(env, mdd, rc, handle);
1431
1432         RETURN(rc);
1433 }
1434
1435 /* partial unlink */
1436 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1437                        struct md_attr *ma)
1438 {
1439         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1440         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1441         struct mdd_device *mdd = mdo2mdd(obj);
1442         struct thandle *handle;
1443 #ifdef HAVE_QUOTA_SUPPORT
1444         struct obd_device *obd = mdd->mdd_obd_dev;
1445         struct mds_obd *mds = &obd->u.mds;
1446         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1447         int quota_opc = 0;
1448 #endif
1449         int rc;
1450         ENTRY;
1451
1452         /*
1453          * Check -ENOENT early here because we need to get object type
1454          * to calculate credits before transaction start
1455          */
1456         if (!mdd_object_exists(mdd_obj))
1457                 RETURN(-ENOENT);
1458
1459         LASSERT(mdd_object_exists(mdd_obj) > 0);
1460
1461         rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1462         if (rc)
1463                 RETURN(rc);
1464
1465         handle = mdd_trans_start(env, mdd);
1466         if (IS_ERR(handle))
1467                 RETURN(-ENOMEM);
1468
1469         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1470
1471         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1472         if (rc)
1473                 GOTO(cleanup, rc);
1474
1475         __mdd_ref_del(env, mdd_obj, handle, 0);
1476
1477         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1478                 /* unlink dot */
1479                 __mdd_ref_del(env, mdd_obj, handle, 1);
1480         }
1481
1482         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1483         la_copy->la_ctime = ma->ma_attr.la_ctime;
1484
1485         la_copy->la_valid = LA_CTIME;
1486         rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1487         if (rc)
1488                 GOTO(cleanup, rc);
1489
1490         rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1491 #ifdef HAVE_QUOTA_SUPPORT
1492         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1493             ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1494                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1495                 mdd_quota_wrapper(&ma->ma_attr, qids);
1496         }
1497 #endif
1498
1499
1500         EXIT;
1501 cleanup:
1502         mdd_write_unlock(env, mdd_obj);
1503         mdd_trans_stop(env, mdd, rc, handle);
1504 #ifdef HAVE_QUOTA_SUPPORT
1505         if (quota_opc)
1506                 /* Trigger dqrel on the owner of child. If failed,
1507                  * the next call for lquota_chkquota will process it */
1508                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1509                               quota_opc);
1510 #endif
1511         return rc;
1512 }
1513
1514 /* partial operation */
1515 static int mdd_oc_sanity_check(const struct lu_env *env,
1516                                struct mdd_object *obj,
1517                                struct md_attr *ma)
1518 {
1519         int rc;
1520         ENTRY;
1521
1522         switch (ma->ma_attr.la_mode & S_IFMT) {
1523         case S_IFREG:
1524         case S_IFDIR:
1525         case S_IFLNK:
1526         case S_IFCHR:
1527         case S_IFBLK:
1528         case S_IFIFO:
1529         case S_IFSOCK:
1530                 rc = 0;
1531                 break;
1532         default:
1533                 rc = -EINVAL;
1534                 break;
1535         }
1536         RETURN(rc);
1537 }
1538
1539 static int mdd_object_create(const struct lu_env *env,
1540                              struct md_object *obj,
1541                              const struct md_op_spec *spec,
1542                              struct md_attr *ma)
1543 {
1544
1545         struct mdd_device *mdd = mdo2mdd(obj);
1546         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1547         const struct lu_fid *pfid = spec->u.sp_pfid;
1548         struct thandle *handle;
1549 #ifdef HAVE_QUOTA_SUPPORT
1550         struct obd_device *obd = mdd->mdd_obd_dev;
1551         struct mds_obd *mds = &obd->u.mds;
1552         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1553         int quota_opc = 0, block_count = 0;
1554         int inode_pending = 0, block_pending = 0;
1555 #endif
1556         int rc = 0;
1557         ENTRY;
1558
1559 #ifdef HAVE_QUOTA_SUPPORT
1560         if (mds->mds_quota) {
1561                 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1562                 mdd_quota_wrapper(&ma->ma_attr, qids);
1563                 /* get file quota for child */
1564                 lquota_chkquota(mds_quota_interface_ref, obd, qids[USRQUOTA],
1565                                 qids[GRPQUOTA], 1, &inode_pending, NULL, 0,
1566                                 NULL, 0);
1567                 switch (ma->ma_attr.la_mode & S_IFMT) {
1568                 case S_IFLNK:
1569                 case S_IFDIR:
1570                         block_count = 2;
1571                         break;
1572                 case S_IFREG:
1573                         block_count = 1;
1574                         break;
1575                 }
1576                 /* get block quota for child */
1577                 if (block_count)
1578                         lquota_chkquota(mds_quota_interface_ref, obd,
1579                                         qids[USRQUOTA], qids[GRPQUOTA],
1580                                         block_count, &block_pending, NULL,
1581                                         LQUOTA_FLAGS_BLK, NULL, 0);
1582         }
1583 #endif
1584
1585         mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
1586         handle = mdd_trans_start(env, mdd);
1587         if (IS_ERR(handle))
1588                 GOTO(out_pending, rc = PTR_ERR(handle));
1589
1590         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1591         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1592         if (rc)
1593                 GOTO(unlock, rc);
1594
1595         rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1596         if (rc)
1597                 GOTO(unlock, rc);
1598
1599         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1600                 /* If creating the slave object, set slave EA here. */
1601                 int lmv_size = spec->u.sp_ea.eadatalen;
1602                 struct lmv_stripe_md *lmv;
1603
1604                 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1605                 LASSERT(lmv != NULL && lmv_size > 0);
1606
1607                 rc = __mdd_xattr_set(env, mdd_obj,
1608                                      mdd_buf_get_const(env, lmv, lmv_size),
1609                                      XATTR_NAME_LMV, 0, handle);
1610                 if (rc)
1611                         GOTO(unlock, rc);
1612
1613                 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1614                                            handle, 0);
1615         } else {
1616 #ifdef CONFIG_FS_POSIX_ACL
1617                 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1618                         struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1619
1620                         buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1621                         buf->lb_len = spec->u.sp_ea.eadatalen;
1622                         if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1623                                 rc = __mdd_acl_init(env, mdd_obj, buf,
1624                                                     &ma->ma_attr.la_mode,
1625                                                     handle);
1626                                 if (rc)
1627                                         GOTO(unlock, rc);
1628                                 else
1629                                         ma->ma_attr.la_valid |= LA_MODE;
1630                         }
1631
1632                         pfid = spec->u.sp_ea.fid;
1633                 }
1634 #endif
1635                 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1636                                            spec);
1637         }
1638         EXIT;
1639 unlock:
1640         if (rc == 0)
1641                 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1642         mdd_write_unlock(env, mdd_obj);
1643
1644         mdd_trans_stop(env, mdd, rc, handle);
1645 out_pending:
1646 #ifdef HAVE_QUOTA_SUPPORT
1647         if (quota_opc) {
1648                 if (inode_pending)
1649                         lquota_pending_commit(mds_quota_interface_ref, obd,
1650                                               qids[USRQUOTA], qids[GRPQUOTA],
1651                                               inode_pending, 0);
1652                 if (block_pending)
1653                         lquota_pending_commit(mds_quota_interface_ref, obd,
1654                                               qids[USRQUOTA], qids[GRPQUOTA],
1655                                               block_pending, 1);
1656                 /* Trigger dqacq on the owner of child. If failed,
1657                  * the next call for lquota_chkquota will process it. */
1658                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1659                               FSFILT_OP_CREATE_PARTIAL_CHILD);
1660         }
1661 #endif
1662         return rc;
1663 }
1664
1665 /* partial link */
1666 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1667                        const struct md_attr *ma)
1668 {
1669         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1670         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1671         struct mdd_device *mdd = mdo2mdd(obj);
1672         struct thandle *handle;
1673         int rc;
1674         ENTRY;
1675
1676         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1677         handle = mdd_trans_start(env, mdd);
1678         if (IS_ERR(handle))
1679                 RETURN(-ENOMEM);
1680
1681         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1682         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1683         if (rc == 0)
1684                 __mdd_ref_add(env, mdd_obj, handle);
1685         mdd_write_unlock(env, mdd_obj);
1686         if (rc == 0) {
1687                 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1688                 la_copy->la_ctime = ma->ma_attr.la_ctime;
1689
1690                 la_copy->la_valid = LA_CTIME;
1691                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1692                                                         handle, 0);
1693         }
1694         mdd_trans_stop(env, mdd, 0, handle);
1695
1696         RETURN(rc);
1697 }
1698
1699 /*
1700  * do NOT or the MAY_*'s, you'll get the weakest
1701  */
1702 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1703 {
1704         int res = 0;
1705
1706         /* Sadly, NFSD reopens a file repeatedly during operation, so the
1707          * "acc_mode = 0" allowance for newly-created files isn't honoured.
1708          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1709          * owner can write to a file even if it is marked readonly to hide
1710          * its brokenness. (bug 5781) */
1711         if (flags & MDS_OPEN_OWNEROVERRIDE) {
1712                 struct md_ucred *uc = md_ucred(env);
1713
1714                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1715                     (la->la_uid == uc->mu_fsuid))
1716                         return 0;
1717         }
1718
1719         if (flags & FMODE_READ)
1720                 res |= MAY_READ;
1721         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1722                 res |= MAY_WRITE;
1723         if (flags & MDS_FMODE_EXEC)
1724                 res |= MAY_EXEC;
1725         return res;
1726 }
1727
1728 static int mdd_open_sanity_check(const struct lu_env *env,
1729                                  struct mdd_object *obj, int flag)
1730 {
1731         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1732         int mode, rc;
1733         ENTRY;
1734
1735         /* EEXIST check */
1736         if (mdd_is_dead_obj(obj))
1737                 RETURN(-ENOENT);
1738
1739         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1740         if (rc)
1741                RETURN(rc);
1742
1743         if (S_ISLNK(tmp_la->la_mode))
1744                 RETURN(-ELOOP);
1745
1746         mode = accmode(env, tmp_la, flag);
1747
1748         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1749                 RETURN(-EISDIR);
1750
1751         if (!(flag & MDS_OPEN_CREATED)) {
1752                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1753                 if (rc)
1754                         RETURN(rc);
1755         }
1756
1757         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1758             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1759                 flag &= ~MDS_OPEN_TRUNC;
1760
1761         /* For writing append-only file must open it with append mode. */
1762         if (mdd_is_append(obj)) {
1763                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
1764                         RETURN(-EPERM);
1765                 if (flag & MDS_OPEN_TRUNC)
1766                         RETURN(-EPERM);
1767         }
1768
1769 #if 0
1770         /*
1771          * Now, flag -- O_NOATIME does not be packed by client.
1772          */
1773         if (flag & O_NOATIME) {
1774                 struct md_ucred *uc = md_ucred(env);
1775
1776                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
1777                     (uc->mu_valid == UCRED_NEW)) &&
1778                     (uc->mu_fsuid != tmp_la->la_uid) &&
1779                     !mdd_capable(uc, CFS_CAP_FOWNER))
1780                         RETURN(-EPERM);
1781         }
1782 #endif
1783
1784         RETURN(0);
1785 }
1786
1787 static int mdd_open(const struct lu_env *env, struct md_object *obj,
1788                     int flags)
1789 {
1790         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1791         int rc = 0;
1792
1793         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1794
1795         rc = mdd_open_sanity_check(env, mdd_obj, flags);
1796         if (rc == 0)
1797                 mdd_obj->mod_count++;
1798
1799         mdd_write_unlock(env, mdd_obj);
1800         return rc;
1801 }
1802
1803 /* return md_attr back,
1804  * if it is last unlink then return lov ea + llog cookie*/
1805 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
1806                     struct md_attr *ma)
1807 {
1808         int rc = 0;
1809         ENTRY;
1810
1811         if (S_ISREG(mdd_object_type(obj))) {
1812                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
1813                  * Caller must be ready for that. */
1814
1815                 rc = __mdd_lmm_get(env, obj, ma);
1816                 if ((ma->ma_valid & MA_LOV))
1817                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
1818                                             obj, ma);
1819         }
1820         RETURN(rc);
1821 }
1822
1823 /*
1824  * No permission check is needed.
1825  */
1826 static int mdd_close(const struct lu_env *env, struct md_object *obj,
1827                      struct md_attr *ma)
1828 {
1829         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1830         struct thandle    *handle;
1831         int rc;
1832         int reset = 1;
1833
1834 #ifdef HAVE_QUOTA_SUPPORT
1835         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
1836         struct mds_obd *mds = &obd->u.mds;
1837         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1838         int quota_opc = 0;
1839 #endif
1840         ENTRY;
1841
1842         rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1843         if (rc)
1844                 RETURN(rc);
1845         handle = mdd_trans_start(env, mdo2mdd(obj));
1846         if (IS_ERR(handle))
1847                 RETURN(PTR_ERR(handle));
1848
1849         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1850         /* release open count */
1851         mdd_obj->mod_count --;
1852
1853         if (mdd_obj->mod_count == 0) {
1854                 /* remove link to object from orphan index */
1855                 if (mdd_obj->mod_flags & ORPHAN_OBJ)
1856                         __mdd_orphan_del(env, mdd_obj, handle);
1857         }
1858
1859         rc = mdd_iattr_get(env, mdd_obj, ma);
1860         if (rc == 0) {
1861                 if (mdd_obj->mod_count == 0 && ma->ma_attr.la_nlink == 0) {
1862                         rc = mdd_object_kill(env, mdd_obj, ma);
1863 #ifdef HAVE_QUOTA_SUPPORT
1864                         if (mds->mds_quota) {
1865                                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1866                                 mdd_quota_wrapper(&ma->ma_attr, qids);
1867                         }
1868 #endif
1869                         if (rc == 0)
1870                                 reset = 0;
1871                 }
1872         }
1873
1874         if (reset)
1875                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
1876
1877         mdd_write_unlock(env, mdd_obj);
1878         mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
1879 #ifdef HAVE_QUOTA_SUPPORT
1880         if (quota_opc)
1881                 /* Trigger dqrel on the owner of child. If failed,
1882                  * the next call for lquota_chkquota will process it */
1883                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1884                               quota_opc);
1885 #endif
1886         RETURN(rc);
1887 }
1888
1889 /*
1890  * Permission check is done when open,
1891  * no need check again.
1892  */
1893 static int mdd_readpage_sanity_check(const struct lu_env *env,
1894                                      struct mdd_object *obj)
1895 {
1896         struct dt_object *next = mdd_object_child(obj);
1897         int rc;
1898         ENTRY;
1899
1900         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
1901                 rc = 0;
1902         else
1903                 rc = -ENOTDIR;
1904
1905         RETURN(rc);
1906 }
1907
1908 static int mdd_dir_page_build(const struct lu_env *env, int first,
1909                               void *area, int nob, const struct dt_it_ops *iops,
1910                               struct dt_it *it, __u64 *start, __u64 *end,
1911                               struct lu_dirent **last)
1912 {
1913         struct lu_fid          *fid  = &mdd_env_info(env)->mti_fid2;
1914         struct mdd_thread_info *info = mdd_env_info(env);
1915         struct lu_fid_pack     *pack = &info->mti_pack;
1916         int                     result;
1917         struct lu_dirent       *ent;
1918
1919         if (first) {
1920                 memset(area, 0, sizeof (struct lu_dirpage));
1921                 area += sizeof (struct lu_dirpage);
1922                 nob  -= sizeof (struct lu_dirpage);
1923         }
1924
1925         LASSERT(nob > sizeof *ent);
1926
1927         ent  = area;
1928         result = 0;
1929         do {
1930                 char  *name;
1931                 int    len;
1932                 int    recsize;
1933                 __u64  hash;
1934
1935                 name = (char *)iops->key(env, it);
1936                 len  = iops->key_size(env, it);
1937
1938                 pack = (struct lu_fid_pack *)iops->rec(env, it);
1939                 result = fid_unpack(pack, fid);
1940                 if (result != 0)
1941                         break;
1942
1943                 recsize = (sizeof(*ent) + len + 7) & ~7;
1944                 hash = iops->store(env, it);
1945                 *end = hash;
1946
1947                 CDEBUG(D_INFO, "%p %p %d "DFID": "LPU64" (%d) \"%*.*s\"\n",
1948                        name, ent, nob, PFID(fid), hash, len, len, len, name);
1949
1950                 if (nob >= recsize) {
1951                         ent->lde_fid = *fid;
1952                         fid_cpu_to_le(&ent->lde_fid, &ent->lde_fid);
1953                         ent->lde_hash = hash;
1954                         ent->lde_namelen = cpu_to_le16(len);
1955                         ent->lde_reclen  = cpu_to_le16(recsize);
1956                         memcpy(ent->lde_name, name, len);
1957                         if (first && ent == area)
1958                                 *start = hash;
1959                         *last = ent;
1960                         ent = (void *)ent + recsize;
1961                         nob -= recsize;
1962                         result = iops->next(env, it);
1963                 } else {
1964                         /*
1965                          * record doesn't fit into page, enlarge previous one.
1966                          */
1967                         LASSERT(*last != NULL);
1968                         (*last)->lde_reclen =
1969                                 cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
1970                                             nob);
1971                         break;
1972                 }
1973         } while (result == 0);
1974
1975         return result;
1976 }
1977
1978 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
1979                           const struct lu_rdpg *rdpg)
1980 {
1981         struct dt_it      *it;
1982         struct dt_object  *next = mdd_object_child(obj);
1983         const struct dt_it_ops  *iops;
1984         struct page       *pg;
1985         struct lu_dirent  *last = NULL;
1986         int i;
1987         int rc;
1988         int nob;
1989         __u64 hash_start;
1990         __u64 hash_end = 0;
1991
1992         LASSERT(rdpg->rp_pages != NULL);
1993         LASSERT(next->do_index_ops != NULL);
1994
1995         if (rdpg->rp_count <= 0)
1996                 return -EFAULT;
1997
1998         /*
1999          * iterate through directory and fill pages from @rdpg
2000          */
2001         iops = &next->do_index_ops->dio_it;
2002         it = iops->init(env, next, mdd_object_capa(env, obj));
2003         if (IS_ERR(it))
2004                 return PTR_ERR(it);
2005
2006         rc = iops->load(env, it, rdpg->rp_hash);
2007
2008         if (rc == 0)
2009                 /*
2010                  * Iterator didn't find record with exactly the key requested.
2011                  *
2012                  * It is currently either
2013                  *
2014                  *     - positioned above record with key less than
2015                  *     requested---skip it.
2016                  *
2017                  *     - or not positioned at all (is in IAM_IT_SKEWED
2018                  *     state)---position it on the next item.
2019                  */
2020                 rc = iops->next(env, it);
2021         else if (rc > 0)
2022                 rc = 0;
2023
2024         /*
2025          * At this point and across for-loop:
2026          *
2027          *  rc == 0 -> ok, proceed.
2028          *  rc >  0 -> end of directory.
2029          *  rc <  0 -> error.
2030          */
2031         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2032              i++, nob -= CFS_PAGE_SIZE) {
2033                 LASSERT(i < rdpg->rp_npages);
2034                 pg = rdpg->rp_pages[i];
2035                 rc = mdd_dir_page_build(env, !i, cfs_kmap(pg),
2036                                         min_t(int, nob, CFS_PAGE_SIZE), iops,
2037                                         it, &hash_start, &hash_end, &last);
2038                 if (rc != 0 || i == rdpg->rp_npages - 1)
2039                         last->lde_reclen = 0;
2040                 cfs_kunmap(pg);
2041         }
2042         if (rc > 0) {
2043                 /*
2044                  * end of directory.
2045                  */
2046                 hash_end = DIR_END_OFF;
2047                 rc = 0;
2048         }
2049         if (rc == 0) {
2050                 struct lu_dirpage *dp;
2051
2052                 dp = cfs_kmap(rdpg->rp_pages[0]);
2053                 dp->ldp_hash_start = rdpg->rp_hash;
2054                 dp->ldp_hash_end   = hash_end;
2055                 if (i == 0)
2056                         /*
2057                          * No pages were processed, mark this.
2058                          */
2059                         dp->ldp_flags |= LDF_EMPTY;
2060                 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2061                 cfs_kunmap(rdpg->rp_pages[0]);
2062         }
2063         iops->put(env, it);
2064         iops->fini(env, it);
2065
2066         return rc;
2067 }
2068
2069 static int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2070                         const struct lu_rdpg *rdpg)
2071 {
2072         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2073         int rc;
2074         ENTRY;
2075
2076         LASSERT(mdd_object_exists(mdd_obj));
2077
2078         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2079         rc = mdd_readpage_sanity_check(env, mdd_obj);
2080         if (rc)
2081                 GOTO(out_unlock, rc);
2082
2083         if (mdd_is_dead_obj(mdd_obj)) {
2084                 struct page *pg;
2085                 struct lu_dirpage *dp;
2086
2087                 /*
2088                  * According to POSIX, please do not return any entry to client:
2089                  * even dot and dotdot should not be returned.
2090                  */
2091                 CWARN("readdir from dead object: "DFID"\n",
2092                         PFID(mdd_object_fid(mdd_obj)));
2093
2094                 if (rdpg->rp_count <= 0)
2095                         GOTO(out_unlock, rc = -EFAULT);
2096                 LASSERT(rdpg->rp_pages != NULL);
2097
2098                 pg = rdpg->rp_pages[0];
2099                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2100                 memset(dp, 0 , sizeof(struct lu_dirpage));
2101                 dp->ldp_hash_start = rdpg->rp_hash;
2102                 dp->ldp_hash_end   = DIR_END_OFF;
2103                 dp->ldp_flags |= LDF_EMPTY;
2104                 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2105                 cfs_kunmap(pg);
2106                 GOTO(out_unlock, rc = 0);
2107         }
2108
2109         rc = __mdd_readpage(env, mdd_obj, rdpg);
2110
2111         EXIT;
2112 out_unlock:
2113         mdd_read_unlock(env, mdd_obj);
2114         return rc;
2115 }
2116
2117 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2118 {
2119         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2120         struct dt_object *next;
2121
2122         LASSERT(mdd_object_exists(mdd_obj));
2123         next = mdd_object_child(mdd_obj);
2124         return next->do_ops->do_object_sync(env, next);
2125 }
2126
2127 const struct md_object_operations mdd_obj_ops = {
2128         .moo_permission    = mdd_permission,
2129         .moo_attr_get      = mdd_attr_get,
2130         .moo_attr_set      = mdd_attr_set,
2131         .moo_xattr_get     = mdd_xattr_get,
2132         .moo_xattr_set     = mdd_xattr_set,
2133         .moo_xattr_list    = mdd_xattr_list,
2134         .moo_xattr_del     = mdd_xattr_del,
2135         .moo_object_create = mdd_object_create,
2136         .moo_ref_add       = mdd_ref_add,
2137         .moo_ref_del       = mdd_ref_del,
2138         .moo_open          = mdd_open,
2139         .moo_close         = mdd_close,
2140         .moo_readpage      = mdd_readpage,
2141         .moo_readlink      = mdd_readlink,
2142         .moo_capa_get      = mdd_capa_get,
2143         .moo_object_sync   = mdd_object_sync,
2144         .moo_path          = mdd_path,
2145 };