Whamcloud - gitweb
LU-848 clio: page writeback support
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * Copyright (c) 2011 Whamcloud, Inc.
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  *
39  * lustre/mdd/mdd_object.c
40  *
41  * Lustre Metadata Server (mdd) routines
42  *
43  * Author: Wang Di <wangdi@clusterfs.com>
44  */
45
46 #ifndef EXPORT_SYMTAB
47 # define EXPORT_SYMTAB
48 #endif
49 #define DEBUG_SUBSYSTEM S_MDS
50
51 #include <linux/module.h>
52 #include <obd.h>
53 #include <obd_class.h>
54 #include <obd_support.h>
55 #include <lprocfs_status.h>
56 /* fid_be_cpu(), fid_cpu_to_be(). */
57 #include <lustre_fid.h>
58
59 #include <lustre_param.h>
60 #include <lustre_mds.h>
61 #include <lustre/lustre_idl.h>
62
63 #include "mdd_internal.h"
64
65 static const struct lu_object_operations mdd_lu_obj_ops;
66
67 static int mdd_xattr_get(const struct lu_env *env,
68                          struct md_object *obj, struct lu_buf *buf,
69                          const char *name);
70
71 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
72                  void **data)
73 {
74         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
75                  PFID(mdd_object_fid(obj)));
76         mdo_data_get(env, obj, data);
77         return 0;
78 }
79
80 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
81                struct lu_attr *la, struct lustre_capa *capa)
82 {
83         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
84                  PFID(mdd_object_fid(obj)));
85         return mdo_attr_get(env, obj, la, capa);
86 }
87
88 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
89 {
90         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
91
92         if (flags & LUSTRE_APPEND_FL)
93                 obj->mod_flags |= APPEND_OBJ;
94
95         if (flags & LUSTRE_IMMUTABLE_FL)
96                 obj->mod_flags |= IMMUTE_OBJ;
97 }
98
99 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
100 {
101         struct mdd_thread_info *info;
102
103         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
104         LASSERT(info != NULL);
105         return info;
106 }
107
108 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
109 {
110         struct lu_buf *buf;
111
112         buf = &mdd_env_info(env)->mti_buf;
113         buf->lb_buf = area;
114         buf->lb_len = len;
115         return buf;
116 }
117
118 void mdd_buf_put(struct lu_buf *buf)
119 {
120         if (buf == NULL || buf->lb_buf == NULL)
121                 return;
122         OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
123         buf->lb_buf = NULL;
124         buf->lb_len = 0;
125 }
126
127 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
128                                        const void *area, ssize_t len)
129 {
130         struct lu_buf *buf;
131
132         buf = &mdd_env_info(env)->mti_buf;
133         buf->lb_buf = (void *)area;
134         buf->lb_len = len;
135         return buf;
136 }
137
138 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
139 {
140         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
141
142         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
143                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
144                 buf->lb_buf = NULL;
145         }
146         if (buf->lb_buf == NULL) {
147                 buf->lb_len = len;
148                 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
149                 if (buf->lb_buf == NULL)
150                         buf->lb_len = 0;
151         }
152         return buf;
153 }
154
155 /** Increase the size of the \a mti_big_buf.
156  * preserves old data in buffer
157  * old buffer remains unchanged on error
158  * \retval 0 or -ENOMEM
159  */
160 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
161 {
162         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
163         struct lu_buf buf;
164
165         LASSERT(len >= oldbuf->lb_len);
166         OBD_ALLOC_LARGE(buf.lb_buf, len);
167
168         if (buf.lb_buf == NULL)
169                 return -ENOMEM;
170
171         buf.lb_len = len;
172         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
173
174         OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
175
176         memcpy(oldbuf, &buf, sizeof(buf));
177
178         return 0;
179 }
180
181 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
182                                        struct mdd_device *mdd)
183 {
184         struct mdd_thread_info *mti = mdd_env_info(env);
185         int                     max_cookie_size;
186
187         max_cookie_size = mdd_lov_cookiesize(env, mdd);
188         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
189                 if (mti->mti_max_cookie)
190                         OBD_FREE_LARGE(mti->mti_max_cookie,
191                                        mti->mti_max_cookie_size);
192                 mti->mti_max_cookie = NULL;
193                 mti->mti_max_cookie_size = 0;
194         }
195         if (unlikely(mti->mti_max_cookie == NULL)) {
196                 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
197                 if (likely(mti->mti_max_cookie != NULL))
198                         mti->mti_max_cookie_size = max_cookie_size;
199         }
200         if (likely(mti->mti_max_cookie != NULL))
201                 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
202         return mti->mti_max_cookie;
203 }
204
205 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
206                                    struct mdd_device *mdd)
207 {
208         struct mdd_thread_info *mti = mdd_env_info(env);
209         int                     max_lmm_size;
210
211         max_lmm_size = mdd_lov_mdsize(env, mdd);
212         if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
213                 if (mti->mti_max_lmm)
214                         OBD_FREE_LARGE(mti->mti_max_lmm, mti->mti_max_lmm_size);
215                 mti->mti_max_lmm = NULL;
216                 mti->mti_max_lmm_size = 0;
217         }
218         if (unlikely(mti->mti_max_lmm == NULL)) {
219                 OBD_ALLOC_LARGE(mti->mti_max_lmm, max_lmm_size);
220                 if (likely(mti->mti_max_lmm != NULL))
221                         mti->mti_max_lmm_size = max_lmm_size;
222         }
223         return mti->mti_max_lmm;
224 }
225
226 struct lu_object *mdd_object_alloc(const struct lu_env *env,
227                                    const struct lu_object_header *hdr,
228                                    struct lu_device *d)
229 {
230         struct mdd_object *mdd_obj;
231
232         OBD_ALLOC_PTR(mdd_obj);
233         if (mdd_obj != NULL) {
234                 struct lu_object *o;
235
236                 o = mdd2lu_obj(mdd_obj);
237                 lu_object_init(o, NULL, d);
238                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
239                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
240                 mdd_obj->mod_count = 0;
241                 o->lo_ops = &mdd_lu_obj_ops;
242                 return o;
243         } else {
244                 return NULL;
245         }
246 }
247
248 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
249                            const struct lu_object_conf *unused)
250 {
251         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
252         struct mdd_object *mdd_obj = lu2mdd_obj(o);
253         struct lu_object  *below;
254         struct lu_device  *under;
255         ENTRY;
256
257         mdd_obj->mod_cltime = 0;
258         under = &d->mdd_child->dd_lu_dev;
259         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
260         mdd_pdlock_init(mdd_obj);
261         if (below == NULL)
262                 RETURN(-ENOMEM);
263
264         lu_object_add(o, below);
265
266         RETURN(0);
267 }
268
269 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
270 {
271         if (lu_object_exists(o))
272                 return mdd_get_flags(env, lu2mdd_obj(o));
273         else
274                 return 0;
275 }
276
277 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
278 {
279         struct mdd_object *mdd = lu2mdd_obj(o);
280
281         lu_object_fini(o);
282         OBD_FREE_PTR(mdd);
283 }
284
285 static int mdd_object_print(const struct lu_env *env, void *cookie,
286                             lu_printer_t p, const struct lu_object *o)
287 {
288         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
289         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
290                     "valid=%x, cltime="LPU64", flags=%lx)",
291                     mdd, mdd->mod_count, mdd->mod_valid,
292                     mdd->mod_cltime, mdd->mod_flags);
293 }
294
295 static const struct lu_object_operations mdd_lu_obj_ops = {
296         .loo_object_init    = mdd_object_init,
297         .loo_object_start   = mdd_object_start,
298         .loo_object_free    = mdd_object_free,
299         .loo_object_print   = mdd_object_print,
300 };
301
302 struct mdd_object *mdd_object_find(const struct lu_env *env,
303                                    struct mdd_device *d,
304                                    const struct lu_fid *f)
305 {
306         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
307 }
308
309 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
310                         const char *path, struct lu_fid *fid)
311 {
312         struct lu_buf *buf;
313         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
314         struct mdd_object *obj;
315         struct lu_name *lname = &mdd_env_info(env)->mti_name;
316         char *name;
317         int rc = 0;
318         ENTRY;
319
320         /* temp buffer for path element */
321         buf = mdd_buf_alloc(env, PATH_MAX);
322         if (buf->lb_buf == NULL)
323                 RETURN(-ENOMEM);
324
325         lname->ln_name = name = buf->lb_buf;
326         lname->ln_namelen = 0;
327         *f = mdd->mdd_root_fid;
328
329         while(1) {
330                 while (*path == '/')
331                         path++;
332                 if (*path == '\0')
333                         break;
334                 while (*path != '/' && *path != '\0') {
335                         *name = *path;
336                         path++;
337                         name++;
338                         lname->ln_namelen++;
339                 }
340
341                 *name = '\0';
342                 /* find obj corresponding to fid */
343                 obj = mdd_object_find(env, mdd, f);
344                 if (obj == NULL)
345                         GOTO(out, rc = -EREMOTE);
346                 if (IS_ERR(obj))
347                         GOTO(out, rc = PTR_ERR(obj));
348                 /* get child fid from parent and name */
349                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
350                 mdd_object_put(env, obj);
351                 if (rc)
352                         break;
353
354                 name = buf->lb_buf;
355                 lname->ln_namelen = 0;
356         }
357
358         if (!rc)
359                 *fid = *f;
360 out:
361         RETURN(rc);
362 }
363
364 /** The maximum depth that fid2path() will search.
365  * This is limited only because we want to store the fids for
366  * historical path lookup purposes.
367  */
368 #define MAX_PATH_DEPTH 100
369
370 /** mdd_path() lookup structure. */
371 struct path_lookup_info {
372         __u64                pli_recno;        /**< history point */
373         __u64                pli_currec;       /**< current record */
374         struct lu_fid        pli_fid;
375         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
376         struct mdd_object   *pli_mdd_obj;
377         char                *pli_path;         /**< full path */
378         int                  pli_pathlen;
379         int                  pli_linkno;       /**< which hardlink to follow */
380         int                  pli_fidcount;     /**< number of \a pli_fids */
381 };
382
383 static int mdd_path_current(const struct lu_env *env,
384                             struct path_lookup_info *pli)
385 {
386         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
387         struct mdd_object *mdd_obj;
388         struct lu_buf     *buf = NULL;
389         struct link_ea_header *leh;
390         struct link_ea_entry  *lee;
391         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
392         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
393         char *ptr;
394         int reclen;
395         int rc;
396         ENTRY;
397
398         ptr = pli->pli_path + pli->pli_pathlen - 1;
399         *ptr = 0;
400         --ptr;
401         pli->pli_fidcount = 0;
402         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
403
404         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
405                 mdd_obj = mdd_object_find(env, mdd,
406                                           &pli->pli_fids[pli->pli_fidcount]);
407                 if (mdd_obj == NULL)
408                         GOTO(out, rc = -EREMOTE);
409                 if (IS_ERR(mdd_obj))
410                         GOTO(out, rc = PTR_ERR(mdd_obj));
411                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
412                 if (rc <= 0) {
413                         mdd_object_put(env, mdd_obj);
414                         if (rc == -1)
415                                 rc = -EREMOTE;
416                         else if (rc == 0)
417                                 /* Do I need to error out here? */
418                                 rc = -ENOENT;
419                         GOTO(out, rc);
420                 }
421
422                 /* Get parent fid and object name */
423                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
424                 buf = mdd_links_get(env, mdd_obj);
425                 mdd_read_unlock(env, mdd_obj);
426                 mdd_object_put(env, mdd_obj);
427                 if (IS_ERR(buf))
428                         GOTO(out, rc = PTR_ERR(buf));
429
430                 leh = buf->lb_buf;
431                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
432                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
433
434                 /* If set, use link #linkno for path lookup, otherwise use
435                    link #0.  Only do this for the final path element. */
436                 if ((pli->pli_fidcount == 0) &&
437                     (pli->pli_linkno < leh->leh_reccount)) {
438                         int count;
439                         for (count = 0; count < pli->pli_linkno; count++) {
440                                 lee = (struct link_ea_entry *)
441                                      ((char *)lee + reclen);
442                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
443                         }
444                         if (pli->pli_linkno < leh->leh_reccount - 1)
445                                 /* indicate to user there are more links */
446                                 pli->pli_linkno++;
447                 }
448
449                 /* Pack the name in the end of the buffer */
450                 ptr -= tmpname->ln_namelen;
451                 if (ptr - 1 <= pli->pli_path)
452                         GOTO(out, rc = -EOVERFLOW);
453                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
454                 *(--ptr) = '/';
455
456                 /* Store the parent fid for historic lookup */
457                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
458                         GOTO(out, rc = -EOVERFLOW);
459                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
460         }
461
462         /* Verify that our path hasn't changed since we started the lookup.
463            Record the current index, and verify the path resolves to the
464            same fid. If it does, then the path is correct as of this index. */
465         cfs_spin_lock(&mdd->mdd_cl.mc_lock);
466         pli->pli_currec = mdd->mdd_cl.mc_index;
467         cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
468         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
469         if (rc) {
470                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
471                 GOTO (out, rc = -EAGAIN);
472         }
473         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
474                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
475                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
476                        PFID(&pli->pli_fid));
477                 GOTO(out, rc = -EAGAIN);
478         }
479         ptr++; /* skip leading / */
480         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
481
482         EXIT;
483 out:
484         if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
485                 /* if we vmalloced a large buffer drop it */
486                 mdd_buf_put(buf);
487
488         return rc;
489 }
490
491 static int mdd_path_historic(const struct lu_env *env,
492                              struct path_lookup_info *pli)
493 {
494         return 0;
495 }
496
497 /* Returns the full path to this fid, as of changelog record recno. */
498 static int mdd_path(const struct lu_env *env, struct md_object *obj,
499                     char *path, int pathlen, __u64 *recno, int *linkno)
500 {
501         struct path_lookup_info *pli;
502         int tries = 3;
503         int rc = -EAGAIN;
504         ENTRY;
505
506         if (pathlen < 3)
507                 RETURN(-EOVERFLOW);
508
509         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
510                 path[0] = '\0';
511                 RETURN(0);
512         }
513
514         OBD_ALLOC_PTR(pli);
515         if (pli == NULL)
516                 RETURN(-ENOMEM);
517
518         pli->pli_mdd_obj = md2mdd_obj(obj);
519         pli->pli_recno = *recno;
520         pli->pli_path = path;
521         pli->pli_pathlen = pathlen;
522         pli->pli_linkno = *linkno;
523
524         /* Retry multiple times in case file is being moved */
525         while (tries-- && rc == -EAGAIN)
526                 rc = mdd_path_current(env, pli);
527
528         /* For historical path lookup, the current links may not have existed
529          * at "recno" time.  We must switch over to earlier links/parents
530          * by using the changelog records.  If the earlier parent doesn't
531          * exist, we must search back through the changelog to reconstruct
532          * its parents, then check if it exists, etc.
533          * We may ignore this problem for the initial implementation and
534          * state that an "original" hardlink must still exist for us to find
535          * historic path name. */
536         if (pli->pli_recno != -1) {
537                 rc = mdd_path_historic(env, pli);
538         } else {
539                 *recno = pli->pli_currec;
540                 /* Return next link index to caller */
541                 *linkno = pli->pli_linkno;
542         }
543
544         OBD_FREE_PTR(pli);
545
546         RETURN (rc);
547 }
548
549 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
550 {
551         struct lu_attr *la = &mdd_env_info(env)->mti_la;
552         int rc;
553
554         ENTRY;
555         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
556         if (rc == 0) {
557                 mdd_flags_xlate(obj, la->la_flags);
558                 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
559                         obj->mod_flags |= MNLINK_OBJ;
560         }
561         RETURN(rc);
562 }
563
564 /* get only inode attributes */
565 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
566                   struct md_attr *ma)
567 {
568         int rc = 0;
569         ENTRY;
570
571         if (ma->ma_valid & MA_INODE)
572                 RETURN(0);
573
574         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
575                           mdd_object_capa(env, mdd_obj));
576         if (rc == 0)
577                 ma->ma_valid |= MA_INODE;
578         RETURN(rc);
579 }
580
581 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
582 {
583         struct lov_desc *ldesc;
584         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
585         struct lov_user_md *lum = (struct lov_user_md*)lmm;
586         ENTRY;
587
588         if (!lum)
589                 RETURN(0);
590
591         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
592         LASSERT(ldesc != NULL);
593
594         lum->lmm_magic = LOV_MAGIC_V1;
595         lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
596         lum->lmm_pattern = ldesc->ld_pattern;
597         lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
598         lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
599         lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
600
601         RETURN(sizeof(*lum));
602 }
603
604 static int is_rootdir(struct mdd_object *mdd_obj)
605 {
606         const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
607         const struct lu_fid *fid = mdo2fid(mdd_obj);
608
609         return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
610 }
611
612 /* get lov EA only */
613 static int __mdd_lmm_get(const struct lu_env *env,
614                          struct mdd_object *mdd_obj, struct md_attr *ma)
615 {
616         int rc;
617         ENTRY;
618
619         if (ma->ma_valid & MA_LOV)
620                 RETURN(0);
621
622         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
623                         XATTR_NAME_LOV);
624         if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
625                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
626         if (rc > 0) {
627                 ma->ma_lmm_size = rc;
628                 ma->ma_valid |= MA_LOV;
629                 rc = 0;
630         }
631         RETURN(rc);
632 }
633
634 /* get the first parent fid from link EA */
635 static int mdd_pfid_get(const struct lu_env *env,
636                         struct mdd_object *mdd_obj, struct md_attr *ma)
637 {
638         struct lu_buf *buf;
639         struct link_ea_header *leh;
640         struct link_ea_entry *lee;
641         struct lu_fid *pfid = &ma->ma_pfid;
642         ENTRY;
643
644         if (ma->ma_valid & MA_PFID)
645                 RETURN(0);
646
647         buf = mdd_links_get(env, mdd_obj);
648         if (IS_ERR(buf))
649                 RETURN(PTR_ERR(buf));
650
651         leh = buf->lb_buf;
652         lee = (struct link_ea_entry *)(leh + 1);
653         memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
654         fid_be_to_cpu(pfid, pfid);
655         ma->ma_valid |= MA_PFID;
656         if (buf->lb_len > OBD_ALLOC_BIG)
657                 /* if we vmalloced a large buffer drop it */
658                 mdd_buf_put(buf);
659         RETURN(0);
660 }
661
662 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
663                        struct md_attr *ma)
664 {
665         int rc;
666         ENTRY;
667
668         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
669         rc = __mdd_lmm_get(env, mdd_obj, ma);
670         mdd_read_unlock(env, mdd_obj);
671         RETURN(rc);
672 }
673
674 /* get lmv EA only*/
675 static int __mdd_lmv_get(const struct lu_env *env,
676                          struct mdd_object *mdd_obj, struct md_attr *ma)
677 {
678         int rc;
679         ENTRY;
680
681         if (ma->ma_valid & MA_LMV)
682                 RETURN(0);
683
684         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
685                         XATTR_NAME_LMV);
686         if (rc > 0) {
687                 ma->ma_valid |= MA_LMV;
688                 rc = 0;
689         }
690         RETURN(rc);
691 }
692
693 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
694                          struct md_attr *ma)
695 {
696         struct mdd_thread_info *info = mdd_env_info(env);
697         struct lustre_mdt_attrs *lma =
698                                  (struct lustre_mdt_attrs *)info->mti_xattr_buf;
699         int lma_size;
700         int rc;
701         ENTRY;
702
703         /* If all needed data are already valid, nothing to do */
704         if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
705             (ma->ma_need & (MA_HSM | MA_SOM)))
706                 RETURN(0);
707
708         /* Read LMA from disk EA */
709         lma_size = sizeof(info->mti_xattr_buf);
710         rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
711         if (rc <= 0)
712                 RETURN(rc);
713
714         /* Useless to check LMA incompatibility because this is already done in
715          * osd_ea_fid_get(), and this will fail long before this code is
716          * called.
717          * So, if we are here, LMA is compatible.
718          */
719
720         lustre_lma_swab(lma);
721
722         /* Swab and copy LMA */
723         if (ma->ma_need & MA_HSM) {
724                 if (lma->lma_compat & LMAC_HSM)
725                         ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
726                 else
727                         ma->ma_hsm.mh_flags = 0;
728                 ma->ma_valid |= MA_HSM;
729         }
730
731         /* Copy SOM */
732         if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
733                 LASSERT(ma->ma_som != NULL);
734                 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
735                 ma->ma_som->msd_size    = lma->lma_som_size;
736                 ma->ma_som->msd_blocks  = lma->lma_som_blocks;
737                 ma->ma_som->msd_mountid = lma->lma_som_mountid;
738                 ma->ma_valid |= MA_SOM;
739         }
740
741         RETURN(0);
742 }
743
744 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
745                                  struct md_attr *ma)
746 {
747         int rc = 0;
748         ENTRY;
749
750         if (ma->ma_need & MA_INODE)
751                 rc = mdd_iattr_get(env, mdd_obj, ma);
752
753         if (rc == 0 && ma->ma_need & MA_LOV) {
754                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
755                     S_ISDIR(mdd_object_type(mdd_obj)))
756                         rc = __mdd_lmm_get(env, mdd_obj, ma);
757         }
758         if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
759                 if (S_ISREG(mdd_object_type(mdd_obj)))
760                         rc = mdd_pfid_get(env, mdd_obj, ma);
761         }
762         if (rc == 0 && ma->ma_need & MA_LMV) {
763                 if (S_ISDIR(mdd_object_type(mdd_obj)))
764                         rc = __mdd_lmv_get(env, mdd_obj, ma);
765         }
766         if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
767                 if (S_ISREG(mdd_object_type(mdd_obj)))
768                         rc = __mdd_lma_get(env, mdd_obj, ma);
769         }
770 #ifdef CONFIG_FS_POSIX_ACL
771         if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
772                 if (S_ISDIR(mdd_object_type(mdd_obj)))
773                         rc = mdd_def_acl_get(env, mdd_obj, ma);
774         }
775 #endif
776         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
777                rc, ma->ma_valid, ma->ma_lmm);
778         RETURN(rc);
779 }
780
781 int mdd_attr_get_internal_locked(const struct lu_env *env,
782                                  struct mdd_object *mdd_obj, struct md_attr *ma)
783 {
784         int rc;
785         int needlock = ma->ma_need &
786                        (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
787
788         if (needlock)
789                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
790         rc = mdd_attr_get_internal(env, mdd_obj, ma);
791         if (needlock)
792                 mdd_read_unlock(env, mdd_obj);
793         return rc;
794 }
795
796 /*
797  * No permission check is needed.
798  */
799 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
800                         struct md_attr *ma)
801 {
802         struct mdd_object *mdd_obj = md2mdd_obj(obj);
803         int                rc;
804
805         ENTRY;
806         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
807         RETURN(rc);
808 }
809
810 /*
811  * No permission check is needed.
812  */
813 static int mdd_xattr_get(const struct lu_env *env,
814                          struct md_object *obj, struct lu_buf *buf,
815                          const char *name)
816 {
817         struct mdd_object *mdd_obj = md2mdd_obj(obj);
818         int rc;
819
820         ENTRY;
821
822         LASSERT(mdd_object_exists(mdd_obj));
823
824         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
825         rc = mdo_xattr_get(env, mdd_obj, buf, name,
826                            mdd_object_capa(env, mdd_obj));
827         mdd_read_unlock(env, mdd_obj);
828
829         RETURN(rc);
830 }
831
832 /*
833  * Permission check is done when open,
834  * no need check again.
835  */
836 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
837                         struct lu_buf *buf)
838 {
839         struct mdd_object *mdd_obj = md2mdd_obj(obj);
840         struct dt_object  *next;
841         loff_t             pos = 0;
842         int                rc;
843         ENTRY;
844
845         LASSERT(mdd_object_exists(mdd_obj));
846
847         next = mdd_object_child(mdd_obj);
848         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
849         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
850                                          mdd_object_capa(env, mdd_obj));
851         mdd_read_unlock(env, mdd_obj);
852         RETURN(rc);
853 }
854
855 /*
856  * No permission check is needed.
857  */
858 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
859                           struct lu_buf *buf)
860 {
861         struct mdd_object *mdd_obj = md2mdd_obj(obj);
862         int rc;
863
864         ENTRY;
865
866         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
867         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
868         mdd_read_unlock(env, mdd_obj);
869
870         RETURN(rc);
871 }
872
873 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
874                                struct mdd_object *c, struct md_attr *ma,
875                                struct thandle *handle,
876                                const struct md_op_spec *spec)
877 {
878         struct lu_attr *attr = &ma->ma_attr;
879         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
880         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
881         const struct dt_index_features *feat = spec->sp_feat;
882         int rc;
883         ENTRY;
884
885         if (!mdd_object_exists(c)) {
886                 struct dt_object *next = mdd_object_child(c);
887                 LASSERT(next);
888
889                 if (feat != &dt_directory_features && feat != NULL)
890                         dof->dof_type = DFT_INDEX;
891                 else
892                         dof->dof_type = dt_mode_to_dft(attr->la_mode);
893
894                 dof->u.dof_idx.di_feat = feat;
895
896                 /* @hint will be initialized by underlying device. */
897                 next->do_ops->do_ah_init(env, hint,
898                                          p ? mdd_object_child(p) : NULL,
899                                          attr->la_mode & S_IFMT);
900
901                 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
902                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
903         } else
904                 rc = -EEXIST;
905
906         RETURN(rc);
907 }
908
909 /**
910  * Make sure the ctime is increased only.
911  */
912 static inline int mdd_attr_check(const struct lu_env *env,
913                                  struct mdd_object *obj,
914                                  struct lu_attr *attr)
915 {
916         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
917         int rc;
918         ENTRY;
919
920         if (attr->la_valid & LA_CTIME) {
921                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
922                 if (rc)
923                         RETURN(rc);
924
925                 if (attr->la_ctime < tmp_la->la_ctime)
926                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
927                 else if (attr->la_valid == LA_CTIME &&
928                          attr->la_ctime == tmp_la->la_ctime)
929                         attr->la_valid &= ~LA_CTIME;
930         }
931         RETURN(0);
932 }
933
934 int mdd_attr_set_internal(const struct lu_env *env,
935                           struct mdd_object *obj,
936                           struct lu_attr *attr,
937                           struct thandle *handle,
938                           int needacl)
939 {
940         int rc;
941         ENTRY;
942
943         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
944 #ifdef CONFIG_FS_POSIX_ACL
945         if (!rc && (attr->la_valid & LA_MODE) && needacl)
946                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
947 #endif
948         RETURN(rc);
949 }
950
951 int mdd_attr_check_set_internal(const struct lu_env *env,
952                                 struct mdd_object *obj,
953                                 struct lu_attr *attr,
954                                 struct thandle *handle,
955                                 int needacl)
956 {
957         int rc;
958         ENTRY;
959
960         rc = mdd_attr_check(env, obj, attr);
961         if (rc)
962                 RETURN(rc);
963
964         if (attr->la_valid)
965                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
966         RETURN(rc);
967 }
968
969 static int mdd_attr_set_internal_locked(const struct lu_env *env,
970                                         struct mdd_object *obj,
971                                         struct lu_attr *attr,
972                                         struct thandle *handle,
973                                         int needacl)
974 {
975         int rc;
976         ENTRY;
977
978         needacl = needacl && (attr->la_valid & LA_MODE);
979         if (needacl)
980                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
981         rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
982         if (needacl)
983                 mdd_write_unlock(env, obj);
984         RETURN(rc);
985 }
986
987 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
988                                        struct mdd_object *obj,
989                                        struct lu_attr *attr,
990                                        struct thandle *handle,
991                                        int needacl)
992 {
993         int rc;
994         ENTRY;
995
996         needacl = needacl && (attr->la_valid & LA_MODE);
997         if (needacl)
998                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
999         rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
1000         if (needacl)
1001                 mdd_write_unlock(env, obj);
1002         RETURN(rc);
1003 }
1004
1005 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
1006                     const struct lu_buf *buf, const char *name,
1007                     int fl, struct thandle *handle)
1008 {
1009         struct lustre_capa *capa = mdd_object_capa(env, obj);
1010         int rc = -EINVAL;
1011         ENTRY;
1012
1013         if (buf->lb_buf && buf->lb_len > 0)
1014                 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1015         else if (buf->lb_buf == NULL && buf->lb_len == 0)
1016                 rc = mdo_xattr_del(env, obj, name, handle, capa);
1017
1018         RETURN(rc);
1019 }
1020
1021 /*
1022  * This gives the same functionality as the code between
1023  * sys_chmod and inode_setattr
1024  * chown_common and inode_setattr
1025  * utimes and inode_setattr
1026  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1027  */
1028 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1029                         struct lu_attr *la, const struct md_attr *ma)
1030 {
1031         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
1032         struct md_ucred  *uc;
1033         int               rc;
1034         ENTRY;
1035
1036         if (!la->la_valid)
1037                 RETURN(0);
1038
1039         /* Do not permit change file type */
1040         if (la->la_valid & LA_TYPE)
1041                 RETURN(-EPERM);
1042
1043         /* They should not be processed by setattr */
1044         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1045                 RETURN(-EPERM);
1046
1047         /* export destroy does not have ->le_ses, but we may want
1048          * to drop LUSTRE_SOM_FL. */
1049         if (!env->le_ses)
1050                 RETURN(0);
1051
1052         uc = md_ucred(env);
1053
1054         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1055         if (rc)
1056                 RETURN(rc);
1057
1058         if (la->la_valid == LA_CTIME) {
1059                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1060                         /* This is only for set ctime when rename's source is
1061                          * on remote MDS. */
1062                         rc = mdd_may_delete(env, NULL, obj,
1063                                             (struct md_attr *)ma, 1, 0);
1064                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1065                         la->la_valid &= ~LA_CTIME;
1066                 RETURN(rc);
1067         }
1068
1069         if (la->la_valid == LA_ATIME) {
1070                 /* This is atime only set for read atime update on close. */
1071                 if (la->la_atime >= tmp_la->la_atime &&
1072                     la->la_atime < (tmp_la->la_atime +
1073                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1074                         la->la_valid &= ~LA_ATIME;
1075                 RETURN(0);
1076         }
1077
1078         /* Check if flags change. */
1079         if (la->la_valid & LA_FLAGS) {
1080                 unsigned int oldflags = 0;
1081                 unsigned int newflags = la->la_flags &
1082                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1083
1084                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1085                     !mdd_capable(uc, CFS_CAP_FOWNER))
1086                         RETURN(-EPERM);
1087
1088                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1089                  * only be changed by the relevant capability. */
1090                 if (mdd_is_immutable(obj))
1091                         oldflags |= LUSTRE_IMMUTABLE_FL;
1092                 if (mdd_is_append(obj))
1093                         oldflags |= LUSTRE_APPEND_FL;
1094                 if ((oldflags ^ newflags) &&
1095                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1096                         RETURN(-EPERM);
1097
1098                 if (!S_ISDIR(tmp_la->la_mode))
1099                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1100         }
1101
1102         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1103             (la->la_valid & ~LA_FLAGS) &&
1104             !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1105                 RETURN(-EPERM);
1106
1107         /* Check for setting the obj time. */
1108         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1109             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1110                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1111                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
1112                         rc = mdd_permission_internal_locked(env, obj, tmp_la,
1113                                                             MAY_WRITE,
1114                                                             MOR_TGT_CHILD);
1115                         if (rc)
1116                                 RETURN(rc);
1117                 }
1118         }
1119
1120         if (la->la_valid & LA_KILL_SUID) {
1121                 la->la_valid &= ~LA_KILL_SUID;
1122                 if ((tmp_la->la_mode & S_ISUID) &&
1123                     !(la->la_valid & LA_MODE)) {
1124                         la->la_mode = tmp_la->la_mode;
1125                         la->la_valid |= LA_MODE;
1126                 }
1127                 la->la_mode &= ~S_ISUID;
1128         }
1129
1130         if (la->la_valid & LA_KILL_SGID) {
1131                 la->la_valid &= ~LA_KILL_SGID;
1132                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1133                                         (S_ISGID | S_IXGRP)) &&
1134                     !(la->la_valid & LA_MODE)) {
1135                         la->la_mode = tmp_la->la_mode;
1136                         la->la_valid |= LA_MODE;
1137                 }
1138                 la->la_mode &= ~S_ISGID;
1139         }
1140
1141         /* Make sure a caller can chmod. */
1142         if (la->la_valid & LA_MODE) {
1143                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1144                     (uc->mu_fsuid != tmp_la->la_uid) &&
1145                     !mdd_capable(uc, CFS_CAP_FOWNER))
1146                         RETURN(-EPERM);
1147
1148                 if (la->la_mode == (cfs_umode_t) -1)
1149                         la->la_mode = tmp_la->la_mode;
1150                 else
1151                         la->la_mode = (la->la_mode & S_IALLUGO) |
1152                                       (tmp_la->la_mode & ~S_IALLUGO);
1153
1154                 /* Also check the setgid bit! */
1155                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1156                                        la->la_gid : tmp_la->la_gid) &&
1157                     !mdd_capable(uc, CFS_CAP_FSETID))
1158                         la->la_mode &= ~S_ISGID;
1159         } else {
1160                la->la_mode = tmp_la->la_mode;
1161         }
1162
1163         /* Make sure a caller can chown. */
1164         if (la->la_valid & LA_UID) {
1165                 if (la->la_uid == (uid_t) -1)
1166                         la->la_uid = tmp_la->la_uid;
1167                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1168                     (la->la_uid != tmp_la->la_uid)) &&
1169                     !mdd_capable(uc, CFS_CAP_CHOWN))
1170                         RETURN(-EPERM);
1171
1172                 /* If the user or group of a non-directory has been
1173                  * changed by a non-root user, remove the setuid bit.
1174                  * 19981026 David C Niemi <niemi@tux.org>
1175                  *
1176                  * Changed this to apply to all users, including root,
1177                  * to avoid some races. This is the behavior we had in
1178                  * 2.0. The check for non-root was definitely wrong
1179                  * for 2.2 anyway, as it should have been using
1180                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
1181                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1182                     !S_ISDIR(tmp_la->la_mode)) {
1183                         la->la_mode &= ~S_ISUID;
1184                         la->la_valid |= LA_MODE;
1185                 }
1186         }
1187
1188         /* Make sure caller can chgrp. */
1189         if (la->la_valid & LA_GID) {
1190                 if (la->la_gid == (gid_t) -1)
1191                         la->la_gid = tmp_la->la_gid;
1192                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1193                     ((la->la_gid != tmp_la->la_gid) &&
1194                     !lustre_in_group_p(uc, la->la_gid))) &&
1195                     !mdd_capable(uc, CFS_CAP_CHOWN))
1196                         RETURN(-EPERM);
1197
1198                 /* Likewise, if the user or group of a non-directory
1199                  * has been changed by a non-root user, remove the
1200                  * setgid bit UNLESS there is no group execute bit
1201                  * (this would be a file marked for mandatory
1202                  * locking).  19981026 David C Niemi <niemi@tux.org>
1203                  *
1204                  * Removed the fsuid check (see the comment above) --
1205                  * 19990830 SD. */
1206                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1207                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1208                         la->la_mode &= ~S_ISGID;
1209                         la->la_valid |= LA_MODE;
1210                 }
1211         }
1212
1213         /* For both Size-on-MDS case and truncate case,
1214          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1215          * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1216          * For SOM case, it is true, the MAY_WRITE perm has been checked
1217          * when open, no need check again. For truncate case, it is false,
1218          * the MAY_WRITE perm should be checked here. */
1219         if (ma->ma_attr_flags & MDS_SOM) {
1220                 /* For the "Size-on-MDS" setattr update, merge coming
1221                  * attributes with the set in the inode. BUG 10641 */
1222                 if ((la->la_valid & LA_ATIME) &&
1223                     (la->la_atime <= tmp_la->la_atime))
1224                         la->la_valid &= ~LA_ATIME;
1225
1226                 /* OST attributes do not have a priority over MDS attributes,
1227                  * so drop times if ctime is equal. */
1228                 if ((la->la_valid & LA_CTIME) &&
1229                     (la->la_ctime <= tmp_la->la_ctime))
1230                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1231         } else {
1232                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1233                         if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1234                               (uc->mu_fsuid == tmp_la->la_uid)) &&
1235                             !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1236                                 rc = mdd_permission_internal_locked(env, obj,
1237                                                             tmp_la, MAY_WRITE,
1238                                                             MOR_TGT_CHILD);
1239                                 if (rc)
1240                                         RETURN(rc);
1241                         }
1242                 }
1243                 if (la->la_valid & LA_CTIME) {
1244                         /* The pure setattr, it has the priority over what is
1245                          * already set, do not drop it if ctime is equal. */
1246                         if (la->la_ctime < tmp_la->la_ctime)
1247                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1248                                                   LA_CTIME);
1249                 }
1250         }
1251
1252         RETURN(0);
1253 }
1254
1255 /** Store a data change changelog record
1256  * If this fails, we must fail the whole transaction; we don't
1257  * want the change to commit without the log entry.
1258  * \param mdd_obj - mdd_object of change
1259  * \param handle - transacion handle
1260  */
1261 static int mdd_changelog_data_store(const struct lu_env     *env,
1262                                     struct mdd_device       *mdd,
1263                                     enum changelog_rec_type type,
1264                                     int                     flags,
1265                                     struct mdd_object       *mdd_obj,
1266                                     struct thandle          *handle)
1267 {
1268         const struct lu_fid *tfid = mdo2fid(mdd_obj);
1269         struct llog_changelog_rec *rec;
1270         struct lu_buf *buf;
1271         int reclen;
1272         int rc;
1273
1274         /* Not recording */
1275         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1276                 RETURN(0);
1277         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1278                 RETURN(0);
1279
1280         LASSERT(handle != NULL);
1281         LASSERT(mdd_obj != NULL);
1282
1283         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1284             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1285                 /* Don't need multiple updates in this log */
1286                 /* Don't check under lock - no big deal if we get an extra
1287                    entry */
1288                 RETURN(0);
1289         }
1290
1291         reclen = llog_data_len(sizeof(*rec));
1292         buf = mdd_buf_alloc(env, reclen);
1293         if (buf->lb_buf == NULL)
1294                 RETURN(-ENOMEM);
1295         rec = (struct llog_changelog_rec *)buf->lb_buf;
1296
1297         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1298         rec->cr.cr_type = (__u32)type;
1299         rec->cr.cr_tfid = *tfid;
1300         rec->cr.cr_namelen = 0;
1301         mdd_obj->mod_cltime = cfs_time_current_64();
1302
1303         rc = mdd_changelog_llog_write(mdd, rec, handle);
1304         if (rc < 0) {
1305                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1306                        rc, type, PFID(tfid));
1307                 return -EFAULT;
1308         }
1309
1310         return 0;
1311 }
1312
1313 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1314                   int flags, struct md_object *obj)
1315 {
1316         struct thandle *handle;
1317         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1318         struct mdd_device *mdd = mdo2mdd(obj);
1319         int rc;
1320         ENTRY;
1321
1322         handle = mdd_trans_start(env, mdd);
1323
1324         if (IS_ERR(handle))
1325                 return(PTR_ERR(handle));
1326
1327         rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1328                                       handle);
1329
1330         mdd_trans_stop(env, mdd, rc, handle);
1331
1332         RETURN(rc);
1333 }
1334
1335 /**
1336  * Should be called with write lock held.
1337  *
1338  * \see mdd_lma_set_locked().
1339  */
1340 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1341                        const struct md_attr *ma, struct thandle *handle)
1342 {
1343         struct mdd_thread_info *info = mdd_env_info(env);
1344         struct lu_buf *buf;
1345         struct lustre_mdt_attrs *lma =
1346                                 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1347         int lmasize = sizeof(struct lustre_mdt_attrs);
1348         int rc = 0;
1349
1350         ENTRY;
1351
1352         /* Either HSM or SOM part is not valid, we need to read it before */
1353         if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1354                 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1355                 if (rc <= 0)
1356                         RETURN(rc);
1357
1358                 lustre_lma_swab(lma);
1359         } else {
1360                 memset(lma, 0, lmasize);
1361         }
1362
1363         /* Copy HSM data */
1364         if (ma->ma_valid & MA_HSM) {
1365                 lma->lma_flags  |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1366                 lma->lma_compat |= LMAC_HSM;
1367         }
1368
1369         /* Copy SOM data */
1370         if (ma->ma_valid & MA_SOM) {
1371                 LASSERT(ma->ma_som != NULL);
1372                 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1373                         lma->lma_compat     &= ~LMAC_SOM;
1374                 } else {
1375                         lma->lma_compat     |= LMAC_SOM;
1376                         lma->lma_ioepoch     = ma->ma_som->msd_ioepoch;
1377                         lma->lma_som_size    = ma->ma_som->msd_size;
1378                         lma->lma_som_blocks  = ma->ma_som->msd_blocks;
1379                         lma->lma_som_mountid = ma->ma_som->msd_mountid;
1380                 }
1381         }
1382
1383         /* Copy FID */
1384         memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1385
1386         lustre_lma_swab(lma);
1387         buf = mdd_buf_get(env, lma, lmasize);
1388         rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1389
1390         RETURN(rc);
1391 }
1392
1393 /**
1394  * Save LMA extended attributes with data from \a ma.
1395  *
1396  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1397  * not, LMA EA will be first read from disk, modified and write back.
1398  *
1399  */
1400 static int mdd_lma_set_locked(const struct lu_env *env,
1401                               struct mdd_object *mdd_obj,
1402                               const struct md_attr *ma, struct thandle *handle)
1403 {
1404         int rc;
1405
1406         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1407         rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1408         mdd_write_unlock(env, mdd_obj);
1409         return rc;
1410 }
1411
1412 /* Precedence for choosing record type when multiple
1413  * attributes change: setattr > mtime > ctime > atime
1414  * (ctime changes when mtime does, plus chmod/chown.
1415  * atime and ctime are independent.) */
1416 static int mdd_attr_set_changelog(const struct lu_env *env,
1417                                   struct md_object *obj, struct thandle *handle,
1418                                   __u64 valid)
1419 {
1420         struct mdd_device *mdd = mdo2mdd(obj);
1421         int bits, type = 0;
1422
1423         bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1424         bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1425         bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1426         bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1427         bits = bits & mdd->mdd_cl.mc_mask;
1428         if (bits == 0)
1429                 return 0;
1430
1431         /* The record type is the lowest non-masked set bit */
1432         while (bits && ((bits & 1) == 0)) {
1433                 bits = bits >> 1;
1434                 type++;
1435         }
1436
1437         /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1438         return mdd_changelog_data_store(env, mdd, type, (int)valid,
1439                                         md2mdd_obj(obj), handle);
1440 }
1441
1442 /* set attr and LOV EA at once, return updated attr */
1443 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1444                         const struct md_attr *ma)
1445 {
1446         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1447         struct mdd_device *mdd = mdo2mdd(obj);
1448         struct thandle *handle;
1449         struct lov_mds_md *lmm = NULL;
1450         struct llog_cookie *logcookies = NULL;
1451         int  rc, lmm_size = 0, cookie_size = 0, chlog_cnt;
1452         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1453         struct obd_device *obd = mdd->mdd_obd_dev;
1454         struct mds_obd *mds = &obd->u.mds;
1455 #ifdef HAVE_QUOTA_SUPPORT
1456         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1457         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1458         int quota_opc = 0, block_count = 0;
1459         int inode_pending[MAXQUOTAS] = { 0, 0 };
1460         int block_pending[MAXQUOTAS] = { 0, 0 };
1461 #endif
1462         ENTRY;
1463
1464         *la_copy = ma->ma_attr;
1465         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1466         if (rc != 0)
1467                 RETURN(rc);
1468
1469         /* setattr on "close" only change atime, or do nothing */
1470         if (ma->ma_valid == MA_INODE &&
1471             ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1472                 RETURN(0);
1473
1474         /*TODO: add lock here*/
1475         /* start a log jounal handle if needed */
1476         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1477             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1478                 lmm_size = mdd_lov_mdsize(env, mdd);
1479                 lmm = mdd_max_lmm_get(env, mdd);
1480                 if (lmm == NULL)
1481                         GOTO(no_trans, rc = -ENOMEM);
1482
1483                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1484                                 XATTR_NAME_LOV);
1485
1486                 if (rc < 0)
1487                         GOTO(no_trans, rc);
1488         }
1489
1490         chlog_cnt = 1;
1491         if (la_copy->la_valid && !(la_copy->la_valid & LA_FLAGS) && lmm_size) {
1492                 chlog_cnt += (lmm->lmm_stripe_count >= 0) ?
1493                          lmm->lmm_stripe_count : mds->mds_lov_desc.ld_tgt_count;
1494         }
1495
1496         mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1497                                     MDD_TXN_ATTR_SET_OP, chlog_cnt);
1498         handle = mdd_trans_start(env, mdd);
1499         if (IS_ERR(handle))
1500                 GOTO(no_trans, rc = PTR_ERR(handle));
1501
1502         /* permission changes may require sync operation */
1503         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1504                 handle->th_sync |= mdd->mdd_sync_permission;
1505
1506         if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1507                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1508                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1509
1510 #ifdef HAVE_QUOTA_SUPPORT
1511         if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1512                 struct obd_export *exp = md_quota(env)->mq_exp;
1513                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1514
1515                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1516                 if (!rc) {
1517                         quota_opc = FSFILT_OP_SETATTR;
1518                         mdd_quota_wrapper(la_copy, qnids);
1519                         mdd_quota_wrapper(la_tmp, qoids);
1520                         /* get file quota for new owner */
1521                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1522                                         qnids, inode_pending, 1, NULL, 0,
1523                                         NULL, 0);
1524                         block_count = (la_tmp->la_blocks + 7) >> 3;
1525                         if (block_count) {
1526                                 void *data = NULL;
1527                                 mdd_data_get(env, mdd_obj, &data);
1528                                 /* get block quota for new owner */
1529                                 lquota_chkquota(mds_quota_interface_ref, obd,
1530                                                 exp, qnids, block_pending,
1531                                                 block_count, NULL,
1532                                                 LQUOTA_FLAGS_BLK, data, 1);
1533                         }
1534                 }
1535         }
1536 #endif
1537
1538         if (la_copy->la_valid & LA_FLAGS) {
1539                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1540                                                   handle, 1);
1541                 if (rc == 0)
1542                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1543         } else if (la_copy->la_valid) {            /* setattr */
1544                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1545                                                   handle, 1);
1546                 /* journal chown/chgrp in llog, just like unlink */
1547                 if (rc == 0 && lmm_size){
1548                         cookie_size = mdd_lov_cookiesize(env, mdd);
1549                         logcookies = mdd_max_cookie_get(env, mdd);
1550                         if (logcookies == NULL)
1551                                 GOTO(cleanup, rc = -ENOMEM);
1552
1553                         if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1554                                             logcookies, cookie_size) <= 0)
1555                                 logcookies = NULL;
1556                 }
1557         }
1558
1559         if (rc == 0 && ma->ma_valid & MA_LOV) {
1560                 cfs_umode_t mode;
1561
1562                 mode = mdd_object_type(mdd_obj);
1563                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1564                         rc = mdd_lsm_sanity_check(env, mdd_obj);
1565                         if (rc)
1566                                 GOTO(cleanup, rc);
1567
1568                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1569                                             ma->ma_lmm_size, handle, 1);
1570                 }
1571
1572         }
1573         if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1574                 cfs_umode_t mode;
1575
1576                 mode = mdd_object_type(mdd_obj);
1577                 if (S_ISREG(mode))
1578                         rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1579
1580         }
1581 cleanup:
1582         if (rc == 0)
1583                 rc = mdd_attr_set_changelog(env, obj, handle,
1584                                             ma->ma_attr.la_valid);
1585         mdd_trans_stop(env, mdd, rc, handle);
1586 no_trans:
1587         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1588                 /*set obd attr, if needed*/
1589                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1590                                            logcookies);
1591         }
1592 #ifdef HAVE_QUOTA_SUPPORT
1593         if (quota_opc) {
1594                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1595                                       inode_pending, 0);
1596                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1597                                       block_pending, 1);
1598                 /* Trigger dqrel/dqacq for original owner and new owner.
1599                  * If failed, the next call for lquota_chkquota will
1600                  * process it. */
1601                 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1602                               quota_opc);
1603         }
1604 #endif
1605         RETURN(rc);
1606 }
1607
1608 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1609                       const struct lu_buf *buf, const char *name, int fl,
1610                       struct thandle *handle)
1611 {
1612         int  rc;
1613         ENTRY;
1614
1615         mdd_write_lock(env, obj, MOR_TGT_CHILD);
1616         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1617         mdd_write_unlock(env, obj);
1618
1619         RETURN(rc);
1620 }
1621
1622 static int mdd_xattr_sanity_check(const struct lu_env *env,
1623                                   struct mdd_object *obj)
1624 {
1625         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1626         struct md_ucred *uc     = md_ucred(env);
1627         int rc;
1628         ENTRY;
1629
1630         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1631                 RETURN(-EPERM);
1632
1633         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1634         if (rc)
1635                 RETURN(rc);
1636
1637         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1638             !mdd_capable(uc, CFS_CAP_FOWNER))
1639                 RETURN(-EPERM);
1640
1641         RETURN(rc);
1642 }
1643
1644 /**
1645  * The caller should guarantee to update the object ctime
1646  * after xattr_set if needed.
1647  */
1648 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1649                          const struct lu_buf *buf, const char *name,
1650                          int fl)
1651 {
1652         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1653         struct mdd_device *mdd = mdo2mdd(obj);
1654         struct thandle *handle;
1655         int  rc;
1656         ENTRY;
1657
1658         rc = mdd_xattr_sanity_check(env, mdd_obj);
1659         if (rc)
1660                 RETURN(rc);
1661
1662         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 1);
1663         handle = mdd_trans_start(env, mdd);
1664         if (IS_ERR(handle))
1665                 RETURN(PTR_ERR(handle));
1666
1667         /* security-replated changes may require sync */
1668         if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1669                 handle->th_sync |= mdd->mdd_sync_permission;
1670
1671         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1672
1673         /* Only record system & user xattr changes */
1674         if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1675                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1676                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1677                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1678                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1679                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1680                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1681                                               handle);
1682         mdd_trans_stop(env, mdd, rc, handle);
1683
1684         RETURN(rc);
1685 }
1686
1687 /**
1688  * The caller should guarantee to update the object ctime
1689  * after xattr_set if needed.
1690  */
1691 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1692                   const char *name)
1693 {
1694         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1695         struct mdd_device *mdd = mdo2mdd(obj);
1696         struct thandle *handle;
1697         int  rc;
1698         ENTRY;
1699
1700         rc = mdd_xattr_sanity_check(env, mdd_obj);
1701         if (rc)
1702                 RETURN(rc);
1703
1704         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 1);
1705         handle = mdd_trans_start(env, mdd);
1706         if (IS_ERR(handle))
1707                 RETURN(PTR_ERR(handle));
1708
1709         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1710         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1711                            mdd_object_capa(env, mdd_obj));
1712         mdd_write_unlock(env, mdd_obj);
1713
1714         /* Only record system & user xattr changes */
1715         if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1716                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1717                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1718                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1719                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1720                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1721                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1722                                               handle);
1723
1724         mdd_trans_stop(env, mdd, rc, handle);
1725
1726         RETURN(rc);
1727 }
1728
1729 /* partial unlink */
1730 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1731                        struct md_attr *ma)
1732 {
1733         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1734         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1735         struct mdd_device *mdd = mdo2mdd(obj);
1736         struct thandle *handle;
1737 #ifdef HAVE_QUOTA_SUPPORT
1738         struct obd_device *obd = mdd->mdd_obd_dev;
1739         struct mds_obd *mds = &obd->u.mds;
1740         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1741         int quota_opc = 0;
1742 #endif
1743         int rc;
1744         ENTRY;
1745
1746         /*
1747          * Check -ENOENT early here because we need to get object type
1748          * to calculate credits before transaction start
1749          */
1750         if (!mdd_object_exists(mdd_obj))
1751                 RETURN(-ENOENT);
1752
1753         LASSERT(mdd_object_exists(mdd_obj) > 0);
1754
1755         rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP, 0);
1756         if (rc)
1757                 RETURN(rc);
1758
1759         handle = mdd_trans_start(env, mdd);
1760         if (IS_ERR(handle))
1761                 RETURN(-ENOMEM);
1762
1763         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1764
1765         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1766         if (rc)
1767                 GOTO(cleanup, rc);
1768
1769         __mdd_ref_del(env, mdd_obj, handle, 0);
1770
1771         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1772                 /* unlink dot */
1773                 __mdd_ref_del(env, mdd_obj, handle, 1);
1774         }
1775
1776         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1777         la_copy->la_ctime = ma->ma_attr.la_ctime;
1778
1779         la_copy->la_valid = LA_CTIME;
1780         rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1781         if (rc)
1782                 GOTO(cleanup, rc);
1783
1784         rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1785 #ifdef HAVE_QUOTA_SUPPORT
1786         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1787             ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1788                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1789                 mdd_quota_wrapper(&ma->ma_attr, qids);
1790         }
1791 #endif
1792
1793
1794         EXIT;
1795 cleanup:
1796         mdd_write_unlock(env, mdd_obj);
1797         mdd_trans_stop(env, mdd, rc, handle);
1798 #ifdef HAVE_QUOTA_SUPPORT
1799         if (quota_opc)
1800                 /* Trigger dqrel on the owner of child. If failed,
1801                  * the next call for lquota_chkquota will process it */
1802                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1803                               quota_opc);
1804 #endif
1805         return rc;
1806 }
1807
1808 /* partial operation */
1809 static int mdd_oc_sanity_check(const struct lu_env *env,
1810                                struct mdd_object *obj,
1811                                struct md_attr *ma)
1812 {
1813         int rc;
1814         ENTRY;
1815
1816         switch (ma->ma_attr.la_mode & S_IFMT) {
1817         case S_IFREG:
1818         case S_IFDIR:
1819         case S_IFLNK:
1820         case S_IFCHR:
1821         case S_IFBLK:
1822         case S_IFIFO:
1823         case S_IFSOCK:
1824                 rc = 0;
1825                 break;
1826         default:
1827                 rc = -EINVAL;
1828                 break;
1829         }
1830         RETURN(rc);
1831 }
1832
1833 static int mdd_object_create(const struct lu_env *env,
1834                              struct md_object *obj,
1835                              const struct md_op_spec *spec,
1836                              struct md_attr *ma)
1837 {
1838
1839         struct mdd_device *mdd = mdo2mdd(obj);
1840         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1841         const struct lu_fid *pfid = spec->u.sp_pfid;
1842         struct thandle *handle;
1843 #ifdef HAVE_QUOTA_SUPPORT
1844         struct obd_device *obd = mdd->mdd_obd_dev;
1845         struct obd_export *exp = md_quota(env)->mq_exp;
1846         struct mds_obd *mds = &obd->u.mds;
1847         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1848         int quota_opc = 0, block_count = 0;
1849         int inode_pending[MAXQUOTAS] = { 0, 0 };
1850         int block_pending[MAXQUOTAS] = { 0, 0 };
1851 #endif
1852         int rc = 0;
1853         ENTRY;
1854
1855 #ifdef HAVE_QUOTA_SUPPORT
1856         if (mds->mds_quota) {
1857                 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1858                 mdd_quota_wrapper(&ma->ma_attr, qids);
1859                 /* get file quota for child */
1860                 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1861                                 qids, inode_pending, 1, NULL, 0,
1862                                 NULL, 0);
1863                 switch (ma->ma_attr.la_mode & S_IFMT) {
1864                 case S_IFLNK:
1865                 case S_IFDIR:
1866                         block_count = 2;
1867                         break;
1868                 case S_IFREG:
1869                         block_count = 1;
1870                         break;
1871                 }
1872                 /* get block quota for child */
1873                 if (block_count)
1874                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1875                                         qids, block_pending, block_count,
1876                                         NULL, LQUOTA_FLAGS_BLK, NULL, 0);
1877         }
1878 #endif
1879
1880         mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP, 0);
1881         handle = mdd_trans_start(env, mdd);
1882         if (IS_ERR(handle))
1883                 GOTO(out_pending, rc = PTR_ERR(handle));
1884
1885         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1886         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1887         if (rc)
1888                 GOTO(unlock, rc);
1889
1890         rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1891         if (rc)
1892                 GOTO(unlock, rc);
1893
1894         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1895                 /* If creating the slave object, set slave EA here. */
1896                 int lmv_size = spec->u.sp_ea.eadatalen;
1897                 struct lmv_stripe_md *lmv;
1898
1899                 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1900                 LASSERT(lmv != NULL && lmv_size > 0);
1901
1902                 rc = __mdd_xattr_set(env, mdd_obj,
1903                                      mdd_buf_get_const(env, lmv, lmv_size),
1904                                      XATTR_NAME_LMV, 0, handle);
1905                 if (rc)
1906                         GOTO(unlock, rc);
1907
1908                 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1909                                            handle, 0);
1910         } else {
1911 #ifdef CONFIG_FS_POSIX_ACL
1912                 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1913                         struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1914
1915                         buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1916                         buf->lb_len = spec->u.sp_ea.eadatalen;
1917                         if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1918                                 rc = __mdd_acl_init(env, mdd_obj, buf,
1919                                                     &ma->ma_attr.la_mode,
1920                                                     handle);
1921                                 if (rc)
1922                                         GOTO(unlock, rc);
1923                                 else
1924                                         ma->ma_attr.la_valid |= LA_MODE;
1925                         }
1926
1927                         pfid = spec->u.sp_ea.fid;
1928                 }
1929 #endif
1930                 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1931                                            spec);
1932         }
1933         EXIT;
1934 unlock:
1935         if (rc == 0)
1936                 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1937         mdd_write_unlock(env, mdd_obj);
1938
1939         mdd_trans_stop(env, mdd, rc, handle);
1940 out_pending:
1941 #ifdef HAVE_QUOTA_SUPPORT
1942         if (quota_opc) {
1943                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1944                                       inode_pending, 0);
1945                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1946                                       block_pending, 1);
1947                 /* Trigger dqacq on the owner of child. If failed,
1948                  * the next call for lquota_chkquota will process it. */
1949                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1950                               quota_opc);
1951         }
1952 #endif
1953         return rc;
1954 }
1955
1956 /* partial link */
1957 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1958                        const struct md_attr *ma)
1959 {
1960         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1961         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1962         struct mdd_device *mdd = mdo2mdd(obj);
1963         struct thandle *handle;
1964         int rc;
1965         ENTRY;
1966
1967         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 0);
1968         handle = mdd_trans_start(env, mdd);
1969         if (IS_ERR(handle))
1970                 RETURN(-ENOMEM);
1971
1972         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1973         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1974         if (rc == 0)
1975                 __mdd_ref_add(env, mdd_obj, handle);
1976         mdd_write_unlock(env, mdd_obj);
1977         if (rc == 0) {
1978                 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1979                 la_copy->la_ctime = ma->ma_attr.la_ctime;
1980
1981                 la_copy->la_valid = LA_CTIME;
1982                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1983                                                         handle, 0);
1984         }
1985         mdd_trans_stop(env, mdd, 0, handle);
1986
1987         RETURN(rc);
1988 }
1989
1990 /*
1991  * do NOT or the MAY_*'s, you'll get the weakest
1992  */
1993 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1994 {
1995         int res = 0;
1996
1997         /* Sadly, NFSD reopens a file repeatedly during operation, so the
1998          * "acc_mode = 0" allowance for newly-created files isn't honoured.
1999          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
2000          * owner can write to a file even if it is marked readonly to hide
2001          * its brokenness. (bug 5781) */
2002         if (flags & MDS_OPEN_OWNEROVERRIDE) {
2003                 struct md_ucred *uc = md_ucred(env);
2004
2005                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
2006                     (la->la_uid == uc->mu_fsuid))
2007                         return 0;
2008         }
2009
2010         if (flags & FMODE_READ)
2011                 res |= MAY_READ;
2012         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2013                 res |= MAY_WRITE;
2014         if (flags & MDS_FMODE_EXEC)
2015                 res = MAY_EXEC;
2016         return res;
2017 }
2018
2019 static int mdd_open_sanity_check(const struct lu_env *env,
2020                                  struct mdd_object *obj, int flag)
2021 {
2022         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2023         int mode, rc;
2024         ENTRY;
2025
2026         /* EEXIST check */
2027         if (mdd_is_dead_obj(obj))
2028                 RETURN(-ENOENT);
2029
2030         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
2031         if (rc)
2032                RETURN(rc);
2033
2034         if (S_ISLNK(tmp_la->la_mode))
2035                 RETURN(-ELOOP);
2036
2037         mode = accmode(env, tmp_la, flag);
2038
2039         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2040                 RETURN(-EISDIR);
2041
2042         if (!(flag & MDS_OPEN_CREATED)) {
2043                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
2044                 if (rc)
2045                         RETURN(rc);
2046         }
2047
2048         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2049             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2050                 flag &= ~MDS_OPEN_TRUNC;
2051
2052         /* For writing append-only file must open it with append mode. */
2053         if (mdd_is_append(obj)) {
2054                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2055                         RETURN(-EPERM);
2056                 if (flag & MDS_OPEN_TRUNC)
2057                         RETURN(-EPERM);
2058         }
2059
2060 #if 0
2061         /*
2062          * Now, flag -- O_NOATIME does not be packed by client.
2063          */
2064         if (flag & O_NOATIME) {
2065                 struct md_ucred *uc = md_ucred(env);
2066
2067                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2068                     (uc->mu_valid == UCRED_NEW)) &&
2069                     (uc->mu_fsuid != tmp_la->la_uid) &&
2070                     !mdd_capable(uc, CFS_CAP_FOWNER))
2071                         RETURN(-EPERM);
2072         }
2073 #endif
2074
2075         RETURN(0);
2076 }
2077
2078 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2079                     int flags)
2080 {
2081         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2082         int rc = 0;
2083
2084         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2085
2086         rc = mdd_open_sanity_check(env, mdd_obj, flags);
2087         if (rc == 0)
2088                 mdd_obj->mod_count++;
2089
2090         mdd_write_unlock(env, mdd_obj);
2091         return rc;
2092 }
2093
2094 /* return md_attr back,
2095  * if it is last unlink then return lov ea + llog cookie*/
2096 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2097                     struct md_attr *ma)
2098 {
2099         int rc = 0;
2100         ENTRY;
2101
2102         if (S_ISREG(mdd_object_type(obj))) {
2103                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2104                  * Caller must be ready for that. */
2105
2106                 rc = __mdd_lmm_get(env, obj, ma);
2107                 if ((ma->ma_valid & MA_LOV))
2108                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2109                                             obj, ma);
2110         }
2111         RETURN(rc);
2112 }
2113
2114 /*
2115  * No permission check is needed.
2116  */
2117 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2118                      struct md_attr *ma)
2119 {
2120         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2121         struct mdd_device *mdd = mdo2mdd(obj);
2122         struct thandle    *handle = NULL;
2123         int rc;
2124         int reset = 1;
2125
2126 #ifdef HAVE_QUOTA_SUPPORT
2127         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2128         struct mds_obd *mds = &obd->u.mds;
2129         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2130         int quota_opc = 0;
2131 #endif
2132         ENTRY;
2133
2134         if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2135                 mdd_obj->mod_count--;
2136
2137                 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2138                         CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2139                                "list\n", PFID(mdd_object_fid(mdd_obj)));
2140                 RETURN(0);
2141         }
2142
2143         /* check without any lock */
2144         if (mdd_obj->mod_count == 1 &&
2145             (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2146  again:
2147                 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP, 0);
2148                 if (rc)
2149                         RETURN(rc);
2150                 handle = mdd_trans_start(env, mdo2mdd(obj));
2151                 if (IS_ERR(handle))
2152                         RETURN(PTR_ERR(handle));
2153         }
2154
2155         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2156         if (handle == NULL && mdd_obj->mod_count == 1 &&
2157             (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2158                 mdd_write_unlock(env, mdd_obj);
2159                 goto again;
2160         }
2161
2162         /* release open count */
2163         mdd_obj->mod_count --;
2164
2165         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2166                 /* remove link to object from orphan index */
2167                 rc = __mdd_orphan_del(env, mdd_obj, handle);
2168                 if (rc == 0) {
2169                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2170                                "list, OSS objects to be destroyed.\n",
2171                                PFID(mdd_object_fid(mdd_obj)));
2172                 } else {
2173                         CERROR("Object "DFID" can not be deleted from orphan "
2174                                 "list, maybe cause OST objects can not be "
2175                                 "destroyed (err: %d).\n",
2176                                 PFID(mdd_object_fid(mdd_obj)), rc);
2177                         /* If object was not deleted from orphan list, do not
2178                          * destroy OSS objects, which will be done when next
2179                          * recovery. */
2180                         GOTO(out, rc);
2181                 }
2182         }
2183
2184         rc = mdd_iattr_get(env, mdd_obj, ma);
2185         /* Object maybe not in orphan list originally, it is rare case for
2186          * mdd_finish_unlink() failure. */
2187         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
2188 #ifdef HAVE_QUOTA_SUPPORT
2189                 if (mds->mds_quota) {
2190                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2191                         mdd_quota_wrapper(&ma->ma_attr, qids);
2192                 }
2193 #endif
2194                 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2195                 if (ma->ma_valid & MA_FLAGS &&
2196                     ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2197                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2198                 } else {
2199                         rc = mdd_object_kill(env, mdd_obj, ma);
2200                         if (rc == 0)
2201                                 reset = 0;
2202                 }
2203
2204                 if (rc != 0)
2205                         CERROR("Error when prepare to delete Object "DFID" , "
2206                                "which will cause OST objects can not be "
2207                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
2208         }
2209         EXIT;
2210
2211 out:
2212         if (reset)
2213                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2214
2215         mdd_write_unlock(env, mdd_obj);
2216         if (handle != NULL)
2217                 mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
2218 #ifdef HAVE_QUOTA_SUPPORT
2219         if (quota_opc)
2220                 /* Trigger dqrel on the owner of child. If failed,
2221                  * the next call for lquota_chkquota will process it */
2222                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2223                               quota_opc);
2224 #endif
2225         return rc;
2226 }
2227
2228 /*
2229  * Permission check is done when open,
2230  * no need check again.
2231  */
2232 static int mdd_readpage_sanity_check(const struct lu_env *env,
2233                                      struct mdd_object *obj)
2234 {
2235         struct dt_object *next = mdd_object_child(obj);
2236         int rc;
2237         ENTRY;
2238
2239         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2240                 rc = 0;
2241         else
2242                 rc = -ENOTDIR;
2243
2244         RETURN(rc);
2245 }
2246
2247 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2248                               struct lu_dirpage *dp, int nob,
2249                               const struct dt_it_ops *iops, struct dt_it *it,
2250                               __u32 attr)
2251 {
2252         void                   *area = dp;
2253         int                     result;
2254         __u64                   hash = 0;
2255         struct lu_dirent       *ent;
2256         struct lu_dirent       *last = NULL;
2257         int                     first = 1;
2258
2259         memset(area, 0, sizeof (*dp));
2260         area += sizeof (*dp);
2261         nob  -= sizeof (*dp);
2262
2263         ent  = area;
2264         do {
2265                 int    len;
2266                 int    recsize;
2267
2268                 len  = iops->key_size(env, it);
2269
2270                 /* IAM iterator can return record with zero len. */
2271                 if (len == 0)
2272                         goto next;
2273
2274                 hash = iops->store(env, it);
2275                 if (unlikely(first)) {
2276                         first = 0;
2277                         dp->ldp_hash_start = cpu_to_le64(hash);
2278                 }
2279
2280                 /* calculate max space required for lu_dirent */
2281                 recsize = lu_dirent_calc_size(len, attr);
2282
2283                 if (nob >= recsize) {
2284                         result = iops->rec(env, it, ent, attr);
2285                         if (result == -ESTALE)
2286                                 goto next;
2287                         if (result != 0)
2288                                 goto out;
2289
2290                         /* osd might not able to pack all attributes,
2291                          * so recheck rec length */
2292                         recsize = le16_to_cpu(ent->lde_reclen);
2293                 } else {
2294                         result = (last != NULL) ? 0 :-EINVAL;
2295                         goto out;
2296                 }
2297                 last = ent;
2298                 ent = (void *)ent + recsize;
2299                 nob -= recsize;
2300
2301 next:
2302                 result = iops->next(env, it);
2303                 if (result == -ESTALE)
2304                         goto next;
2305         } while (result == 0);
2306
2307 out:
2308         dp->ldp_hash_end = cpu_to_le64(hash);
2309         if (last != NULL) {
2310                 if (last->lde_hash == dp->ldp_hash_end)
2311                         dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2312                 last->lde_reclen = 0; /* end mark */
2313         }
2314         return result;
2315 }
2316
2317 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2318                           const struct lu_rdpg *rdpg)
2319 {
2320         struct dt_it      *it;
2321         struct dt_object  *next = mdd_object_child(obj);
2322         const struct dt_it_ops  *iops;
2323         struct page       *pg;
2324         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2325         int i;
2326         int nlupgs = 0;
2327         int rc;
2328         int nob;
2329
2330         LASSERT(rdpg->rp_pages != NULL);
2331         LASSERT(next->do_index_ops != NULL);
2332
2333         if (rdpg->rp_count <= 0)
2334                 return -EFAULT;
2335
2336         /*
2337          * iterate through directory and fill pages from @rdpg
2338          */
2339         iops = &next->do_index_ops->dio_it;
2340         it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
2341         if (IS_ERR(it))
2342                 return PTR_ERR(it);
2343
2344         rc = iops->load(env, it, rdpg->rp_hash);
2345
2346         if (rc == 0) {
2347                 /*
2348                  * Iterator didn't find record with exactly the key requested.
2349                  *
2350                  * It is currently either
2351                  *
2352                  *     - positioned above record with key less than
2353                  *     requested---skip it.
2354                  *
2355                  *     - or not positioned at all (is in IAM_IT_SKEWED
2356                  *     state)---position it on the next item.
2357                  */
2358                 rc = iops->next(env, it);
2359         } else if (rc > 0)
2360                 rc = 0;
2361
2362         /*
2363          * At this point and across for-loop:
2364          *
2365          *  rc == 0 -> ok, proceed.
2366          *  rc >  0 -> end of directory.
2367          *  rc <  0 -> error.
2368          */
2369         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2370              i++, nob -= CFS_PAGE_SIZE) {
2371                 struct lu_dirpage *dp;
2372
2373                 LASSERT(i < rdpg->rp_npages);
2374                 pg = rdpg->rp_pages[i];
2375                 dp = cfs_kmap(pg);
2376 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2377 repeat:
2378 #endif
2379                 rc = mdd_dir_page_build(env, mdd, dp,
2380                                         min_t(int, nob, LU_PAGE_SIZE),
2381                                         iops, it, rdpg->rp_attrs);
2382                 if (rc > 0) {
2383                         /*
2384                          * end of directory.
2385                          */
2386                         dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2387                         nlupgs++;
2388                 } else if (rc < 0) {
2389                         CWARN("build page failed: %d!\n", rc);
2390                 } else {
2391                         nlupgs++;
2392 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2393                         dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2394                         if ((unsigned long)dp & ~CFS_PAGE_MASK)
2395                                 goto repeat;
2396 #endif
2397                 }
2398                 cfs_kunmap(pg);
2399         }
2400         if (rc >= 0) {
2401                 struct lu_dirpage *dp;
2402
2403                 dp = cfs_kmap(rdpg->rp_pages[0]);
2404                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2405                 if (nlupgs == 0) {
2406                         /*
2407                          * No pages were processed, mark this for first page
2408                          * and send back.
2409                          */
2410                         dp->ldp_flags  = cpu_to_le32(LDF_EMPTY);
2411                         nlupgs = 1;
2412                 }
2413                 cfs_kunmap(rdpg->rp_pages[0]);
2414
2415                 rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
2416         }
2417         iops->put(env, it);
2418         iops->fini(env, it);
2419
2420         return rc;
2421 }
2422
2423 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2424                  const struct lu_rdpg *rdpg)
2425 {
2426         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2427         int rc;
2428         ENTRY;
2429
2430         LASSERT(mdd_object_exists(mdd_obj));
2431
2432         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2433         rc = mdd_readpage_sanity_check(env, mdd_obj);
2434         if (rc)
2435                 GOTO(out_unlock, rc);
2436
2437         if (mdd_is_dead_obj(mdd_obj)) {
2438                 struct page *pg;
2439                 struct lu_dirpage *dp;
2440
2441                 /*
2442                  * According to POSIX, please do not return any entry to client:
2443                  * even dot and dotdot should not be returned.
2444                  */
2445                 CWARN("readdir from dead object: "DFID"\n",
2446                         PFID(mdd_object_fid(mdd_obj)));
2447
2448                 if (rdpg->rp_count <= 0)
2449                         GOTO(out_unlock, rc = -EFAULT);
2450                 LASSERT(rdpg->rp_pages != NULL);
2451
2452                 pg = rdpg->rp_pages[0];
2453                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2454                 memset(dp, 0 , sizeof(struct lu_dirpage));
2455                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2456                 dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
2457                 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2458                 cfs_kunmap(pg);
2459                 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2460         }
2461
2462         rc = __mdd_readpage(env, mdd_obj, rdpg);
2463
2464         EXIT;
2465 out_unlock:
2466         mdd_read_unlock(env, mdd_obj);
2467         return rc;
2468 }
2469
2470 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2471 {
2472         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2473         struct dt_object *next;
2474
2475         LASSERT(mdd_object_exists(mdd_obj));
2476         next = mdd_object_child(mdd_obj);
2477         return next->do_ops->do_object_sync(env, next);
2478 }
2479
2480 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2481                                         struct md_object *obj)
2482 {
2483         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2484
2485         LASSERT(mdd_object_exists(mdd_obj));
2486         return do_version_get(env, mdd_object_child(mdd_obj));
2487 }
2488
2489 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2490                             dt_obj_version_t version)
2491 {
2492         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2493
2494         LASSERT(mdd_object_exists(mdd_obj));
2495         do_version_set(env, mdd_object_child(mdd_obj), version);
2496 }
2497
2498 const struct md_object_operations mdd_obj_ops = {
2499         .moo_permission    = mdd_permission,
2500         .moo_attr_get      = mdd_attr_get,
2501         .moo_attr_set      = mdd_attr_set,
2502         .moo_xattr_get     = mdd_xattr_get,
2503         .moo_xattr_set     = mdd_xattr_set,
2504         .moo_xattr_list    = mdd_xattr_list,
2505         .moo_xattr_del     = mdd_xattr_del,
2506         .moo_object_create = mdd_object_create,
2507         .moo_ref_add       = mdd_ref_add,
2508         .moo_ref_del       = mdd_ref_del,
2509         .moo_open          = mdd_open,
2510         .moo_close         = mdd_close,
2511         .moo_readpage      = mdd_readpage,
2512         .moo_readlink      = mdd_readlink,
2513         .moo_changelog     = mdd_changelog,
2514         .moo_capa_get      = mdd_capa_get,
2515         .moo_object_sync   = mdd_object_sync,
2516         .moo_version_get   = mdd_version_get,
2517         .moo_version_set   = mdd_version_set,
2518         .moo_path          = mdd_path,
2519         .moo_file_lock     = mdd_file_lock,
2520         .moo_file_unlock   = mdd_file_unlock,
2521 };