Whamcloud - gitweb
9aa717075ee0a7c480c6b49348ba1abb7bdf5195
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_object.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <linux/module.h>
46 #include <obd.h>
47 #include <obd_class.h>
48 #include <obd_support.h>
49 #include <lprocfs_status.h>
50 /* fid_be_cpu(), fid_cpu_to_be(). */
51 #include <lustre_fid.h>
52 #include <obd_lov.h>
53
54 #include <lustre_param.h>
55 #include <lustre_mds.h>
56 #include <lustre/lustre_idl.h>
57
58 #include "mdd_internal.h"
59
60 static const struct lu_object_operations mdd_lu_obj_ops;
61 extern cfs_mem_cache_t *mdd_object_kmem;
62
63 static int mdd_xattr_get(const struct lu_env *env,
64                          struct md_object *obj, struct lu_buf *buf,
65                          const char *name);
66
67 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
68                  void **data)
69 {
70         if (mdd_object_exists(obj) == 0) {
71                 CERROR("%s: object "DFID" not found: rc = -2\n",
72                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
73                 return -ENOENT;
74         }
75         mdo_data_get(env, obj, data);
76         return 0;
77 }
78
79 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
80                struct lu_attr *la, struct lustre_capa *capa)
81 {
82         if (mdd_object_exists(obj) == 0) {
83                 CERROR("%s: object "DFID" not found: rc = -2\n",
84                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
85                 return -ENOENT;
86         }
87         return mdo_attr_get(env, obj, la, capa);
88 }
89
90 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
91 {
92         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
93
94         if (flags & LUSTRE_APPEND_FL)
95                 obj->mod_flags |= APPEND_OBJ;
96
97         if (flags & LUSTRE_IMMUTABLE_FL)
98                 obj->mod_flags |= IMMUTE_OBJ;
99 }
100
101 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
102 {
103         struct mdd_thread_info *info;
104
105         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
106         LASSERT(info != NULL);
107         return info;
108 }
109
110 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
111 {
112         struct lu_buf *buf;
113
114         buf = &mdd_env_info(env)->mti_buf;
115         buf->lb_buf = area;
116         buf->lb_len = len;
117         return buf;
118 }
119
120 void mdd_buf_put(struct lu_buf *buf)
121 {
122         if (buf == NULL || buf->lb_buf == NULL)
123                 return;
124         OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
125         buf->lb_buf = NULL;
126         buf->lb_len = 0;
127 }
128
129 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
130                                        const void *area, ssize_t len)
131 {
132         struct lu_buf *buf;
133
134         buf = &mdd_env_info(env)->mti_buf;
135         buf->lb_buf = (void *)area;
136         buf->lb_len = len;
137         return buf;
138 }
139
140 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
141 {
142         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
143
144         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
145                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
146                 buf->lb_buf = NULL;
147         }
148         if (buf->lb_buf == NULL) {
149                 buf->lb_len = len;
150                 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
151                 if (buf->lb_buf == NULL)
152                         buf->lb_len = 0;
153         }
154         return buf;
155 }
156
157 /** Increase the size of the \a mti_big_buf.
158  * preserves old data in buffer
159  * old buffer remains unchanged on error
160  * \retval 0 or -ENOMEM
161  */
162 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
163 {
164         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
165         struct lu_buf buf;
166
167         LASSERT(len >= oldbuf->lb_len);
168         OBD_ALLOC_LARGE(buf.lb_buf, len);
169
170         if (buf.lb_buf == NULL)
171                 return -ENOMEM;
172
173         buf.lb_len = len;
174         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
175
176         OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
177
178         memcpy(oldbuf, &buf, sizeof(buf));
179
180         return 0;
181 }
182
183 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
184                                        struct mdd_device *mdd)
185 {
186         struct mdd_thread_info *mti = mdd_env_info(env);
187         int                     max_cookie_size;
188
189         max_cookie_size = mdd_lov_cookiesize(env, mdd);
190         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
191                 if (mti->mti_max_cookie)
192                         OBD_FREE_LARGE(mti->mti_max_cookie,
193                                        mti->mti_max_cookie_size);
194                 mti->mti_max_cookie = NULL;
195                 mti->mti_max_cookie_size = 0;
196         }
197         if (unlikely(mti->mti_max_cookie == NULL)) {
198                 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
199                 if (likely(mti->mti_max_cookie != NULL))
200                         mti->mti_max_cookie_size = max_cookie_size;
201         }
202         if (likely(mti->mti_max_cookie != NULL))
203                 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
204         return mti->mti_max_cookie;
205 }
206
207 struct lov_mds_md *mdd_max_lmm_buffer(const struct lu_env *env, int size)
208 {
209         struct mdd_thread_info *mti = mdd_env_info(env);
210
211         if (unlikely(mti->mti_max_lmm_size < size)) {
212                 int rsize = size_roundup_power2(size);
213
214                 if (mti->mti_max_lmm_size > 0) {
215                         LASSERT(mti->mti_max_lmm);
216                         OBD_FREE_LARGE(mti->mti_max_lmm,
217                                        mti->mti_max_lmm_size);
218                         mti->mti_max_lmm = NULL;
219                         mti->mti_max_lmm_size = 0;
220                 }
221
222                 OBD_ALLOC_LARGE(mti->mti_max_lmm, rsize);
223                 if (likely(mti->mti_max_lmm != NULL))
224                         mti->mti_max_lmm_size = rsize;
225         }
226         return mti->mti_max_lmm;
227 }
228
229 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
230                                    struct mdd_device *mdd)
231 {
232         int max_lmm_size;
233
234         max_lmm_size = mdd_lov_mdsize(env, mdd);
235         return mdd_max_lmm_buffer(env, max_lmm_size);
236 }
237
238 struct lu_object *mdd_object_alloc(const struct lu_env *env,
239                                    const struct lu_object_header *hdr,
240                                    struct lu_device *d)
241 {
242         struct mdd_object *mdd_obj;
243
244         OBD_SLAB_ALLOC_PTR_GFP(mdd_obj, mdd_object_kmem, CFS_ALLOC_IO);
245         if (mdd_obj != NULL) {
246                 struct lu_object *o;
247
248                 o = mdd2lu_obj(mdd_obj);
249                 lu_object_init(o, NULL, d);
250                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
251                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
252                 mdd_obj->mod_count = 0;
253                 o->lo_ops = &mdd_lu_obj_ops;
254                 return o;
255         } else {
256                 return NULL;
257         }
258 }
259
260 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
261                            const struct lu_object_conf *unused)
262 {
263         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
264         struct mdd_object *mdd_obj = lu2mdd_obj(o);
265         struct lu_object  *below;
266         struct lu_device  *under;
267         ENTRY;
268
269         mdd_obj->mod_cltime = 0;
270         under = &d->mdd_child->dd_lu_dev;
271         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
272         mdd_pdlock_init(mdd_obj);
273         if (below == NULL)
274                 RETURN(-ENOMEM);
275
276         lu_object_add(o, below);
277
278         RETURN(0);
279 }
280
281 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
282 {
283         if (lu_object_exists(o))
284                 return mdd_get_flags(env, lu2mdd_obj(o));
285         else
286                 return 0;
287 }
288
289 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
290 {
291         struct mdd_object *mdd = lu2mdd_obj(o);
292
293         lu_object_fini(o);
294         OBD_SLAB_FREE_PTR(mdd, mdd_object_kmem);
295 }
296
297 static int mdd_object_print(const struct lu_env *env, void *cookie,
298                             lu_printer_t p, const struct lu_object *o)
299 {
300         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
301         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
302                     "valid=%x, cltime="LPU64", flags=%lx)",
303                     mdd, mdd->mod_count, mdd->mod_valid,
304                     mdd->mod_cltime, mdd->mod_flags);
305 }
306
307 static const struct lu_object_operations mdd_lu_obj_ops = {
308         .loo_object_init    = mdd_object_init,
309         .loo_object_start   = mdd_object_start,
310         .loo_object_free    = mdd_object_free,
311         .loo_object_print   = mdd_object_print,
312 };
313
314 struct mdd_object *mdd_object_find(const struct lu_env *env,
315                                    struct mdd_device *d,
316                                    const struct lu_fid *f)
317 {
318         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
319 }
320
321 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
322                         const char *path, struct lu_fid *fid)
323 {
324         struct lu_buf *buf;
325         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
326         struct mdd_object *obj;
327         struct lu_name *lname = &mdd_env_info(env)->mti_name;
328         char *name;
329         int rc = 0;
330         ENTRY;
331
332         /* temp buffer for path element */
333         buf = mdd_buf_alloc(env, PATH_MAX);
334         if (buf->lb_buf == NULL)
335                 RETURN(-ENOMEM);
336
337         lname->ln_name = name = buf->lb_buf;
338         lname->ln_namelen = 0;
339         *f = mdd->mdd_root_fid;
340
341         while(1) {
342                 while (*path == '/')
343                         path++;
344                 if (*path == '\0')
345                         break;
346                 while (*path != '/' && *path != '\0') {
347                         *name = *path;
348                         path++;
349                         name++;
350                         lname->ln_namelen++;
351                 }
352
353                 *name = '\0';
354                 /* find obj corresponding to fid */
355                 obj = mdd_object_find(env, mdd, f);
356                 if (obj == NULL)
357                         GOTO(out, rc = -EREMOTE);
358                 if (IS_ERR(obj))
359                         GOTO(out, rc = PTR_ERR(obj));
360                 /* get child fid from parent and name */
361                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
362                 mdd_object_put(env, obj);
363                 if (rc)
364                         break;
365
366                 name = buf->lb_buf;
367                 lname->ln_namelen = 0;
368         }
369
370         if (!rc)
371                 *fid = *f;
372 out:
373         RETURN(rc);
374 }
375
376 /** The maximum depth that fid2path() will search.
377  * This is limited only because we want to store the fids for
378  * historical path lookup purposes.
379  */
380 #define MAX_PATH_DEPTH 100
381
382 /** mdd_path() lookup structure. */
383 struct path_lookup_info {
384         __u64                pli_recno;        /**< history point */
385         __u64                pli_currec;       /**< current record */
386         struct lu_fid        pli_fid;
387         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
388         struct mdd_object   *pli_mdd_obj;
389         char                *pli_path;         /**< full path */
390         int                  pli_pathlen;
391         int                  pli_linkno;       /**< which hardlink to follow */
392         int                  pli_fidcount;     /**< number of \a pli_fids */
393 };
394
395 static int mdd_path_current(const struct lu_env *env,
396                             struct path_lookup_info *pli)
397 {
398         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
399         struct mdd_object *mdd_obj;
400         struct lu_buf     *buf = NULL;
401         struct link_ea_header *leh;
402         struct link_ea_entry  *lee;
403         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
404         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
405         char *ptr;
406         int reclen;
407         int rc;
408         ENTRY;
409
410         ptr = pli->pli_path + pli->pli_pathlen - 1;
411         *ptr = 0;
412         --ptr;
413         pli->pli_fidcount = 0;
414         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
415
416         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
417                 mdd_obj = mdd_object_find(env, mdd,
418                                           &pli->pli_fids[pli->pli_fidcount]);
419                 if (mdd_obj == NULL)
420                         GOTO(out, rc = -EREMOTE);
421                 if (IS_ERR(mdd_obj))
422                         GOTO(out, rc = PTR_ERR(mdd_obj));
423                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
424                 if (rc <= 0) {
425                         mdd_object_put(env, mdd_obj);
426                         if (rc == -1)
427                                 rc = -EREMOTE;
428                         else if (rc == 0)
429                                 /* Do I need to error out here? */
430                                 rc = -ENOENT;
431                         GOTO(out, rc);
432                 }
433
434                 /* Get parent fid and object name */
435                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
436                 buf = mdd_links_get(env, mdd_obj);
437                 mdd_read_unlock(env, mdd_obj);
438                 mdd_object_put(env, mdd_obj);
439                 if (IS_ERR(buf))
440                         GOTO(out, rc = PTR_ERR(buf));
441
442                 leh = buf->lb_buf;
443                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
444                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
445
446                 /* If set, use link #linkno for path lookup, otherwise use
447                    link #0.  Only do this for the final path element. */
448                 if ((pli->pli_fidcount == 0) &&
449                     (pli->pli_linkno < leh->leh_reccount)) {
450                         int count;
451                         for (count = 0; count < pli->pli_linkno; count++) {
452                                 lee = (struct link_ea_entry *)
453                                      ((char *)lee + reclen);
454                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
455                         }
456                         if (pli->pli_linkno < leh->leh_reccount - 1)
457                                 /* indicate to user there are more links */
458                                 pli->pli_linkno++;
459                 }
460
461                 /* Pack the name in the end of the buffer */
462                 ptr -= tmpname->ln_namelen;
463                 if (ptr - 1 <= pli->pli_path)
464                         GOTO(out, rc = -EOVERFLOW);
465                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
466                 *(--ptr) = '/';
467
468                 /* Store the parent fid for historic lookup */
469                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
470                         GOTO(out, rc = -EOVERFLOW);
471                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
472         }
473
474         /* Verify that our path hasn't changed since we started the lookup.
475            Record the current index, and verify the path resolves to the
476            same fid. If it does, then the path is correct as of this index. */
477         cfs_spin_lock(&mdd->mdd_cl.mc_lock);
478         pli->pli_currec = mdd->mdd_cl.mc_index;
479         cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
480         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
481         if (rc) {
482                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
483                 GOTO (out, rc = -EAGAIN);
484         }
485         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
486                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
487                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
488                        PFID(&pli->pli_fid));
489                 GOTO(out, rc = -EAGAIN);
490         }
491         ptr++; /* skip leading / */
492         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
493
494         EXIT;
495 out:
496         if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
497                 /* if we vmalloced a large buffer drop it */
498                 mdd_buf_put(buf);
499
500         return rc;
501 }
502
503 static int mdd_path_historic(const struct lu_env *env,
504                              struct path_lookup_info *pli)
505 {
506         return 0;
507 }
508
509 /* Returns the full path to this fid, as of changelog record recno. */
510 static int mdd_path(const struct lu_env *env, struct md_object *obj,
511                     char *path, int pathlen, __u64 *recno, int *linkno)
512 {
513         struct path_lookup_info *pli;
514         int tries = 3;
515         int rc = -EAGAIN;
516         ENTRY;
517
518         if (pathlen < 3)
519                 RETURN(-EOVERFLOW);
520
521         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
522                 path[0] = '\0';
523                 RETURN(0);
524         }
525
526         OBD_ALLOC_PTR(pli);
527         if (pli == NULL)
528                 RETURN(-ENOMEM);
529
530         pli->pli_mdd_obj = md2mdd_obj(obj);
531         pli->pli_recno = *recno;
532         pli->pli_path = path;
533         pli->pli_pathlen = pathlen;
534         pli->pli_linkno = *linkno;
535
536         /* Retry multiple times in case file is being moved */
537         while (tries-- && rc == -EAGAIN)
538                 rc = mdd_path_current(env, pli);
539
540         /* For historical path lookup, the current links may not have existed
541          * at "recno" time.  We must switch over to earlier links/parents
542          * by using the changelog records.  If the earlier parent doesn't
543          * exist, we must search back through the changelog to reconstruct
544          * its parents, then check if it exists, etc.
545          * We may ignore this problem for the initial implementation and
546          * state that an "original" hardlink must still exist for us to find
547          * historic path name. */
548         if (pli->pli_recno != -1) {
549                 rc = mdd_path_historic(env, pli);
550         } else {
551                 *recno = pli->pli_currec;
552                 /* Return next link index to caller */
553                 *linkno = pli->pli_linkno;
554         }
555
556         OBD_FREE_PTR(pli);
557
558         RETURN (rc);
559 }
560
561 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
562 {
563         struct lu_attr *la = &mdd_env_info(env)->mti_la;
564         int rc;
565
566         ENTRY;
567         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
568         if (rc == 0) {
569                 mdd_flags_xlate(obj, la->la_flags);
570         }
571         RETURN(rc);
572 }
573
574 /* get only inode attributes */
575 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
576                   struct md_attr *ma)
577 {
578         int rc = 0;
579         ENTRY;
580
581         if (ma->ma_valid & MA_INODE)
582                 RETURN(0);
583
584         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
585                           mdd_object_capa(env, mdd_obj));
586         if (rc == 0)
587                 ma->ma_valid |= MA_INODE;
588         RETURN(rc);
589 }
590
591 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
592 {
593         struct lov_desc *ldesc;
594         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
595         struct lov_user_md *lum = (struct lov_user_md*)lmm;
596         ENTRY;
597
598         if (!lum)
599                 RETURN(0);
600
601         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
602         LASSERT(ldesc != NULL);
603
604         lum->lmm_magic = LOV_MAGIC_V1;
605         lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
606         lum->lmm_pattern = ldesc->ld_pattern;
607         lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
608         lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
609         lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
610
611         RETURN(sizeof(*lum));
612 }
613
614 static int is_rootdir(struct mdd_object *mdd_obj)
615 {
616         const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
617         const struct lu_fid *fid = mdo2fid(mdd_obj);
618
619         return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
620 }
621
622 int mdd_big_lmm_get(const struct lu_env *env, struct mdd_object *obj,
623                     struct md_attr *ma)
624 {
625         struct mdd_thread_info *info = mdd_env_info(env);
626         int                     size;
627         int                     rc   = -EINVAL;
628         ENTRY;
629
630         LASSERT(info != NULL);
631         LASSERT(ma->ma_big_lmm_used == 0);
632
633         if (ma->ma_lmm_size == 0) {
634                 CERROR("No buffer to hold %s xattr of object "DFID"\n",
635                        XATTR_NAME_LOV, PFID(mdd_object_fid(obj)));
636                 RETURN(rc);
637         }
638
639         rc = mdo_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
640                            mdd_object_capa(env, obj));
641         if (rc < 0)
642                 RETURN(rc);
643
644         /* big_lmm may need to grow */
645         size = rc;
646         mdd_max_lmm_buffer(env, size);
647         if (info->mti_max_lmm == NULL)
648                 RETURN(-ENOMEM);
649
650         LASSERT(info->mti_max_lmm_size >= size);
651         rc = mdd_get_md(env, obj, info->mti_max_lmm, &size,
652                         XATTR_NAME_LOV);
653         if (rc < 0)
654                 RETURN(rc);
655
656         ma->ma_big_lmm_used = 1;
657         ma->ma_valid |= MA_LOV;
658         ma->ma_lmm = info->mti_max_lmm;
659         ma->ma_lmm_size = size;
660         LASSERT(size == rc);
661         RETURN(rc);
662 }
663
664 /* get lov EA only */
665 static int __mdd_lmm_get(const struct lu_env *env,
666                          struct mdd_object *mdd_obj, struct md_attr *ma)
667 {
668         int rc;
669         ENTRY;
670
671         if (ma->ma_valid & MA_LOV)
672                 RETURN(0);
673
674         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
675                         XATTR_NAME_LOV);
676         if (rc == -ERANGE)
677                 rc = mdd_big_lmm_get(env, mdd_obj, ma);
678         else if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
679                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
680
681         if (rc > 0) {
682                 ma->ma_lmm_size = rc;
683                 ma->ma_layout_gen = ma->ma_lmm->lmm_layout_gen;
684                 ma->ma_valid |= MA_LOV | MA_LAY_GEN;
685                 rc = 0;
686         }
687         RETURN(rc);
688 }
689
690 /* get the first parent fid from link EA */
691 static int mdd_pfid_get(const struct lu_env *env,
692                         struct mdd_object *mdd_obj, struct md_attr *ma)
693 {
694         struct lu_buf *buf;
695         struct link_ea_header *leh;
696         struct link_ea_entry *lee;
697         struct lu_fid *pfid = &ma->ma_pfid;
698         ENTRY;
699
700         if (ma->ma_valid & MA_PFID)
701                 RETURN(0);
702
703         buf = mdd_links_get(env, mdd_obj);
704         if (IS_ERR(buf))
705                 RETURN(PTR_ERR(buf));
706
707         leh = buf->lb_buf;
708         lee = (struct link_ea_entry *)(leh + 1);
709         memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
710         fid_be_to_cpu(pfid, pfid);
711         ma->ma_valid |= MA_PFID;
712         if (buf->lb_len > OBD_ALLOC_BIG)
713                 /* if we vmalloced a large buffer drop it */
714                 mdd_buf_put(buf);
715         RETURN(0);
716 }
717
718 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
719                        struct md_attr *ma)
720 {
721         int rc;
722         ENTRY;
723
724         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
725         rc = __mdd_lmm_get(env, mdd_obj, ma);
726         mdd_read_unlock(env, mdd_obj);
727         RETURN(rc);
728 }
729
730 /* get lmv EA only*/
731 static int __mdd_lmv_get(const struct lu_env *env,
732                          struct mdd_object *mdd_obj, struct md_attr *ma)
733 {
734         int rc;
735         ENTRY;
736
737         if (ma->ma_valid & MA_LMV)
738                 RETURN(0);
739
740         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
741                         XATTR_NAME_LMV);
742         if (rc > 0) {
743                 ma->ma_valid |= MA_LMV;
744                 rc = 0;
745         }
746         RETURN(rc);
747 }
748
749 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
750                          struct md_attr *ma)
751 {
752         struct mdd_thread_info *info = mdd_env_info(env);
753         struct lustre_mdt_attrs *lma =
754                                  (struct lustre_mdt_attrs *)info->mti_xattr_buf;
755         int lma_size;
756         int rc;
757         ENTRY;
758
759         /* If all needed data are already valid, nothing to do */
760         if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
761             (ma->ma_need & (MA_HSM | MA_SOM)))
762                 RETURN(0);
763
764         /* Read LMA from disk EA */
765         lma_size = sizeof(info->mti_xattr_buf);
766         rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
767         if (rc <= 0)
768                 RETURN(rc);
769
770         /* Useless to check LMA incompatibility because this is already done in
771          * osd_ea_fid_get(), and this will fail long before this code is
772          * called.
773          * So, if we are here, LMA is compatible.
774          */
775
776         lustre_lma_swab(lma);
777
778         /* Swab and copy LMA */
779         if (ma->ma_need & MA_HSM) {
780                 if (lma->lma_compat & LMAC_HSM)
781                         ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
782                 else
783                         ma->ma_hsm.mh_flags = 0;
784                 ma->ma_valid |= MA_HSM;
785         }
786
787         /* Copy SOM */
788         if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
789                 LASSERT(ma->ma_som != NULL);
790                 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
791                 ma->ma_som->msd_size    = lma->lma_som_size;
792                 ma->ma_som->msd_blocks  = lma->lma_som_blocks;
793                 ma->ma_som->msd_mountid = lma->lma_som_mountid;
794                 ma->ma_valid |= MA_SOM;
795         }
796
797         RETURN(0);
798 }
799
800 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
801                                  struct md_attr *ma)
802 {
803         int rc = 0;
804         ENTRY;
805
806         if (ma->ma_need & MA_INODE)
807                 rc = mdd_iattr_get(env, mdd_obj, ma);
808
809         if (rc == 0 && ma->ma_need & MA_LOV) {
810                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
811                     S_ISDIR(mdd_object_type(mdd_obj)))
812                         rc = __mdd_lmm_get(env, mdd_obj, ma);
813         }
814         if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
815                 if (S_ISREG(mdd_object_type(mdd_obj)))
816                         rc = mdd_pfid_get(env, mdd_obj, ma);
817         }
818         if (rc == 0 && ma->ma_need & MA_LMV) {
819                 if (S_ISDIR(mdd_object_type(mdd_obj)))
820                         rc = __mdd_lmv_get(env, mdd_obj, ma);
821         }
822         if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
823                 if (S_ISREG(mdd_object_type(mdd_obj)))
824                         rc = __mdd_lma_get(env, mdd_obj, ma);
825         }
826 #ifdef CONFIG_FS_POSIX_ACL
827         if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
828                 if (S_ISDIR(mdd_object_type(mdd_obj)))
829                         rc = mdd_def_acl_get(env, mdd_obj, ma);
830         }
831 #endif
832         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
833                rc, ma->ma_valid, ma->ma_lmm);
834         RETURN(rc);
835 }
836
837 int mdd_attr_get_internal_locked(const struct lu_env *env,
838                                  struct mdd_object *mdd_obj, struct md_attr *ma)
839 {
840         int rc;
841         int needlock = ma->ma_need &
842                        (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
843
844         if (needlock)
845                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
846         rc = mdd_attr_get_internal(env, mdd_obj, ma);
847         if (needlock)
848                 mdd_read_unlock(env, mdd_obj);
849         return rc;
850 }
851
852 /*
853  * No permission check is needed.
854  */
855 int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
856                  struct md_attr *ma)
857 {
858         struct mdd_object *mdd_obj = md2mdd_obj(obj);
859         int                rc;
860
861         ENTRY;
862         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
863         RETURN(rc);
864 }
865
866 /*
867  * No permission check is needed.
868  */
869 static int mdd_xattr_get(const struct lu_env *env,
870                          struct md_object *obj, struct lu_buf *buf,
871                          const char *name)
872 {
873         struct mdd_object *mdd_obj = md2mdd_obj(obj);
874         int rc;
875
876         ENTRY;
877
878         if (mdd_object_exists(mdd_obj) == 0) {
879                 CERROR("%s: object "DFID" not found: rc = -2\n",
880                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
881                 return -ENOENT;
882         }
883
884         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
885         rc = mdo_xattr_get(env, mdd_obj, buf, name,
886                            mdd_object_capa(env, mdd_obj));
887         mdd_read_unlock(env, mdd_obj);
888
889         RETURN(rc);
890 }
891
892 /*
893  * Permission check is done when open,
894  * no need check again.
895  */
896 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
897                         struct lu_buf *buf)
898 {
899         struct mdd_object *mdd_obj = md2mdd_obj(obj);
900         struct dt_object  *next;
901         loff_t             pos = 0;
902         int                rc;
903         ENTRY;
904
905         if (mdd_object_exists(mdd_obj) == 0) {
906                 CERROR("%s: object "DFID" not found: rc = -2\n",
907                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
908                 return -ENOENT;
909         }
910
911         next = mdd_object_child(mdd_obj);
912         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
913         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
914                                          mdd_object_capa(env, mdd_obj));
915         mdd_read_unlock(env, mdd_obj);
916         RETURN(rc);
917 }
918
919 /*
920  * No permission check is needed.
921  */
922 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
923                           struct lu_buf *buf)
924 {
925         struct mdd_object *mdd_obj = md2mdd_obj(obj);
926         int rc;
927
928         ENTRY;
929
930         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
931         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
932         mdd_read_unlock(env, mdd_obj);
933
934         RETURN(rc);
935 }
936
937 int mdd_declare_object_create_internal(const struct lu_env *env,
938                                        struct mdd_object *p,
939                                        struct mdd_object *c,
940                                        struct md_attr *ma,
941                                        struct thandle *handle,
942                                        const struct md_op_spec *spec)
943 {
944         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
945         const struct dt_index_features *feat = spec->sp_feat;
946         int rc;
947         ENTRY;
948
949         if (feat != &dt_directory_features && feat != NULL)
950                 dof->dof_type = DFT_INDEX;
951         else
952                 dof->dof_type = dt_mode_to_dft(ma->ma_attr.la_mode);
953
954         dof->u.dof_idx.di_feat = feat;
955
956         rc = mdo_declare_create_obj(env, c, &ma->ma_attr, NULL, dof, handle);
957
958         RETURN(rc);
959 }
960
961 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
962                                struct mdd_object *c, struct md_attr *ma,
963                                struct thandle *handle,
964                                const struct md_op_spec *spec)
965 {
966         struct lu_attr *attr = &ma->ma_attr;
967         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
968         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
969         const struct dt_index_features *feat = spec->sp_feat;
970         int rc;
971         ENTRY;
972
973         if (!mdd_object_exists(c)) {
974                 struct dt_object *next = mdd_object_child(c);
975                 LASSERT(next);
976
977                 if (feat != &dt_directory_features && feat != NULL)
978                         dof->dof_type = DFT_INDEX;
979                 else
980                         dof->dof_type = dt_mode_to_dft(attr->la_mode);
981
982                 dof->u.dof_idx.di_feat = feat;
983
984                 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
985                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
986         } else
987                 rc = -EEXIST;
988
989         RETURN(rc);
990 }
991
992 /**
993  * Make sure the ctime is increased only.
994  */
995 static inline int mdd_attr_check(const struct lu_env *env,
996                                  struct mdd_object *obj,
997                                  struct lu_attr *attr)
998 {
999         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1000         int rc;
1001         ENTRY;
1002
1003         if (attr->la_valid & LA_CTIME) {
1004                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1005                 if (rc)
1006                         RETURN(rc);
1007
1008                 if (attr->la_ctime < tmp_la->la_ctime)
1009                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
1010                 else if (attr->la_valid == LA_CTIME &&
1011                          attr->la_ctime == tmp_la->la_ctime)
1012                         attr->la_valid &= ~LA_CTIME;
1013         }
1014         RETURN(0);
1015 }
1016
1017 int mdd_attr_set_internal(const struct lu_env *env,
1018                           struct mdd_object *obj,
1019                           struct lu_attr *attr,
1020                           struct thandle *handle,
1021                           int needacl)
1022 {
1023         int rc;
1024         ENTRY;
1025
1026         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
1027 #ifdef CONFIG_FS_POSIX_ACL
1028         if (!rc && (attr->la_valid & LA_MODE) && needacl)
1029                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
1030 #endif
1031         RETURN(rc);
1032 }
1033
1034 int mdd_attr_check_set_internal(const struct lu_env *env,
1035                                 struct mdd_object *obj,
1036                                 struct lu_attr *attr,
1037                                 struct thandle *handle,
1038                                 int needacl)
1039 {
1040         int rc;
1041         ENTRY;
1042
1043         rc = mdd_attr_check(env, obj, attr);
1044         if (rc)
1045                 RETURN(rc);
1046
1047         if (attr->la_valid)
1048                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1049         RETURN(rc);
1050 }
1051
1052 static int mdd_attr_set_internal_locked(const struct lu_env *env,
1053                                         struct mdd_object *obj,
1054                                         struct lu_attr *attr,
1055                                         struct thandle *handle,
1056                                         int needacl)
1057 {
1058         int rc;
1059         ENTRY;
1060
1061         needacl = needacl && (attr->la_valid & LA_MODE);
1062         if (needacl)
1063                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1064         rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1065         if (needacl)
1066                 mdd_write_unlock(env, obj);
1067         RETURN(rc);
1068 }
1069
1070 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
1071                                        struct mdd_object *obj,
1072                                        struct lu_attr *attr,
1073                                        struct thandle *handle,
1074                                        int needacl)
1075 {
1076         int rc;
1077         ENTRY;
1078
1079         needacl = needacl && (attr->la_valid & LA_MODE);
1080         if (needacl)
1081                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1082         rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
1083         if (needacl)
1084                 mdd_write_unlock(env, obj);
1085         RETURN(rc);
1086 }
1087
1088 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
1089                     const struct lu_buf *buf, const char *name,
1090                     int fl, struct thandle *handle)
1091 {
1092         struct lustre_capa *capa = mdd_object_capa(env, obj);
1093         int rc = -EINVAL;
1094         ENTRY;
1095
1096         if (buf->lb_buf && buf->lb_len > 0)
1097                 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1098         else if (buf->lb_buf == NULL && buf->lb_len == 0)
1099                 rc = mdo_xattr_del(env, obj, name, handle, capa);
1100
1101         RETURN(rc);
1102 }
1103
1104 /*
1105  * This gives the same functionality as the code between
1106  * sys_chmod and inode_setattr
1107  * chown_common and inode_setattr
1108  * utimes and inode_setattr
1109  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1110  */
1111 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1112                         struct lu_attr *la, const struct md_attr *ma)
1113 {
1114         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
1115         struct md_ucred  *uc;
1116         int               rc;
1117         ENTRY;
1118
1119         if (!la->la_valid)
1120                 RETURN(0);
1121
1122         /* Do not permit change file type */
1123         if (la->la_valid & LA_TYPE)
1124                 RETURN(-EPERM);
1125
1126         /* They should not be processed by setattr */
1127         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1128                 RETURN(-EPERM);
1129
1130         /* export destroy does not have ->le_ses, but we may want
1131          * to drop LUSTRE_SOM_FL. */
1132         if (!env->le_ses)
1133                 RETURN(0);
1134
1135         uc = md_ucred(env);
1136
1137         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1138         if (rc)
1139                 RETURN(rc);
1140
1141         if (la->la_valid == LA_CTIME) {
1142                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1143                         /* This is only for set ctime when rename's source is
1144                          * on remote MDS. */
1145                         rc = mdd_may_delete(env, NULL, obj,
1146                                             (struct md_attr *)ma, 1, 0);
1147                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1148                         la->la_valid &= ~LA_CTIME;
1149                 RETURN(rc);
1150         }
1151
1152         if (la->la_valid == LA_ATIME) {
1153                 /* This is atime only set for read atime update on close. */
1154                 if (la->la_atime >= tmp_la->la_atime &&
1155                     la->la_atime < (tmp_la->la_atime +
1156                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1157                         la->la_valid &= ~LA_ATIME;
1158                 RETURN(0);
1159         }
1160
1161         /* Check if flags change. */
1162         if (la->la_valid & LA_FLAGS) {
1163                 unsigned int oldflags = 0;
1164                 unsigned int newflags = la->la_flags &
1165                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1166
1167                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1168                     !mdd_capable(uc, CFS_CAP_FOWNER))
1169                         RETURN(-EPERM);
1170
1171                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1172                  * only be changed by the relevant capability. */
1173                 if (mdd_is_immutable(obj))
1174                         oldflags |= LUSTRE_IMMUTABLE_FL;
1175                 if (mdd_is_append(obj))
1176                         oldflags |= LUSTRE_APPEND_FL;
1177                 if ((oldflags ^ newflags) &&
1178                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1179                         RETURN(-EPERM);
1180
1181                 if (!S_ISDIR(tmp_la->la_mode))
1182                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1183         }
1184
1185         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1186             (la->la_valid & ~LA_FLAGS) &&
1187             !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1188                 RETURN(-EPERM);
1189
1190         /* Check for setting the obj time. */
1191         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1192             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1193                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1194                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
1195                         rc = mdd_permission_internal_locked(env, obj, tmp_la,
1196                                                             MAY_WRITE,
1197                                                             MOR_TGT_CHILD);
1198                         if (rc)
1199                                 RETURN(rc);
1200                 }
1201         }
1202
1203         if (la->la_valid & LA_KILL_SUID) {
1204                 la->la_valid &= ~LA_KILL_SUID;
1205                 if ((tmp_la->la_mode & S_ISUID) &&
1206                     !(la->la_valid & LA_MODE)) {
1207                         la->la_mode = tmp_la->la_mode;
1208                         la->la_valid |= LA_MODE;
1209                 }
1210                 la->la_mode &= ~S_ISUID;
1211         }
1212
1213         if (la->la_valid & LA_KILL_SGID) {
1214                 la->la_valid &= ~LA_KILL_SGID;
1215                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1216                                         (S_ISGID | S_IXGRP)) &&
1217                     !(la->la_valid & LA_MODE)) {
1218                         la->la_mode = tmp_la->la_mode;
1219                         la->la_valid |= LA_MODE;
1220                 }
1221                 la->la_mode &= ~S_ISGID;
1222         }
1223
1224         /* Make sure a caller can chmod. */
1225         if (la->la_valid & LA_MODE) {
1226                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1227                     (uc->mu_fsuid != tmp_la->la_uid) &&
1228                     !mdd_capable(uc, CFS_CAP_FOWNER))
1229                         RETURN(-EPERM);
1230
1231                 if (la->la_mode == (cfs_umode_t) -1)
1232                         la->la_mode = tmp_la->la_mode;
1233                 else
1234                         la->la_mode = (la->la_mode & S_IALLUGO) |
1235                                       (tmp_la->la_mode & ~S_IALLUGO);
1236
1237                 /* Also check the setgid bit! */
1238                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1239                                        la->la_gid : tmp_la->la_gid) &&
1240                     !mdd_capable(uc, CFS_CAP_FSETID))
1241                         la->la_mode &= ~S_ISGID;
1242         } else {
1243                la->la_mode = tmp_la->la_mode;
1244         }
1245
1246         /* Make sure a caller can chown. */
1247         if (la->la_valid & LA_UID) {
1248                 if (la->la_uid == (uid_t) -1)
1249                         la->la_uid = tmp_la->la_uid;
1250                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1251                     (la->la_uid != tmp_la->la_uid)) &&
1252                     !mdd_capable(uc, CFS_CAP_CHOWN))
1253                         RETURN(-EPERM);
1254
1255                 /* If the user or group of a non-directory has been
1256                  * changed by a non-root user, remove the setuid bit.
1257                  * 19981026 David C Niemi <niemi@tux.org>
1258                  *
1259                  * Changed this to apply to all users, including root,
1260                  * to avoid some races. This is the behavior we had in
1261                  * 2.0. The check for non-root was definitely wrong
1262                  * for 2.2 anyway, as it should have been using
1263                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
1264                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1265                     !S_ISDIR(tmp_la->la_mode)) {
1266                         la->la_mode &= ~S_ISUID;
1267                         la->la_valid |= LA_MODE;
1268                 }
1269         }
1270
1271         /* Make sure caller can chgrp. */
1272         if (la->la_valid & LA_GID) {
1273                 if (la->la_gid == (gid_t) -1)
1274                         la->la_gid = tmp_la->la_gid;
1275                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1276                     ((la->la_gid != tmp_la->la_gid) &&
1277                     !lustre_in_group_p(uc, la->la_gid))) &&
1278                     !mdd_capable(uc, CFS_CAP_CHOWN))
1279                         RETURN(-EPERM);
1280
1281                 /* Likewise, if the user or group of a non-directory
1282                  * has been changed by a non-root user, remove the
1283                  * setgid bit UNLESS there is no group execute bit
1284                  * (this would be a file marked for mandatory
1285                  * locking).  19981026 David C Niemi <niemi@tux.org>
1286                  *
1287                  * Removed the fsuid check (see the comment above) --
1288                  * 19990830 SD. */
1289                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1290                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1291                         la->la_mode &= ~S_ISGID;
1292                         la->la_valid |= LA_MODE;
1293                 }
1294         }
1295
1296         /* For both Size-on-MDS case and truncate case,
1297          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1298          * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1299          * For SOM case, it is true, the MAY_WRITE perm has been checked
1300          * when open, no need check again. For truncate case, it is false,
1301          * the MAY_WRITE perm should be checked here. */
1302         if (ma->ma_attr_flags & MDS_SOM) {
1303                 /* For the "Size-on-MDS" setattr update, merge coming
1304                  * attributes with the set in the inode. BUG 10641 */
1305                 if ((la->la_valid & LA_ATIME) &&
1306                     (la->la_atime <= tmp_la->la_atime))
1307                         la->la_valid &= ~LA_ATIME;
1308
1309                 /* OST attributes do not have a priority over MDS attributes,
1310                  * so drop times if ctime is equal. */
1311                 if ((la->la_valid & LA_CTIME) &&
1312                     (la->la_ctime <= tmp_la->la_ctime))
1313                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1314         } else {
1315                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1316                         if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1317                               (uc->mu_fsuid == tmp_la->la_uid)) &&
1318                             !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1319                                 rc = mdd_permission_internal_locked(env, obj,
1320                                                             tmp_la, MAY_WRITE,
1321                                                             MOR_TGT_CHILD);
1322                                 if (rc)
1323                                         RETURN(rc);
1324                         }
1325                 }
1326                 if (la->la_valid & LA_CTIME) {
1327                         /* The pure setattr, it has the priority over what is
1328                          * already set, do not drop it if ctime is equal. */
1329                         if (la->la_ctime < tmp_la->la_ctime)
1330                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1331                                                   LA_CTIME);
1332                 }
1333         }
1334
1335         RETURN(0);
1336 }
1337
1338 /** Store a data change changelog record
1339  * If this fails, we must fail the whole transaction; we don't
1340  * want the change to commit without the log entry.
1341  * \param mdd_obj - mdd_object of change
1342  * \param handle - transacion handle
1343  */
1344 static int mdd_changelog_data_store(const struct lu_env     *env,
1345                                     struct mdd_device       *mdd,
1346                                     enum changelog_rec_type type,
1347                                     int                     flags,
1348                                     struct mdd_object       *mdd_obj,
1349                                     struct thandle          *handle)
1350 {
1351         const struct lu_fid *tfid = mdo2fid(mdd_obj);
1352         struct llog_changelog_rec *rec;
1353         struct thandle *th = NULL;
1354         struct lu_buf *buf;
1355         int reclen;
1356         int rc;
1357
1358         /* Not recording */
1359         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1360                 RETURN(0);
1361         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1362                 RETURN(0);
1363
1364         LASSERT(mdd_obj != NULL);
1365         LASSERT(handle != NULL);
1366
1367         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1368             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1369                 /* Don't need multiple updates in this log */
1370                 /* Don't check under lock - no big deal if we get an extra
1371                    entry */
1372                 RETURN(0);
1373         }
1374
1375         reclen = llog_data_len(sizeof(*rec));
1376         buf = mdd_buf_alloc(env, reclen);
1377         if (buf->lb_buf == NULL)
1378                 RETURN(-ENOMEM);
1379         rec = (struct llog_changelog_rec *)buf->lb_buf;
1380
1381         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1382         rec->cr.cr_type = (__u32)type;
1383         rec->cr.cr_tfid = *tfid;
1384         rec->cr.cr_namelen = 0;
1385         mdd_obj->mod_cltime = cfs_time_current_64();
1386
1387         rc = mdd_changelog_llog_write(mdd, rec, handle ? : th);
1388
1389         if (th)
1390                 mdd_trans_stop(env, mdd, rc, th);
1391
1392         if (rc < 0) {
1393                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1394                        rc, type, PFID(tfid));
1395                 return -EFAULT;
1396         }
1397
1398         return 0;
1399 }
1400
1401 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1402                   int flags, struct md_object *obj)
1403 {
1404         struct thandle *handle;
1405         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1406         struct mdd_device *mdd = mdo2mdd(obj);
1407         int rc;
1408         ENTRY;
1409
1410         handle = mdd_trans_create(env, mdd);
1411         if (IS_ERR(handle))
1412                 return(PTR_ERR(handle));
1413
1414         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1415         if (rc)
1416                 GOTO(stop, rc);
1417
1418         rc = mdd_trans_start(env, mdd, handle);
1419         if (rc)
1420                 GOTO(stop, rc);
1421
1422         rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1423                                       handle);
1424
1425 stop:
1426         mdd_trans_stop(env, mdd, rc, handle);
1427
1428         RETURN(rc);
1429 }
1430
1431 /**
1432  * Should be called with write lock held.
1433  *
1434  * \see mdd_lma_set_locked().
1435  */
1436 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1437                        const struct md_attr *ma, struct thandle *handle)
1438 {
1439         struct mdd_thread_info *info = mdd_env_info(env);
1440         struct lu_buf *buf;
1441         struct lustre_mdt_attrs *lma =
1442                                 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1443         int lmasize = sizeof(struct lustre_mdt_attrs);
1444         int rc = 0;
1445
1446         ENTRY;
1447
1448         /* Either HSM or SOM part is not valid, we need to read it before */
1449         if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1450                 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1451                 if (rc <= 0)
1452                         RETURN(rc);
1453
1454                 lustre_lma_swab(lma);
1455         } else {
1456                 memset(lma, 0, lmasize);
1457         }
1458
1459         /* Copy HSM data */
1460         if (ma->ma_valid & MA_HSM) {
1461                 lma->lma_flags  |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1462                 lma->lma_compat |= LMAC_HSM;
1463         }
1464
1465         /* Copy SOM data */
1466         if (ma->ma_valid & MA_SOM) {
1467                 LASSERT(ma->ma_som != NULL);
1468                 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1469                         lma->lma_compat     &= ~LMAC_SOM;
1470                 } else {
1471                         lma->lma_compat     |= LMAC_SOM;
1472                         lma->lma_ioepoch     = ma->ma_som->msd_ioepoch;
1473                         lma->lma_som_size    = ma->ma_som->msd_size;
1474                         lma->lma_som_blocks  = ma->ma_som->msd_blocks;
1475                         lma->lma_som_mountid = ma->ma_som->msd_mountid;
1476                 }
1477         }
1478
1479         /* Copy FID */
1480         memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1481
1482         lustre_lma_swab(lma);
1483         buf = mdd_buf_get(env, lma, lmasize);
1484         rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1485
1486         RETURN(rc);
1487 }
1488
1489 /**
1490  * Save LMA extended attributes with data from \a ma.
1491  *
1492  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1493  * not, LMA EA will be first read from disk, modified and write back.
1494  *
1495  */
1496 static int mdd_lma_set_locked(const struct lu_env *env,
1497                               struct mdd_object *mdd_obj,
1498                               const struct md_attr *ma, struct thandle *handle)
1499 {
1500         int rc;
1501
1502         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1503         rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1504         mdd_write_unlock(env, mdd_obj);
1505         return rc;
1506 }
1507
1508 /* Precedence for choosing record type when multiple
1509  * attributes change: setattr > mtime > ctime > atime
1510  * (ctime changes when mtime does, plus chmod/chown.
1511  * atime and ctime are independent.) */
1512 static int mdd_attr_set_changelog(const struct lu_env *env,
1513                                   struct md_object *obj, struct thandle *handle,
1514                                   __u64 valid)
1515 {
1516         struct mdd_device *mdd = mdo2mdd(obj);
1517         int bits, type = 0;
1518
1519         bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1520         bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1521         bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1522         bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1523         bits = bits & mdd->mdd_cl.mc_mask;
1524         if (bits == 0)
1525                 return 0;
1526
1527         /* The record type is the lowest non-masked set bit */
1528         while (bits && ((bits & 1) == 0)) {
1529                 bits = bits >> 1;
1530                 type++;
1531         }
1532
1533         /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1534         return mdd_changelog_data_store(env, mdd, type, (int)valid,
1535                                         md2mdd_obj(obj), handle);
1536 }
1537
1538 static int mdd_declare_attr_set(const struct lu_env *env,
1539                                 struct mdd_device *mdd,
1540                                 struct mdd_object *obj,
1541                                 const struct md_attr *ma,
1542                                 struct lov_mds_md *lmm,
1543                                 struct thandle *handle)
1544 {
1545         struct lu_buf  *buf = &mdd_env_info(env)->mti_buf;
1546         int             rc, i;
1547
1548         rc = mdo_declare_attr_set(env, obj, &ma->ma_attr, handle);
1549         if (rc)
1550                 return rc;
1551
1552         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1553         if (rc)
1554                 return rc;
1555
1556         if (ma->ma_valid & MA_LOV) {
1557                 buf->lb_buf = NULL;
1558                 buf->lb_len = ma->ma_lmm_size;
1559                 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
1560                                            0, handle);
1561                 if (rc)
1562                         return rc;
1563         }
1564
1565         if (ma->ma_valid & (MA_HSM | MA_SOM)) {
1566                 buf->lb_buf = NULL;
1567                 buf->lb_len = sizeof(struct lustre_mdt_attrs);
1568                 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA,
1569                                            0, handle);
1570                 if (rc)
1571                         return rc;
1572         }
1573
1574 #ifdef CONFIG_FS_POSIX_ACL
1575         if (ma->ma_attr.la_valid & LA_MODE) {
1576                 mdd_read_lock(env, obj, MOR_TGT_CHILD);
1577                 rc = mdo_xattr_get(env, obj, &LU_BUF_NULL,XATTR_NAME_ACL_ACCESS,
1578                                    BYPASS_CAPA);
1579                 mdd_read_unlock(env, obj);
1580                 if (rc == -EOPNOTSUPP || rc == -ENODATA)
1581                         rc = 0;
1582                 else if (rc < 0)
1583                         return rc;
1584
1585                 if (rc != 0) {
1586                         buf->lb_buf = NULL;
1587                         buf->lb_len = rc;
1588                         rc = mdo_declare_xattr_set(env, obj, buf,
1589                                                    XATTR_NAME_ACL_ACCESS, 0,
1590                                                    handle);
1591                         if (rc)
1592                                 return rc;
1593                 }
1594         }
1595 #endif
1596
1597         /* basically the log is the same as in unlink case */
1598         if (lmm) {
1599                 __u16 stripe;
1600
1601                 if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V1 &&
1602                                 le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V3) {
1603                         CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n",
1604                                mdd->mdd_obd_dev->obd_name,
1605                                le32_to_cpu(lmm->lmm_magic),
1606                                PFID(lu_object_fid(&obj->mod_obj.mo_lu)));
1607                         return -EINVAL;
1608                 }
1609
1610                 stripe = le16_to_cpu(lmm->lmm_stripe_count);
1611                 if (stripe == LOV_ALL_STRIPES) {
1612                         struct lov_desc *ldesc;
1613
1614                         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
1615                         LASSERT(ldesc != NULL);
1616                         stripe = ldesc->ld_tgt_count;
1617                 }
1618
1619                 for (i = 0; i < stripe; i++) {
1620                         rc = mdd_declare_llog_record(env, mdd,
1621                                         sizeof(struct llog_unlink_rec),
1622                                         handle);
1623                         if (rc)
1624                                 return rc;
1625                 }
1626         }
1627
1628         return rc;
1629 }
1630
1631 /* set attr and LOV EA at once, return updated attr */
1632 int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1633                  const struct md_attr *ma)
1634 {
1635         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1636         struct mdd_device *mdd = mdo2mdd(obj);
1637         struct thandle *handle;
1638         struct lov_mds_md *lmm = NULL;
1639         struct llog_cookie *logcookies = NULL;
1640         int  rc, lmm_size = 0, cookie_size = 0;
1641         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1642 #ifdef HAVE_QUOTA_SUPPORT
1643         struct obd_device *obd = mdd->mdd_obd_dev;
1644         struct mds_obd *mds = &obd->u.mds;
1645         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1646         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1647         int quota_opc = 0, block_count = 0;
1648         int inode_pending[MAXQUOTAS] = { 0, 0 };
1649         int block_pending[MAXQUOTAS] = { 0, 0 };
1650 #endif
1651         ENTRY;
1652
1653         *la_copy = ma->ma_attr;
1654         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1655         if (rc != 0)
1656                 RETURN(rc);
1657
1658         /* setattr on "close" only change atime, or do nothing */
1659         if (ma->ma_valid == MA_INODE &&
1660             ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1661                 RETURN(0);
1662
1663         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1664             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1665                 lmm_size = mdd_lov_mdsize(env, mdd);
1666                 lmm = mdd_max_lmm_get(env, mdd);
1667                 if (lmm == NULL)
1668                         RETURN(-ENOMEM);
1669
1670                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1671                                 XATTR_NAME_LOV);
1672
1673                 if (rc < 0)
1674                         RETURN(rc);
1675         }
1676
1677         handle = mdd_trans_create(env, mdd);
1678         if (IS_ERR(handle))
1679                 RETURN(PTR_ERR(handle));
1680
1681         rc = mdd_declare_attr_set(env, mdd, mdd_obj, ma,
1682                                   lmm_size > 0 ? lmm : NULL, handle);
1683         if (rc)
1684                 GOTO(stop, rc);
1685
1686         rc = mdd_trans_start(env, mdd, handle);
1687         if (rc)
1688                 GOTO(stop, rc);
1689
1690         /* permission changes may require sync operation */
1691         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1692                 handle->th_sync |= !!mdd->mdd_sync_permission;
1693
1694         if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1695                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1696                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1697
1698 #ifdef HAVE_QUOTA_SUPPORT
1699         if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1700                 struct obd_export *exp = md_quota(env)->mq_exp;
1701                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1702
1703                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1704                 if (!rc) {
1705                         quota_opc = FSFILT_OP_SETATTR;
1706                         mdd_quota_wrapper(la_copy, qnids);
1707                         mdd_quota_wrapper(la_tmp, qoids);
1708                         /* get file quota for new owner */
1709                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1710                                         qnids, inode_pending, 1, NULL, 0,
1711                                         NULL, 0);
1712                         block_count = (la_tmp->la_blocks + 7) >> 3;
1713                         if (block_count) {
1714                                 void *data = NULL;
1715                                 mdd_data_get(env, mdd_obj, &data);
1716                                 /* get block quota for new owner */
1717                                 lquota_chkquota(mds_quota_interface_ref, obd,
1718                                                 exp, qnids, block_pending,
1719                                                 block_count, NULL,
1720                                                 LQUOTA_FLAGS_BLK, data, 1);
1721                         }
1722                 }
1723         }
1724 #endif
1725
1726         if (la_copy->la_valid & LA_FLAGS) {
1727                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1728                                                   handle, 1);
1729                 if (rc == 0)
1730                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1731         } else if (la_copy->la_valid) {            /* setattr */
1732                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1733                                                   handle, 1);
1734                 /* journal chown/chgrp in llog, just like unlink */
1735                 if (rc == 0 && lmm_size){
1736                         cookie_size = mdd_lov_cookiesize(env, mdd);
1737                         logcookies = mdd_max_cookie_get(env, mdd);
1738                         if (logcookies == NULL)
1739                                 GOTO(cleanup, rc = -ENOMEM);
1740
1741                         if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1742                                             logcookies, cookie_size) <= 0)
1743                                 logcookies = NULL;
1744                 }
1745         }
1746
1747         if (rc == 0 && ma->ma_valid & MA_LOV) {
1748                 cfs_umode_t mode;
1749
1750                 mode = mdd_object_type(mdd_obj);
1751                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1752                         rc = mdd_lsm_sanity_check(env, mdd_obj);
1753                         if (rc)
1754                                 GOTO(cleanup, rc);
1755
1756                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1757                                             ma->ma_lmm_size, handle, 1);
1758                 }
1759
1760         }
1761         if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1762                 cfs_umode_t mode;
1763
1764                 mode = mdd_object_type(mdd_obj);
1765                 if (S_ISREG(mode))
1766                         rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1767
1768         }
1769 cleanup:
1770         if (rc == 0)
1771                 rc = mdd_attr_set_changelog(env, obj, handle,
1772                                             ma->ma_attr.la_valid);
1773 stop:
1774         mdd_trans_stop(env, mdd, rc, handle);
1775         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1776                 /*set obd attr, if needed*/
1777                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1778                                            logcookies);
1779         }
1780 #ifdef HAVE_QUOTA_SUPPORT
1781         if (quota_opc) {
1782                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1783                                       inode_pending, 0);
1784                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1785                                       block_pending, 1);
1786                 /* Trigger dqrel/dqacq for original owner and new owner.
1787                  * If failed, the next call for lquota_chkquota will
1788                  * process it. */
1789                 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1790                               quota_opc);
1791         }
1792 #endif
1793         RETURN(rc);
1794 }
1795
1796 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1797                       const struct lu_buf *buf, const char *name, int fl,
1798                       struct thandle *handle)
1799 {
1800         int  rc;
1801         ENTRY;
1802
1803         mdd_write_lock(env, obj, MOR_TGT_CHILD);
1804         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1805         mdd_write_unlock(env, obj);
1806
1807         RETURN(rc);
1808 }
1809
1810 static int mdd_xattr_sanity_check(const struct lu_env *env,
1811                                   struct mdd_object *obj)
1812 {
1813         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1814         struct md_ucred *uc     = md_ucred(env);
1815         int rc;
1816         ENTRY;
1817
1818         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1819                 RETURN(-EPERM);
1820
1821         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1822         if (rc)
1823                 RETURN(rc);
1824
1825         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1826             !mdd_capable(uc, CFS_CAP_FOWNER))
1827                 RETURN(-EPERM);
1828
1829         RETURN(rc);
1830 }
1831
1832 static int mdd_declare_xattr_set(const struct lu_env *env,
1833                                  struct mdd_device *mdd,
1834                                  struct mdd_object *obj,
1835                                  const struct lu_buf *buf,
1836                                  const char *name,
1837                                  struct thandle *handle)
1838
1839 {
1840         int rc;
1841
1842         rc = mdo_declare_xattr_set(env, obj, buf, name, 0, handle);
1843         if (rc)
1844                 return rc;
1845
1846         /* Only record user xattr changes */
1847         if ((strncmp("user.", name, 5) == 0))
1848                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1849
1850         return rc;
1851 }
1852
1853 /**
1854  * The caller should guarantee to update the object ctime
1855  * after xattr_set if needed.
1856  */
1857 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1858                          const struct lu_buf *buf, const char *name,
1859                          int fl)
1860 {
1861         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1862         struct mdd_device *mdd = mdo2mdd(obj);
1863         struct thandle *handle;
1864         int  rc;
1865         ENTRY;
1866
1867         if (!strcmp(name, XATTR_NAME_ACL_ACCESS)) {
1868                 rc = mdd_acl_set(env, mdd_obj, buf, fl);
1869                 RETURN(rc);
1870         }
1871
1872         rc = mdd_xattr_sanity_check(env, mdd_obj);
1873         if (rc)
1874                 RETURN(rc);
1875
1876         handle = mdd_trans_create(env, mdd);
1877         if (IS_ERR(handle))
1878                 RETURN(PTR_ERR(handle));
1879
1880         rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, handle);
1881         if (rc)
1882                 GOTO(stop, rc);
1883
1884         rc = mdd_trans_start(env, mdd, handle);
1885         if (rc)
1886                 GOTO(stop, rc);
1887
1888         /* security-replated changes may require sync */
1889         if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1890                 handle->th_sync |= !!mdd->mdd_sync_permission;
1891
1892         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1893
1894         /* Only record system & user xattr changes */
1895         if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1896                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1897                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1898                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1899                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1900                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1901                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1902                                               handle);
1903
1904 stop:
1905         mdd_trans_stop(env, mdd, rc, handle);
1906
1907         RETURN(rc);
1908 }
1909
1910 static int mdd_declare_xattr_del(const struct lu_env *env,
1911                                  struct mdd_device *mdd,
1912                                  struct mdd_object *obj,
1913                                  const char *name,
1914                                  struct thandle *handle)
1915 {
1916         int rc;
1917
1918         rc = mdo_declare_xattr_del(env, obj, name, handle);
1919         if (rc)
1920                 return rc;
1921
1922         /* Only record user xattr changes */
1923         if ((strncmp("user.", name, 5) == 0))
1924                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1925
1926         return rc;
1927 }
1928
1929 /**
1930  * The caller should guarantee to update the object ctime
1931  * after xattr_set if needed.
1932  */
1933 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1934                   const char *name)
1935 {
1936         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1937         struct mdd_device *mdd = mdo2mdd(obj);
1938         struct thandle *handle;
1939         int  rc;
1940         ENTRY;
1941
1942         rc = mdd_xattr_sanity_check(env, mdd_obj);
1943         if (rc)
1944                 RETURN(rc);
1945
1946         handle = mdd_trans_create(env, mdd);
1947         if (IS_ERR(handle))
1948                 RETURN(PTR_ERR(handle));
1949
1950         rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
1951         if (rc)
1952                 GOTO(stop, rc);
1953
1954         rc = mdd_trans_start(env, mdd, handle);
1955         if (rc)
1956                 GOTO(stop, rc);
1957
1958         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1959         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1960                            mdd_object_capa(env, mdd_obj));
1961         mdd_write_unlock(env, mdd_obj);
1962
1963         /* Only record system & user xattr changes */
1964         if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1965                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1966                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1967                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1968                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1969                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1970                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1971                                               handle);
1972
1973 stop:
1974         mdd_trans_stop(env, mdd, rc, handle);
1975
1976         RETURN(rc);
1977 }
1978
1979 void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent,
1980                 struct mdd_object *child, struct lu_attr *attr)
1981 {
1982         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
1983         struct dt_object *np = parent ? mdd_object_child(parent) : NULL;
1984         struct dt_object *nc = mdd_object_child(child);
1985
1986         /* @hint will be initialized by underlying device. */
1987         nc->do_ops->do_ah_init(env, hint, np, nc, attr->la_mode & S_IFMT);
1988 }
1989
1990 /*
1991  * do NOT or the MAY_*'s, you'll get the weakest
1992  */
1993 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1994 {
1995         int res = 0;
1996
1997         /* Sadly, NFSD reopens a file repeatedly during operation, so the
1998          * "acc_mode = 0" allowance for newly-created files isn't honoured.
1999          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
2000          * owner can write to a file even if it is marked readonly to hide
2001          * its brokenness. (bug 5781) */
2002         if (flags & MDS_OPEN_OWNEROVERRIDE) {
2003                 struct md_ucred *uc = md_ucred(env);
2004
2005                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
2006                     (la->la_uid == uc->mu_fsuid))
2007                         return 0;
2008         }
2009
2010         if (flags & FMODE_READ)
2011                 res |= MAY_READ;
2012         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2013                 res |= MAY_WRITE;
2014         if (flags & MDS_FMODE_EXEC)
2015                 res = MAY_EXEC;
2016         return res;
2017 }
2018
2019 static int mdd_open_sanity_check(const struct lu_env *env,
2020                                  struct mdd_object *obj, int flag)
2021 {
2022         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2023         int mode, rc;
2024         ENTRY;
2025
2026         /* EEXIST check */
2027         if (mdd_is_dead_obj(obj))
2028                 RETURN(-ENOENT);
2029
2030         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
2031         if (rc)
2032                RETURN(rc);
2033
2034         if (S_ISLNK(tmp_la->la_mode))
2035                 RETURN(-ELOOP);
2036
2037         mode = accmode(env, tmp_la, flag);
2038
2039         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2040                 RETURN(-EISDIR);
2041
2042         if (!(flag & MDS_OPEN_CREATED)) {
2043                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
2044                 if (rc)
2045                         RETURN(rc);
2046         }
2047
2048         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2049             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2050                 flag &= ~MDS_OPEN_TRUNC;
2051
2052         /* For writing append-only file must open it with append mode. */
2053         if (mdd_is_append(obj)) {
2054                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2055                         RETURN(-EPERM);
2056                 if (flag & MDS_OPEN_TRUNC)
2057                         RETURN(-EPERM);
2058         }
2059
2060 #if 0
2061         /*
2062          * Now, flag -- O_NOATIME does not be packed by client.
2063          */
2064         if (flag & O_NOATIME) {
2065                 struct md_ucred *uc = md_ucred(env);
2066
2067                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2068                     (uc->mu_valid == UCRED_NEW)) &&
2069                     (uc->mu_fsuid != tmp_la->la_uid) &&
2070                     !mdd_capable(uc, CFS_CAP_FOWNER))
2071                         RETURN(-EPERM);
2072         }
2073 #endif
2074
2075         RETURN(0);
2076 }
2077
2078 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2079                     int flags)
2080 {
2081         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2082         int rc = 0;
2083
2084         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2085
2086         rc = mdd_open_sanity_check(env, mdd_obj, flags);
2087         if (rc == 0)
2088                 mdd_obj->mod_count++;
2089
2090         mdd_write_unlock(env, mdd_obj);
2091         return rc;
2092 }
2093
2094 int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
2095                             struct md_attr *ma, struct thandle *handle)
2096 {
2097         int rc;
2098
2099         rc = mdd_declare_unlink_log(env, obj, ma, handle);
2100         if (rc)
2101                 return rc;
2102
2103         return mdo_declare_destroy(env, obj, handle);
2104 }
2105
2106 /* return md_attr back,
2107  * if it is last unlink then return lov ea + llog cookie*/
2108 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2109                     struct md_attr *ma, struct thandle *handle)
2110 {
2111         int rc = 0;
2112         ENTRY;
2113
2114         if (S_ISREG(mdd_object_type(obj))) {
2115                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2116                  * Caller must be ready for that. */
2117                 rc = __mdd_lmm_get(env, obj, ma);
2118                 if ((ma->ma_valid & MA_LOV))
2119                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2120                                             obj, ma);
2121         }
2122
2123         if (rc == 0)
2124                 rc = mdo_destroy(env, obj, handle);
2125
2126         RETURN(rc);
2127 }
2128
2129 static int mdd_declare_close(const struct lu_env *env,
2130                              struct mdd_object *obj,
2131                              struct md_attr *ma,
2132                              struct thandle *handle)
2133 {
2134         int rc;
2135
2136         rc = orph_declare_index_delete(env, obj, handle);
2137         if (rc)
2138                 return rc;
2139
2140         return mdd_declare_object_kill(env, obj, ma, handle);
2141 }
2142
2143 /*
2144  * No permission check is needed.
2145  */
2146 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2147                      struct md_attr *ma, int mode)
2148 {
2149         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2150         struct mdd_device *mdd = mdo2mdd(obj);
2151         struct thandle    *handle = NULL;
2152         int rc;
2153         int is_orphan = 0, reset = 1;
2154
2155 #ifdef HAVE_QUOTA_SUPPORT
2156         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2157         struct mds_obd *mds = &obd->u.mds;
2158         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2159         int quota_opc = 0;
2160 #endif
2161         ENTRY;
2162
2163         if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2164                 mdd_obj->mod_count--;
2165
2166                 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2167                         CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2168                                "list\n", PFID(mdd_object_fid(mdd_obj)));
2169                 RETURN(0);
2170         }
2171
2172         /* check without any lock */
2173         if (mdd_obj->mod_count == 1 &&
2174             (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2175  again:
2176                 handle = mdd_trans_create(env, mdo2mdd(obj));
2177                 if (IS_ERR(handle))
2178                         RETURN(PTR_ERR(handle));
2179
2180                 rc = mdd_declare_close(env, mdd_obj, ma, handle);
2181                 if (rc)
2182                         GOTO(stop, rc);
2183
2184                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
2185                 if (rc)
2186                         GOTO(stop, rc);
2187
2188                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2189                 if (rc)
2190                         GOTO(stop, rc);
2191         }
2192
2193         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2194         if (handle == NULL && mdd_obj->mod_count == 1 &&
2195             (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2196                 mdd_write_unlock(env, mdd_obj);
2197                 goto again;
2198         }
2199
2200         /* release open count */
2201         mdd_obj->mod_count --;
2202
2203         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2204                 /* remove link to object from orphan index */
2205                 LASSERT(handle != NULL);
2206                 rc = __mdd_orphan_del(env, mdd_obj, handle);
2207                 if (rc == 0) {
2208                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2209                                "list, OSS objects to be destroyed.\n",
2210                                PFID(mdd_object_fid(mdd_obj)));
2211                         is_orphan = 1;
2212                 } else {
2213                         CERROR("Object "DFID" can not be deleted from orphan "
2214                                 "list, maybe cause OST objects can not be "
2215                                 "destroyed (err: %d).\n",
2216                                 PFID(mdd_object_fid(mdd_obj)), rc);
2217                         /* If object was not deleted from orphan list, do not
2218                          * destroy OSS objects, which will be done when next
2219                          * recovery. */
2220                         GOTO(out, rc);
2221                 }
2222         }
2223
2224         rc = mdd_iattr_get(env, mdd_obj, ma);
2225         /* Object maybe not in orphan list originally, it is rare case for
2226          * mdd_finish_unlink() failure. */
2227         if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_orphan)) {
2228 #ifdef HAVE_QUOTA_SUPPORT
2229                 if (mds->mds_quota) {
2230                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2231                         mdd_quota_wrapper(&ma->ma_attr, qids);
2232                 }
2233 #endif
2234                 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2235                 if (ma->ma_valid & MA_FLAGS &&
2236                     ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2237                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2238                 } else {
2239                         if (handle == NULL) {
2240                                 handle = mdd_trans_create(env, mdo2mdd(obj));
2241                                 if (IS_ERR(handle))
2242                                         GOTO(out, rc = PTR_ERR(handle));
2243
2244                                 rc = mdd_declare_object_kill(env, mdd_obj, ma,
2245                                                              handle);
2246                                 if (rc)
2247                                         GOTO(out, rc);
2248
2249                                 rc = mdd_declare_changelog_store(env, mdd,
2250                                                                  NULL, handle);
2251                                 if (rc)
2252                                         GOTO(stop, rc);
2253
2254                                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2255                                 if (rc)
2256                                         GOTO(out, rc);
2257                         }
2258
2259                         rc = mdd_object_kill(env, mdd_obj, ma, handle);
2260                         if (rc == 0)
2261                                 reset = 0;
2262                 }
2263
2264                 if (rc != 0)
2265                         CERROR("Error when prepare to delete Object "DFID" , "
2266                                "which will cause OST objects can not be "
2267                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
2268         }
2269         EXIT;
2270
2271 out:
2272         if (reset)
2273                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2274
2275         mdd_write_unlock(env, mdd_obj);
2276
2277         if (rc == 0 &&
2278             (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
2279             !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
2280                 if (handle == NULL) {
2281                         handle = mdd_trans_create(env, mdo2mdd(obj));
2282                         if (IS_ERR(handle))
2283                                 GOTO(stop, rc = IS_ERR(handle));
2284
2285                         rc = mdd_declare_changelog_store(env, mdd, NULL,
2286                                                          handle);
2287                         if (rc)
2288                                 GOTO(stop, rc);
2289
2290                         rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2291                         if (rc)
2292                                 GOTO(stop, rc);
2293                 }
2294
2295                 mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
2296                                          mdd_obj, handle);
2297         }
2298
2299 stop:
2300         if (handle != NULL)
2301                 mdd_trans_stop(env, mdd, rc, handle);
2302 #ifdef HAVE_QUOTA_SUPPORT
2303         if (quota_opc)
2304                 /* Trigger dqrel on the owner of child. If failed,
2305                  * the next call for lquota_chkquota will process it */
2306                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2307                               quota_opc);
2308 #endif
2309         return rc;
2310 }
2311
2312 /*
2313  * Permission check is done when open,
2314  * no need check again.
2315  */
2316 static int mdd_readpage_sanity_check(const struct lu_env *env,
2317                                      struct mdd_object *obj)
2318 {
2319         struct dt_object *next = mdd_object_child(obj);
2320         int rc;
2321         ENTRY;
2322
2323         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2324                 rc = 0;
2325         else
2326                 rc = -ENOTDIR;
2327
2328         RETURN(rc);
2329 }
2330
2331 static int mdd_dir_page_build(const struct lu_env *env, union lu_page *lp,
2332                               int nob, const struct dt_it_ops *iops,
2333                               struct dt_it *it, __u32 attr, void *arg)
2334 {
2335         struct lu_dirpage       *dp = &lp->lp_dir;
2336         void                    *area = dp;
2337         int                      result;
2338         __u64                    hash = 0;
2339         struct lu_dirent        *ent;
2340         struct lu_dirent        *last = NULL;
2341         int                      first = 1;
2342
2343         memset(area, 0, sizeof (*dp));
2344         area += sizeof (*dp);
2345         nob  -= sizeof (*dp);
2346
2347         ent  = area;
2348         do {
2349                 int    len;
2350                 int    recsize;
2351
2352                 len = iops->key_size(env, it);
2353
2354                 /* IAM iterator can return record with zero len. */
2355                 if (len == 0)
2356                         goto next;
2357
2358                 hash = iops->store(env, it);
2359                 if (unlikely(first)) {
2360                         first = 0;
2361                         dp->ldp_hash_start = cpu_to_le64(hash);
2362                 }
2363
2364                 /* calculate max space required for lu_dirent */
2365                 recsize = lu_dirent_calc_size(len, attr);
2366
2367                 if (nob >= recsize) {
2368                         result = iops->rec(env, it, (struct dt_rec *)ent, attr);
2369                         if (result == -ESTALE)
2370                                 goto next;
2371                         if (result != 0)
2372                                 goto out;
2373
2374                         /* osd might not able to pack all attributes,
2375                          * so recheck rec length */
2376                         recsize = le16_to_cpu(ent->lde_reclen);
2377                 } else {
2378                         result = (last != NULL) ? 0 :-EINVAL;
2379                         goto out;
2380                 }
2381                 last = ent;
2382                 ent = (void *)ent + recsize;
2383                 nob -= recsize;
2384
2385 next:
2386                 result = iops->next(env, it);
2387                 if (result == -ESTALE)
2388                         goto next;
2389         } while (result == 0);
2390
2391 out:
2392         dp->ldp_hash_end = cpu_to_le64(hash);
2393         if (last != NULL) {
2394                 if (last->lde_hash == dp->ldp_hash_end)
2395                         dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2396                 last->lde_reclen = 0; /* end mark */
2397         }
2398         if (result > 0)
2399                 /* end of directory */
2400                 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2401         if (result < 0)
2402                 CWARN("build page failed: %d!\n", result);
2403         return result;
2404 }
2405
2406 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2407                  const struct lu_rdpg *rdpg)
2408 {
2409         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2410         int rc;
2411         ENTRY;
2412
2413         if (mdd_object_exists(mdd_obj) == 0) {
2414                 CERROR("%s: object "DFID" not found: rc = -2\n",
2415                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2416                 return -ENOENT;
2417         }
2418
2419         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2420         rc = mdd_readpage_sanity_check(env, mdd_obj);
2421         if (rc)
2422                 GOTO(out_unlock, rc);
2423
2424         if (mdd_is_dead_obj(mdd_obj)) {
2425                 struct page *pg;
2426                 struct lu_dirpage *dp;
2427
2428                 /*
2429                  * According to POSIX, please do not return any entry to client:
2430                  * even dot and dotdot should not be returned.
2431                  */
2432                 CDEBUG(D_INODE, "readdir from dead object: "DFID"\n",
2433                        PFID(mdd_object_fid(mdd_obj)));
2434
2435                 if (rdpg->rp_count <= 0)
2436                         GOTO(out_unlock, rc = -EFAULT);
2437                 LASSERT(rdpg->rp_pages != NULL);
2438
2439                 pg = rdpg->rp_pages[0];
2440                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2441                 memset(dp, 0 , sizeof(struct lu_dirpage));
2442                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2443                 dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
2444                 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2445                 cfs_kunmap(pg);
2446                 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2447         }
2448
2449         rc = dt_index_walk(env, mdd_object_child(mdd_obj), rdpg,
2450                            mdd_dir_page_build, NULL);
2451         if (rc >= 0) {
2452                 struct lu_dirpage       *dp;
2453
2454                 dp = cfs_kmap(rdpg->rp_pages[0]);
2455                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2456                 if (rc == 0) {
2457                         /*
2458                          * No pages were processed, mark this for first page
2459                          * and send back.
2460                          */
2461                         dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2462                         rc = min_t(unsigned int, LU_PAGE_SIZE, rdpg->rp_count);
2463                 }
2464                 cfs_kunmap(rdpg->rp_pages[0]);
2465         }
2466
2467         GOTO(out_unlock, rc);
2468 out_unlock:
2469         mdd_read_unlock(env, mdd_obj);
2470         return rc;
2471 }
2472
2473 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2474 {
2475         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2476
2477         if (mdd_object_exists(mdd_obj) == 0) {
2478                 CERROR("%s: object "DFID" not found: rc = -2\n",
2479                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2480                 return -ENOENT;
2481         }
2482         return dt_object_sync(env, mdd_object_child(mdd_obj));
2483 }
2484
2485 const struct md_object_operations mdd_obj_ops = {
2486         .moo_permission    = mdd_permission,
2487         .moo_attr_get      = mdd_attr_get,
2488         .moo_attr_set      = mdd_attr_set,
2489         .moo_xattr_get     = mdd_xattr_get,
2490         .moo_xattr_set     = mdd_xattr_set,
2491         .moo_xattr_list    = mdd_xattr_list,
2492         .moo_xattr_del     = mdd_xattr_del,
2493         .moo_open          = mdd_open,
2494         .moo_close         = mdd_close,
2495         .moo_readpage      = mdd_readpage,
2496         .moo_readlink      = mdd_readlink,
2497         .moo_changelog     = mdd_changelog,
2498         .moo_capa_get      = mdd_capa_get,
2499         .moo_object_sync   = mdd_object_sync,
2500         .moo_path          = mdd_path,
2501 };