Whamcloud - gitweb
553e04a3627456704a3f53e58665fc71e532a65a
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_object.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49 #ifdef HAVE_EXT4_LDISKFS
50 #include <ldiskfs/ldiskfs_jbd2.h>
51 #else
52 #include <linux/jbd.h>
53 #endif
54 #include <obd.h>
55 #include <obd_class.h>
56 #include <obd_support.h>
57 #include <lprocfs_status.h>
58 /* fid_be_cpu(), fid_cpu_to_be(). */
59 #include <lustre_fid.h>
60
61 #include <lustre_param.h>
62 #ifdef HAVE_EXT4_LDISKFS
63 #include <ldiskfs/ldiskfs.h>
64 #else
65 #include <linux/ldiskfs_fs.h>
66 #endif
67 #include <lustre_mds.h>
68 #include <lustre/lustre_idl.h>
69
70 #include "mdd_internal.h"
71
72 static const struct lu_object_operations mdd_lu_obj_ops;
73
74 static int mdd_xattr_get(const struct lu_env *env,
75                          struct md_object *obj, struct lu_buf *buf,
76                          const char *name);
77
78 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
79                  void **data)
80 {
81         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
82                  PFID(mdd_object_fid(obj)));
83         mdo_data_get(env, obj, data);
84         return 0;
85 }
86
87 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
88                struct lu_attr *la, struct lustre_capa *capa)
89 {
90         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
91                  PFID(mdd_object_fid(obj)));
92         return mdo_attr_get(env, obj, la, capa);
93 }
94
95 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
96 {
97         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
98
99         if (flags & LUSTRE_APPEND_FL)
100                 obj->mod_flags |= APPEND_OBJ;
101
102         if (flags & LUSTRE_IMMUTABLE_FL)
103                 obj->mod_flags |= IMMUTE_OBJ;
104 }
105
106 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
107 {
108         struct mdd_thread_info *info;
109
110         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
111         LASSERT(info != NULL);
112         return info;
113 }
114
115 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
116 {
117         struct lu_buf *buf;
118
119         buf = &mdd_env_info(env)->mti_buf;
120         buf->lb_buf = area;
121         buf->lb_len = len;
122         return buf;
123 }
124
125 void mdd_buf_put(struct lu_buf *buf)
126 {
127         if (buf == NULL || buf->lb_buf == NULL)
128                 return;
129         OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
130         buf->lb_buf = NULL;
131         buf->lb_len = 0;
132 }
133
134 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
135                                        const void *area, ssize_t len)
136 {
137         struct lu_buf *buf;
138
139         buf = &mdd_env_info(env)->mti_buf;
140         buf->lb_buf = (void *)area;
141         buf->lb_len = len;
142         return buf;
143 }
144
145 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
146 {
147         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
148
149         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
150                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
151                 buf->lb_buf = NULL;
152         }
153         if (buf->lb_buf == NULL) {
154                 buf->lb_len = len;
155                 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
156                 if (buf->lb_buf == NULL)
157                         buf->lb_len = 0;
158         }
159         return buf;
160 }
161
162 /** Increase the size of the \a mti_big_buf.
163  * preserves old data in buffer
164  * old buffer remains unchanged on error
165  * \retval 0 or -ENOMEM
166  */
167 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
168 {
169         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
170         struct lu_buf buf;
171
172         LASSERT(len >= oldbuf->lb_len);
173         OBD_ALLOC_LARGE(buf.lb_buf, len);
174
175         if (buf.lb_buf == NULL)
176                 return -ENOMEM;
177
178         buf.lb_len = len;
179         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
180
181         OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
182
183         memcpy(oldbuf, &buf, sizeof(buf));
184
185         return 0;
186 }
187
188 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
189                                        struct mdd_device *mdd)
190 {
191         struct mdd_thread_info *mti = mdd_env_info(env);
192         int                     max_cookie_size;
193
194         max_cookie_size = mdd_lov_cookiesize(env, mdd);
195         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
196                 if (mti->mti_max_cookie)
197                         OBD_FREE_LARGE(mti->mti_max_cookie,
198                                        mti->mti_max_cookie_size);
199                 mti->mti_max_cookie = NULL;
200                 mti->mti_max_cookie_size = 0;
201         }
202         if (unlikely(mti->mti_max_cookie == NULL)) {
203                 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
204                 if (likely(mti->mti_max_cookie != NULL))
205                         mti->mti_max_cookie_size = max_cookie_size;
206         }
207         if (likely(mti->mti_max_cookie != NULL))
208                 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
209         return mti->mti_max_cookie;
210 }
211
212 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
213                                    struct mdd_device *mdd)
214 {
215         struct mdd_thread_info *mti = mdd_env_info(env);
216         int                     max_lmm_size;
217
218         max_lmm_size = mdd_lov_mdsize(env, mdd);
219         if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
220                 if (mti->mti_max_lmm)
221                         OBD_FREE_LARGE(mti->mti_max_lmm, mti->mti_max_lmm_size);
222                 mti->mti_max_lmm = NULL;
223                 mti->mti_max_lmm_size = 0;
224         }
225         if (unlikely(mti->mti_max_lmm == NULL)) {
226                 OBD_ALLOC_LARGE(mti->mti_max_lmm, max_lmm_size);
227                 if (likely(mti->mti_max_lmm != NULL))
228                         mti->mti_max_lmm_size = max_lmm_size;
229         }
230         return mti->mti_max_lmm;
231 }
232
233 struct lu_object *mdd_object_alloc(const struct lu_env *env,
234                                    const struct lu_object_header *hdr,
235                                    struct lu_device *d)
236 {
237         struct mdd_object *mdd_obj;
238
239         OBD_ALLOC_PTR(mdd_obj);
240         if (mdd_obj != NULL) {
241                 struct lu_object *o;
242
243                 o = mdd2lu_obj(mdd_obj);
244                 lu_object_init(o, NULL, d);
245                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
246                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
247                 mdd_obj->mod_count = 0;
248                 o->lo_ops = &mdd_lu_obj_ops;
249                 return o;
250         } else {
251                 return NULL;
252         }
253 }
254
255 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
256                            const struct lu_object_conf *unused)
257 {
258         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
259         struct mdd_object *mdd_obj = lu2mdd_obj(o);
260         struct lu_object  *below;
261         struct lu_device  *under;
262         ENTRY;
263
264         mdd_obj->mod_cltime = 0;
265         under = &d->mdd_child->dd_lu_dev;
266         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
267         mdd_pdlock_init(mdd_obj);
268         if (below == NULL)
269                 RETURN(-ENOMEM);
270
271         lu_object_add(o, below);
272
273         RETURN(0);
274 }
275
276 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
277 {
278         if (lu_object_exists(o))
279                 return mdd_get_flags(env, lu2mdd_obj(o));
280         else
281                 return 0;
282 }
283
284 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
285 {
286         struct mdd_object *mdd = lu2mdd_obj(o);
287
288         lu_object_fini(o);
289         OBD_FREE_PTR(mdd);
290 }
291
292 static int mdd_object_print(const struct lu_env *env, void *cookie,
293                             lu_printer_t p, const struct lu_object *o)
294 {
295         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
296         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
297                     "valid=%x, cltime="LPU64", flags=%lx)",
298                     mdd, mdd->mod_count, mdd->mod_valid,
299                     mdd->mod_cltime, mdd->mod_flags);
300 }
301
302 static const struct lu_object_operations mdd_lu_obj_ops = {
303         .loo_object_init    = mdd_object_init,
304         .loo_object_start   = mdd_object_start,
305         .loo_object_free    = mdd_object_free,
306         .loo_object_print   = mdd_object_print,
307 };
308
309 struct mdd_object *mdd_object_find(const struct lu_env *env,
310                                    struct mdd_device *d,
311                                    const struct lu_fid *f)
312 {
313         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
314 }
315
316 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
317                         const char *path, struct lu_fid *fid)
318 {
319         struct lu_buf *buf;
320         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
321         struct mdd_object *obj;
322         struct lu_name *lname = &mdd_env_info(env)->mti_name;
323         char *name;
324         int rc = 0;
325         ENTRY;
326
327         /* temp buffer for path element */
328         buf = mdd_buf_alloc(env, PATH_MAX);
329         if (buf->lb_buf == NULL)
330                 RETURN(-ENOMEM);
331
332         lname->ln_name = name = buf->lb_buf;
333         lname->ln_namelen = 0;
334         *f = mdd->mdd_root_fid;
335
336         while(1) {
337                 while (*path == '/')
338                         path++;
339                 if (*path == '\0')
340                         break;
341                 while (*path != '/' && *path != '\0') {
342                         *name = *path;
343                         path++;
344                         name++;
345                         lname->ln_namelen++;
346                 }
347
348                 *name = '\0';
349                 /* find obj corresponding to fid */
350                 obj = mdd_object_find(env, mdd, f);
351                 if (obj == NULL)
352                         GOTO(out, rc = -EREMOTE);
353                 if (IS_ERR(obj))
354                         GOTO(out, rc = PTR_ERR(obj));
355                 /* get child fid from parent and name */
356                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
357                 mdd_object_put(env, obj);
358                 if (rc)
359                         break;
360
361                 name = buf->lb_buf;
362                 lname->ln_namelen = 0;
363         }
364
365         if (!rc)
366                 *fid = *f;
367 out:
368         RETURN(rc);
369 }
370
371 /** The maximum depth that fid2path() will search.
372  * This is limited only because we want to store the fids for
373  * historical path lookup purposes.
374  */
375 #define MAX_PATH_DEPTH 100
376
377 /** mdd_path() lookup structure. */
378 struct path_lookup_info {
379         __u64                pli_recno;        /**< history point */
380         __u64                pli_currec;       /**< current record */
381         struct lu_fid        pli_fid;
382         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
383         struct mdd_object   *pli_mdd_obj;
384         char                *pli_path;         /**< full path */
385         int                  pli_pathlen;
386         int                  pli_linkno;       /**< which hardlink to follow */
387         int                  pli_fidcount;     /**< number of \a pli_fids */
388 };
389
390 static int mdd_path_current(const struct lu_env *env,
391                             struct path_lookup_info *pli)
392 {
393         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
394         struct mdd_object *mdd_obj;
395         struct lu_buf     *buf = NULL;
396         struct link_ea_header *leh;
397         struct link_ea_entry  *lee;
398         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
399         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
400         char *ptr;
401         int reclen;
402         int rc;
403         ENTRY;
404
405         ptr = pli->pli_path + pli->pli_pathlen - 1;
406         *ptr = 0;
407         --ptr;
408         pli->pli_fidcount = 0;
409         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
410
411         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
412                 mdd_obj = mdd_object_find(env, mdd,
413                                           &pli->pli_fids[pli->pli_fidcount]);
414                 if (mdd_obj == NULL)
415                         GOTO(out, rc = -EREMOTE);
416                 if (IS_ERR(mdd_obj))
417                         GOTO(out, rc = PTR_ERR(mdd_obj));
418                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
419                 if (rc <= 0) {
420                         mdd_object_put(env, mdd_obj);
421                         if (rc == -1)
422                                 rc = -EREMOTE;
423                         else if (rc == 0)
424                                 /* Do I need to error out here? */
425                                 rc = -ENOENT;
426                         GOTO(out, rc);
427                 }
428
429                 /* Get parent fid and object name */
430                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
431                 buf = mdd_links_get(env, mdd_obj);
432                 mdd_read_unlock(env, mdd_obj);
433                 mdd_object_put(env, mdd_obj);
434                 if (IS_ERR(buf))
435                         GOTO(out, rc = PTR_ERR(buf));
436
437                 leh = buf->lb_buf;
438                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
439                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
440
441                 /* If set, use link #linkno for path lookup, otherwise use
442                    link #0.  Only do this for the final path element. */
443                 if ((pli->pli_fidcount == 0) &&
444                     (pli->pli_linkno < leh->leh_reccount)) {
445                         int count;
446                         for (count = 0; count < pli->pli_linkno; count++) {
447                                 lee = (struct link_ea_entry *)
448                                      ((char *)lee + reclen);
449                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
450                         }
451                         if (pli->pli_linkno < leh->leh_reccount - 1)
452                                 /* indicate to user there are more links */
453                                 pli->pli_linkno++;
454                 }
455
456                 /* Pack the name in the end of the buffer */
457                 ptr -= tmpname->ln_namelen;
458                 if (ptr - 1 <= pli->pli_path)
459                         GOTO(out, rc = -EOVERFLOW);
460                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
461                 *(--ptr) = '/';
462
463                 /* Store the parent fid for historic lookup */
464                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
465                         GOTO(out, rc = -EOVERFLOW);
466                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
467         }
468
469         /* Verify that our path hasn't changed since we started the lookup.
470            Record the current index, and verify the path resolves to the
471            same fid. If it does, then the path is correct as of this index. */
472         cfs_spin_lock(&mdd->mdd_cl.mc_lock);
473         pli->pli_currec = mdd->mdd_cl.mc_index;
474         cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
475         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
476         if (rc) {
477                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
478                 GOTO (out, rc = -EAGAIN);
479         }
480         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
481                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
482                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
483                        PFID(&pli->pli_fid));
484                 GOTO(out, rc = -EAGAIN);
485         }
486         ptr++; /* skip leading / */
487         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
488
489         EXIT;
490 out:
491         if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
492                 /* if we vmalloced a large buffer drop it */
493                 mdd_buf_put(buf);
494
495         return rc;
496 }
497
498 static int mdd_path_historic(const struct lu_env *env,
499                              struct path_lookup_info *pli)
500 {
501         return 0;
502 }
503
504 /* Returns the full path to this fid, as of changelog record recno. */
505 static int mdd_path(const struct lu_env *env, struct md_object *obj,
506                     char *path, int pathlen, __u64 *recno, int *linkno)
507 {
508         struct path_lookup_info *pli;
509         int tries = 3;
510         int rc = -EAGAIN;
511         ENTRY;
512
513         if (pathlen < 3)
514                 RETURN(-EOVERFLOW);
515
516         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
517                 path[0] = '\0';
518                 RETURN(0);
519         }
520
521         OBD_ALLOC_PTR(pli);
522         if (pli == NULL)
523                 RETURN(-ENOMEM);
524
525         pli->pli_mdd_obj = md2mdd_obj(obj);
526         pli->pli_recno = *recno;
527         pli->pli_path = path;
528         pli->pli_pathlen = pathlen;
529         pli->pli_linkno = *linkno;
530
531         /* Retry multiple times in case file is being moved */
532         while (tries-- && rc == -EAGAIN)
533                 rc = mdd_path_current(env, pli);
534
535         /* For historical path lookup, the current links may not have existed
536          * at "recno" time.  We must switch over to earlier links/parents
537          * by using the changelog records.  If the earlier parent doesn't
538          * exist, we must search back through the changelog to reconstruct
539          * its parents, then check if it exists, etc.
540          * We may ignore this problem for the initial implementation and
541          * state that an "original" hardlink must still exist for us to find
542          * historic path name. */
543         if (pli->pli_recno != -1) {
544                 rc = mdd_path_historic(env, pli);
545         } else {
546                 *recno = pli->pli_currec;
547                 /* Return next link index to caller */
548                 *linkno = pli->pli_linkno;
549         }
550
551         OBD_FREE_PTR(pli);
552
553         RETURN (rc);
554 }
555
556 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
557 {
558         struct lu_attr *la = &mdd_env_info(env)->mti_la;
559         int rc;
560
561         ENTRY;
562         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
563         if (rc == 0) {
564                 mdd_flags_xlate(obj, la->la_flags);
565                 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
566                         obj->mod_flags |= MNLINK_OBJ;
567         }
568         RETURN(rc);
569 }
570
571 /* get only inode attributes */
572 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
573                   struct md_attr *ma)
574 {
575         int rc = 0;
576         ENTRY;
577
578         if (ma->ma_valid & MA_INODE)
579                 RETURN(0);
580
581         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
582                           mdd_object_capa(env, mdd_obj));
583         if (rc == 0)
584                 ma->ma_valid |= MA_INODE;
585         RETURN(rc);
586 }
587
588 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
589 {
590         struct lov_desc *ldesc;
591         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
592         struct lov_user_md *lum = (struct lov_user_md*)lmm;
593         ENTRY;
594
595         if (!lum)
596                 RETURN(0);
597
598         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
599         LASSERT(ldesc != NULL);
600
601         lum->lmm_magic = LOV_MAGIC_V1;
602         lum->lmm_object_seq = LOV_OBJECT_GROUP_DEFAULT;
603         lum->lmm_pattern = ldesc->ld_pattern;
604         lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
605         lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
606         lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
607
608         RETURN(sizeof(*lum));
609 }
610
611 static int is_rootdir(struct mdd_object *mdd_obj)
612 {
613         const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
614         const struct lu_fid *fid = mdo2fid(mdd_obj);
615
616         return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
617 }
618
619 /* get lov EA only */
620 static int __mdd_lmm_get(const struct lu_env *env,
621                          struct mdd_object *mdd_obj, struct md_attr *ma)
622 {
623         int rc;
624         ENTRY;
625
626         if (ma->ma_valid & MA_LOV)
627                 RETURN(0);
628
629         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
630                         XATTR_NAME_LOV);
631         if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
632                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
633         if (rc > 0) {
634                 ma->ma_lmm_size = rc;
635                 ma->ma_valid |= MA_LOV;
636                 rc = 0;
637         }
638         RETURN(rc);
639 }
640
641 /* get the first parent fid from link EA */
642 static int mdd_pfid_get(const struct lu_env *env,
643                         struct mdd_object *mdd_obj, struct md_attr *ma)
644 {
645         struct lu_buf *buf;
646         struct link_ea_header *leh;
647         struct link_ea_entry *lee;
648         struct lu_fid *pfid = &ma->ma_pfid;
649         ENTRY;
650
651         if (ma->ma_valid & MA_PFID)
652                 RETURN(0);
653
654         buf = mdd_links_get(env, mdd_obj);
655         if (IS_ERR(buf))
656                 RETURN(PTR_ERR(buf));
657
658         leh = buf->lb_buf;
659         lee = (struct link_ea_entry *)(leh + 1);
660         memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
661         fid_be_to_cpu(pfid, pfid);
662         ma->ma_valid |= MA_PFID;
663         if (buf->lb_len > OBD_ALLOC_BIG)
664                 /* if we vmalloced a large buffer drop it */
665                 mdd_buf_put(buf);
666         RETURN(0);
667 }
668
669 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
670                        struct md_attr *ma)
671 {
672         int rc;
673         ENTRY;
674
675         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
676         rc = __mdd_lmm_get(env, mdd_obj, ma);
677         mdd_read_unlock(env, mdd_obj);
678         RETURN(rc);
679 }
680
681 /* get lmv EA only*/
682 static int __mdd_lmv_get(const struct lu_env *env,
683                          struct mdd_object *mdd_obj, struct md_attr *ma)
684 {
685         int rc;
686         ENTRY;
687
688         if (ma->ma_valid & MA_LMV)
689                 RETURN(0);
690
691         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
692                         XATTR_NAME_LMV);
693         if (rc > 0) {
694                 ma->ma_valid |= MA_LMV;
695                 rc = 0;
696         }
697         RETURN(rc);
698 }
699
700 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
701                          struct md_attr *ma)
702 {
703         struct mdd_thread_info *info = mdd_env_info(env);
704         struct lustre_mdt_attrs *lma =
705                                  (struct lustre_mdt_attrs *)info->mti_xattr_buf;
706         int lma_size;
707         int rc;
708         ENTRY;
709
710         /* If all needed data are already valid, nothing to do */
711         if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
712             (ma->ma_need & (MA_HSM | MA_SOM)))
713                 RETURN(0);
714
715         /* Read LMA from disk EA */
716         lma_size = sizeof(info->mti_xattr_buf);
717         rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
718         if (rc <= 0)
719                 RETURN(rc);
720
721         /* Useless to check LMA incompatibility because this is already done in
722          * osd_ea_fid_get(), and this will fail long before this code is
723          * called.
724          * So, if we are here, LMA is compatible.
725          */
726
727         lustre_lma_swab(lma);
728
729         /* Swab and copy LMA */
730         if (ma->ma_need & MA_HSM) {
731                 if (lma->lma_compat & LMAC_HSM)
732                         ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
733                 else
734                         ma->ma_hsm.mh_flags = 0;
735                 ma->ma_valid |= MA_HSM;
736         }
737
738         /* Copy SOM */
739         if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
740                 LASSERT(ma->ma_som != NULL);
741                 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
742                 ma->ma_som->msd_size    = lma->lma_som_size;
743                 ma->ma_som->msd_blocks  = lma->lma_som_blocks;
744                 ma->ma_som->msd_mountid = lma->lma_som_mountid;
745                 ma->ma_valid |= MA_SOM;
746         }
747
748         RETURN(0);
749 }
750
751 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
752                                  struct md_attr *ma)
753 {
754         int rc = 0;
755         ENTRY;
756
757         if (ma->ma_need & MA_INODE)
758                 rc = mdd_iattr_get(env, mdd_obj, ma);
759
760         if (rc == 0 && ma->ma_need & MA_LOV) {
761                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
762                     S_ISDIR(mdd_object_type(mdd_obj)))
763                         rc = __mdd_lmm_get(env, mdd_obj, ma);
764         }
765         if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
766                 if (S_ISREG(mdd_object_type(mdd_obj)))
767                         rc = mdd_pfid_get(env, mdd_obj, ma);
768         }
769         if (rc == 0 && ma->ma_need & MA_LMV) {
770                 if (S_ISDIR(mdd_object_type(mdd_obj)))
771                         rc = __mdd_lmv_get(env, mdd_obj, ma);
772         }
773         if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
774                 if (S_ISREG(mdd_object_type(mdd_obj)))
775                         rc = __mdd_lma_get(env, mdd_obj, ma);
776         }
777 #ifdef CONFIG_FS_POSIX_ACL
778         if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
779                 if (S_ISDIR(mdd_object_type(mdd_obj)))
780                         rc = mdd_def_acl_get(env, mdd_obj, ma);
781         }
782 #endif
783         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
784                rc, ma->ma_valid, ma->ma_lmm);
785         RETURN(rc);
786 }
787
788 int mdd_attr_get_internal_locked(const struct lu_env *env,
789                                  struct mdd_object *mdd_obj, struct md_attr *ma)
790 {
791         int rc;
792         int needlock = ma->ma_need &
793                        (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
794
795         if (needlock)
796                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
797         rc = mdd_attr_get_internal(env, mdd_obj, ma);
798         if (needlock)
799                 mdd_read_unlock(env, mdd_obj);
800         return rc;
801 }
802
803 /*
804  * No permission check is needed.
805  */
806 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
807                         struct md_attr *ma)
808 {
809         struct mdd_object *mdd_obj = md2mdd_obj(obj);
810         int                rc;
811
812         ENTRY;
813         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
814         RETURN(rc);
815 }
816
817 /*
818  * No permission check is needed.
819  */
820 static int mdd_xattr_get(const struct lu_env *env,
821                          struct md_object *obj, struct lu_buf *buf,
822                          const char *name)
823 {
824         struct mdd_object *mdd_obj = md2mdd_obj(obj);
825         int rc;
826
827         ENTRY;
828
829         LASSERT(mdd_object_exists(mdd_obj));
830
831         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
832         rc = mdo_xattr_get(env, mdd_obj, buf, name,
833                            mdd_object_capa(env, mdd_obj));
834         mdd_read_unlock(env, mdd_obj);
835
836         RETURN(rc);
837 }
838
839 /*
840  * Permission check is done when open,
841  * no need check again.
842  */
843 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
844                         struct lu_buf *buf)
845 {
846         struct mdd_object *mdd_obj = md2mdd_obj(obj);
847         struct dt_object  *next;
848         loff_t             pos = 0;
849         int                rc;
850         ENTRY;
851
852         LASSERT(mdd_object_exists(mdd_obj));
853
854         next = mdd_object_child(mdd_obj);
855         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
856         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
857                                          mdd_object_capa(env, mdd_obj));
858         mdd_read_unlock(env, mdd_obj);
859         RETURN(rc);
860 }
861
862 /*
863  * No permission check is needed.
864  */
865 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
866                           struct lu_buf *buf)
867 {
868         struct mdd_object *mdd_obj = md2mdd_obj(obj);
869         int rc;
870
871         ENTRY;
872
873         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
874         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
875         mdd_read_unlock(env, mdd_obj);
876
877         RETURN(rc);
878 }
879
880 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
881                                struct mdd_object *c, struct md_attr *ma,
882                                struct thandle *handle,
883                                const struct md_op_spec *spec)
884 {
885         struct lu_attr *attr = &ma->ma_attr;
886         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
887         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
888         const struct dt_index_features *feat = spec->sp_feat;
889         int rc;
890         ENTRY;
891
892         if (!mdd_object_exists(c)) {
893                 struct dt_object *next = mdd_object_child(c);
894                 LASSERT(next);
895
896                 if (feat != &dt_directory_features && feat != NULL)
897                         dof->dof_type = DFT_INDEX;
898                 else
899                         dof->dof_type = dt_mode_to_dft(attr->la_mode);
900
901                 dof->u.dof_idx.di_feat = feat;
902
903                 /* @hint will be initialized by underlying device. */
904                 next->do_ops->do_ah_init(env, hint,
905                                          p ? mdd_object_child(p) : NULL,
906                                          attr->la_mode & S_IFMT);
907
908                 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
909                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
910         } else
911                 rc = -EEXIST;
912
913         RETURN(rc);
914 }
915
916 /**
917  * Make sure the ctime is increased only.
918  */
919 static inline int mdd_attr_check(const struct lu_env *env,
920                                  struct mdd_object *obj,
921                                  struct lu_attr *attr)
922 {
923         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
924         int rc;
925         ENTRY;
926
927         if (attr->la_valid & LA_CTIME) {
928                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
929                 if (rc)
930                         RETURN(rc);
931
932                 if (attr->la_ctime < tmp_la->la_ctime)
933                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
934                 else if (attr->la_valid == LA_CTIME &&
935                          attr->la_ctime == tmp_la->la_ctime)
936                         attr->la_valid &= ~LA_CTIME;
937         }
938         RETURN(0);
939 }
940
941 int mdd_attr_set_internal(const struct lu_env *env,
942                           struct mdd_object *obj,
943                           struct lu_attr *attr,
944                           struct thandle *handle,
945                           int needacl)
946 {
947         int rc;
948         ENTRY;
949
950         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
951 #ifdef CONFIG_FS_POSIX_ACL
952         if (!rc && (attr->la_valid & LA_MODE) && needacl)
953                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
954 #endif
955         RETURN(rc);
956 }
957
958 int mdd_attr_check_set_internal(const struct lu_env *env,
959                                 struct mdd_object *obj,
960                                 struct lu_attr *attr,
961                                 struct thandle *handle,
962                                 int needacl)
963 {
964         int rc;
965         ENTRY;
966
967         rc = mdd_attr_check(env, obj, attr);
968         if (rc)
969                 RETURN(rc);
970
971         if (attr->la_valid)
972                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
973         RETURN(rc);
974 }
975
976 static int mdd_attr_set_internal_locked(const struct lu_env *env,
977                                         struct mdd_object *obj,
978                                         struct lu_attr *attr,
979                                         struct thandle *handle,
980                                         int needacl)
981 {
982         int rc;
983         ENTRY;
984
985         needacl = needacl && (attr->la_valid & LA_MODE);
986         if (needacl)
987                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
988         rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
989         if (needacl)
990                 mdd_write_unlock(env, obj);
991         RETURN(rc);
992 }
993
994 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
995                                        struct mdd_object *obj,
996                                        struct lu_attr *attr,
997                                        struct thandle *handle,
998                                        int needacl)
999 {
1000         int rc;
1001         ENTRY;
1002
1003         needacl = needacl && (attr->la_valid & LA_MODE);
1004         if (needacl)
1005                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1006         rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
1007         if (needacl)
1008                 mdd_write_unlock(env, obj);
1009         RETURN(rc);
1010 }
1011
1012 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
1013                     const struct lu_buf *buf, const char *name,
1014                     int fl, struct thandle *handle)
1015 {
1016         struct lustre_capa *capa = mdd_object_capa(env, obj);
1017         int rc = -EINVAL;
1018         ENTRY;
1019
1020         if (buf->lb_buf && buf->lb_len > 0)
1021                 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1022         else if (buf->lb_buf == NULL && buf->lb_len == 0)
1023                 rc = mdo_xattr_del(env, obj, name, handle, capa);
1024
1025         RETURN(rc);
1026 }
1027
1028 /*
1029  * This gives the same functionality as the code between
1030  * sys_chmod and inode_setattr
1031  * chown_common and inode_setattr
1032  * utimes and inode_setattr
1033  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1034  */
1035 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1036                         struct lu_attr *la, const struct md_attr *ma)
1037 {
1038         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
1039         struct md_ucred  *uc;
1040         int               rc;
1041         ENTRY;
1042
1043         if (!la->la_valid)
1044                 RETURN(0);
1045
1046         /* Do not permit change file type */
1047         if (la->la_valid & LA_TYPE)
1048                 RETURN(-EPERM);
1049
1050         /* They should not be processed by setattr */
1051         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1052                 RETURN(-EPERM);
1053
1054         /* export destroy does not have ->le_ses, but we may want
1055          * to drop LUSTRE_SOM_FL. */
1056         if (!env->le_ses)
1057                 RETURN(0);
1058
1059         uc = md_ucred(env);
1060
1061         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1062         if (rc)
1063                 RETURN(rc);
1064
1065         if (la->la_valid == LA_CTIME) {
1066                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1067                         /* This is only for set ctime when rename's source is
1068                          * on remote MDS. */
1069                         rc = mdd_may_delete(env, NULL, obj,
1070                                             (struct md_attr *)ma, 1, 0);
1071                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1072                         la->la_valid &= ~LA_CTIME;
1073                 RETURN(rc);
1074         }
1075
1076         if (la->la_valid == LA_ATIME) {
1077                 /* This is atime only set for read atime update on close. */
1078                 if (la->la_atime >= tmp_la->la_atime &&
1079                     la->la_atime < (tmp_la->la_atime +
1080                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1081                         la->la_valid &= ~LA_ATIME;
1082                 RETURN(0);
1083         }
1084
1085         /* Check if flags change. */
1086         if (la->la_valid & LA_FLAGS) {
1087                 unsigned int oldflags = 0;
1088                 unsigned int newflags = la->la_flags &
1089                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1090
1091                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1092                     !mdd_capable(uc, CFS_CAP_FOWNER))
1093                         RETURN(-EPERM);
1094
1095                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1096                  * only be changed by the relevant capability. */
1097                 if (mdd_is_immutable(obj))
1098                         oldflags |= LUSTRE_IMMUTABLE_FL;
1099                 if (mdd_is_append(obj))
1100                         oldflags |= LUSTRE_APPEND_FL;
1101                 if ((oldflags ^ newflags) &&
1102                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1103                         RETURN(-EPERM);
1104
1105                 if (!S_ISDIR(tmp_la->la_mode))
1106                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1107         }
1108
1109         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1110             (la->la_valid & ~LA_FLAGS) &&
1111             !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1112                 RETURN(-EPERM);
1113
1114         /* Check for setting the obj time. */
1115         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1116             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1117                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1118                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
1119                         rc = mdd_permission_internal_locked(env, obj, tmp_la,
1120                                                             MAY_WRITE,
1121                                                             MOR_TGT_CHILD);
1122                         if (rc)
1123                                 RETURN(rc);
1124                 }
1125         }
1126
1127         if (la->la_valid & LA_KILL_SUID) {
1128                 la->la_valid &= ~LA_KILL_SUID;
1129                 if ((tmp_la->la_mode & S_ISUID) &&
1130                     !(la->la_valid & LA_MODE)) {
1131                         la->la_mode = tmp_la->la_mode;
1132                         la->la_valid |= LA_MODE;
1133                 }
1134                 la->la_mode &= ~S_ISUID;
1135         }
1136
1137         if (la->la_valid & LA_KILL_SGID) {
1138                 la->la_valid &= ~LA_KILL_SGID;
1139                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1140                                         (S_ISGID | S_IXGRP)) &&
1141                     !(la->la_valid & LA_MODE)) {
1142                         la->la_mode = tmp_la->la_mode;
1143                         la->la_valid |= LA_MODE;
1144                 }
1145                 la->la_mode &= ~S_ISGID;
1146         }
1147
1148         /* Make sure a caller can chmod. */
1149         if (la->la_valid & LA_MODE) {
1150                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1151                     (uc->mu_fsuid != tmp_la->la_uid) &&
1152                     !mdd_capable(uc, CFS_CAP_FOWNER))
1153                         RETURN(-EPERM);
1154
1155                 if (la->la_mode == (cfs_umode_t) -1)
1156                         la->la_mode = tmp_la->la_mode;
1157                 else
1158                         la->la_mode = (la->la_mode & S_IALLUGO) |
1159                                       (tmp_la->la_mode & ~S_IALLUGO);
1160
1161                 /* Also check the setgid bit! */
1162                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1163                                        la->la_gid : tmp_la->la_gid) &&
1164                     !mdd_capable(uc, CFS_CAP_FSETID))
1165                         la->la_mode &= ~S_ISGID;
1166         } else {
1167                la->la_mode = tmp_la->la_mode;
1168         }
1169
1170         /* Make sure a caller can chown. */
1171         if (la->la_valid & LA_UID) {
1172                 if (la->la_uid == (uid_t) -1)
1173                         la->la_uid = tmp_la->la_uid;
1174                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1175                     (la->la_uid != tmp_la->la_uid)) &&
1176                     !mdd_capable(uc, CFS_CAP_CHOWN))
1177                         RETURN(-EPERM);
1178
1179                 /* If the user or group of a non-directory has been
1180                  * changed by a non-root user, remove the setuid bit.
1181                  * 19981026 David C Niemi <niemi@tux.org>
1182                  *
1183                  * Changed this to apply to all users, including root,
1184                  * to avoid some races. This is the behavior we had in
1185                  * 2.0. The check for non-root was definitely wrong
1186                  * for 2.2 anyway, as it should have been using
1187                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
1188                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1189                     !S_ISDIR(tmp_la->la_mode)) {
1190                         la->la_mode &= ~S_ISUID;
1191                         la->la_valid |= LA_MODE;
1192                 }
1193         }
1194
1195         /* Make sure caller can chgrp. */
1196         if (la->la_valid & LA_GID) {
1197                 if (la->la_gid == (gid_t) -1)
1198                         la->la_gid = tmp_la->la_gid;
1199                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1200                     ((la->la_gid != tmp_la->la_gid) &&
1201                     !lustre_in_group_p(uc, la->la_gid))) &&
1202                     !mdd_capable(uc, CFS_CAP_CHOWN))
1203                         RETURN(-EPERM);
1204
1205                 /* Likewise, if the user or group of a non-directory
1206                  * has been changed by a non-root user, remove the
1207                  * setgid bit UNLESS there is no group execute bit
1208                  * (this would be a file marked for mandatory
1209                  * locking).  19981026 David C Niemi <niemi@tux.org>
1210                  *
1211                  * Removed the fsuid check (see the comment above) --
1212                  * 19990830 SD. */
1213                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1214                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1215                         la->la_mode &= ~S_ISGID;
1216                         la->la_valid |= LA_MODE;
1217                 }
1218         }
1219
1220         /* For both Size-on-MDS case and truncate case,
1221          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1222          * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1223          * For SOM case, it is true, the MAY_WRITE perm has been checked
1224          * when open, no need check again. For truncate case, it is false,
1225          * the MAY_WRITE perm should be checked here. */
1226         if (ma->ma_attr_flags & MDS_SOM) {
1227                 /* For the "Size-on-MDS" setattr update, merge coming
1228                  * attributes with the set in the inode. BUG 10641 */
1229                 if ((la->la_valid & LA_ATIME) &&
1230                     (la->la_atime <= tmp_la->la_atime))
1231                         la->la_valid &= ~LA_ATIME;
1232
1233                 /* OST attributes do not have a priority over MDS attributes,
1234                  * so drop times if ctime is equal. */
1235                 if ((la->la_valid & LA_CTIME) &&
1236                     (la->la_ctime <= tmp_la->la_ctime))
1237                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1238         } else {
1239                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1240                         if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1241                               (uc->mu_fsuid == tmp_la->la_uid)) &&
1242                             !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1243                                 rc = mdd_permission_internal_locked(env, obj,
1244                                                             tmp_la, MAY_WRITE,
1245                                                             MOR_TGT_CHILD);
1246                                 if (rc)
1247                                         RETURN(rc);
1248                         }
1249                 }
1250                 if (la->la_valid & LA_CTIME) {
1251                         /* The pure setattr, it has the priority over what is
1252                          * already set, do not drop it if ctime is equal. */
1253                         if (la->la_ctime < tmp_la->la_ctime)
1254                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1255                                                   LA_CTIME);
1256                 }
1257         }
1258
1259         RETURN(0);
1260 }
1261
1262 /** Store a data change changelog record
1263  * If this fails, we must fail the whole transaction; we don't
1264  * want the change to commit without the log entry.
1265  * \param mdd_obj - mdd_object of change
1266  * \param handle - transacion handle
1267  */
1268 static int mdd_changelog_data_store(const struct lu_env     *env,
1269                                     struct mdd_device       *mdd,
1270                                     enum changelog_rec_type type,
1271                                     int                     flags,
1272                                     struct mdd_object       *mdd_obj,
1273                                     struct thandle          *handle)
1274 {
1275         const struct lu_fid *tfid = mdo2fid(mdd_obj);
1276         struct llog_changelog_rec *rec;
1277         struct lu_buf *buf;
1278         int reclen;
1279         int rc;
1280
1281         /* Not recording */
1282         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1283                 RETURN(0);
1284         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1285                 RETURN(0);
1286
1287         LASSERT(handle != NULL);
1288         LASSERT(mdd_obj != NULL);
1289
1290         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1291             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1292                 /* Don't need multiple updates in this log */
1293                 /* Don't check under lock - no big deal if we get an extra
1294                    entry */
1295                 RETURN(0);
1296         }
1297
1298         reclen = llog_data_len(sizeof(*rec));
1299         buf = mdd_buf_alloc(env, reclen);
1300         if (buf->lb_buf == NULL)
1301                 RETURN(-ENOMEM);
1302         rec = (struct llog_changelog_rec *)buf->lb_buf;
1303
1304         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1305         rec->cr.cr_type = (__u32)type;
1306         rec->cr.cr_tfid = *tfid;
1307         rec->cr.cr_namelen = 0;
1308         mdd_obj->mod_cltime = cfs_time_current_64();
1309
1310         rc = mdd_changelog_llog_write(mdd, rec, handle);
1311         if (rc < 0) {
1312                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1313                        rc, type, PFID(tfid));
1314                 return -EFAULT;
1315         }
1316
1317         return 0;
1318 }
1319
1320 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1321                   int flags, struct md_object *obj)
1322 {
1323         struct thandle *handle;
1324         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1325         struct mdd_device *mdd = mdo2mdd(obj);
1326         int rc;
1327         ENTRY;
1328
1329         handle = mdd_trans_start(env, mdd);
1330
1331         if (IS_ERR(handle))
1332                 return(PTR_ERR(handle));
1333
1334         rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1335                                       handle);
1336
1337         mdd_trans_stop(env, mdd, rc, handle);
1338
1339         RETURN(rc);
1340 }
1341
1342 /**
1343  * Should be called with write lock held.
1344  *
1345  * \see mdd_lma_set_locked().
1346  */
1347 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1348                        const struct md_attr *ma, struct thandle *handle)
1349 {
1350         struct mdd_thread_info *info = mdd_env_info(env);
1351         struct lu_buf *buf;
1352         struct lustre_mdt_attrs *lma =
1353                                 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1354         int lmasize = sizeof(struct lustre_mdt_attrs);
1355         int rc = 0;
1356
1357         ENTRY;
1358
1359         /* Either HSM or SOM part is not valid, we need to read it before */
1360         if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1361                 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1362                 if (rc <= 0)
1363                         RETURN(rc);
1364
1365                 lustre_lma_swab(lma);
1366         } else {
1367                 memset(lma, 0, lmasize);
1368         }
1369
1370         /* Copy HSM data */
1371         if (ma->ma_valid & MA_HSM) {
1372                 lma->lma_flags  |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1373                 lma->lma_compat |= LMAC_HSM;
1374         }
1375
1376         /* Copy SOM data */
1377         if (ma->ma_valid & MA_SOM) {
1378                 LASSERT(ma->ma_som != NULL);
1379                 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1380                         lma->lma_compat     &= ~LMAC_SOM;
1381                 } else {
1382                         lma->lma_compat     |= LMAC_SOM;
1383                         lma->lma_ioepoch     = ma->ma_som->msd_ioepoch;
1384                         lma->lma_som_size    = ma->ma_som->msd_size;
1385                         lma->lma_som_blocks  = ma->ma_som->msd_blocks;
1386                         lma->lma_som_mountid = ma->ma_som->msd_mountid;
1387                 }
1388         }
1389
1390         /* Copy FID */
1391         memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1392
1393         lustre_lma_swab(lma);
1394         buf = mdd_buf_get(env, lma, lmasize);
1395         rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1396
1397         RETURN(rc);
1398 }
1399
1400 /**
1401  * Save LMA extended attributes with data from \a ma.
1402  *
1403  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1404  * not, LMA EA will be first read from disk, modified and write back.
1405  *
1406  */
1407 static int mdd_lma_set_locked(const struct lu_env *env,
1408                               struct mdd_object *mdd_obj,
1409                               const struct md_attr *ma, struct thandle *handle)
1410 {
1411         int rc;
1412
1413         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1414         rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1415         mdd_write_unlock(env, mdd_obj);
1416         return rc;
1417 }
1418
1419 /* Precedence for choosing record type when multiple
1420  * attributes change: setattr > mtime > ctime > atime
1421  * (ctime changes when mtime does, plus chmod/chown.
1422  * atime and ctime are independent.) */
1423 static int mdd_attr_set_changelog(const struct lu_env *env,
1424                                   struct md_object *obj, struct thandle *handle,
1425                                   __u64 valid)
1426 {
1427         struct mdd_device *mdd = mdo2mdd(obj);
1428         int bits, type = 0;
1429
1430         bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1431         bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1432         bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1433         bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1434         bits = bits & mdd->mdd_cl.mc_mask;
1435         if (bits == 0)
1436                 return 0;
1437
1438         /* The record type is the lowest non-masked set bit */
1439         while (bits && ((bits & 1) == 0)) {
1440                 bits = bits >> 1;
1441                 type++;
1442         }
1443
1444         /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1445         return mdd_changelog_data_store(env, mdd, type, (int)valid,
1446                                         md2mdd_obj(obj), handle);
1447 }
1448
1449 /* set attr and LOV EA at once, return updated attr */
1450 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1451                         const struct md_attr *ma)
1452 {
1453         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1454         struct mdd_device *mdd = mdo2mdd(obj);
1455         struct thandle *handle;
1456         struct lov_mds_md *lmm = NULL;
1457         struct llog_cookie *logcookies = NULL;
1458         int  rc, lmm_size = 0, cookie_size = 0;
1459         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1460 #ifdef HAVE_QUOTA_SUPPORT
1461         struct obd_device *obd = mdd->mdd_obd_dev;
1462         struct mds_obd *mds = &obd->u.mds;
1463         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1464         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1465         int quota_opc = 0, block_count = 0;
1466         int inode_pending[MAXQUOTAS] = { 0, 0 };
1467         int block_pending[MAXQUOTAS] = { 0, 0 };
1468 #endif
1469         ENTRY;
1470
1471         *la_copy = ma->ma_attr;
1472         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1473         if (rc != 0)
1474                 RETURN(rc);
1475
1476         /* setattr on "close" only change atime, or do nothing */
1477         if (ma->ma_valid == MA_INODE &&
1478             ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1479                 RETURN(0);
1480
1481         mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1482                                     MDD_TXN_ATTR_SET_OP);
1483         handle = mdd_trans_start(env, mdd);
1484         if (IS_ERR(handle))
1485                 RETURN(PTR_ERR(handle));
1486         /*TODO: add lock here*/
1487         /* start a log jounal handle if needed */
1488         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1489             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1490                 lmm_size = mdd_lov_mdsize(env, mdd);
1491                 lmm = mdd_max_lmm_get(env, mdd);
1492                 if (lmm == NULL)
1493                         GOTO(cleanup, rc = -ENOMEM);
1494
1495                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1496                                 XATTR_NAME_LOV);
1497
1498                 if (rc < 0)
1499                         GOTO(cleanup, rc);
1500         }
1501
1502         if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1503                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1504                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1505
1506 #ifdef HAVE_QUOTA_SUPPORT
1507         if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1508                 struct obd_export *exp = md_quota(env)->mq_exp;
1509                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1510
1511                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1512                 if (!rc) {
1513                         quota_opc = FSFILT_OP_SETATTR;
1514                         mdd_quota_wrapper(la_copy, qnids);
1515                         mdd_quota_wrapper(la_tmp, qoids);
1516                         /* get file quota for new owner */
1517                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1518                                         qnids, inode_pending, 1, NULL, 0,
1519                                         NULL, 0);
1520                         block_count = (la_tmp->la_blocks + 7) >> 3;
1521                         if (block_count) {
1522                                 void *data = NULL;
1523                                 mdd_data_get(env, mdd_obj, &data);
1524                                 /* get block quota for new owner */
1525                                 lquota_chkquota(mds_quota_interface_ref, obd,
1526                                                 exp, qnids, block_pending,
1527                                                 block_count, NULL,
1528                                                 LQUOTA_FLAGS_BLK, data, 1);
1529                         }
1530                 }
1531         }
1532 #endif
1533
1534         if (la_copy->la_valid & LA_FLAGS) {
1535                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1536                                                   handle, 1);
1537                 if (rc == 0)
1538                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1539         } else if (la_copy->la_valid) {            /* setattr */
1540                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1541                                                   handle, 1);
1542                 /* journal chown/chgrp in llog, just like unlink */
1543                 if (rc == 0 && lmm_size){
1544                         cookie_size = mdd_lov_cookiesize(env, mdd);
1545                         logcookies = mdd_max_cookie_get(env, mdd);
1546                         if (logcookies == NULL)
1547                                 GOTO(cleanup, rc = -ENOMEM);
1548
1549                         if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1550                                             logcookies, cookie_size) <= 0)
1551                                 logcookies = NULL;
1552                 }
1553         }
1554
1555         if (rc == 0 && ma->ma_valid & MA_LOV) {
1556                 cfs_umode_t mode;
1557
1558                 mode = mdd_object_type(mdd_obj);
1559                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1560                         rc = mdd_lsm_sanity_check(env, mdd_obj);
1561                         if (rc)
1562                                 GOTO(cleanup, rc);
1563
1564                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1565                                             ma->ma_lmm_size, handle, 1);
1566                 }
1567
1568         }
1569         if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1570                 cfs_umode_t mode;
1571
1572                 mode = mdd_object_type(mdd_obj);
1573                 if (S_ISREG(mode))
1574                         rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1575
1576         }
1577 cleanup:
1578         if (rc == 0)
1579                 rc = mdd_attr_set_changelog(env, obj, handle,
1580                                             ma->ma_attr.la_valid);
1581         mdd_trans_stop(env, mdd, rc, handle);
1582         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1583                 /*set obd attr, if needed*/
1584                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1585                                            logcookies);
1586         }
1587 #ifdef HAVE_QUOTA_SUPPORT
1588         if (quota_opc) {
1589                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1590                                       inode_pending, 0);
1591                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1592                                       block_pending, 1);
1593                 /* Trigger dqrel/dqacq for original owner and new owner.
1594                  * If failed, the next call for lquota_chkquota will
1595                  * process it. */
1596                 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1597                               quota_opc);
1598         }
1599 #endif
1600         RETURN(rc);
1601 }
1602
1603 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1604                       const struct lu_buf *buf, const char *name, int fl,
1605                       struct thandle *handle)
1606 {
1607         int  rc;
1608         ENTRY;
1609
1610         mdd_write_lock(env, obj, MOR_TGT_CHILD);
1611         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1612         mdd_write_unlock(env, obj);
1613
1614         RETURN(rc);
1615 }
1616
1617 static int mdd_xattr_sanity_check(const struct lu_env *env,
1618                                   struct mdd_object *obj)
1619 {
1620         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1621         struct md_ucred *uc     = md_ucred(env);
1622         int rc;
1623         ENTRY;
1624
1625         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1626                 RETURN(-EPERM);
1627
1628         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1629         if (rc)
1630                 RETURN(rc);
1631
1632         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1633             !mdd_capable(uc, CFS_CAP_FOWNER))
1634                 RETURN(-EPERM);
1635
1636         RETURN(rc);
1637 }
1638
1639 /**
1640  * The caller should guarantee to update the object ctime
1641  * after xattr_set if needed.
1642  */
1643 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1644                          const struct lu_buf *buf, const char *name,
1645                          int fl)
1646 {
1647         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1648         struct mdd_device *mdd = mdo2mdd(obj);
1649         struct thandle *handle;
1650         int  rc;
1651         ENTRY;
1652
1653         rc = mdd_xattr_sanity_check(env, mdd_obj);
1654         if (rc)
1655                 RETURN(rc);
1656
1657         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1658         /* security-replated changes may require sync */
1659         if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1660             mdd->mdd_sync_permission == 1)
1661                 txn_param_sync(&mdd_env_info(env)->mti_param);
1662
1663         handle = mdd_trans_start(env, mdd);
1664         if (IS_ERR(handle))
1665                 RETURN(PTR_ERR(handle));
1666
1667         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1668
1669         /* Only record user xattr changes */
1670         if ((rc == 0) && (strncmp("user.", name, 5) == 0))
1671                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1672                                               handle);
1673         mdd_trans_stop(env, mdd, rc, handle);
1674
1675         RETURN(rc);
1676 }
1677
1678 /**
1679  * The caller should guarantee to update the object ctime
1680  * after xattr_set if needed.
1681  */
1682 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1683                   const char *name)
1684 {
1685         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1686         struct mdd_device *mdd = mdo2mdd(obj);
1687         struct thandle *handle;
1688         int  rc;
1689         ENTRY;
1690
1691         rc = mdd_xattr_sanity_check(env, mdd_obj);
1692         if (rc)
1693                 RETURN(rc);
1694
1695         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1696         handle = mdd_trans_start(env, mdd);
1697         if (IS_ERR(handle))
1698                 RETURN(PTR_ERR(handle));
1699
1700         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1701         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1702                            mdd_object_capa(env, mdd_obj));
1703         mdd_write_unlock(env, mdd_obj);
1704
1705         /* Only record user xattr changes */
1706         if ((rc == 0) && (strncmp("user.", name, 5) != 0))
1707                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1708                                               handle);
1709
1710         mdd_trans_stop(env, mdd, rc, handle);
1711
1712         RETURN(rc);
1713 }
1714
1715 /* partial unlink */
1716 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1717                        struct md_attr *ma)
1718 {
1719         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1720         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1721         struct mdd_device *mdd = mdo2mdd(obj);
1722         struct thandle *handle;
1723 #ifdef HAVE_QUOTA_SUPPORT
1724         struct obd_device *obd = mdd->mdd_obd_dev;
1725         struct mds_obd *mds = &obd->u.mds;
1726         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1727         int quota_opc = 0;
1728 #endif
1729         int rc;
1730         ENTRY;
1731
1732         /*
1733          * Check -ENOENT early here because we need to get object type
1734          * to calculate credits before transaction start
1735          */
1736         if (!mdd_object_exists(mdd_obj))
1737                 RETURN(-ENOENT);
1738
1739         LASSERT(mdd_object_exists(mdd_obj) > 0);
1740
1741         rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1742         if (rc)
1743                 RETURN(rc);
1744
1745         handle = mdd_trans_start(env, mdd);
1746         if (IS_ERR(handle))
1747                 RETURN(-ENOMEM);
1748
1749         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1750
1751         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1752         if (rc)
1753                 GOTO(cleanup, rc);
1754
1755         __mdd_ref_del(env, mdd_obj, handle, 0);
1756
1757         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1758                 /* unlink dot */
1759                 __mdd_ref_del(env, mdd_obj, handle, 1);
1760         }
1761
1762         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1763         la_copy->la_ctime = ma->ma_attr.la_ctime;
1764
1765         la_copy->la_valid = LA_CTIME;
1766         rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1767         if (rc)
1768                 GOTO(cleanup, rc);
1769
1770         rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1771 #ifdef HAVE_QUOTA_SUPPORT
1772         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1773             ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1774                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1775                 mdd_quota_wrapper(&ma->ma_attr, qids);
1776         }
1777 #endif
1778
1779
1780         EXIT;
1781 cleanup:
1782         mdd_write_unlock(env, mdd_obj);
1783         mdd_trans_stop(env, mdd, rc, handle);
1784 #ifdef HAVE_QUOTA_SUPPORT
1785         if (quota_opc)
1786                 /* Trigger dqrel on the owner of child. If failed,
1787                  * the next call for lquota_chkquota will process it */
1788                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1789                               quota_opc);
1790 #endif
1791         return rc;
1792 }
1793
1794 /* partial operation */
1795 static int mdd_oc_sanity_check(const struct lu_env *env,
1796                                struct mdd_object *obj,
1797                                struct md_attr *ma)
1798 {
1799         int rc;
1800         ENTRY;
1801
1802         switch (ma->ma_attr.la_mode & S_IFMT) {
1803         case S_IFREG:
1804         case S_IFDIR:
1805         case S_IFLNK:
1806         case S_IFCHR:
1807         case S_IFBLK:
1808         case S_IFIFO:
1809         case S_IFSOCK:
1810                 rc = 0;
1811                 break;
1812         default:
1813                 rc = -EINVAL;
1814                 break;
1815         }
1816         RETURN(rc);
1817 }
1818
1819 static int mdd_object_create(const struct lu_env *env,
1820                              struct md_object *obj,
1821                              const struct md_op_spec *spec,
1822                              struct md_attr *ma)
1823 {
1824
1825         struct mdd_device *mdd = mdo2mdd(obj);
1826         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1827         const struct lu_fid *pfid = spec->u.sp_pfid;
1828         struct thandle *handle;
1829 #ifdef HAVE_QUOTA_SUPPORT
1830         struct obd_device *obd = mdd->mdd_obd_dev;
1831         struct obd_export *exp = md_quota(env)->mq_exp;
1832         struct mds_obd *mds = &obd->u.mds;
1833         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1834         int quota_opc = 0, block_count = 0;
1835         int inode_pending[MAXQUOTAS] = { 0, 0 };
1836         int block_pending[MAXQUOTAS] = { 0, 0 };
1837 #endif
1838         int rc = 0;
1839         ENTRY;
1840
1841 #ifdef HAVE_QUOTA_SUPPORT
1842         if (mds->mds_quota) {
1843                 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1844                 mdd_quota_wrapper(&ma->ma_attr, qids);
1845                 /* get file quota for child */
1846                 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1847                                 qids, inode_pending, 1, NULL, 0,
1848                                 NULL, 0);
1849                 switch (ma->ma_attr.la_mode & S_IFMT) {
1850                 case S_IFLNK:
1851                 case S_IFDIR:
1852                         block_count = 2;
1853                         break;
1854                 case S_IFREG:
1855                         block_count = 1;
1856                         break;
1857                 }
1858                 /* get block quota for child */
1859                 if (block_count)
1860                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1861                                         qids, block_pending, block_count,
1862                                         NULL, LQUOTA_FLAGS_BLK, NULL, 0);
1863         }
1864 #endif
1865
1866         mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
1867         handle = mdd_trans_start(env, mdd);
1868         if (IS_ERR(handle))
1869                 GOTO(out_pending, rc = PTR_ERR(handle));
1870
1871         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1872         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1873         if (rc)
1874                 GOTO(unlock, rc);
1875
1876         rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1877         if (rc)
1878                 GOTO(unlock, rc);
1879
1880         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1881                 /* If creating the slave object, set slave EA here. */
1882                 int lmv_size = spec->u.sp_ea.eadatalen;
1883                 struct lmv_stripe_md *lmv;
1884
1885                 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1886                 LASSERT(lmv != NULL && lmv_size > 0);
1887
1888                 rc = __mdd_xattr_set(env, mdd_obj,
1889                                      mdd_buf_get_const(env, lmv, lmv_size),
1890                                      XATTR_NAME_LMV, 0, handle);
1891                 if (rc)
1892                         GOTO(unlock, rc);
1893
1894                 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1895                                            handle, 0);
1896         } else {
1897 #ifdef CONFIG_FS_POSIX_ACL
1898                 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1899                         struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1900
1901                         buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1902                         buf->lb_len = spec->u.sp_ea.eadatalen;
1903                         if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1904                                 rc = __mdd_acl_init(env, mdd_obj, buf,
1905                                                     &ma->ma_attr.la_mode,
1906                                                     handle);
1907                                 if (rc)
1908                                         GOTO(unlock, rc);
1909                                 else
1910                                         ma->ma_attr.la_valid |= LA_MODE;
1911                         }
1912
1913                         pfid = spec->u.sp_ea.fid;
1914                 }
1915 #endif
1916                 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1917                                            spec);
1918         }
1919         EXIT;
1920 unlock:
1921         if (rc == 0)
1922                 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1923         mdd_write_unlock(env, mdd_obj);
1924
1925         mdd_trans_stop(env, mdd, rc, handle);
1926 out_pending:
1927 #ifdef HAVE_QUOTA_SUPPORT
1928         if (quota_opc) {
1929                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1930                                       inode_pending, 0);
1931                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1932                                       block_pending, 1);
1933                 /* Trigger dqacq on the owner of child. If failed,
1934                  * the next call for lquota_chkquota will process it. */
1935                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1936                               quota_opc);
1937         }
1938 #endif
1939         return rc;
1940 }
1941
1942 /* partial link */
1943 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1944                        const struct md_attr *ma)
1945 {
1946         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1947         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1948         struct mdd_device *mdd = mdo2mdd(obj);
1949         struct thandle *handle;
1950         int rc;
1951         ENTRY;
1952
1953         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1954         handle = mdd_trans_start(env, mdd);
1955         if (IS_ERR(handle))
1956                 RETURN(-ENOMEM);
1957
1958         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1959         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1960         if (rc == 0)
1961                 __mdd_ref_add(env, mdd_obj, handle);
1962         mdd_write_unlock(env, mdd_obj);
1963         if (rc == 0) {
1964                 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1965                 la_copy->la_ctime = ma->ma_attr.la_ctime;
1966
1967                 la_copy->la_valid = LA_CTIME;
1968                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1969                                                         handle, 0);
1970         }
1971         mdd_trans_stop(env, mdd, 0, handle);
1972
1973         RETURN(rc);
1974 }
1975
1976 /*
1977  * do NOT or the MAY_*'s, you'll get the weakest
1978  */
1979 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1980 {
1981         int res = 0;
1982
1983         /* Sadly, NFSD reopens a file repeatedly during operation, so the
1984          * "acc_mode = 0" allowance for newly-created files isn't honoured.
1985          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1986          * owner can write to a file even if it is marked readonly to hide
1987          * its brokenness. (bug 5781) */
1988         if (flags & MDS_OPEN_OWNEROVERRIDE) {
1989                 struct md_ucred *uc = md_ucred(env);
1990
1991                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1992                     (la->la_uid == uc->mu_fsuid))
1993                         return 0;
1994         }
1995
1996         if (flags & FMODE_READ)
1997                 res |= MAY_READ;
1998         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1999                 res |= MAY_WRITE;
2000         if (flags & MDS_FMODE_EXEC)
2001                 res = MAY_EXEC;
2002         return res;
2003 }
2004
2005 static int mdd_open_sanity_check(const struct lu_env *env,
2006                                  struct mdd_object *obj, int flag)
2007 {
2008         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2009         int mode, rc;
2010         ENTRY;
2011
2012         /* EEXIST check */
2013         if (mdd_is_dead_obj(obj))
2014                 RETURN(-ENOENT);
2015
2016         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
2017         if (rc)
2018                RETURN(rc);
2019
2020         if (S_ISLNK(tmp_la->la_mode))
2021                 RETURN(-ELOOP);
2022
2023         mode = accmode(env, tmp_la, flag);
2024
2025         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2026                 RETURN(-EISDIR);
2027
2028         if (!(flag & MDS_OPEN_CREATED)) {
2029                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
2030                 if (rc)
2031                         RETURN(rc);
2032         }
2033
2034         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2035             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2036                 flag &= ~MDS_OPEN_TRUNC;
2037
2038         /* For writing append-only file must open it with append mode. */
2039         if (mdd_is_append(obj)) {
2040                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2041                         RETURN(-EPERM);
2042                 if (flag & MDS_OPEN_TRUNC)
2043                         RETURN(-EPERM);
2044         }
2045
2046 #if 0
2047         /*
2048          * Now, flag -- O_NOATIME does not be packed by client.
2049          */
2050         if (flag & O_NOATIME) {
2051                 struct md_ucred *uc = md_ucred(env);
2052
2053                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2054                     (uc->mu_valid == UCRED_NEW)) &&
2055                     (uc->mu_fsuid != tmp_la->la_uid) &&
2056                     !mdd_capable(uc, CFS_CAP_FOWNER))
2057                         RETURN(-EPERM);
2058         }
2059 #endif
2060
2061         RETURN(0);
2062 }
2063
2064 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2065                     int flags)
2066 {
2067         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2068         int rc = 0;
2069
2070         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2071
2072         rc = mdd_open_sanity_check(env, mdd_obj, flags);
2073         if (rc == 0)
2074                 mdd_obj->mod_count++;
2075
2076         mdd_write_unlock(env, mdd_obj);
2077         return rc;
2078 }
2079
2080 /* return md_attr back,
2081  * if it is last unlink then return lov ea + llog cookie*/
2082 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2083                     struct md_attr *ma)
2084 {
2085         int rc = 0;
2086         ENTRY;
2087
2088         if (S_ISREG(mdd_object_type(obj))) {
2089                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2090                  * Caller must be ready for that. */
2091
2092                 rc = __mdd_lmm_get(env, obj, ma);
2093                 if ((ma->ma_valid & MA_LOV))
2094                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2095                                             obj, ma);
2096         }
2097         RETURN(rc);
2098 }
2099
2100 /*
2101  * No permission check is needed.
2102  */
2103 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2104                      struct md_attr *ma)
2105 {
2106         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2107         struct mdd_device *mdd = mdo2mdd(obj);
2108         struct thandle    *handle = NULL;
2109         int rc;
2110         int reset = 1;
2111
2112 #ifdef HAVE_QUOTA_SUPPORT
2113         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2114         struct mds_obd *mds = &obd->u.mds;
2115         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2116         int quota_opc = 0;
2117 #endif
2118         ENTRY;
2119
2120         if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2121                 mdd_obj->mod_count--;
2122
2123                 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2124                         CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2125                                "list\n", PFID(mdd_object_fid(mdd_obj)));
2126                 RETURN(0);
2127         }
2128
2129         /* check without any lock */
2130         if (mdd_obj->mod_count == 1 &&
2131             (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2132  again:
2133                 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
2134                 if (rc)
2135                         RETURN(rc);
2136                 handle = mdd_trans_start(env, mdo2mdd(obj));
2137                 if (IS_ERR(handle))
2138                         RETURN(PTR_ERR(handle));
2139         }
2140
2141         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2142         if (handle == NULL && mdd_obj->mod_count == 1 &&
2143             (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2144                 mdd_write_unlock(env, mdd_obj);
2145                 goto again;
2146         }
2147
2148         /* release open count */
2149         mdd_obj->mod_count --;
2150
2151         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2152                 /* remove link to object from orphan index */
2153                 rc = __mdd_orphan_del(env, mdd_obj, handle);
2154                 if (rc == 0) {
2155                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2156                                "list, OSS objects to be destroyed.\n",
2157                                PFID(mdd_object_fid(mdd_obj)));
2158                 } else {
2159                         CERROR("Object "DFID" can not be deleted from orphan "
2160                                 "list, maybe cause OST objects can not be "
2161                                 "destroyed (err: %d).\n",
2162                                 PFID(mdd_object_fid(mdd_obj)), rc);
2163                         /* If object was not deleted from orphan list, do not
2164                          * destroy OSS objects, which will be done when next
2165                          * recovery. */
2166                         GOTO(out, rc);
2167                 }
2168         }
2169
2170         rc = mdd_iattr_get(env, mdd_obj, ma);
2171         /* Object maybe not in orphan list originally, it is rare case for
2172          * mdd_finish_unlink() failure. */
2173         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
2174 #ifdef HAVE_QUOTA_SUPPORT
2175                 if (mds->mds_quota) {
2176                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2177                         mdd_quota_wrapper(&ma->ma_attr, qids);
2178                 }
2179 #endif
2180                 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2181                 if (ma->ma_valid & MA_FLAGS &&
2182                     ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2183                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2184                 } else {
2185                         rc = mdd_object_kill(env, mdd_obj, ma);
2186                         if (rc == 0)
2187                                 reset = 0;
2188                 }
2189
2190                 if (rc != 0)
2191                         CERROR("Error when prepare to delete Object "DFID" , "
2192                                "which will cause OST objects can not be "
2193                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
2194         }
2195         EXIT;
2196
2197 out:
2198         if (reset)
2199                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2200
2201         mdd_write_unlock(env, mdd_obj);
2202         if (handle != NULL)
2203                 mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
2204 #ifdef HAVE_QUOTA_SUPPORT
2205         if (quota_opc)
2206                 /* Trigger dqrel on the owner of child. If failed,
2207                  * the next call for lquota_chkquota will process it */
2208                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2209                               quota_opc);
2210 #endif
2211         return rc;
2212 }
2213
2214 /*
2215  * Permission check is done when open,
2216  * no need check again.
2217  */
2218 static int mdd_readpage_sanity_check(const struct lu_env *env,
2219                                      struct mdd_object *obj)
2220 {
2221         struct dt_object *next = mdd_object_child(obj);
2222         int rc;
2223         ENTRY;
2224
2225         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2226                 rc = 0;
2227         else
2228                 rc = -ENOTDIR;
2229
2230         RETURN(rc);
2231 }
2232
2233 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2234                               int first, void *area, int nob,
2235                               const struct dt_it_ops *iops, struct dt_it *it,
2236                               __u64 *start, __u64 *end,
2237                               struct lu_dirent **last, __u32 attr)
2238 {
2239         int                     result;
2240         __u64                   hash = 0;
2241         struct lu_dirent       *ent;
2242
2243         if (first) {
2244                 memset(area, 0, sizeof (struct lu_dirpage));
2245                 area += sizeof (struct lu_dirpage);
2246                 nob  -= sizeof (struct lu_dirpage);
2247         }
2248
2249         ent  = area;
2250         do {
2251                 int    len;
2252                 int    recsize;
2253
2254                 len  = iops->key_size(env, it);
2255
2256                 /* IAM iterator can return record with zero len. */
2257                 if (len == 0)
2258                         goto next;
2259
2260                 hash = iops->store(env, it);
2261                 if (unlikely(first)) {
2262                         first = 0;
2263                         *start = hash;
2264                 }
2265
2266                 /* calculate max space required for lu_dirent */
2267                 recsize = lu_dirent_calc_size(len, attr);
2268
2269                 if (nob >= recsize) {
2270                         result = iops->rec(env, it, ent, attr);
2271                         if (result == -ESTALE)
2272                                 goto next;
2273                         if (result != 0)
2274                                 goto out;
2275
2276                         /* osd might not able to pack all attributes,
2277                          * so recheck rec length */
2278                         recsize = le16_to_cpu(ent->lde_reclen);
2279                 } else {
2280                         /*
2281                          * record doesn't fit into page, enlarge previous one.
2282                          */
2283                         if (*last) {
2284                                 (*last)->lde_reclen =
2285                                         cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
2286                                                         nob);
2287                                 result = 0;
2288                         } else
2289                                 result = -EINVAL;
2290
2291                         goto out;
2292                 }
2293                 *last = ent;
2294                 ent = (void *)ent + recsize;
2295                 nob -= recsize;
2296
2297 next:
2298                 result = iops->next(env, it);
2299                 if (result == -ESTALE)
2300                         goto next;
2301         } while (result == 0);
2302
2303 out:
2304         *end = hash;
2305         return result;
2306 }
2307
2308 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2309                           const struct lu_rdpg *rdpg)
2310 {
2311         struct dt_it      *it;
2312         struct dt_object  *next = mdd_object_child(obj);
2313         const struct dt_it_ops  *iops;
2314         struct page       *pg;
2315         struct lu_dirent  *last = NULL;
2316         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2317         int i;
2318         int rc;
2319         int nob;
2320         __u64 hash_start;
2321         __u64 hash_end = 0;
2322
2323         LASSERT(rdpg->rp_pages != NULL);
2324         LASSERT(next->do_index_ops != NULL);
2325
2326         if (rdpg->rp_count <= 0)
2327                 return -EFAULT;
2328
2329         /*
2330          * iterate through directory and fill pages from @rdpg
2331          */
2332         iops = &next->do_index_ops->dio_it;
2333         it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
2334         if (IS_ERR(it))
2335                 return PTR_ERR(it);
2336
2337         rc = iops->load(env, it, rdpg->rp_hash);
2338
2339         if (rc == 0){
2340                 /*
2341                  * Iterator didn't find record with exactly the key requested.
2342                  *
2343                  * It is currently either
2344                  *
2345                  *     - positioned above record with key less than
2346                  *     requested---skip it.
2347                  *
2348                  *     - or not positioned at all (is in IAM_IT_SKEWED
2349                  *     state)---position it on the next item.
2350                  */
2351                 rc = iops->next(env, it);
2352         } else if (rc > 0)
2353                 rc = 0;
2354
2355         /*
2356          * At this point and across for-loop:
2357          *
2358          *  rc == 0 -> ok, proceed.
2359          *  rc >  0 -> end of directory.
2360          *  rc <  0 -> error.
2361          */
2362         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2363              i++, nob -= CFS_PAGE_SIZE) {
2364                 LASSERT(i < rdpg->rp_npages);
2365                 pg = rdpg->rp_pages[i];
2366                 rc = mdd_dir_page_build(env, mdd, !i, cfs_kmap(pg),
2367                                         min_t(int, nob, CFS_PAGE_SIZE), iops,
2368                                         it, &hash_start, &hash_end, &last,
2369                                         rdpg->rp_attrs);
2370                 if (rc != 0 || i == rdpg->rp_npages - 1) {
2371                         if (last)
2372                                 last->lde_reclen = 0;
2373                 }
2374                 cfs_kunmap(pg);
2375         }
2376         if (rc > 0) {
2377                 /*
2378                  * end of directory.
2379                  */
2380                 hash_end = MDS_DIR_END_OFF;
2381                 rc = 0;
2382         }
2383         if (rc == 0) {
2384                 struct lu_dirpage *dp;
2385
2386                 dp = cfs_kmap(rdpg->rp_pages[0]);
2387                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2388                 dp->ldp_hash_end   = cpu_to_le64(hash_end);
2389                 if (i == 0)
2390                         /*
2391                          * No pages were processed, mark this.
2392                          */
2393                         dp->ldp_flags |= LDF_EMPTY;
2394
2395                 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2396                 cfs_kunmap(rdpg->rp_pages[0]);
2397         }
2398         iops->put(env, it);
2399         iops->fini(env, it);
2400
2401         return rc;
2402 }
2403
2404 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2405                  const struct lu_rdpg *rdpg)
2406 {
2407         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2408         int rc;
2409         ENTRY;
2410
2411         LASSERT(mdd_object_exists(mdd_obj));
2412
2413         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2414         rc = mdd_readpage_sanity_check(env, mdd_obj);
2415         if (rc)
2416                 GOTO(out_unlock, rc);
2417
2418         if (mdd_is_dead_obj(mdd_obj)) {
2419                 struct page *pg;
2420                 struct lu_dirpage *dp;
2421
2422                 /*
2423                  * According to POSIX, please do not return any entry to client:
2424                  * even dot and dotdot should not be returned.
2425                  */
2426                 CWARN("readdir from dead object: "DFID"\n",
2427                         PFID(mdd_object_fid(mdd_obj)));
2428
2429                 if (rdpg->rp_count <= 0)
2430                         GOTO(out_unlock, rc = -EFAULT);
2431                 LASSERT(rdpg->rp_pages != NULL);
2432
2433                 pg = rdpg->rp_pages[0];
2434                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2435                 memset(dp, 0 , sizeof(struct lu_dirpage));
2436                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2437                 dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
2438                 dp->ldp_flags |= LDF_EMPTY;
2439                 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2440                 cfs_kunmap(pg);
2441                 GOTO(out_unlock, rc = 0);
2442         }
2443
2444         rc = __mdd_readpage(env, mdd_obj, rdpg);
2445
2446         EXIT;
2447 out_unlock:
2448         mdd_read_unlock(env, mdd_obj);
2449         return rc;
2450 }
2451
2452 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2453 {
2454         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2455         struct dt_object *next;
2456
2457         LASSERT(mdd_object_exists(mdd_obj));
2458         next = mdd_object_child(mdd_obj);
2459         return next->do_ops->do_object_sync(env, next);
2460 }
2461
2462 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2463                                         struct md_object *obj)
2464 {
2465         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2466
2467         LASSERT(mdd_object_exists(mdd_obj));
2468         return do_version_get(env, mdd_object_child(mdd_obj));
2469 }
2470
2471 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2472                             dt_obj_version_t version)
2473 {
2474         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2475
2476         LASSERT(mdd_object_exists(mdd_obj));
2477         do_version_set(env, mdd_object_child(mdd_obj), version);
2478 }
2479
2480 const struct md_object_operations mdd_obj_ops = {
2481         .moo_permission    = mdd_permission,
2482         .moo_attr_get      = mdd_attr_get,
2483         .moo_attr_set      = mdd_attr_set,
2484         .moo_xattr_get     = mdd_xattr_get,
2485         .moo_xattr_set     = mdd_xattr_set,
2486         .moo_xattr_list    = mdd_xattr_list,
2487         .moo_xattr_del     = mdd_xattr_del,
2488         .moo_object_create = mdd_object_create,
2489         .moo_ref_add       = mdd_ref_add,
2490         .moo_ref_del       = mdd_ref_del,
2491         .moo_open          = mdd_open,
2492         .moo_close         = mdd_close,
2493         .moo_readpage      = mdd_readpage,
2494         .moo_readlink      = mdd_readlink,
2495         .moo_changelog     = mdd_changelog,
2496         .moo_capa_get      = mdd_capa_get,
2497         .moo_object_sync   = mdd_object_sync,
2498         .moo_version_get   = mdd_version_get,
2499         .moo_version_set   = mdd_version_set,
2500         .moo_path          = mdd_path,
2501         .moo_file_lock     = mdd_file_lock,
2502         .moo_file_unlock   = mdd_file_unlock,
2503 };