Whamcloud - gitweb
LU-5 readdir read multiple pages per rpc
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * Copyright (c) 2011 Whamcloud, Inc.
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  *
39  * lustre/mdd/mdd_object.c
40  *
41  * Lustre Metadata Server (mdd) routines
42  *
43  * Author: Wang Di <wangdi@clusterfs.com>
44  */
45
46 #ifndef EXPORT_SYMTAB
47 # define EXPORT_SYMTAB
48 #endif
49 #define DEBUG_SUBSYSTEM S_MDS
50
51 #include <linux/module.h>
52 #ifdef HAVE_EXT4_LDISKFS
53 #include <ldiskfs/ldiskfs_jbd2.h>
54 #else
55 #include <linux/jbd.h>
56 #endif
57 #include <obd.h>
58 #include <obd_class.h>
59 #include <obd_support.h>
60 #include <lprocfs_status.h>
61 /* fid_be_cpu(), fid_cpu_to_be(). */
62 #include <lustre_fid.h>
63
64 #include <lustre_param.h>
65 #ifdef HAVE_EXT4_LDISKFS
66 #include <ldiskfs/ldiskfs.h>
67 #else
68 #include <linux/ldiskfs_fs.h>
69 #endif
70 #include <lustre_mds.h>
71 #include <lustre/lustre_idl.h>
72
73 #include "mdd_internal.h"
74
75 static const struct lu_object_operations mdd_lu_obj_ops;
76
77 static int mdd_xattr_get(const struct lu_env *env,
78                          struct md_object *obj, struct lu_buf *buf,
79                          const char *name);
80
81 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
82                  void **data)
83 {
84         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
85                  PFID(mdd_object_fid(obj)));
86         mdo_data_get(env, obj, data);
87         return 0;
88 }
89
90 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
91                struct lu_attr *la, struct lustre_capa *capa)
92 {
93         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
94                  PFID(mdd_object_fid(obj)));
95         return mdo_attr_get(env, obj, la, capa);
96 }
97
98 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
99 {
100         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
101
102         if (flags & LUSTRE_APPEND_FL)
103                 obj->mod_flags |= APPEND_OBJ;
104
105         if (flags & LUSTRE_IMMUTABLE_FL)
106                 obj->mod_flags |= IMMUTE_OBJ;
107 }
108
109 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
110 {
111         struct mdd_thread_info *info;
112
113         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
114         LASSERT(info != NULL);
115         return info;
116 }
117
118 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
119 {
120         struct lu_buf *buf;
121
122         buf = &mdd_env_info(env)->mti_buf;
123         buf->lb_buf = area;
124         buf->lb_len = len;
125         return buf;
126 }
127
128 void mdd_buf_put(struct lu_buf *buf)
129 {
130         if (buf == NULL || buf->lb_buf == NULL)
131                 return;
132         OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
133         buf->lb_buf = NULL;
134         buf->lb_len = 0;
135 }
136
137 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
138                                        const void *area, ssize_t len)
139 {
140         struct lu_buf *buf;
141
142         buf = &mdd_env_info(env)->mti_buf;
143         buf->lb_buf = (void *)area;
144         buf->lb_len = len;
145         return buf;
146 }
147
148 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
149 {
150         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
151
152         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
153                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
154                 buf->lb_buf = NULL;
155         }
156         if (buf->lb_buf == NULL) {
157                 buf->lb_len = len;
158                 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
159                 if (buf->lb_buf == NULL)
160                         buf->lb_len = 0;
161         }
162         return buf;
163 }
164
165 /** Increase the size of the \a mti_big_buf.
166  * preserves old data in buffer
167  * old buffer remains unchanged on error
168  * \retval 0 or -ENOMEM
169  */
170 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
171 {
172         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
173         struct lu_buf buf;
174
175         LASSERT(len >= oldbuf->lb_len);
176         OBD_ALLOC_LARGE(buf.lb_buf, len);
177
178         if (buf.lb_buf == NULL)
179                 return -ENOMEM;
180
181         buf.lb_len = len;
182         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
183
184         OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
185
186         memcpy(oldbuf, &buf, sizeof(buf));
187
188         return 0;
189 }
190
191 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
192                                        struct mdd_device *mdd)
193 {
194         struct mdd_thread_info *mti = mdd_env_info(env);
195         int                     max_cookie_size;
196
197         max_cookie_size = mdd_lov_cookiesize(env, mdd);
198         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
199                 if (mti->mti_max_cookie)
200                         OBD_FREE_LARGE(mti->mti_max_cookie,
201                                        mti->mti_max_cookie_size);
202                 mti->mti_max_cookie = NULL;
203                 mti->mti_max_cookie_size = 0;
204         }
205         if (unlikely(mti->mti_max_cookie == NULL)) {
206                 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
207                 if (likely(mti->mti_max_cookie != NULL))
208                         mti->mti_max_cookie_size = max_cookie_size;
209         }
210         if (likely(mti->mti_max_cookie != NULL))
211                 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
212         return mti->mti_max_cookie;
213 }
214
215 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
216                                    struct mdd_device *mdd)
217 {
218         struct mdd_thread_info *mti = mdd_env_info(env);
219         int                     max_lmm_size;
220
221         max_lmm_size = mdd_lov_mdsize(env, mdd);
222         if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
223                 if (mti->mti_max_lmm)
224                         OBD_FREE_LARGE(mti->mti_max_lmm, mti->mti_max_lmm_size);
225                 mti->mti_max_lmm = NULL;
226                 mti->mti_max_lmm_size = 0;
227         }
228         if (unlikely(mti->mti_max_lmm == NULL)) {
229                 OBD_ALLOC_LARGE(mti->mti_max_lmm, max_lmm_size);
230                 if (likely(mti->mti_max_lmm != NULL))
231                         mti->mti_max_lmm_size = max_lmm_size;
232         }
233         return mti->mti_max_lmm;
234 }
235
236 struct lu_object *mdd_object_alloc(const struct lu_env *env,
237                                    const struct lu_object_header *hdr,
238                                    struct lu_device *d)
239 {
240         struct mdd_object *mdd_obj;
241
242         OBD_ALLOC_PTR(mdd_obj);
243         if (mdd_obj != NULL) {
244                 struct lu_object *o;
245
246                 o = mdd2lu_obj(mdd_obj);
247                 lu_object_init(o, NULL, d);
248                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
249                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
250                 mdd_obj->mod_count = 0;
251                 o->lo_ops = &mdd_lu_obj_ops;
252                 return o;
253         } else {
254                 return NULL;
255         }
256 }
257
258 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
259                            const struct lu_object_conf *unused)
260 {
261         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
262         struct mdd_object *mdd_obj = lu2mdd_obj(o);
263         struct lu_object  *below;
264         struct lu_device  *under;
265         ENTRY;
266
267         mdd_obj->mod_cltime = 0;
268         under = &d->mdd_child->dd_lu_dev;
269         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
270         mdd_pdlock_init(mdd_obj);
271         if (below == NULL)
272                 RETURN(-ENOMEM);
273
274         lu_object_add(o, below);
275
276         RETURN(0);
277 }
278
279 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
280 {
281         if (lu_object_exists(o))
282                 return mdd_get_flags(env, lu2mdd_obj(o));
283         else
284                 return 0;
285 }
286
287 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
288 {
289         struct mdd_object *mdd = lu2mdd_obj(o);
290
291         lu_object_fini(o);
292         OBD_FREE_PTR(mdd);
293 }
294
295 static int mdd_object_print(const struct lu_env *env, void *cookie,
296                             lu_printer_t p, const struct lu_object *o)
297 {
298         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
299         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
300                     "valid=%x, cltime="LPU64", flags=%lx)",
301                     mdd, mdd->mod_count, mdd->mod_valid,
302                     mdd->mod_cltime, mdd->mod_flags);
303 }
304
305 static const struct lu_object_operations mdd_lu_obj_ops = {
306         .loo_object_init    = mdd_object_init,
307         .loo_object_start   = mdd_object_start,
308         .loo_object_free    = mdd_object_free,
309         .loo_object_print   = mdd_object_print,
310 };
311
312 struct mdd_object *mdd_object_find(const struct lu_env *env,
313                                    struct mdd_device *d,
314                                    const struct lu_fid *f)
315 {
316         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
317 }
318
319 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
320                         const char *path, struct lu_fid *fid)
321 {
322         struct lu_buf *buf;
323         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
324         struct mdd_object *obj;
325         struct lu_name *lname = &mdd_env_info(env)->mti_name;
326         char *name;
327         int rc = 0;
328         ENTRY;
329
330         /* temp buffer for path element */
331         buf = mdd_buf_alloc(env, PATH_MAX);
332         if (buf->lb_buf == NULL)
333                 RETURN(-ENOMEM);
334
335         lname->ln_name = name = buf->lb_buf;
336         lname->ln_namelen = 0;
337         *f = mdd->mdd_root_fid;
338
339         while(1) {
340                 while (*path == '/')
341                         path++;
342                 if (*path == '\0')
343                         break;
344                 while (*path != '/' && *path != '\0') {
345                         *name = *path;
346                         path++;
347                         name++;
348                         lname->ln_namelen++;
349                 }
350
351                 *name = '\0';
352                 /* find obj corresponding to fid */
353                 obj = mdd_object_find(env, mdd, f);
354                 if (obj == NULL)
355                         GOTO(out, rc = -EREMOTE);
356                 if (IS_ERR(obj))
357                         GOTO(out, rc = PTR_ERR(obj));
358                 /* get child fid from parent and name */
359                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
360                 mdd_object_put(env, obj);
361                 if (rc)
362                         break;
363
364                 name = buf->lb_buf;
365                 lname->ln_namelen = 0;
366         }
367
368         if (!rc)
369                 *fid = *f;
370 out:
371         RETURN(rc);
372 }
373
374 /** The maximum depth that fid2path() will search.
375  * This is limited only because we want to store the fids for
376  * historical path lookup purposes.
377  */
378 #define MAX_PATH_DEPTH 100
379
380 /** mdd_path() lookup structure. */
381 struct path_lookup_info {
382         __u64                pli_recno;        /**< history point */
383         __u64                pli_currec;       /**< current record */
384         struct lu_fid        pli_fid;
385         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
386         struct mdd_object   *pli_mdd_obj;
387         char                *pli_path;         /**< full path */
388         int                  pli_pathlen;
389         int                  pli_linkno;       /**< which hardlink to follow */
390         int                  pli_fidcount;     /**< number of \a pli_fids */
391 };
392
393 static int mdd_path_current(const struct lu_env *env,
394                             struct path_lookup_info *pli)
395 {
396         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
397         struct mdd_object *mdd_obj;
398         struct lu_buf     *buf = NULL;
399         struct link_ea_header *leh;
400         struct link_ea_entry  *lee;
401         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
402         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
403         char *ptr;
404         int reclen;
405         int rc;
406         ENTRY;
407
408         ptr = pli->pli_path + pli->pli_pathlen - 1;
409         *ptr = 0;
410         --ptr;
411         pli->pli_fidcount = 0;
412         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
413
414         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
415                 mdd_obj = mdd_object_find(env, mdd,
416                                           &pli->pli_fids[pli->pli_fidcount]);
417                 if (mdd_obj == NULL)
418                         GOTO(out, rc = -EREMOTE);
419                 if (IS_ERR(mdd_obj))
420                         GOTO(out, rc = PTR_ERR(mdd_obj));
421                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
422                 if (rc <= 0) {
423                         mdd_object_put(env, mdd_obj);
424                         if (rc == -1)
425                                 rc = -EREMOTE;
426                         else if (rc == 0)
427                                 /* Do I need to error out here? */
428                                 rc = -ENOENT;
429                         GOTO(out, rc);
430                 }
431
432                 /* Get parent fid and object name */
433                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
434                 buf = mdd_links_get(env, mdd_obj);
435                 mdd_read_unlock(env, mdd_obj);
436                 mdd_object_put(env, mdd_obj);
437                 if (IS_ERR(buf))
438                         GOTO(out, rc = PTR_ERR(buf));
439
440                 leh = buf->lb_buf;
441                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
442                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
443
444                 /* If set, use link #linkno for path lookup, otherwise use
445                    link #0.  Only do this for the final path element. */
446                 if ((pli->pli_fidcount == 0) &&
447                     (pli->pli_linkno < leh->leh_reccount)) {
448                         int count;
449                         for (count = 0; count < pli->pli_linkno; count++) {
450                                 lee = (struct link_ea_entry *)
451                                      ((char *)lee + reclen);
452                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
453                         }
454                         if (pli->pli_linkno < leh->leh_reccount - 1)
455                                 /* indicate to user there are more links */
456                                 pli->pli_linkno++;
457                 }
458
459                 /* Pack the name in the end of the buffer */
460                 ptr -= tmpname->ln_namelen;
461                 if (ptr - 1 <= pli->pli_path)
462                         GOTO(out, rc = -EOVERFLOW);
463                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
464                 *(--ptr) = '/';
465
466                 /* Store the parent fid for historic lookup */
467                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
468                         GOTO(out, rc = -EOVERFLOW);
469                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
470         }
471
472         /* Verify that our path hasn't changed since we started the lookup.
473            Record the current index, and verify the path resolves to the
474            same fid. If it does, then the path is correct as of this index. */
475         cfs_spin_lock(&mdd->mdd_cl.mc_lock);
476         pli->pli_currec = mdd->mdd_cl.mc_index;
477         cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
478         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
479         if (rc) {
480                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
481                 GOTO (out, rc = -EAGAIN);
482         }
483         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
484                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
485                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
486                        PFID(&pli->pli_fid));
487                 GOTO(out, rc = -EAGAIN);
488         }
489         ptr++; /* skip leading / */
490         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
491
492         EXIT;
493 out:
494         if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
495                 /* if we vmalloced a large buffer drop it */
496                 mdd_buf_put(buf);
497
498         return rc;
499 }
500
501 static int mdd_path_historic(const struct lu_env *env,
502                              struct path_lookup_info *pli)
503 {
504         return 0;
505 }
506
507 /* Returns the full path to this fid, as of changelog record recno. */
508 static int mdd_path(const struct lu_env *env, struct md_object *obj,
509                     char *path, int pathlen, __u64 *recno, int *linkno)
510 {
511         struct path_lookup_info *pli;
512         int tries = 3;
513         int rc = -EAGAIN;
514         ENTRY;
515
516         if (pathlen < 3)
517                 RETURN(-EOVERFLOW);
518
519         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
520                 path[0] = '\0';
521                 RETURN(0);
522         }
523
524         OBD_ALLOC_PTR(pli);
525         if (pli == NULL)
526                 RETURN(-ENOMEM);
527
528         pli->pli_mdd_obj = md2mdd_obj(obj);
529         pli->pli_recno = *recno;
530         pli->pli_path = path;
531         pli->pli_pathlen = pathlen;
532         pli->pli_linkno = *linkno;
533
534         /* Retry multiple times in case file is being moved */
535         while (tries-- && rc == -EAGAIN)
536                 rc = mdd_path_current(env, pli);
537
538         /* For historical path lookup, the current links may not have existed
539          * at "recno" time.  We must switch over to earlier links/parents
540          * by using the changelog records.  If the earlier parent doesn't
541          * exist, we must search back through the changelog to reconstruct
542          * its parents, then check if it exists, etc.
543          * We may ignore this problem for the initial implementation and
544          * state that an "original" hardlink must still exist for us to find
545          * historic path name. */
546         if (pli->pli_recno != -1) {
547                 rc = mdd_path_historic(env, pli);
548         } else {
549                 *recno = pli->pli_currec;
550                 /* Return next link index to caller */
551                 *linkno = pli->pli_linkno;
552         }
553
554         OBD_FREE_PTR(pli);
555
556         RETURN (rc);
557 }
558
559 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
560 {
561         struct lu_attr *la = &mdd_env_info(env)->mti_la;
562         int rc;
563
564         ENTRY;
565         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
566         if (rc == 0) {
567                 mdd_flags_xlate(obj, la->la_flags);
568                 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
569                         obj->mod_flags |= MNLINK_OBJ;
570         }
571         RETURN(rc);
572 }
573
574 /* get only inode attributes */
575 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
576                   struct md_attr *ma)
577 {
578         int rc = 0;
579         ENTRY;
580
581         if (ma->ma_valid & MA_INODE)
582                 RETURN(0);
583
584         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
585                           mdd_object_capa(env, mdd_obj));
586         if (rc == 0)
587                 ma->ma_valid |= MA_INODE;
588         RETURN(rc);
589 }
590
591 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
592 {
593         struct lov_desc *ldesc;
594         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
595         struct lov_user_md *lum = (struct lov_user_md*)lmm;
596         ENTRY;
597
598         if (!lum)
599                 RETURN(0);
600
601         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
602         LASSERT(ldesc != NULL);
603
604         lum->lmm_magic = LOV_MAGIC_V1;
605         lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
606         lum->lmm_pattern = ldesc->ld_pattern;
607         lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
608         lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
609         lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
610
611         RETURN(sizeof(*lum));
612 }
613
614 static int is_rootdir(struct mdd_object *mdd_obj)
615 {
616         const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
617         const struct lu_fid *fid = mdo2fid(mdd_obj);
618
619         return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
620 }
621
622 /* get lov EA only */
623 static int __mdd_lmm_get(const struct lu_env *env,
624                          struct mdd_object *mdd_obj, struct md_attr *ma)
625 {
626         int rc;
627         ENTRY;
628
629         if (ma->ma_valid & MA_LOV)
630                 RETURN(0);
631
632         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
633                         XATTR_NAME_LOV);
634         if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
635                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
636         if (rc > 0) {
637                 ma->ma_lmm_size = rc;
638                 ma->ma_valid |= MA_LOV;
639                 rc = 0;
640         }
641         RETURN(rc);
642 }
643
644 /* get the first parent fid from link EA */
645 static int mdd_pfid_get(const struct lu_env *env,
646                         struct mdd_object *mdd_obj, struct md_attr *ma)
647 {
648         struct lu_buf *buf;
649         struct link_ea_header *leh;
650         struct link_ea_entry *lee;
651         struct lu_fid *pfid = &ma->ma_pfid;
652         ENTRY;
653
654         if (ma->ma_valid & MA_PFID)
655                 RETURN(0);
656
657         buf = mdd_links_get(env, mdd_obj);
658         if (IS_ERR(buf))
659                 RETURN(PTR_ERR(buf));
660
661         leh = buf->lb_buf;
662         lee = (struct link_ea_entry *)(leh + 1);
663         memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
664         fid_be_to_cpu(pfid, pfid);
665         ma->ma_valid |= MA_PFID;
666         if (buf->lb_len > OBD_ALLOC_BIG)
667                 /* if we vmalloced a large buffer drop it */
668                 mdd_buf_put(buf);
669         RETURN(0);
670 }
671
672 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
673                        struct md_attr *ma)
674 {
675         int rc;
676         ENTRY;
677
678         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
679         rc = __mdd_lmm_get(env, mdd_obj, ma);
680         mdd_read_unlock(env, mdd_obj);
681         RETURN(rc);
682 }
683
684 /* get lmv EA only*/
685 static int __mdd_lmv_get(const struct lu_env *env,
686                          struct mdd_object *mdd_obj, struct md_attr *ma)
687 {
688         int rc;
689         ENTRY;
690
691         if (ma->ma_valid & MA_LMV)
692                 RETURN(0);
693
694         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
695                         XATTR_NAME_LMV);
696         if (rc > 0) {
697                 ma->ma_valid |= MA_LMV;
698                 rc = 0;
699         }
700         RETURN(rc);
701 }
702
703 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
704                          struct md_attr *ma)
705 {
706         struct mdd_thread_info *info = mdd_env_info(env);
707         struct lustre_mdt_attrs *lma =
708                                  (struct lustre_mdt_attrs *)info->mti_xattr_buf;
709         int lma_size;
710         int rc;
711         ENTRY;
712
713         /* If all needed data are already valid, nothing to do */
714         if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
715             (ma->ma_need & (MA_HSM | MA_SOM)))
716                 RETURN(0);
717
718         /* Read LMA from disk EA */
719         lma_size = sizeof(info->mti_xattr_buf);
720         rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
721         if (rc <= 0)
722                 RETURN(rc);
723
724         /* Useless to check LMA incompatibility because this is already done in
725          * osd_ea_fid_get(), and this will fail long before this code is
726          * called.
727          * So, if we are here, LMA is compatible.
728          */
729
730         lustre_lma_swab(lma);
731
732         /* Swab and copy LMA */
733         if (ma->ma_need & MA_HSM) {
734                 if (lma->lma_compat & LMAC_HSM)
735                         ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
736                 else
737                         ma->ma_hsm.mh_flags = 0;
738                 ma->ma_valid |= MA_HSM;
739         }
740
741         /* Copy SOM */
742         if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
743                 LASSERT(ma->ma_som != NULL);
744                 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
745                 ma->ma_som->msd_size    = lma->lma_som_size;
746                 ma->ma_som->msd_blocks  = lma->lma_som_blocks;
747                 ma->ma_som->msd_mountid = lma->lma_som_mountid;
748                 ma->ma_valid |= MA_SOM;
749         }
750
751         RETURN(0);
752 }
753
754 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
755                                  struct md_attr *ma)
756 {
757         int rc = 0;
758         ENTRY;
759
760         if (ma->ma_need & MA_INODE)
761                 rc = mdd_iattr_get(env, mdd_obj, ma);
762
763         if (rc == 0 && ma->ma_need & MA_LOV) {
764                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
765                     S_ISDIR(mdd_object_type(mdd_obj)))
766                         rc = __mdd_lmm_get(env, mdd_obj, ma);
767         }
768         if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
769                 if (S_ISREG(mdd_object_type(mdd_obj)))
770                         rc = mdd_pfid_get(env, mdd_obj, ma);
771         }
772         if (rc == 0 && ma->ma_need & MA_LMV) {
773                 if (S_ISDIR(mdd_object_type(mdd_obj)))
774                         rc = __mdd_lmv_get(env, mdd_obj, ma);
775         }
776         if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
777                 if (S_ISREG(mdd_object_type(mdd_obj)))
778                         rc = __mdd_lma_get(env, mdd_obj, ma);
779         }
780 #ifdef CONFIG_FS_POSIX_ACL
781         if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
782                 if (S_ISDIR(mdd_object_type(mdd_obj)))
783                         rc = mdd_def_acl_get(env, mdd_obj, ma);
784         }
785 #endif
786         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
787                rc, ma->ma_valid, ma->ma_lmm);
788         RETURN(rc);
789 }
790
791 int mdd_attr_get_internal_locked(const struct lu_env *env,
792                                  struct mdd_object *mdd_obj, struct md_attr *ma)
793 {
794         int rc;
795         int needlock = ma->ma_need &
796                        (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
797
798         if (needlock)
799                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
800         rc = mdd_attr_get_internal(env, mdd_obj, ma);
801         if (needlock)
802                 mdd_read_unlock(env, mdd_obj);
803         return rc;
804 }
805
806 /*
807  * No permission check is needed.
808  */
809 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
810                         struct md_attr *ma)
811 {
812         struct mdd_object *mdd_obj = md2mdd_obj(obj);
813         int                rc;
814
815         ENTRY;
816         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
817         RETURN(rc);
818 }
819
820 /*
821  * No permission check is needed.
822  */
823 static int mdd_xattr_get(const struct lu_env *env,
824                          struct md_object *obj, struct lu_buf *buf,
825                          const char *name)
826 {
827         struct mdd_object *mdd_obj = md2mdd_obj(obj);
828         int rc;
829
830         ENTRY;
831
832         LASSERT(mdd_object_exists(mdd_obj));
833
834         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
835         rc = mdo_xattr_get(env, mdd_obj, buf, name,
836                            mdd_object_capa(env, mdd_obj));
837         mdd_read_unlock(env, mdd_obj);
838
839         RETURN(rc);
840 }
841
842 /*
843  * Permission check is done when open,
844  * no need check again.
845  */
846 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
847                         struct lu_buf *buf)
848 {
849         struct mdd_object *mdd_obj = md2mdd_obj(obj);
850         struct dt_object  *next;
851         loff_t             pos = 0;
852         int                rc;
853         ENTRY;
854
855         LASSERT(mdd_object_exists(mdd_obj));
856
857         next = mdd_object_child(mdd_obj);
858         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
859         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
860                                          mdd_object_capa(env, mdd_obj));
861         mdd_read_unlock(env, mdd_obj);
862         RETURN(rc);
863 }
864
865 /*
866  * No permission check is needed.
867  */
868 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
869                           struct lu_buf *buf)
870 {
871         struct mdd_object *mdd_obj = md2mdd_obj(obj);
872         int rc;
873
874         ENTRY;
875
876         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
877         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
878         mdd_read_unlock(env, mdd_obj);
879
880         RETURN(rc);
881 }
882
883 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
884                                struct mdd_object *c, struct md_attr *ma,
885                                struct thandle *handle,
886                                const struct md_op_spec *spec)
887 {
888         struct lu_attr *attr = &ma->ma_attr;
889         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
890         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
891         const struct dt_index_features *feat = spec->sp_feat;
892         int rc;
893         ENTRY;
894
895         if (!mdd_object_exists(c)) {
896                 struct dt_object *next = mdd_object_child(c);
897                 LASSERT(next);
898
899                 if (feat != &dt_directory_features && feat != NULL)
900                         dof->dof_type = DFT_INDEX;
901                 else
902                         dof->dof_type = dt_mode_to_dft(attr->la_mode);
903
904                 dof->u.dof_idx.di_feat = feat;
905
906                 /* @hint will be initialized by underlying device. */
907                 next->do_ops->do_ah_init(env, hint,
908                                          p ? mdd_object_child(p) : NULL,
909                                          attr->la_mode & S_IFMT);
910
911                 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
912                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
913         } else
914                 rc = -EEXIST;
915
916         RETURN(rc);
917 }
918
919 /**
920  * Make sure the ctime is increased only.
921  */
922 static inline int mdd_attr_check(const struct lu_env *env,
923                                  struct mdd_object *obj,
924                                  struct lu_attr *attr)
925 {
926         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
927         int rc;
928         ENTRY;
929
930         if (attr->la_valid & LA_CTIME) {
931                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
932                 if (rc)
933                         RETURN(rc);
934
935                 if (attr->la_ctime < tmp_la->la_ctime)
936                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
937                 else if (attr->la_valid == LA_CTIME &&
938                          attr->la_ctime == tmp_la->la_ctime)
939                         attr->la_valid &= ~LA_CTIME;
940         }
941         RETURN(0);
942 }
943
944 int mdd_attr_set_internal(const struct lu_env *env,
945                           struct mdd_object *obj,
946                           struct lu_attr *attr,
947                           struct thandle *handle,
948                           int needacl)
949 {
950         int rc;
951         ENTRY;
952
953         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
954 #ifdef CONFIG_FS_POSIX_ACL
955         if (!rc && (attr->la_valid & LA_MODE) && needacl)
956                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
957 #endif
958         RETURN(rc);
959 }
960
961 int mdd_attr_check_set_internal(const struct lu_env *env,
962                                 struct mdd_object *obj,
963                                 struct lu_attr *attr,
964                                 struct thandle *handle,
965                                 int needacl)
966 {
967         int rc;
968         ENTRY;
969
970         rc = mdd_attr_check(env, obj, attr);
971         if (rc)
972                 RETURN(rc);
973
974         if (attr->la_valid)
975                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
976         RETURN(rc);
977 }
978
979 static int mdd_attr_set_internal_locked(const struct lu_env *env,
980                                         struct mdd_object *obj,
981                                         struct lu_attr *attr,
982                                         struct thandle *handle,
983                                         int needacl)
984 {
985         int rc;
986         ENTRY;
987
988         needacl = needacl && (attr->la_valid & LA_MODE);
989         if (needacl)
990                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
991         rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
992         if (needacl)
993                 mdd_write_unlock(env, obj);
994         RETURN(rc);
995 }
996
997 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
998                                        struct mdd_object *obj,
999                                        struct lu_attr *attr,
1000                                        struct thandle *handle,
1001                                        int needacl)
1002 {
1003         int rc;
1004         ENTRY;
1005
1006         needacl = needacl && (attr->la_valid & LA_MODE);
1007         if (needacl)
1008                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1009         rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
1010         if (needacl)
1011                 mdd_write_unlock(env, obj);
1012         RETURN(rc);
1013 }
1014
1015 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
1016                     const struct lu_buf *buf, const char *name,
1017                     int fl, struct thandle *handle)
1018 {
1019         struct lustre_capa *capa = mdd_object_capa(env, obj);
1020         int rc = -EINVAL;
1021         ENTRY;
1022
1023         if (buf->lb_buf && buf->lb_len > 0)
1024                 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1025         else if (buf->lb_buf == NULL && buf->lb_len == 0)
1026                 rc = mdo_xattr_del(env, obj, name, handle, capa);
1027
1028         RETURN(rc);
1029 }
1030
1031 /*
1032  * This gives the same functionality as the code between
1033  * sys_chmod and inode_setattr
1034  * chown_common and inode_setattr
1035  * utimes and inode_setattr
1036  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1037  */
1038 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1039                         struct lu_attr *la, const struct md_attr *ma)
1040 {
1041         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
1042         struct md_ucred  *uc;
1043         int               rc;
1044         ENTRY;
1045
1046         if (!la->la_valid)
1047                 RETURN(0);
1048
1049         /* Do not permit change file type */
1050         if (la->la_valid & LA_TYPE)
1051                 RETURN(-EPERM);
1052
1053         /* They should not be processed by setattr */
1054         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1055                 RETURN(-EPERM);
1056
1057         /* export destroy does not have ->le_ses, but we may want
1058          * to drop LUSTRE_SOM_FL. */
1059         if (!env->le_ses)
1060                 RETURN(0);
1061
1062         uc = md_ucred(env);
1063
1064         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1065         if (rc)
1066                 RETURN(rc);
1067
1068         if (la->la_valid == LA_CTIME) {
1069                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1070                         /* This is only for set ctime when rename's source is
1071                          * on remote MDS. */
1072                         rc = mdd_may_delete(env, NULL, obj,
1073                                             (struct md_attr *)ma, 1, 0);
1074                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1075                         la->la_valid &= ~LA_CTIME;
1076                 RETURN(rc);
1077         }
1078
1079         if (la->la_valid == LA_ATIME) {
1080                 /* This is atime only set for read atime update on close. */
1081                 if (la->la_atime >= tmp_la->la_atime &&
1082                     la->la_atime < (tmp_la->la_atime +
1083                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1084                         la->la_valid &= ~LA_ATIME;
1085                 RETURN(0);
1086         }
1087
1088         /* Check if flags change. */
1089         if (la->la_valid & LA_FLAGS) {
1090                 unsigned int oldflags = 0;
1091                 unsigned int newflags = la->la_flags &
1092                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1093
1094                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1095                     !mdd_capable(uc, CFS_CAP_FOWNER))
1096                         RETURN(-EPERM);
1097
1098                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1099                  * only be changed by the relevant capability. */
1100                 if (mdd_is_immutable(obj))
1101                         oldflags |= LUSTRE_IMMUTABLE_FL;
1102                 if (mdd_is_append(obj))
1103                         oldflags |= LUSTRE_APPEND_FL;
1104                 if ((oldflags ^ newflags) &&
1105                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1106                         RETURN(-EPERM);
1107
1108                 if (!S_ISDIR(tmp_la->la_mode))
1109                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1110         }
1111
1112         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1113             (la->la_valid & ~LA_FLAGS) &&
1114             !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1115                 RETURN(-EPERM);
1116
1117         /* Check for setting the obj time. */
1118         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1119             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1120                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1121                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
1122                         rc = mdd_permission_internal_locked(env, obj, tmp_la,
1123                                                             MAY_WRITE,
1124                                                             MOR_TGT_CHILD);
1125                         if (rc)
1126                                 RETURN(rc);
1127                 }
1128         }
1129
1130         if (la->la_valid & LA_KILL_SUID) {
1131                 la->la_valid &= ~LA_KILL_SUID;
1132                 if ((tmp_la->la_mode & S_ISUID) &&
1133                     !(la->la_valid & LA_MODE)) {
1134                         la->la_mode = tmp_la->la_mode;
1135                         la->la_valid |= LA_MODE;
1136                 }
1137                 la->la_mode &= ~S_ISUID;
1138         }
1139
1140         if (la->la_valid & LA_KILL_SGID) {
1141                 la->la_valid &= ~LA_KILL_SGID;
1142                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1143                                         (S_ISGID | S_IXGRP)) &&
1144                     !(la->la_valid & LA_MODE)) {
1145                         la->la_mode = tmp_la->la_mode;
1146                         la->la_valid |= LA_MODE;
1147                 }
1148                 la->la_mode &= ~S_ISGID;
1149         }
1150
1151         /* Make sure a caller can chmod. */
1152         if (la->la_valid & LA_MODE) {
1153                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1154                     (uc->mu_fsuid != tmp_la->la_uid) &&
1155                     !mdd_capable(uc, CFS_CAP_FOWNER))
1156                         RETURN(-EPERM);
1157
1158                 if (la->la_mode == (cfs_umode_t) -1)
1159                         la->la_mode = tmp_la->la_mode;
1160                 else
1161                         la->la_mode = (la->la_mode & S_IALLUGO) |
1162                                       (tmp_la->la_mode & ~S_IALLUGO);
1163
1164                 /* Also check the setgid bit! */
1165                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1166                                        la->la_gid : tmp_la->la_gid) &&
1167                     !mdd_capable(uc, CFS_CAP_FSETID))
1168                         la->la_mode &= ~S_ISGID;
1169         } else {
1170                la->la_mode = tmp_la->la_mode;
1171         }
1172
1173         /* Make sure a caller can chown. */
1174         if (la->la_valid & LA_UID) {
1175                 if (la->la_uid == (uid_t) -1)
1176                         la->la_uid = tmp_la->la_uid;
1177                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1178                     (la->la_uid != tmp_la->la_uid)) &&
1179                     !mdd_capable(uc, CFS_CAP_CHOWN))
1180                         RETURN(-EPERM);
1181
1182                 /* If the user or group of a non-directory has been
1183                  * changed by a non-root user, remove the setuid bit.
1184                  * 19981026 David C Niemi <niemi@tux.org>
1185                  *
1186                  * Changed this to apply to all users, including root,
1187                  * to avoid some races. This is the behavior we had in
1188                  * 2.0. The check for non-root was definitely wrong
1189                  * for 2.2 anyway, as it should have been using
1190                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
1191                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1192                     !S_ISDIR(tmp_la->la_mode)) {
1193                         la->la_mode &= ~S_ISUID;
1194                         la->la_valid |= LA_MODE;
1195                 }
1196         }
1197
1198         /* Make sure caller can chgrp. */
1199         if (la->la_valid & LA_GID) {
1200                 if (la->la_gid == (gid_t) -1)
1201                         la->la_gid = tmp_la->la_gid;
1202                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1203                     ((la->la_gid != tmp_la->la_gid) &&
1204                     !lustre_in_group_p(uc, la->la_gid))) &&
1205                     !mdd_capable(uc, CFS_CAP_CHOWN))
1206                         RETURN(-EPERM);
1207
1208                 /* Likewise, if the user or group of a non-directory
1209                  * has been changed by a non-root user, remove the
1210                  * setgid bit UNLESS there is no group execute bit
1211                  * (this would be a file marked for mandatory
1212                  * locking).  19981026 David C Niemi <niemi@tux.org>
1213                  *
1214                  * Removed the fsuid check (see the comment above) --
1215                  * 19990830 SD. */
1216                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1217                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1218                         la->la_mode &= ~S_ISGID;
1219                         la->la_valid |= LA_MODE;
1220                 }
1221         }
1222
1223         /* For both Size-on-MDS case and truncate case,
1224          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1225          * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1226          * For SOM case, it is true, the MAY_WRITE perm has been checked
1227          * when open, no need check again. For truncate case, it is false,
1228          * the MAY_WRITE perm should be checked here. */
1229         if (ma->ma_attr_flags & MDS_SOM) {
1230                 /* For the "Size-on-MDS" setattr update, merge coming
1231                  * attributes with the set in the inode. BUG 10641 */
1232                 if ((la->la_valid & LA_ATIME) &&
1233                     (la->la_atime <= tmp_la->la_atime))
1234                         la->la_valid &= ~LA_ATIME;
1235
1236                 /* OST attributes do not have a priority over MDS attributes,
1237                  * so drop times if ctime is equal. */
1238                 if ((la->la_valid & LA_CTIME) &&
1239                     (la->la_ctime <= tmp_la->la_ctime))
1240                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1241         } else {
1242                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1243                         if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1244                               (uc->mu_fsuid == tmp_la->la_uid)) &&
1245                             !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1246                                 rc = mdd_permission_internal_locked(env, obj,
1247                                                             tmp_la, MAY_WRITE,
1248                                                             MOR_TGT_CHILD);
1249                                 if (rc)
1250                                         RETURN(rc);
1251                         }
1252                 }
1253                 if (la->la_valid & LA_CTIME) {
1254                         /* The pure setattr, it has the priority over what is
1255                          * already set, do not drop it if ctime is equal. */
1256                         if (la->la_ctime < tmp_la->la_ctime)
1257                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1258                                                   LA_CTIME);
1259                 }
1260         }
1261
1262         RETURN(0);
1263 }
1264
1265 /** Store a data change changelog record
1266  * If this fails, we must fail the whole transaction; we don't
1267  * want the change to commit without the log entry.
1268  * \param mdd_obj - mdd_object of change
1269  * \param handle - transacion handle
1270  */
1271 static int mdd_changelog_data_store(const struct lu_env     *env,
1272                                     struct mdd_device       *mdd,
1273                                     enum changelog_rec_type type,
1274                                     int                     flags,
1275                                     struct mdd_object       *mdd_obj,
1276                                     struct thandle          *handle)
1277 {
1278         const struct lu_fid *tfid = mdo2fid(mdd_obj);
1279         struct llog_changelog_rec *rec;
1280         struct lu_buf *buf;
1281         int reclen;
1282         int rc;
1283
1284         /* Not recording */
1285         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1286                 RETURN(0);
1287         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1288                 RETURN(0);
1289
1290         LASSERT(handle != NULL);
1291         LASSERT(mdd_obj != NULL);
1292
1293         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1294             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1295                 /* Don't need multiple updates in this log */
1296                 /* Don't check under lock - no big deal if we get an extra
1297                    entry */
1298                 RETURN(0);
1299         }
1300
1301         reclen = llog_data_len(sizeof(*rec));
1302         buf = mdd_buf_alloc(env, reclen);
1303         if (buf->lb_buf == NULL)
1304                 RETURN(-ENOMEM);
1305         rec = (struct llog_changelog_rec *)buf->lb_buf;
1306
1307         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1308         rec->cr.cr_type = (__u32)type;
1309         rec->cr.cr_tfid = *tfid;
1310         rec->cr.cr_namelen = 0;
1311         mdd_obj->mod_cltime = cfs_time_current_64();
1312
1313         rc = mdd_changelog_llog_write(mdd, rec, handle);
1314         if (rc < 0) {
1315                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1316                        rc, type, PFID(tfid));
1317                 return -EFAULT;
1318         }
1319
1320         return 0;
1321 }
1322
1323 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1324                   int flags, struct md_object *obj)
1325 {
1326         struct thandle *handle;
1327         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1328         struct mdd_device *mdd = mdo2mdd(obj);
1329         int rc;
1330         ENTRY;
1331
1332         handle = mdd_trans_start(env, mdd);
1333
1334         if (IS_ERR(handle))
1335                 return(PTR_ERR(handle));
1336
1337         rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1338                                       handle);
1339
1340         mdd_trans_stop(env, mdd, rc, handle);
1341
1342         RETURN(rc);
1343 }
1344
1345 /**
1346  * Should be called with write lock held.
1347  *
1348  * \see mdd_lma_set_locked().
1349  */
1350 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1351                        const struct md_attr *ma, struct thandle *handle)
1352 {
1353         struct mdd_thread_info *info = mdd_env_info(env);
1354         struct lu_buf *buf;
1355         struct lustre_mdt_attrs *lma =
1356                                 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1357         int lmasize = sizeof(struct lustre_mdt_attrs);
1358         int rc = 0;
1359
1360         ENTRY;
1361
1362         /* Either HSM or SOM part is not valid, we need to read it before */
1363         if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1364                 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1365                 if (rc <= 0)
1366                         RETURN(rc);
1367
1368                 lustre_lma_swab(lma);
1369         } else {
1370                 memset(lma, 0, lmasize);
1371         }
1372
1373         /* Copy HSM data */
1374         if (ma->ma_valid & MA_HSM) {
1375                 lma->lma_flags  |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1376                 lma->lma_compat |= LMAC_HSM;
1377         }
1378
1379         /* Copy SOM data */
1380         if (ma->ma_valid & MA_SOM) {
1381                 LASSERT(ma->ma_som != NULL);
1382                 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1383                         lma->lma_compat     &= ~LMAC_SOM;
1384                 } else {
1385                         lma->lma_compat     |= LMAC_SOM;
1386                         lma->lma_ioepoch     = ma->ma_som->msd_ioepoch;
1387                         lma->lma_som_size    = ma->ma_som->msd_size;
1388                         lma->lma_som_blocks  = ma->ma_som->msd_blocks;
1389                         lma->lma_som_mountid = ma->ma_som->msd_mountid;
1390                 }
1391         }
1392
1393         /* Copy FID */
1394         memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1395
1396         lustre_lma_swab(lma);
1397         buf = mdd_buf_get(env, lma, lmasize);
1398         rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1399
1400         RETURN(rc);
1401 }
1402
1403 /**
1404  * Save LMA extended attributes with data from \a ma.
1405  *
1406  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1407  * not, LMA EA will be first read from disk, modified and write back.
1408  *
1409  */
1410 static int mdd_lma_set_locked(const struct lu_env *env,
1411                               struct mdd_object *mdd_obj,
1412                               const struct md_attr *ma, struct thandle *handle)
1413 {
1414         int rc;
1415
1416         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1417         rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1418         mdd_write_unlock(env, mdd_obj);
1419         return rc;
1420 }
1421
1422 /* Precedence for choosing record type when multiple
1423  * attributes change: setattr > mtime > ctime > atime
1424  * (ctime changes when mtime does, plus chmod/chown.
1425  * atime and ctime are independent.) */
1426 static int mdd_attr_set_changelog(const struct lu_env *env,
1427                                   struct md_object *obj, struct thandle *handle,
1428                                   __u64 valid)
1429 {
1430         struct mdd_device *mdd = mdo2mdd(obj);
1431         int bits, type = 0;
1432
1433         bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1434         bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1435         bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1436         bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1437         bits = bits & mdd->mdd_cl.mc_mask;
1438         if (bits == 0)
1439                 return 0;
1440
1441         /* The record type is the lowest non-masked set bit */
1442         while (bits && ((bits & 1) == 0)) {
1443                 bits = bits >> 1;
1444                 type++;
1445         }
1446
1447         /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1448         return mdd_changelog_data_store(env, mdd, type, (int)valid,
1449                                         md2mdd_obj(obj), handle);
1450 }
1451
1452 /* set attr and LOV EA at once, return updated attr */
1453 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1454                         const struct md_attr *ma)
1455 {
1456         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1457         struct mdd_device *mdd = mdo2mdd(obj);
1458         struct thandle *handle;
1459         struct lov_mds_md *lmm = NULL;
1460         struct llog_cookie *logcookies = NULL;
1461         int  rc, lmm_size = 0, cookie_size = 0;
1462         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1463 #ifdef HAVE_QUOTA_SUPPORT
1464         struct obd_device *obd = mdd->mdd_obd_dev;
1465         struct mds_obd *mds = &obd->u.mds;
1466         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1467         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1468         int quota_opc = 0, block_count = 0;
1469         int inode_pending[MAXQUOTAS] = { 0, 0 };
1470         int block_pending[MAXQUOTAS] = { 0, 0 };
1471 #endif
1472         ENTRY;
1473
1474         *la_copy = ma->ma_attr;
1475         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1476         if (rc != 0)
1477                 RETURN(rc);
1478
1479         /* setattr on "close" only change atime, or do nothing */
1480         if (ma->ma_valid == MA_INODE &&
1481             ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1482                 RETURN(0);
1483
1484         mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1485                                     MDD_TXN_ATTR_SET_OP);
1486         handle = mdd_trans_start(env, mdd);
1487         if (IS_ERR(handle))
1488                 RETURN(PTR_ERR(handle));
1489         /*TODO: add lock here*/
1490         /* start a log jounal handle if needed */
1491         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1492             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1493                 lmm_size = mdd_lov_mdsize(env, mdd);
1494                 lmm = mdd_max_lmm_get(env, mdd);
1495                 if (lmm == NULL)
1496                         GOTO(cleanup, rc = -ENOMEM);
1497
1498                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1499                                 XATTR_NAME_LOV);
1500
1501                 if (rc < 0)
1502                         GOTO(cleanup, rc);
1503         }
1504
1505         if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1506                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1507                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1508
1509 #ifdef HAVE_QUOTA_SUPPORT
1510         if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1511                 struct obd_export *exp = md_quota(env)->mq_exp;
1512                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1513
1514                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1515                 if (!rc) {
1516                         quota_opc = FSFILT_OP_SETATTR;
1517                         mdd_quota_wrapper(la_copy, qnids);
1518                         mdd_quota_wrapper(la_tmp, qoids);
1519                         /* get file quota for new owner */
1520                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1521                                         qnids, inode_pending, 1, NULL, 0,
1522                                         NULL, 0);
1523                         block_count = (la_tmp->la_blocks + 7) >> 3;
1524                         if (block_count) {
1525                                 void *data = NULL;
1526                                 mdd_data_get(env, mdd_obj, &data);
1527                                 /* get block quota for new owner */
1528                                 lquota_chkquota(mds_quota_interface_ref, obd,
1529                                                 exp, qnids, block_pending,
1530                                                 block_count, NULL,
1531                                                 LQUOTA_FLAGS_BLK, data, 1);
1532                         }
1533                 }
1534         }
1535 #endif
1536
1537         if (la_copy->la_valid & LA_FLAGS) {
1538                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1539                                                   handle, 1);
1540                 if (rc == 0)
1541                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1542         } else if (la_copy->la_valid) {            /* setattr */
1543                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1544                                                   handle, 1);
1545                 /* journal chown/chgrp in llog, just like unlink */
1546                 if (rc == 0 && lmm_size){
1547                         cookie_size = mdd_lov_cookiesize(env, mdd);
1548                         logcookies = mdd_max_cookie_get(env, mdd);
1549                         if (logcookies == NULL)
1550                                 GOTO(cleanup, rc = -ENOMEM);
1551
1552                         if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1553                                             logcookies, cookie_size) <= 0)
1554                                 logcookies = NULL;
1555                 }
1556         }
1557
1558         if (rc == 0 && ma->ma_valid & MA_LOV) {
1559                 cfs_umode_t mode;
1560
1561                 mode = mdd_object_type(mdd_obj);
1562                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1563                         rc = mdd_lsm_sanity_check(env, mdd_obj);
1564                         if (rc)
1565                                 GOTO(cleanup, rc);
1566
1567                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1568                                             ma->ma_lmm_size, handle, 1);
1569                 }
1570
1571         }
1572         if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1573                 cfs_umode_t mode;
1574
1575                 mode = mdd_object_type(mdd_obj);
1576                 if (S_ISREG(mode))
1577                         rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1578
1579         }
1580 cleanup:
1581         if (rc == 0)
1582                 rc = mdd_attr_set_changelog(env, obj, handle,
1583                                             ma->ma_attr.la_valid);
1584         mdd_trans_stop(env, mdd, rc, handle);
1585         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1586                 /*set obd attr, if needed*/
1587                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1588                                            logcookies);
1589         }
1590 #ifdef HAVE_QUOTA_SUPPORT
1591         if (quota_opc) {
1592                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1593                                       inode_pending, 0);
1594                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1595                                       block_pending, 1);
1596                 /* Trigger dqrel/dqacq for original owner and new owner.
1597                  * If failed, the next call for lquota_chkquota will
1598                  * process it. */
1599                 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1600                               quota_opc);
1601         }
1602 #endif
1603         RETURN(rc);
1604 }
1605
1606 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1607                       const struct lu_buf *buf, const char *name, int fl,
1608                       struct thandle *handle)
1609 {
1610         int  rc;
1611         ENTRY;
1612
1613         mdd_write_lock(env, obj, MOR_TGT_CHILD);
1614         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1615         mdd_write_unlock(env, obj);
1616
1617         RETURN(rc);
1618 }
1619
1620 static int mdd_xattr_sanity_check(const struct lu_env *env,
1621                                   struct mdd_object *obj)
1622 {
1623         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1624         struct md_ucred *uc     = md_ucred(env);
1625         int rc;
1626         ENTRY;
1627
1628         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1629                 RETURN(-EPERM);
1630
1631         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1632         if (rc)
1633                 RETURN(rc);
1634
1635         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1636             !mdd_capable(uc, CFS_CAP_FOWNER))
1637                 RETURN(-EPERM);
1638
1639         RETURN(rc);
1640 }
1641
1642 /**
1643  * The caller should guarantee to update the object ctime
1644  * after xattr_set if needed.
1645  */
1646 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1647                          const struct lu_buf *buf, const char *name,
1648                          int fl)
1649 {
1650         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1651         struct mdd_device *mdd = mdo2mdd(obj);
1652         struct thandle *handle;
1653         int  rc;
1654         ENTRY;
1655
1656         rc = mdd_xattr_sanity_check(env, mdd_obj);
1657         if (rc)
1658                 RETURN(rc);
1659
1660         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1661         /* security-replated changes may require sync */
1662         if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1663             mdd->mdd_sync_permission == 1)
1664                 txn_param_sync(&mdd_env_info(env)->mti_param);
1665
1666         handle = mdd_trans_start(env, mdd);
1667         if (IS_ERR(handle))
1668                 RETURN(PTR_ERR(handle));
1669
1670         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1671
1672         /* Only record user xattr changes */
1673         if ((rc == 0) && (strncmp("user.", name, 5) == 0))
1674                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1675                                               handle);
1676         mdd_trans_stop(env, mdd, rc, handle);
1677
1678         RETURN(rc);
1679 }
1680
1681 /**
1682  * The caller should guarantee to update the object ctime
1683  * after xattr_set if needed.
1684  */
1685 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1686                   const char *name)
1687 {
1688         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1689         struct mdd_device *mdd = mdo2mdd(obj);
1690         struct thandle *handle;
1691         int  rc;
1692         ENTRY;
1693
1694         rc = mdd_xattr_sanity_check(env, mdd_obj);
1695         if (rc)
1696                 RETURN(rc);
1697
1698         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1699         handle = mdd_trans_start(env, mdd);
1700         if (IS_ERR(handle))
1701                 RETURN(PTR_ERR(handle));
1702
1703         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1704         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1705                            mdd_object_capa(env, mdd_obj));
1706         mdd_write_unlock(env, mdd_obj);
1707
1708         /* Only record user xattr changes */
1709         if ((rc == 0) && (strncmp("user.", name, 5) != 0))
1710                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1711                                               handle);
1712
1713         mdd_trans_stop(env, mdd, rc, handle);
1714
1715         RETURN(rc);
1716 }
1717
1718 /* partial unlink */
1719 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1720                        struct md_attr *ma)
1721 {
1722         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1723         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1724         struct mdd_device *mdd = mdo2mdd(obj);
1725         struct thandle *handle;
1726 #ifdef HAVE_QUOTA_SUPPORT
1727         struct obd_device *obd = mdd->mdd_obd_dev;
1728         struct mds_obd *mds = &obd->u.mds;
1729         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1730         int quota_opc = 0;
1731 #endif
1732         int rc;
1733         ENTRY;
1734
1735         /*
1736          * Check -ENOENT early here because we need to get object type
1737          * to calculate credits before transaction start
1738          */
1739         if (!mdd_object_exists(mdd_obj))
1740                 RETURN(-ENOENT);
1741
1742         LASSERT(mdd_object_exists(mdd_obj) > 0);
1743
1744         rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1745         if (rc)
1746                 RETURN(rc);
1747
1748         handle = mdd_trans_start(env, mdd);
1749         if (IS_ERR(handle))
1750                 RETURN(-ENOMEM);
1751
1752         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1753
1754         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1755         if (rc)
1756                 GOTO(cleanup, rc);
1757
1758         __mdd_ref_del(env, mdd_obj, handle, 0);
1759
1760         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1761                 /* unlink dot */
1762                 __mdd_ref_del(env, mdd_obj, handle, 1);
1763         }
1764
1765         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1766         la_copy->la_ctime = ma->ma_attr.la_ctime;
1767
1768         la_copy->la_valid = LA_CTIME;
1769         rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1770         if (rc)
1771                 GOTO(cleanup, rc);
1772
1773         rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1774 #ifdef HAVE_QUOTA_SUPPORT
1775         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1776             ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1777                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1778                 mdd_quota_wrapper(&ma->ma_attr, qids);
1779         }
1780 #endif
1781
1782
1783         EXIT;
1784 cleanup:
1785         mdd_write_unlock(env, mdd_obj);
1786         mdd_trans_stop(env, mdd, rc, handle);
1787 #ifdef HAVE_QUOTA_SUPPORT
1788         if (quota_opc)
1789                 /* Trigger dqrel on the owner of child. If failed,
1790                  * the next call for lquota_chkquota will process it */
1791                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1792                               quota_opc);
1793 #endif
1794         return rc;
1795 }
1796
1797 /* partial operation */
1798 static int mdd_oc_sanity_check(const struct lu_env *env,
1799                                struct mdd_object *obj,
1800                                struct md_attr *ma)
1801 {
1802         int rc;
1803         ENTRY;
1804
1805         switch (ma->ma_attr.la_mode & S_IFMT) {
1806         case S_IFREG:
1807         case S_IFDIR:
1808         case S_IFLNK:
1809         case S_IFCHR:
1810         case S_IFBLK:
1811         case S_IFIFO:
1812         case S_IFSOCK:
1813                 rc = 0;
1814                 break;
1815         default:
1816                 rc = -EINVAL;
1817                 break;
1818         }
1819         RETURN(rc);
1820 }
1821
1822 static int mdd_object_create(const struct lu_env *env,
1823                              struct md_object *obj,
1824                              const struct md_op_spec *spec,
1825                              struct md_attr *ma)
1826 {
1827
1828         struct mdd_device *mdd = mdo2mdd(obj);
1829         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1830         const struct lu_fid *pfid = spec->u.sp_pfid;
1831         struct thandle *handle;
1832 #ifdef HAVE_QUOTA_SUPPORT
1833         struct obd_device *obd = mdd->mdd_obd_dev;
1834         struct obd_export *exp = md_quota(env)->mq_exp;
1835         struct mds_obd *mds = &obd->u.mds;
1836         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1837         int quota_opc = 0, block_count = 0;
1838         int inode_pending[MAXQUOTAS] = { 0, 0 };
1839         int block_pending[MAXQUOTAS] = { 0, 0 };
1840 #endif
1841         int rc = 0;
1842         ENTRY;
1843
1844 #ifdef HAVE_QUOTA_SUPPORT
1845         if (mds->mds_quota) {
1846                 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1847                 mdd_quota_wrapper(&ma->ma_attr, qids);
1848                 /* get file quota for child */
1849                 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1850                                 qids, inode_pending, 1, NULL, 0,
1851                                 NULL, 0);
1852                 switch (ma->ma_attr.la_mode & S_IFMT) {
1853                 case S_IFLNK:
1854                 case S_IFDIR:
1855                         block_count = 2;
1856                         break;
1857                 case S_IFREG:
1858                         block_count = 1;
1859                         break;
1860                 }
1861                 /* get block quota for child */
1862                 if (block_count)
1863                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1864                                         qids, block_pending, block_count,
1865                                         NULL, LQUOTA_FLAGS_BLK, NULL, 0);
1866         }
1867 #endif
1868
1869         mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
1870         handle = mdd_trans_start(env, mdd);
1871         if (IS_ERR(handle))
1872                 GOTO(out_pending, rc = PTR_ERR(handle));
1873
1874         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1875         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1876         if (rc)
1877                 GOTO(unlock, rc);
1878
1879         rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1880         if (rc)
1881                 GOTO(unlock, rc);
1882
1883         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1884                 /* If creating the slave object, set slave EA here. */
1885                 int lmv_size = spec->u.sp_ea.eadatalen;
1886                 struct lmv_stripe_md *lmv;
1887
1888                 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1889                 LASSERT(lmv != NULL && lmv_size > 0);
1890
1891                 rc = __mdd_xattr_set(env, mdd_obj,
1892                                      mdd_buf_get_const(env, lmv, lmv_size),
1893                                      XATTR_NAME_LMV, 0, handle);
1894                 if (rc)
1895                         GOTO(unlock, rc);
1896
1897                 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1898                                            handle, 0);
1899         } else {
1900 #ifdef CONFIG_FS_POSIX_ACL
1901                 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1902                         struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1903
1904                         buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1905                         buf->lb_len = spec->u.sp_ea.eadatalen;
1906                         if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1907                                 rc = __mdd_acl_init(env, mdd_obj, buf,
1908                                                     &ma->ma_attr.la_mode,
1909                                                     handle);
1910                                 if (rc)
1911                                         GOTO(unlock, rc);
1912                                 else
1913                                         ma->ma_attr.la_valid |= LA_MODE;
1914                         }
1915
1916                         pfid = spec->u.sp_ea.fid;
1917                 }
1918 #endif
1919                 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1920                                            spec);
1921         }
1922         EXIT;
1923 unlock:
1924         if (rc == 0)
1925                 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1926         mdd_write_unlock(env, mdd_obj);
1927
1928         mdd_trans_stop(env, mdd, rc, handle);
1929 out_pending:
1930 #ifdef HAVE_QUOTA_SUPPORT
1931         if (quota_opc) {
1932                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1933                                       inode_pending, 0);
1934                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1935                                       block_pending, 1);
1936                 /* Trigger dqacq on the owner of child. If failed,
1937                  * the next call for lquota_chkquota will process it. */
1938                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1939                               quota_opc);
1940         }
1941 #endif
1942         return rc;
1943 }
1944
1945 /* partial link */
1946 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1947                        const struct md_attr *ma)
1948 {
1949         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1950         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1951         struct mdd_device *mdd = mdo2mdd(obj);
1952         struct thandle *handle;
1953         int rc;
1954         ENTRY;
1955
1956         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1957         handle = mdd_trans_start(env, mdd);
1958         if (IS_ERR(handle))
1959                 RETURN(-ENOMEM);
1960
1961         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1962         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1963         if (rc == 0)
1964                 __mdd_ref_add(env, mdd_obj, handle);
1965         mdd_write_unlock(env, mdd_obj);
1966         if (rc == 0) {
1967                 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1968                 la_copy->la_ctime = ma->ma_attr.la_ctime;
1969
1970                 la_copy->la_valid = LA_CTIME;
1971                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1972                                                         handle, 0);
1973         }
1974         mdd_trans_stop(env, mdd, 0, handle);
1975
1976         RETURN(rc);
1977 }
1978
1979 /*
1980  * do NOT or the MAY_*'s, you'll get the weakest
1981  */
1982 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1983 {
1984         int res = 0;
1985
1986         /* Sadly, NFSD reopens a file repeatedly during operation, so the
1987          * "acc_mode = 0" allowance for newly-created files isn't honoured.
1988          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1989          * owner can write to a file even if it is marked readonly to hide
1990          * its brokenness. (bug 5781) */
1991         if (flags & MDS_OPEN_OWNEROVERRIDE) {
1992                 struct md_ucred *uc = md_ucred(env);
1993
1994                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1995                     (la->la_uid == uc->mu_fsuid))
1996                         return 0;
1997         }
1998
1999         if (flags & FMODE_READ)
2000                 res |= MAY_READ;
2001         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2002                 res |= MAY_WRITE;
2003         if (flags & MDS_FMODE_EXEC)
2004                 res = MAY_EXEC;
2005         return res;
2006 }
2007
2008 static int mdd_open_sanity_check(const struct lu_env *env,
2009                                  struct mdd_object *obj, int flag)
2010 {
2011         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2012         int mode, rc;
2013         ENTRY;
2014
2015         /* EEXIST check */
2016         if (mdd_is_dead_obj(obj))
2017                 RETURN(-ENOENT);
2018
2019         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
2020         if (rc)
2021                RETURN(rc);
2022
2023         if (S_ISLNK(tmp_la->la_mode))
2024                 RETURN(-ELOOP);
2025
2026         mode = accmode(env, tmp_la, flag);
2027
2028         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2029                 RETURN(-EISDIR);
2030
2031         if (!(flag & MDS_OPEN_CREATED)) {
2032                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
2033                 if (rc)
2034                         RETURN(rc);
2035         }
2036
2037         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2038             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2039                 flag &= ~MDS_OPEN_TRUNC;
2040
2041         /* For writing append-only file must open it with append mode. */
2042         if (mdd_is_append(obj)) {
2043                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2044                         RETURN(-EPERM);
2045                 if (flag & MDS_OPEN_TRUNC)
2046                         RETURN(-EPERM);
2047         }
2048
2049 #if 0
2050         /*
2051          * Now, flag -- O_NOATIME does not be packed by client.
2052          */
2053         if (flag & O_NOATIME) {
2054                 struct md_ucred *uc = md_ucred(env);
2055
2056                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2057                     (uc->mu_valid == UCRED_NEW)) &&
2058                     (uc->mu_fsuid != tmp_la->la_uid) &&
2059                     !mdd_capable(uc, CFS_CAP_FOWNER))
2060                         RETURN(-EPERM);
2061         }
2062 #endif
2063
2064         RETURN(0);
2065 }
2066
2067 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2068                     int flags)
2069 {
2070         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2071         int rc = 0;
2072
2073         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2074
2075         rc = mdd_open_sanity_check(env, mdd_obj, flags);
2076         if (rc == 0)
2077                 mdd_obj->mod_count++;
2078
2079         mdd_write_unlock(env, mdd_obj);
2080         return rc;
2081 }
2082
2083 /* return md_attr back,
2084  * if it is last unlink then return lov ea + llog cookie*/
2085 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2086                     struct md_attr *ma)
2087 {
2088         int rc = 0;
2089         ENTRY;
2090
2091         if (S_ISREG(mdd_object_type(obj))) {
2092                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2093                  * Caller must be ready for that. */
2094
2095                 rc = __mdd_lmm_get(env, obj, ma);
2096                 if ((ma->ma_valid & MA_LOV))
2097                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2098                                             obj, ma);
2099         }
2100         RETURN(rc);
2101 }
2102
2103 /*
2104  * No permission check is needed.
2105  */
2106 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2107                      struct md_attr *ma)
2108 {
2109         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2110         struct mdd_device *mdd = mdo2mdd(obj);
2111         struct thandle    *handle = NULL;
2112         int rc;
2113         int reset = 1;
2114
2115 #ifdef HAVE_QUOTA_SUPPORT
2116         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2117         struct mds_obd *mds = &obd->u.mds;
2118         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2119         int quota_opc = 0;
2120 #endif
2121         ENTRY;
2122
2123         if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2124                 mdd_obj->mod_count--;
2125
2126                 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2127                         CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2128                                "list\n", PFID(mdd_object_fid(mdd_obj)));
2129                 RETURN(0);
2130         }
2131
2132         /* check without any lock */
2133         if (mdd_obj->mod_count == 1 &&
2134             (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2135  again:
2136                 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
2137                 if (rc)
2138                         RETURN(rc);
2139                 handle = mdd_trans_start(env, mdo2mdd(obj));
2140                 if (IS_ERR(handle))
2141                         RETURN(PTR_ERR(handle));
2142         }
2143
2144         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2145         if (handle == NULL && mdd_obj->mod_count == 1 &&
2146             (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2147                 mdd_write_unlock(env, mdd_obj);
2148                 goto again;
2149         }
2150
2151         /* release open count */
2152         mdd_obj->mod_count --;
2153
2154         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2155                 /* remove link to object from orphan index */
2156                 rc = __mdd_orphan_del(env, mdd_obj, handle);
2157                 if (rc == 0) {
2158                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2159                                "list, OSS objects to be destroyed.\n",
2160                                PFID(mdd_object_fid(mdd_obj)));
2161                 } else {
2162                         CERROR("Object "DFID" can not be deleted from orphan "
2163                                 "list, maybe cause OST objects can not be "
2164                                 "destroyed (err: %d).\n",
2165                                 PFID(mdd_object_fid(mdd_obj)), rc);
2166                         /* If object was not deleted from orphan list, do not
2167                          * destroy OSS objects, which will be done when next
2168                          * recovery. */
2169                         GOTO(out, rc);
2170                 }
2171         }
2172
2173         rc = mdd_iattr_get(env, mdd_obj, ma);
2174         /* Object maybe not in orphan list originally, it is rare case for
2175          * mdd_finish_unlink() failure. */
2176         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
2177 #ifdef HAVE_QUOTA_SUPPORT
2178                 if (mds->mds_quota) {
2179                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2180                         mdd_quota_wrapper(&ma->ma_attr, qids);
2181                 }
2182 #endif
2183                 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2184                 if (ma->ma_valid & MA_FLAGS &&
2185                     ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2186                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2187                 } else {
2188                         rc = mdd_object_kill(env, mdd_obj, ma);
2189                         if (rc == 0)
2190                                 reset = 0;
2191                 }
2192
2193                 if (rc != 0)
2194                         CERROR("Error when prepare to delete Object "DFID" , "
2195                                "which will cause OST objects can not be "
2196                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
2197         }
2198         EXIT;
2199
2200 out:
2201         if (reset)
2202                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2203
2204         mdd_write_unlock(env, mdd_obj);
2205         if (handle != NULL)
2206                 mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
2207 #ifdef HAVE_QUOTA_SUPPORT
2208         if (quota_opc)
2209                 /* Trigger dqrel on the owner of child. If failed,
2210                  * the next call for lquota_chkquota will process it */
2211                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2212                               quota_opc);
2213 #endif
2214         return rc;
2215 }
2216
2217 /*
2218  * Permission check is done when open,
2219  * no need check again.
2220  */
2221 static int mdd_readpage_sanity_check(const struct lu_env *env,
2222                                      struct mdd_object *obj)
2223 {
2224         struct dt_object *next = mdd_object_child(obj);
2225         int rc;
2226         ENTRY;
2227
2228         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2229                 rc = 0;
2230         else
2231                 rc = -ENOTDIR;
2232
2233         RETURN(rc);
2234 }
2235
2236 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2237                               struct lu_dirpage *dp, int nob,
2238                               const struct dt_it_ops *iops, struct dt_it *it,
2239                               __u32 attr)
2240 {
2241         void                   *area = dp;
2242         int                     result;
2243         __u64                   hash = 0;
2244         struct lu_dirent       *ent;
2245         struct lu_dirent       *last = NULL;
2246         int                     first = 1;
2247
2248         memset(area, 0, sizeof (*dp));
2249         area += sizeof (*dp);
2250         nob  -= sizeof (*dp);
2251
2252         ent  = area;
2253         do {
2254                 int    len;
2255                 int    recsize;
2256
2257                 len  = iops->key_size(env, it);
2258
2259                 /* IAM iterator can return record with zero len. */
2260                 if (len == 0)
2261                         goto next;
2262
2263                 hash = iops->store(env, it);
2264                 if (unlikely(first)) {
2265                         first = 0;
2266                         dp->ldp_hash_start = cpu_to_le64(hash);
2267                 }
2268
2269                 /* calculate max space required for lu_dirent */
2270                 recsize = lu_dirent_calc_size(len, attr);
2271
2272                 if (nob >= recsize) {
2273                         result = iops->rec(env, it, ent, attr);
2274                         if (result == -ESTALE)
2275                                 goto next;
2276                         if (result != 0)
2277                                 goto out;
2278
2279                         /* osd might not able to pack all attributes,
2280                          * so recheck rec length */
2281                         recsize = le16_to_cpu(ent->lde_reclen);
2282                 } else {
2283                         result = (last != NULL) ? 0 :-EINVAL;
2284                         goto out;
2285                 }
2286                 last = ent;
2287                 ent = (void *)ent + recsize;
2288                 nob -= recsize;
2289
2290 next:
2291                 result = iops->next(env, it);
2292                 if (result == -ESTALE)
2293                         goto next;
2294         } while (result == 0);
2295
2296 out:
2297         dp->ldp_hash_end = cpu_to_le64(hash);
2298         if (last != NULL) {
2299                 if (last->lde_hash == dp->ldp_hash_end)
2300                         dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2301                 last->lde_reclen = 0; /* end mark */
2302         }
2303         return result;
2304 }
2305
2306 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2307                           const struct lu_rdpg *rdpg)
2308 {
2309         struct dt_it      *it;
2310         struct dt_object  *next = mdd_object_child(obj);
2311         const struct dt_it_ops  *iops;
2312         struct page       *pg;
2313         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2314         int i;
2315         int nlupgs = 0;
2316         int rc;
2317         int nob;
2318
2319         LASSERT(rdpg->rp_pages != NULL);
2320         LASSERT(next->do_index_ops != NULL);
2321
2322         if (rdpg->rp_count <= 0)
2323                 return -EFAULT;
2324
2325         /*
2326          * iterate through directory and fill pages from @rdpg
2327          */
2328         iops = &next->do_index_ops->dio_it;
2329         it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
2330         if (IS_ERR(it))
2331                 return PTR_ERR(it);
2332
2333         rc = iops->load(env, it, rdpg->rp_hash);
2334
2335         if (rc == 0) {
2336                 /*
2337                  * Iterator didn't find record with exactly the key requested.
2338                  *
2339                  * It is currently either
2340                  *
2341                  *     - positioned above record with key less than
2342                  *     requested---skip it.
2343                  *
2344                  *     - or not positioned at all (is in IAM_IT_SKEWED
2345                  *     state)---position it on the next item.
2346                  */
2347                 rc = iops->next(env, it);
2348         } else if (rc > 0)
2349                 rc = 0;
2350
2351         /*
2352          * At this point and across for-loop:
2353          *
2354          *  rc == 0 -> ok, proceed.
2355          *  rc >  0 -> end of directory.
2356          *  rc <  0 -> error.
2357          */
2358         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2359              i++, nob -= CFS_PAGE_SIZE) {
2360                 struct lu_dirpage *dp;
2361
2362                 LASSERT(i < rdpg->rp_npages);
2363                 pg = rdpg->rp_pages[i];
2364                 dp = cfs_kmap(pg);
2365 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2366 repeat:
2367 #endif
2368                 rc = mdd_dir_page_build(env, mdd, dp,
2369                                         min_t(int, nob, LU_PAGE_SIZE),
2370                                         iops, it, rdpg->rp_attrs);
2371                 if (rc > 0) {
2372                         /*
2373                          * end of directory.
2374                          */
2375                         dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2376                         nlupgs++;
2377                 } else if (rc < 0) {
2378                         CWARN("build page failed: %d!\n", rc);
2379                 } else {
2380                         nlupgs++;
2381 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2382                         dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2383                         if ((unsigned long)dp & ~CFS_PAGE_MASK)
2384                                 goto repeat;
2385 #endif
2386                 }
2387                 cfs_kunmap(pg);
2388         }
2389         if (rc >= 0) {
2390                 struct lu_dirpage *dp;
2391
2392                 dp = cfs_kmap(rdpg->rp_pages[0]);
2393                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2394                 if (nlupgs == 0) {
2395                         /*
2396                          * No pages were processed, mark this for first page
2397                          * and send back.
2398                          */
2399                         dp->ldp_flags  = cpu_to_le32(LDF_EMPTY);
2400                         nlupgs = 1;
2401                 }
2402                 cfs_kunmap(rdpg->rp_pages[0]);
2403
2404                 rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
2405         }
2406         iops->put(env, it);
2407         iops->fini(env, it);
2408
2409         return rc;
2410 }
2411
2412 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2413                  const struct lu_rdpg *rdpg)
2414 {
2415         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2416         int rc;
2417         ENTRY;
2418
2419         LASSERT(mdd_object_exists(mdd_obj));
2420
2421         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2422         rc = mdd_readpage_sanity_check(env, mdd_obj);
2423         if (rc)
2424                 GOTO(out_unlock, rc);
2425
2426         if (mdd_is_dead_obj(mdd_obj)) {
2427                 struct page *pg;
2428                 struct lu_dirpage *dp;
2429
2430                 /*
2431                  * According to POSIX, please do not return any entry to client:
2432                  * even dot and dotdot should not be returned.
2433                  */
2434                 CWARN("readdir from dead object: "DFID"\n",
2435                         PFID(mdd_object_fid(mdd_obj)));
2436
2437                 if (rdpg->rp_count <= 0)
2438                         GOTO(out_unlock, rc = -EFAULT);
2439                 LASSERT(rdpg->rp_pages != NULL);
2440
2441                 pg = rdpg->rp_pages[0];
2442                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2443                 memset(dp, 0 , sizeof(struct lu_dirpage));
2444                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2445                 dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
2446                 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2447                 cfs_kunmap(pg);
2448                 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2449         }
2450
2451         rc = __mdd_readpage(env, mdd_obj, rdpg);
2452
2453         EXIT;
2454 out_unlock:
2455         mdd_read_unlock(env, mdd_obj);
2456         return rc;
2457 }
2458
2459 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2460 {
2461         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2462         struct dt_object *next;
2463
2464         LASSERT(mdd_object_exists(mdd_obj));
2465         next = mdd_object_child(mdd_obj);
2466         return next->do_ops->do_object_sync(env, next);
2467 }
2468
2469 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2470                                         struct md_object *obj)
2471 {
2472         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2473
2474         LASSERT(mdd_object_exists(mdd_obj));
2475         return do_version_get(env, mdd_object_child(mdd_obj));
2476 }
2477
2478 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2479                             dt_obj_version_t version)
2480 {
2481         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2482
2483         LASSERT(mdd_object_exists(mdd_obj));
2484         do_version_set(env, mdd_object_child(mdd_obj), version);
2485 }
2486
2487 const struct md_object_operations mdd_obj_ops = {
2488         .moo_permission    = mdd_permission,
2489         .moo_attr_get      = mdd_attr_get,
2490         .moo_attr_set      = mdd_attr_set,
2491         .moo_xattr_get     = mdd_xattr_get,
2492         .moo_xattr_set     = mdd_xattr_set,
2493         .moo_xattr_list    = mdd_xattr_list,
2494         .moo_xattr_del     = mdd_xattr_del,
2495         .moo_object_create = mdd_object_create,
2496         .moo_ref_add       = mdd_ref_add,
2497         .moo_ref_del       = mdd_ref_del,
2498         .moo_open          = mdd_open,
2499         .moo_close         = mdd_close,
2500         .moo_readpage      = mdd_readpage,
2501         .moo_readlink      = mdd_readlink,
2502         .moo_changelog     = mdd_changelog,
2503         .moo_capa_get      = mdd_capa_get,
2504         .moo_object_sync   = mdd_object_sync,
2505         .moo_version_get   = mdd_version_get,
2506         .moo_version_set   = mdd_version_set,
2507         .moo_path          = mdd_path,
2508         .moo_file_lock     = mdd_file_lock,
2509         .moo_file_unlock   = mdd_file_unlock,
2510 };