Whamcloud - gitweb
LU-1304 mdd: do not use ma internally
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_object.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <linux/module.h>
46 #include <obd.h>
47 #include <obd_class.h>
48 #include <obd_support.h>
49 #include <lprocfs_status.h>
50 /* fid_be_cpu(), fid_cpu_to_be(). */
51 #include <lustre_fid.h>
52 #include <obd_lov.h>
53
54 #include <lustre_param.h>
55 #include <lustre_mds.h>
56 #include <lustre/lustre_idl.h>
57
58 #include "mdd_internal.h"
59
60 static const struct lu_object_operations mdd_lu_obj_ops;
61 extern cfs_mem_cache_t *mdd_object_kmem;
62
63 static int mdd_xattr_get(const struct lu_env *env,
64                          struct md_object *obj, struct lu_buf *buf,
65                          const char *name);
66
67 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
68                  void **data)
69 {
70         if (mdd_object_exists(obj) == 0) {
71                 CERROR("%s: object "DFID" not found: rc = -2\n",
72                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
73                 return -ENOENT;
74         }
75         mdo_data_get(env, obj, data);
76         return 0;
77 }
78
79 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
80                struct lu_attr *la, struct lustre_capa *capa)
81 {
82         if (mdd_object_exists(obj) == 0) {
83                 CERROR("%s: object "DFID" not found: rc = -2\n",
84                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
85                 return -ENOENT;
86         }
87         return mdo_attr_get(env, obj, la, capa);
88 }
89
90 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
91 {
92         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
93
94         if (flags & LUSTRE_APPEND_FL)
95                 obj->mod_flags |= APPEND_OBJ;
96
97         if (flags & LUSTRE_IMMUTABLE_FL)
98                 obj->mod_flags |= IMMUTE_OBJ;
99 }
100
101 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
102 {
103         struct mdd_thread_info *info;
104
105         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
106         LASSERT(info != NULL);
107         return info;
108 }
109
110 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
111 {
112         struct lu_buf *buf;
113
114         buf = &mdd_env_info(env)->mti_buf;
115         buf->lb_buf = area;
116         buf->lb_len = len;
117         return buf;
118 }
119
120 void mdd_buf_put(struct lu_buf *buf)
121 {
122         if (buf == NULL || buf->lb_buf == NULL)
123                 return;
124         OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
125         buf->lb_buf = NULL;
126         buf->lb_len = 0;
127 }
128
129 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
130                                        const void *area, ssize_t len)
131 {
132         struct lu_buf *buf;
133
134         buf = &mdd_env_info(env)->mti_buf;
135         buf->lb_buf = (void *)area;
136         buf->lb_len = len;
137         return buf;
138 }
139
140 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
141 {
142         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
143
144         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
145                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
146                 buf->lb_buf = NULL;
147         }
148         if (buf->lb_buf == NULL) {
149                 buf->lb_len = len;
150                 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
151                 if (buf->lb_buf == NULL)
152                         buf->lb_len = 0;
153         }
154         return buf;
155 }
156
157 /** Increase the size of the \a mti_big_buf.
158  * preserves old data in buffer
159  * old buffer remains unchanged on error
160  * \retval 0 or -ENOMEM
161  */
162 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
163 {
164         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
165         struct lu_buf buf;
166
167         LASSERT(len >= oldbuf->lb_len);
168         OBD_ALLOC_LARGE(buf.lb_buf, len);
169
170         if (buf.lb_buf == NULL)
171                 return -ENOMEM;
172
173         buf.lb_len = len;
174         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
175
176         OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
177
178         memcpy(oldbuf, &buf, sizeof(buf));
179
180         return 0;
181 }
182
183 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
184                                        struct mdd_device *mdd)
185 {
186         struct mdd_thread_info *mti = mdd_env_info(env);
187         int                     max_cookie_size;
188
189         max_cookie_size = mdd_lov_cookiesize(env, mdd);
190         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
191                 if (mti->mti_max_cookie)
192                         OBD_FREE_LARGE(mti->mti_max_cookie,
193                                        mti->mti_max_cookie_size);
194                 mti->mti_max_cookie = NULL;
195                 mti->mti_max_cookie_size = 0;
196         }
197         if (unlikely(mti->mti_max_cookie == NULL)) {
198                 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
199                 if (likely(mti->mti_max_cookie != NULL))
200                         mti->mti_max_cookie_size = max_cookie_size;
201         }
202         if (likely(mti->mti_max_cookie != NULL))
203                 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
204         return mti->mti_max_cookie;
205 }
206
207 struct lov_mds_md *mdd_max_lmm_buffer(const struct lu_env *env, int size)
208 {
209         struct mdd_thread_info *mti = mdd_env_info(env);
210
211         if (unlikely(mti->mti_max_lmm_size < size)) {
212                 int rsize = size_roundup_power2(size);
213
214                 if (mti->mti_max_lmm_size > 0) {
215                         LASSERT(mti->mti_max_lmm);
216                         OBD_FREE_LARGE(mti->mti_max_lmm,
217                                        mti->mti_max_lmm_size);
218                         mti->mti_max_lmm = NULL;
219                         mti->mti_max_lmm_size = 0;
220                 }
221
222                 OBD_ALLOC_LARGE(mti->mti_max_lmm, rsize);
223                 if (likely(mti->mti_max_lmm != NULL))
224                         mti->mti_max_lmm_size = rsize;
225         }
226         return mti->mti_max_lmm;
227 }
228
229 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
230                                    struct mdd_device *mdd)
231 {
232         int max_lmm_size;
233
234         max_lmm_size = mdd_lov_mdsize(env, mdd);
235         return mdd_max_lmm_buffer(env, max_lmm_size);
236 }
237
238 struct lu_object *mdd_object_alloc(const struct lu_env *env,
239                                    const struct lu_object_header *hdr,
240                                    struct lu_device *d)
241 {
242         struct mdd_object *mdd_obj;
243
244         OBD_SLAB_ALLOC_PTR_GFP(mdd_obj, mdd_object_kmem, CFS_ALLOC_IO);
245         if (mdd_obj != NULL) {
246                 struct lu_object *o;
247
248                 o = mdd2lu_obj(mdd_obj);
249                 lu_object_init(o, NULL, d);
250                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
251                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
252                 mdd_obj->mod_count = 0;
253                 o->lo_ops = &mdd_lu_obj_ops;
254                 return o;
255         } else {
256                 return NULL;
257         }
258 }
259
260 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
261                            const struct lu_object_conf *unused)
262 {
263         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
264         struct mdd_object *mdd_obj = lu2mdd_obj(o);
265         struct lu_object  *below;
266         struct lu_device  *under;
267         ENTRY;
268
269         mdd_obj->mod_cltime = 0;
270         under = &d->mdd_child->dd_lu_dev;
271         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
272         mdd_pdlock_init(mdd_obj);
273         if (below == NULL)
274                 RETURN(-ENOMEM);
275
276         lu_object_add(o, below);
277
278         RETURN(0);
279 }
280
281 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
282 {
283         if (lu_object_exists(o))
284                 return mdd_get_flags(env, lu2mdd_obj(o));
285         else
286                 return 0;
287 }
288
289 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
290 {
291         struct mdd_object *mdd = lu2mdd_obj(o);
292
293         lu_object_fini(o);
294         OBD_SLAB_FREE_PTR(mdd, mdd_object_kmem);
295 }
296
297 static int mdd_object_print(const struct lu_env *env, void *cookie,
298                             lu_printer_t p, const struct lu_object *o)
299 {
300         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
301         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
302                     "valid=%x, cltime="LPU64", flags=%lx)",
303                     mdd, mdd->mod_count, mdd->mod_valid,
304                     mdd->mod_cltime, mdd->mod_flags);
305 }
306
307 static const struct lu_object_operations mdd_lu_obj_ops = {
308         .loo_object_init    = mdd_object_init,
309         .loo_object_start   = mdd_object_start,
310         .loo_object_free    = mdd_object_free,
311         .loo_object_print   = mdd_object_print,
312 };
313
314 struct mdd_object *mdd_object_find(const struct lu_env *env,
315                                    struct mdd_device *d,
316                                    const struct lu_fid *f)
317 {
318         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
319 }
320
321 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
322                         const char *path, struct lu_fid *fid)
323 {
324         struct lu_buf *buf;
325         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
326         struct mdd_object *obj;
327         struct lu_name *lname = &mdd_env_info(env)->mti_name;
328         char *name;
329         int rc = 0;
330         ENTRY;
331
332         /* temp buffer for path element */
333         buf = mdd_buf_alloc(env, PATH_MAX);
334         if (buf->lb_buf == NULL)
335                 RETURN(-ENOMEM);
336
337         lname->ln_name = name = buf->lb_buf;
338         lname->ln_namelen = 0;
339         *f = mdd->mdd_root_fid;
340
341         while(1) {
342                 while (*path == '/')
343                         path++;
344                 if (*path == '\0')
345                         break;
346                 while (*path != '/' && *path != '\0') {
347                         *name = *path;
348                         path++;
349                         name++;
350                         lname->ln_namelen++;
351                 }
352
353                 *name = '\0';
354                 /* find obj corresponding to fid */
355                 obj = mdd_object_find(env, mdd, f);
356                 if (obj == NULL)
357                         GOTO(out, rc = -EREMOTE);
358                 if (IS_ERR(obj))
359                         GOTO(out, rc = PTR_ERR(obj));
360                 /* get child fid from parent and name */
361                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
362                 mdd_object_put(env, obj);
363                 if (rc)
364                         break;
365
366                 name = buf->lb_buf;
367                 lname->ln_namelen = 0;
368         }
369
370         if (!rc)
371                 *fid = *f;
372 out:
373         RETURN(rc);
374 }
375
376 /** The maximum depth that fid2path() will search.
377  * This is limited only because we want to store the fids for
378  * historical path lookup purposes.
379  */
380 #define MAX_PATH_DEPTH 100
381
382 /** mdd_path() lookup structure. */
383 struct path_lookup_info {
384         __u64                pli_recno;        /**< history point */
385         __u64                pli_currec;       /**< current record */
386         struct lu_fid        pli_fid;
387         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
388         struct mdd_object   *pli_mdd_obj;
389         char                *pli_path;         /**< full path */
390         int                  pli_pathlen;
391         int                  pli_linkno;       /**< which hardlink to follow */
392         int                  pli_fidcount;     /**< number of \a pli_fids */
393 };
394
395 static int mdd_path_current(const struct lu_env *env,
396                             struct path_lookup_info *pli)
397 {
398         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
399         struct mdd_object *mdd_obj;
400         struct lu_buf     *buf = NULL;
401         struct link_ea_header *leh;
402         struct link_ea_entry  *lee;
403         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
404         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
405         char *ptr;
406         int reclen;
407         int rc;
408         ENTRY;
409
410         ptr = pli->pli_path + pli->pli_pathlen - 1;
411         *ptr = 0;
412         --ptr;
413         pli->pli_fidcount = 0;
414         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
415
416         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
417                 mdd_obj = mdd_object_find(env, mdd,
418                                           &pli->pli_fids[pli->pli_fidcount]);
419                 if (mdd_obj == NULL)
420                         GOTO(out, rc = -EREMOTE);
421                 if (IS_ERR(mdd_obj))
422                         GOTO(out, rc = PTR_ERR(mdd_obj));
423                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
424                 if (rc <= 0) {
425                         mdd_object_put(env, mdd_obj);
426                         if (rc == -1)
427                                 rc = -EREMOTE;
428                         else if (rc == 0)
429                                 /* Do I need to error out here? */
430                                 rc = -ENOENT;
431                         GOTO(out, rc);
432                 }
433
434                 /* Get parent fid and object name */
435                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
436                 buf = mdd_links_get(env, mdd_obj);
437                 mdd_read_unlock(env, mdd_obj);
438                 mdd_object_put(env, mdd_obj);
439                 if (IS_ERR(buf))
440                         GOTO(out, rc = PTR_ERR(buf));
441
442                 leh = buf->lb_buf;
443                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
444                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
445
446                 /* If set, use link #linkno for path lookup, otherwise use
447                    link #0.  Only do this for the final path element. */
448                 if ((pli->pli_fidcount == 0) &&
449                     (pli->pli_linkno < leh->leh_reccount)) {
450                         int count;
451                         for (count = 0; count < pli->pli_linkno; count++) {
452                                 lee = (struct link_ea_entry *)
453                                      ((char *)lee + reclen);
454                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
455                         }
456                         if (pli->pli_linkno < leh->leh_reccount - 1)
457                                 /* indicate to user there are more links */
458                                 pli->pli_linkno++;
459                 }
460
461                 /* Pack the name in the end of the buffer */
462                 ptr -= tmpname->ln_namelen;
463                 if (ptr - 1 <= pli->pli_path)
464                         GOTO(out, rc = -EOVERFLOW);
465                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
466                 *(--ptr) = '/';
467
468                 /* Store the parent fid for historic lookup */
469                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
470                         GOTO(out, rc = -EOVERFLOW);
471                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
472         }
473
474         /* Verify that our path hasn't changed since we started the lookup.
475            Record the current index, and verify the path resolves to the
476            same fid. If it does, then the path is correct as of this index. */
477         cfs_spin_lock(&mdd->mdd_cl.mc_lock);
478         pli->pli_currec = mdd->mdd_cl.mc_index;
479         cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
480         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
481         if (rc) {
482                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
483                 GOTO (out, rc = -EAGAIN);
484         }
485         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
486                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
487                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
488                        PFID(&pli->pli_fid));
489                 GOTO(out, rc = -EAGAIN);
490         }
491         ptr++; /* skip leading / */
492         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
493
494         EXIT;
495 out:
496         if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
497                 /* if we vmalloced a large buffer drop it */
498                 mdd_buf_put(buf);
499
500         return rc;
501 }
502
503 static int mdd_path_historic(const struct lu_env *env,
504                              struct path_lookup_info *pli)
505 {
506         return 0;
507 }
508
509 /* Returns the full path to this fid, as of changelog record recno. */
510 static int mdd_path(const struct lu_env *env, struct md_object *obj,
511                     char *path, int pathlen, __u64 *recno, int *linkno)
512 {
513         struct path_lookup_info *pli;
514         int tries = 3;
515         int rc = -EAGAIN;
516         ENTRY;
517
518         if (pathlen < 3)
519                 RETURN(-EOVERFLOW);
520
521         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
522                 path[0] = '\0';
523                 RETURN(0);
524         }
525
526         OBD_ALLOC_PTR(pli);
527         if (pli == NULL)
528                 RETURN(-ENOMEM);
529
530         pli->pli_mdd_obj = md2mdd_obj(obj);
531         pli->pli_recno = *recno;
532         pli->pli_path = path;
533         pli->pli_pathlen = pathlen;
534         pli->pli_linkno = *linkno;
535
536         /* Retry multiple times in case file is being moved */
537         while (tries-- && rc == -EAGAIN)
538                 rc = mdd_path_current(env, pli);
539
540         /* For historical path lookup, the current links may not have existed
541          * at "recno" time.  We must switch over to earlier links/parents
542          * by using the changelog records.  If the earlier parent doesn't
543          * exist, we must search back through the changelog to reconstruct
544          * its parents, then check if it exists, etc.
545          * We may ignore this problem for the initial implementation and
546          * state that an "original" hardlink must still exist for us to find
547          * historic path name. */
548         if (pli->pli_recno != -1) {
549                 rc = mdd_path_historic(env, pli);
550         } else {
551                 *recno = pli->pli_currec;
552                 /* Return next link index to caller */
553                 *linkno = pli->pli_linkno;
554         }
555
556         OBD_FREE_PTR(pli);
557
558         RETURN (rc);
559 }
560
561 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
562 {
563         struct lu_attr *la = &mdd_env_info(env)->mti_la;
564         int rc;
565
566         ENTRY;
567         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
568         if (rc == 0) {
569                 mdd_flags_xlate(obj, la->la_flags);
570         }
571         RETURN(rc);
572 }
573
574 /* get only inode attributes */
575 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
576                   struct md_attr *ma)
577 {
578         int rc = 0;
579         ENTRY;
580
581         if (ma->ma_valid & MA_INODE)
582                 RETURN(0);
583
584         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
585                           mdd_object_capa(env, mdd_obj));
586         if (rc == 0)
587                 ma->ma_valid |= MA_INODE;
588         RETURN(rc);
589 }
590
591 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
592 {
593         struct lov_desc *ldesc;
594         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
595         struct lov_user_md *lum = (struct lov_user_md*)lmm;
596         ENTRY;
597
598         if (!lum)
599                 RETURN(0);
600
601         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
602         LASSERT(ldesc != NULL);
603
604         lum->lmm_magic = LOV_MAGIC_V1;
605         lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
606         lum->lmm_pattern = ldesc->ld_pattern;
607         lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
608         lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
609         lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
610
611         RETURN(sizeof(*lum));
612 }
613
614 static int is_rootdir(struct mdd_object *mdd_obj)
615 {
616         const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
617         const struct lu_fid *fid = mdo2fid(mdd_obj);
618
619         return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
620 }
621
622 int mdd_big_lmm_get(const struct lu_env *env, struct mdd_object *obj,
623                     struct md_attr *ma)
624 {
625         struct mdd_thread_info *info = mdd_env_info(env);
626         int                     size;
627         int                     rc   = -EINVAL;
628         ENTRY;
629
630         LASSERT(info != NULL);
631         LASSERT(ma->ma_big_lmm_used == 0);
632
633         if (ma->ma_lmm_size == 0) {
634                 CERROR("No buffer to hold %s xattr of object "DFID"\n",
635                        XATTR_NAME_LOV, PFID(mdd_object_fid(obj)));
636                 RETURN(rc);
637         }
638
639         rc = mdo_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
640                            mdd_object_capa(env, obj));
641         if (rc < 0)
642                 RETURN(rc);
643
644         /* big_lmm may need to grow */
645         size = rc;
646         mdd_max_lmm_buffer(env, size);
647         if (info->mti_max_lmm == NULL)
648                 RETURN(-ENOMEM);
649
650         LASSERT(info->mti_max_lmm_size >= size);
651         rc = mdd_get_md(env, obj, info->mti_max_lmm, &size,
652                         XATTR_NAME_LOV);
653         if (rc < 0)
654                 RETURN(rc);
655
656         ma->ma_big_lmm_used = 1;
657         ma->ma_valid |= MA_LOV;
658         ma->ma_lmm = info->mti_max_lmm;
659         ma->ma_lmm_size = size;
660         LASSERT(size == rc);
661         RETURN(rc);
662 }
663
664 /* get lov EA only */
665 static int __mdd_lmm_get(const struct lu_env *env,
666                          struct mdd_object *mdd_obj, struct md_attr *ma)
667 {
668         int rc;
669         ENTRY;
670
671         if (ma->ma_valid & MA_LOV)
672                 RETURN(0);
673
674         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
675                         XATTR_NAME_LOV);
676         if (rc == -ERANGE)
677                 rc = mdd_big_lmm_get(env, mdd_obj, ma);
678         else if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
679                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
680
681         if (rc > 0) {
682                 ma->ma_lmm_size = rc;
683                 ma->ma_layout_gen = ma->ma_lmm->lmm_layout_gen;
684                 ma->ma_valid |= MA_LOV | MA_LAY_GEN;
685                 rc = 0;
686         }
687         RETURN(rc);
688 }
689
690 /* get the first parent fid from link EA */
691 static int mdd_pfid_get(const struct lu_env *env,
692                         struct mdd_object *mdd_obj, struct md_attr *ma)
693 {
694         struct lu_buf *buf;
695         struct link_ea_header *leh;
696         struct link_ea_entry *lee;
697         struct lu_fid *pfid = &ma->ma_pfid;
698         ENTRY;
699
700         if (ma->ma_valid & MA_PFID)
701                 RETURN(0);
702
703         buf = mdd_links_get(env, mdd_obj);
704         if (IS_ERR(buf))
705                 RETURN(PTR_ERR(buf));
706
707         leh = buf->lb_buf;
708         lee = (struct link_ea_entry *)(leh + 1);
709         memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
710         fid_be_to_cpu(pfid, pfid);
711         ma->ma_valid |= MA_PFID;
712         if (buf->lb_len > OBD_ALLOC_BIG)
713                 /* if we vmalloced a large buffer drop it */
714                 mdd_buf_put(buf);
715         RETURN(0);
716 }
717
718 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
719                        struct md_attr *ma)
720 {
721         int rc;
722         ENTRY;
723
724         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
725         rc = __mdd_lmm_get(env, mdd_obj, ma);
726         mdd_read_unlock(env, mdd_obj);
727         RETURN(rc);
728 }
729
730 /* get lmv EA only*/
731 static int __mdd_lmv_get(const struct lu_env *env,
732                          struct mdd_object *mdd_obj, struct md_attr *ma)
733 {
734         int rc;
735         ENTRY;
736
737         if (ma->ma_valid & MA_LMV)
738                 RETURN(0);
739
740         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
741                         XATTR_NAME_LMV);
742         if (rc > 0) {
743                 ma->ma_valid |= MA_LMV;
744                 rc = 0;
745         }
746         RETURN(rc);
747 }
748
749 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
750                          struct md_attr *ma)
751 {
752         struct mdd_thread_info *info = mdd_env_info(env);
753         struct lustre_mdt_attrs *lma =
754                                  (struct lustre_mdt_attrs *)info->mti_xattr_buf;
755         int lma_size;
756         int rc;
757         ENTRY;
758
759         /* If all needed data are already valid, nothing to do */
760         if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
761             (ma->ma_need & (MA_HSM | MA_SOM)))
762                 RETURN(0);
763
764         /* Read LMA from disk EA */
765         lma_size = sizeof(info->mti_xattr_buf);
766         rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
767         if (rc <= 0)
768                 RETURN(rc);
769
770         /* Useless to check LMA incompatibility because this is already done in
771          * osd_ea_fid_get(), and this will fail long before this code is
772          * called.
773          * So, if we are here, LMA is compatible.
774          */
775
776         lustre_lma_swab(lma);
777
778         /* Swab and copy LMA */
779         if (ma->ma_need & MA_HSM) {
780                 if (lma->lma_compat & LMAC_HSM)
781                         ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
782                 else
783                         ma->ma_hsm.mh_flags = 0;
784                 ma->ma_valid |= MA_HSM;
785         }
786
787         /* Copy SOM */
788         if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
789                 LASSERT(ma->ma_som != NULL);
790                 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
791                 ma->ma_som->msd_size    = lma->lma_som_size;
792                 ma->ma_som->msd_blocks  = lma->lma_som_blocks;
793                 ma->ma_som->msd_mountid = lma->lma_som_mountid;
794                 ma->ma_valid |= MA_SOM;
795         }
796
797         RETURN(0);
798 }
799
800 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
801                                  struct md_attr *ma)
802 {
803         int rc = 0;
804         ENTRY;
805
806         if (ma->ma_need & MA_INODE)
807                 rc = mdd_iattr_get(env, mdd_obj, ma);
808
809         if (rc == 0 && ma->ma_need & MA_LOV) {
810                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
811                     S_ISDIR(mdd_object_type(mdd_obj)))
812                         rc = __mdd_lmm_get(env, mdd_obj, ma);
813         }
814         if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
815                 if (S_ISREG(mdd_object_type(mdd_obj)))
816                         rc = mdd_pfid_get(env, mdd_obj, ma);
817         }
818         if (rc == 0 && ma->ma_need & MA_LMV) {
819                 if (S_ISDIR(mdd_object_type(mdd_obj)))
820                         rc = __mdd_lmv_get(env, mdd_obj, ma);
821         }
822         if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
823                 if (S_ISREG(mdd_object_type(mdd_obj)))
824                         rc = __mdd_lma_get(env, mdd_obj, ma);
825         }
826 #ifdef CONFIG_FS_POSIX_ACL
827         if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
828                 if (S_ISDIR(mdd_object_type(mdd_obj)))
829                         rc = mdd_def_acl_get(env, mdd_obj, ma);
830         }
831 #endif
832         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
833                rc, ma->ma_valid, ma->ma_lmm);
834         RETURN(rc);
835 }
836
837 int mdd_attr_get_internal_locked(const struct lu_env *env,
838                                  struct mdd_object *mdd_obj, struct md_attr *ma)
839 {
840         int rc;
841         int needlock = ma->ma_need &
842                        (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
843
844         if (needlock)
845                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
846         rc = mdd_attr_get_internal(env, mdd_obj, ma);
847         if (needlock)
848                 mdd_read_unlock(env, mdd_obj);
849         return rc;
850 }
851
852 /*
853  * No permission check is needed.
854  */
855 int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
856                  struct md_attr *ma)
857 {
858         struct mdd_object *mdd_obj = md2mdd_obj(obj);
859         int                rc;
860
861         ENTRY;
862         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
863         RETURN(rc);
864 }
865
866 /*
867  * No permission check is needed.
868  */
869 static int mdd_xattr_get(const struct lu_env *env,
870                          struct md_object *obj, struct lu_buf *buf,
871                          const char *name)
872 {
873         struct mdd_object *mdd_obj = md2mdd_obj(obj);
874         int rc;
875
876         ENTRY;
877
878         if (mdd_object_exists(mdd_obj) == 0) {
879                 CERROR("%s: object "DFID" not found: rc = -2\n",
880                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
881                 return -ENOENT;
882         }
883
884         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
885         rc = mdo_xattr_get(env, mdd_obj, buf, name,
886                            mdd_object_capa(env, mdd_obj));
887         mdd_read_unlock(env, mdd_obj);
888
889         RETURN(rc);
890 }
891
892 /*
893  * Permission check is done when open,
894  * no need check again.
895  */
896 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
897                         struct lu_buf *buf)
898 {
899         struct mdd_object *mdd_obj = md2mdd_obj(obj);
900         struct dt_object  *next;
901         loff_t             pos = 0;
902         int                rc;
903         ENTRY;
904
905         if (mdd_object_exists(mdd_obj) == 0) {
906                 CERROR("%s: object "DFID" not found: rc = -2\n",
907                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
908                 return -ENOENT;
909         }
910
911         next = mdd_object_child(mdd_obj);
912         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
913         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
914                                          mdd_object_capa(env, mdd_obj));
915         mdd_read_unlock(env, mdd_obj);
916         RETURN(rc);
917 }
918
919 /*
920  * No permission check is needed.
921  */
922 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
923                           struct lu_buf *buf)
924 {
925         struct mdd_object *mdd_obj = md2mdd_obj(obj);
926         int rc;
927
928         ENTRY;
929
930         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
931         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
932         mdd_read_unlock(env, mdd_obj);
933
934         RETURN(rc);
935 }
936
937 int mdd_declare_object_create_internal(const struct lu_env *env,
938                                        struct mdd_object *p,
939                                        struct mdd_object *c,
940                                        struct lu_attr *attr,
941                                        struct thandle *handle,
942                                        const struct md_op_spec *spec)
943 {
944         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
945         const struct dt_index_features *feat = spec->sp_feat;
946         int rc;
947         ENTRY;
948
949         if (feat != &dt_directory_features && feat != NULL)
950                 dof->dof_type = DFT_INDEX;
951         else
952                 dof->dof_type = dt_mode_to_dft(attr->la_mode);
953
954         dof->u.dof_idx.di_feat = feat;
955
956         rc = mdo_declare_create_obj(env, c, attr, NULL, dof, handle);
957
958         RETURN(rc);
959 }
960
961 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
962                                struct mdd_object *c, struct lu_attr *attr,
963                                struct thandle *handle,
964                                const struct md_op_spec *spec)
965 {
966         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
967         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
968         const struct dt_index_features *feat = spec->sp_feat;
969         int rc;
970         ENTRY;
971
972         if (!mdd_object_exists(c)) {
973                 struct dt_object *next = mdd_object_child(c);
974                 LASSERT(next);
975
976                 if (feat != &dt_directory_features && feat != NULL)
977                         dof->dof_type = DFT_INDEX;
978                 else
979                         dof->dof_type = dt_mode_to_dft(attr->la_mode);
980
981                 dof->u.dof_idx.di_feat = feat;
982
983                 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
984                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
985         } else
986                 rc = -EEXIST;
987
988         RETURN(rc);
989 }
990
991 /**
992  * Make sure the ctime is increased only.
993  */
994 static inline int mdd_attr_check(const struct lu_env *env,
995                                  struct mdd_object *obj,
996                                  struct lu_attr *attr)
997 {
998         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
999         int rc;
1000         ENTRY;
1001
1002         if (attr->la_valid & LA_CTIME) {
1003                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1004                 if (rc)
1005                         RETURN(rc);
1006
1007                 if (attr->la_ctime < tmp_la->la_ctime)
1008                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
1009                 else if (attr->la_valid == LA_CTIME &&
1010                          attr->la_ctime == tmp_la->la_ctime)
1011                         attr->la_valid &= ~LA_CTIME;
1012         }
1013         RETURN(0);
1014 }
1015
1016 int mdd_attr_set_internal(const struct lu_env *env, struct mdd_object *obj,
1017                           struct lu_attr *attr, struct thandle *handle,
1018                           int needacl)
1019 {
1020         int rc;
1021         ENTRY;
1022
1023         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
1024 #ifdef CONFIG_FS_POSIX_ACL
1025         if (!rc && (attr->la_valid & LA_MODE) && needacl)
1026                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
1027 #endif
1028         RETURN(rc);
1029 }
1030
1031 int mdd_attr_check_set_internal(const struct lu_env *env,
1032                                 struct mdd_object *obj, struct lu_attr *attr,
1033                                 struct thandle *handle, int needacl)
1034 {
1035         int rc;
1036         ENTRY;
1037
1038         rc = mdd_attr_check(env, obj, attr);
1039         if (rc)
1040                 RETURN(rc);
1041
1042         if (attr->la_valid)
1043                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1044         RETURN(rc);
1045 }
1046
1047 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
1048                                        struct mdd_object *obj,
1049                                        struct lu_attr *attr,
1050                                        struct thandle *handle,
1051                                        int needacl)
1052 {
1053         int rc;
1054         ENTRY;
1055
1056         needacl = needacl && (attr->la_valid & LA_MODE);
1057         if (needacl)
1058                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1059         rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
1060         if (needacl)
1061                 mdd_write_unlock(env, obj);
1062         RETURN(rc);
1063 }
1064
1065 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
1066                     const struct lu_buf *buf, const char *name,
1067                     int fl, struct thandle *handle)
1068 {
1069         struct lustre_capa *capa = mdd_object_capa(env, obj);
1070         int rc = -EINVAL;
1071         ENTRY;
1072
1073         if (buf->lb_buf && buf->lb_len > 0)
1074                 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1075         else if (buf->lb_buf == NULL && buf->lb_len == 0)
1076                 rc = mdo_xattr_del(env, obj, name, handle, capa);
1077
1078         RETURN(rc);
1079 }
1080
1081 /*
1082  * This gives the same functionality as the code between
1083  * sys_chmod and inode_setattr
1084  * chown_common and inode_setattr
1085  * utimes and inode_setattr
1086  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1087  */
1088 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1089                         struct lu_attr *la, const unsigned long flags)
1090 {
1091         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
1092         struct md_ucred  *uc;
1093         int               rc;
1094         ENTRY;
1095
1096         if (!la->la_valid)
1097                 RETURN(0);
1098
1099         /* Do not permit change file type */
1100         if (la->la_valid & LA_TYPE)
1101                 RETURN(-EPERM);
1102
1103         /* They should not be processed by setattr */
1104         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1105                 RETURN(-EPERM);
1106
1107         /* export destroy does not have ->le_ses, but we may want
1108          * to drop LUSTRE_SOM_FL. */
1109         if (!env->le_ses)
1110                 RETURN(0);
1111
1112         uc = md_ucred(env);
1113
1114         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1115         if (rc)
1116                 RETURN(rc);
1117
1118         if (la->la_valid == LA_CTIME) {
1119                 if (!(flags & MDS_PERM_BYPASS))
1120                         /* This is only for set ctime when rename's source is
1121                          * on remote MDS. */
1122                         rc = mdd_may_delete(env, NULL, obj, tmp_la, NULL, 1, 0);
1123                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1124                         la->la_valid &= ~LA_CTIME;
1125                 RETURN(rc);
1126         }
1127
1128         if (la->la_valid == LA_ATIME) {
1129                 /* This is atime only set for read atime update on close. */
1130                 if (la->la_atime >= tmp_la->la_atime &&
1131                     la->la_atime < (tmp_la->la_atime +
1132                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1133                         la->la_valid &= ~LA_ATIME;
1134                 RETURN(0);
1135         }
1136
1137         /* Check if flags change. */
1138         if (la->la_valid & LA_FLAGS) {
1139                 unsigned int oldflags = 0;
1140                 unsigned int newflags = la->la_flags &
1141                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1142
1143                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1144                     !mdd_capable(uc, CFS_CAP_FOWNER))
1145                         RETURN(-EPERM);
1146
1147                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1148                  * only be changed by the relevant capability. */
1149                 if (mdd_is_immutable(obj))
1150                         oldflags |= LUSTRE_IMMUTABLE_FL;
1151                 if (mdd_is_append(obj))
1152                         oldflags |= LUSTRE_APPEND_FL;
1153                 if ((oldflags ^ newflags) &&
1154                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1155                         RETURN(-EPERM);
1156
1157                 if (!S_ISDIR(tmp_la->la_mode))
1158                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1159         }
1160
1161         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1162             (la->la_valid & ~LA_FLAGS) &&
1163             !(flags & MDS_PERM_BYPASS))
1164                 RETURN(-EPERM);
1165
1166         /* Check for setting the obj time. */
1167         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1168             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1169                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1170                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
1171                         rc = mdd_permission_internal(env, obj, tmp_la,
1172                                                      MAY_WRITE);
1173                         if (rc)
1174                                 RETURN(rc);
1175                 }
1176         }
1177
1178         if (la->la_valid & LA_KILL_SUID) {
1179                 la->la_valid &= ~LA_KILL_SUID;
1180                 if ((tmp_la->la_mode & S_ISUID) &&
1181                     !(la->la_valid & LA_MODE)) {
1182                         la->la_mode = tmp_la->la_mode;
1183                         la->la_valid |= LA_MODE;
1184                 }
1185                 la->la_mode &= ~S_ISUID;
1186         }
1187
1188         if (la->la_valid & LA_KILL_SGID) {
1189                 la->la_valid &= ~LA_KILL_SGID;
1190                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1191                                         (S_ISGID | S_IXGRP)) &&
1192                     !(la->la_valid & LA_MODE)) {
1193                         la->la_mode = tmp_la->la_mode;
1194                         la->la_valid |= LA_MODE;
1195                 }
1196                 la->la_mode &= ~S_ISGID;
1197         }
1198
1199         /* Make sure a caller can chmod. */
1200         if (la->la_valid & LA_MODE) {
1201                 if (!(flags & MDS_PERM_BYPASS) &&
1202                     (uc->mu_fsuid != tmp_la->la_uid) &&
1203                     !mdd_capable(uc, CFS_CAP_FOWNER))
1204                         RETURN(-EPERM);
1205
1206                 if (la->la_mode == (cfs_umode_t) -1)
1207                         la->la_mode = tmp_la->la_mode;
1208                 else
1209                         la->la_mode = (la->la_mode & S_IALLUGO) |
1210                                       (tmp_la->la_mode & ~S_IALLUGO);
1211
1212                 /* Also check the setgid bit! */
1213                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1214                                        la->la_gid : tmp_la->la_gid) &&
1215                     !mdd_capable(uc, CFS_CAP_FSETID))
1216                         la->la_mode &= ~S_ISGID;
1217         } else {
1218                la->la_mode = tmp_la->la_mode;
1219         }
1220
1221         /* Make sure a caller can chown. */
1222         if (la->la_valid & LA_UID) {
1223                 if (la->la_uid == (uid_t) -1)
1224                         la->la_uid = tmp_la->la_uid;
1225                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1226                     (la->la_uid != tmp_la->la_uid)) &&
1227                     !mdd_capable(uc, CFS_CAP_CHOWN))
1228                         RETURN(-EPERM);
1229
1230                 /* If the user or group of a non-directory has been
1231                  * changed by a non-root user, remove the setuid bit.
1232                  * 19981026 David C Niemi <niemi@tux.org>
1233                  *
1234                  * Changed this to apply to all users, including root,
1235                  * to avoid some races. This is the behavior we had in
1236                  * 2.0. The check for non-root was definitely wrong
1237                  * for 2.2 anyway, as it should have been using
1238                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
1239                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1240                     !S_ISDIR(tmp_la->la_mode)) {
1241                         la->la_mode &= ~S_ISUID;
1242                         la->la_valid |= LA_MODE;
1243                 }
1244         }
1245
1246         /* Make sure caller can chgrp. */
1247         if (la->la_valid & LA_GID) {
1248                 if (la->la_gid == (gid_t) -1)
1249                         la->la_gid = tmp_la->la_gid;
1250                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1251                     ((la->la_gid != tmp_la->la_gid) &&
1252                     !lustre_in_group_p(uc, la->la_gid))) &&
1253                     !mdd_capable(uc, CFS_CAP_CHOWN))
1254                         RETURN(-EPERM);
1255
1256                 /* Likewise, if the user or group of a non-directory
1257                  * has been changed by a non-root user, remove the
1258                  * setgid bit UNLESS there is no group execute bit
1259                  * (this would be a file marked for mandatory
1260                  * locking).  19981026 David C Niemi <niemi@tux.org>
1261                  *
1262                  * Removed the fsuid check (see the comment above) --
1263                  * 19990830 SD. */
1264                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1265                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1266                         la->la_mode &= ~S_ISGID;
1267                         la->la_valid |= LA_MODE;
1268                 }
1269         }
1270
1271         /* For both Size-on-MDS case and truncate case,
1272          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1273          * We distinguish them by "flags & MDS_SOM".
1274          * For SOM case, it is true, the MAY_WRITE perm has been checked
1275          * when open, no need check again. For truncate case, it is false,
1276          * the MAY_WRITE perm should be checked here. */
1277         if (flags & MDS_SOM) {
1278                 /* For the "Size-on-MDS" setattr update, merge coming
1279                  * attributes with the set in the inode. BUG 10641 */
1280                 if ((la->la_valid & LA_ATIME) &&
1281                     (la->la_atime <= tmp_la->la_atime))
1282                         la->la_valid &= ~LA_ATIME;
1283
1284                 /* OST attributes do not have a priority over MDS attributes,
1285                  * so drop times if ctime is equal. */
1286                 if ((la->la_valid & LA_CTIME) &&
1287                     (la->la_ctime <= tmp_la->la_ctime))
1288                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1289         } else {
1290                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1291                         if (!((flags & MDS_OPEN_OWNEROVERRIDE) &&
1292                               (uc->mu_fsuid == tmp_la->la_uid)) &&
1293                             !(flags & MDS_PERM_BYPASS)) {
1294                                 rc = mdd_permission_internal(env, obj,
1295                                                              tmp_la, MAY_WRITE);
1296                                 if (rc)
1297                                         RETURN(rc);
1298                         }
1299                 }
1300                 if (la->la_valid & LA_CTIME) {
1301                         /* The pure setattr, it has the priority over what is
1302                          * already set, do not drop it if ctime is equal. */
1303                         if (la->la_ctime < tmp_la->la_ctime)
1304                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1305                                                   LA_CTIME);
1306                 }
1307         }
1308
1309         RETURN(0);
1310 }
1311
1312 /** Store a data change changelog record
1313  * If this fails, we must fail the whole transaction; we don't
1314  * want the change to commit without the log entry.
1315  * \param mdd_obj - mdd_object of change
1316  * \param handle - transacion handle
1317  */
1318 static int mdd_changelog_data_store(const struct lu_env     *env,
1319                                     struct mdd_device       *mdd,
1320                                     enum changelog_rec_type type,
1321                                     int                     flags,
1322                                     struct mdd_object       *mdd_obj,
1323                                     struct thandle          *handle)
1324 {
1325         const struct lu_fid *tfid = mdo2fid(mdd_obj);
1326         struct llog_changelog_rec *rec;
1327         struct thandle *th = NULL;
1328         struct lu_buf *buf;
1329         int reclen;
1330         int rc;
1331
1332         /* Not recording */
1333         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1334                 RETURN(0);
1335         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1336                 RETURN(0);
1337
1338         LASSERT(mdd_obj != NULL);
1339         LASSERT(handle != NULL);
1340
1341         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1342             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1343                 /* Don't need multiple updates in this log */
1344                 /* Don't check under lock - no big deal if we get an extra
1345                    entry */
1346                 RETURN(0);
1347         }
1348
1349         reclen = llog_data_len(sizeof(*rec));
1350         buf = mdd_buf_alloc(env, reclen);
1351         if (buf->lb_buf == NULL)
1352                 RETURN(-ENOMEM);
1353         rec = (struct llog_changelog_rec *)buf->lb_buf;
1354
1355         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1356         rec->cr.cr_type = (__u32)type;
1357         rec->cr.cr_tfid = *tfid;
1358         rec->cr.cr_namelen = 0;
1359         mdd_obj->mod_cltime = cfs_time_current_64();
1360
1361         rc = mdd_changelog_llog_write(mdd, rec, handle ? : th);
1362
1363         if (th)
1364                 mdd_trans_stop(env, mdd, rc, th);
1365
1366         if (rc < 0) {
1367                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1368                        rc, type, PFID(tfid));
1369                 return -EFAULT;
1370         }
1371
1372         return 0;
1373 }
1374
1375 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1376                   int flags, struct md_object *obj)
1377 {
1378         struct thandle *handle;
1379         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1380         struct mdd_device *mdd = mdo2mdd(obj);
1381         int rc;
1382         ENTRY;
1383
1384         handle = mdd_trans_create(env, mdd);
1385         if (IS_ERR(handle))
1386                 return(PTR_ERR(handle));
1387
1388         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1389         if (rc)
1390                 GOTO(stop, rc);
1391
1392         rc = mdd_trans_start(env, mdd, handle);
1393         if (rc)
1394                 GOTO(stop, rc);
1395
1396         rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1397                                       handle);
1398
1399 stop:
1400         mdd_trans_stop(env, mdd, rc, handle);
1401
1402         RETURN(rc);
1403 }
1404
1405 /**
1406  * Should be called with write lock held.
1407  *
1408  * \see mdd_lma_set_locked().
1409  */
1410 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1411                        const struct md_attr *ma, struct thandle *handle)
1412 {
1413         struct mdd_thread_info *info = mdd_env_info(env);
1414         struct lu_buf *buf;
1415         struct lustre_mdt_attrs *lma =
1416                                 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1417         int lmasize = sizeof(struct lustre_mdt_attrs);
1418         int rc = 0;
1419
1420         ENTRY;
1421
1422         /* Either HSM or SOM part is not valid, we need to read it before */
1423         if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1424                 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1425                 if (rc <= 0)
1426                         RETURN(rc);
1427
1428                 lustre_lma_swab(lma);
1429         } else {
1430                 memset(lma, 0, lmasize);
1431         }
1432
1433         /* Copy HSM data */
1434         if (ma->ma_valid & MA_HSM) {
1435                 lma->lma_flags  |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1436                 lma->lma_compat |= LMAC_HSM;
1437         }
1438
1439         /* Copy SOM data */
1440         if (ma->ma_valid & MA_SOM) {
1441                 LASSERT(ma->ma_som != NULL);
1442                 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1443                         lma->lma_compat     &= ~LMAC_SOM;
1444                 } else {
1445                         lma->lma_compat     |= LMAC_SOM;
1446                         lma->lma_ioepoch     = ma->ma_som->msd_ioepoch;
1447                         lma->lma_som_size    = ma->ma_som->msd_size;
1448                         lma->lma_som_blocks  = ma->ma_som->msd_blocks;
1449                         lma->lma_som_mountid = ma->ma_som->msd_mountid;
1450                 }
1451         }
1452
1453         /* Copy FID */
1454         memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1455
1456         lustre_lma_swab(lma);
1457         buf = mdd_buf_get(env, lma, lmasize);
1458         rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1459
1460         RETURN(rc);
1461 }
1462
1463 /**
1464  * Save LMA extended attributes with data from \a ma.
1465  *
1466  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1467  * not, LMA EA will be first read from disk, modified and write back.
1468  *
1469  */
1470 static int mdd_lma_set_locked(const struct lu_env *env,
1471                               struct mdd_object *mdd_obj,
1472                               const struct md_attr *ma, struct thandle *handle)
1473 {
1474         int rc;
1475
1476         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1477         rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1478         mdd_write_unlock(env, mdd_obj);
1479         return rc;
1480 }
1481
1482 /* Precedence for choosing record type when multiple
1483  * attributes change: setattr > mtime > ctime > atime
1484  * (ctime changes when mtime does, plus chmod/chown.
1485  * atime and ctime are independent.) */
1486 static int mdd_attr_set_changelog(const struct lu_env *env,
1487                                   struct md_object *obj, struct thandle *handle,
1488                                   __u64 valid)
1489 {
1490         struct mdd_device *mdd = mdo2mdd(obj);
1491         int bits, type = 0;
1492
1493         bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1494         bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1495         bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1496         bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1497         bits = bits & mdd->mdd_cl.mc_mask;
1498         if (bits == 0)
1499                 return 0;
1500
1501         /* The record type is the lowest non-masked set bit */
1502         while (bits && ((bits & 1) == 0)) {
1503                 bits = bits >> 1;
1504                 type++;
1505         }
1506
1507         /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1508         return mdd_changelog_data_store(env, mdd, type, (int)valid,
1509                                         md2mdd_obj(obj), handle);
1510 }
1511
1512 static int mdd_declare_attr_set(const struct lu_env *env,
1513                                 struct mdd_device *mdd,
1514                                 struct mdd_object *obj,
1515                                 const struct md_attr *ma,
1516                                 struct lov_mds_md *lmm,
1517                                 struct thandle *handle)
1518 {
1519         struct lu_buf  *buf = &mdd_env_info(env)->mti_buf;
1520         struct lu_attr *attr = (struct lu_attr *) &ma->ma_attr;
1521         int             rc, i;
1522
1523         rc = mdo_declare_attr_set(env, obj, attr, handle);
1524         if (rc)
1525                 return rc;
1526
1527         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1528         if (rc)
1529                 return rc;
1530
1531         if (ma->ma_valid & MA_LOV) {
1532                 buf->lb_buf = NULL;
1533                 buf->lb_len = ma->ma_lmm_size;
1534                 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
1535                                            0, handle);
1536                 if (rc)
1537                         return rc;
1538         }
1539
1540         if (ma->ma_valid & (MA_HSM | MA_SOM)) {
1541                 buf->lb_buf = NULL;
1542                 buf->lb_len = sizeof(struct lustre_mdt_attrs);
1543                 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA,
1544                                            0, handle);
1545                 if (rc)
1546                         return rc;
1547         }
1548
1549 #ifdef CONFIG_FS_POSIX_ACL
1550         if (attr->la_valid & LA_MODE) {
1551                 mdd_read_lock(env, obj, MOR_TGT_CHILD);
1552                 rc = mdo_xattr_get(env, obj, &LU_BUF_NULL,
1553                                    XATTR_NAME_ACL_ACCESS, BYPASS_CAPA);
1554                 mdd_read_unlock(env, obj);
1555                 if (rc == -EOPNOTSUPP || rc == -ENODATA)
1556                         rc = 0;
1557                 else if (rc < 0)
1558                         return rc;
1559
1560                 if (rc != 0) {
1561                         struct lu_buf *buf = mdd_buf_get(env, NULL, rc);
1562                         rc = mdo_declare_xattr_set(env, obj, buf,
1563                                                    XATTR_NAME_ACL_ACCESS, 0,
1564                                                    handle);
1565                         if (rc)
1566                                 return rc;
1567                 }
1568         }
1569 #endif
1570
1571         /* basically the log is the same as in unlink case */
1572         if (lmm) {
1573                 __u16 stripe;
1574
1575                 if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V1 &&
1576                                 le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V3) {
1577                         CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n",
1578                                mdd->mdd_obd_dev->obd_name,
1579                                le32_to_cpu(lmm->lmm_magic),
1580                                PFID(lu_object_fid(&obj->mod_obj.mo_lu)));
1581                         return -EINVAL;
1582                 }
1583
1584                 stripe = le16_to_cpu(lmm->lmm_stripe_count);
1585                 if (stripe == LOV_ALL_STRIPES) {
1586                         struct lov_desc *ldesc;
1587
1588                         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
1589                         LASSERT(ldesc != NULL);
1590                         stripe = ldesc->ld_tgt_count;
1591                 }
1592
1593                 for (i = 0; i < stripe; i++) {
1594                         rc = mdd_declare_llog_record(env, mdd,
1595                                         sizeof(struct llog_unlink_rec),
1596                                         handle);
1597                         if (rc)
1598                                 return rc;
1599                 }
1600         }
1601
1602         return rc;
1603 }
1604
1605 /* set attr and LOV EA at once, return updated attr */
1606 int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1607                  const struct md_attr *ma)
1608 {
1609         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1610         struct mdd_device *mdd = mdo2mdd(obj);
1611         struct thandle *handle;
1612         struct lov_mds_md *lmm = NULL;
1613         struct llog_cookie *logcookies = NULL;
1614         int  rc, lmm_size = 0, cookie_size = 0;
1615         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1616         const struct lu_attr *la = &ma->ma_attr;
1617 #ifdef HAVE_QUOTA_SUPPORT
1618         struct obd_device *obd = mdd->mdd_obd_dev;
1619         struct mds_obd *mds = &obd->u.mds;
1620         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1621         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1622         int quota_opc = 0, block_count = 0;
1623         int inode_pending[MAXQUOTAS] = { 0, 0 };
1624         int block_pending[MAXQUOTAS] = { 0, 0 };
1625 #endif
1626         ENTRY;
1627
1628         *la_copy = ma->ma_attr;
1629         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma->ma_attr_flags);
1630         if (rc)
1631                 RETURN(rc);
1632
1633         /* setattr on "close" only change atime, or do nothing */
1634         if (la->la_valid == LA_ATIME && la_copy->la_valid == 0)
1635                 RETURN(0);
1636
1637         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1638             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1639                 lmm_size = mdd_lov_mdsize(env, mdd);
1640                 lmm = mdd_max_lmm_get(env, mdd);
1641                 if (lmm == NULL)
1642                         RETURN(-ENOMEM);
1643
1644                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1645                                 XATTR_NAME_LOV);
1646
1647                 if (rc < 0)
1648                         RETURN(rc);
1649         }
1650
1651         handle = mdd_trans_create(env, mdd);
1652         if (IS_ERR(handle))
1653                 RETURN(PTR_ERR(handle));
1654
1655         rc = mdd_declare_attr_set(env, mdd, mdd_obj, ma,
1656                                   lmm_size > 0 ? lmm : NULL, handle);
1657         if (rc)
1658                 GOTO(stop, rc);
1659
1660         rc = mdd_trans_start(env, mdd, handle);
1661         if (rc)
1662                 GOTO(stop, rc);
1663
1664         /* permission changes may require sync operation */
1665         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1666                 handle->th_sync |= !!mdd->mdd_sync_permission;
1667
1668         if (la->la_valid & (LA_MTIME | LA_CTIME))
1669                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1670                        la->la_mtime, la->la_ctime);
1671
1672 #ifdef HAVE_QUOTA_SUPPORT
1673         if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1674                 struct obd_export *exp = md_quota(env)->mq_exp;
1675                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1676
1677                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1678                 if (!rc) {
1679                         quota_opc = FSFILT_OP_SETATTR;
1680                         mdd_quota_wrapper(la_copy, qnids);
1681                         mdd_quota_wrapper(la_tmp, qoids);
1682                         /* get file quota for new owner */
1683                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1684                                         qnids, inode_pending, 1, NULL, 0,
1685                                         NULL, 0);
1686                         block_count = (la_tmp->la_blocks + 7) >> 3;
1687                         if (block_count) {
1688                                 void *data = NULL;
1689                                 mdd_data_get(env, mdd_obj, &data);
1690                                 /* get block quota for new owner */
1691                                 lquota_chkquota(mds_quota_interface_ref, obd,
1692                                                 exp, qnids, block_pending,
1693                                                 block_count, NULL,
1694                                                 LQUOTA_FLAGS_BLK, data, 1);
1695                         }
1696                 }
1697         }
1698 #endif
1699
1700         if (la_copy->la_valid & LA_FLAGS) {
1701                 rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
1702                 if (rc == 0)
1703                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1704         } else if (la_copy->la_valid) {            /* setattr */
1705                 rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
1706                 /* journal chown/chgrp in llog, just like unlink */
1707                 if (rc == 0 && lmm_size){
1708                         cookie_size = mdd_lov_cookiesize(env, mdd);
1709                         logcookies = mdd_max_cookie_get(env, mdd);
1710                         if (logcookies == NULL)
1711                                 GOTO(cleanup, rc = -ENOMEM);
1712
1713                         if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1714                                             logcookies, cookie_size) <= 0)
1715                                 logcookies = NULL;
1716                 }
1717         }
1718
1719         if (rc == 0 && ma->ma_valid & MA_LOV) {
1720                 cfs_umode_t mode;
1721
1722                 mode = mdd_object_type(mdd_obj);
1723                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1724                         rc = mdd_lsm_sanity_check(env, mdd_obj);
1725                         if (rc)
1726                                 GOTO(cleanup, rc);
1727
1728                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1729                                             ma->ma_lmm_size, handle, 1);
1730                 }
1731
1732         }
1733         if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1734                 cfs_umode_t mode;
1735
1736                 mode = mdd_object_type(mdd_obj);
1737                 if (S_ISREG(mode))
1738                         rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1739
1740         }
1741 cleanup:
1742         if (rc == 0)
1743                 rc = mdd_attr_set_changelog(env, obj, handle,
1744                                             ma->ma_attr.la_valid);
1745 stop:
1746         mdd_trans_stop(env, mdd, rc, handle);
1747         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1748                 /*set obd attr, if needed*/
1749                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1750                                            logcookies);
1751         }
1752 #ifdef HAVE_QUOTA_SUPPORT
1753         if (quota_opc) {
1754                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1755                                       inode_pending, 0);
1756                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1757                                       block_pending, 1);
1758                 /* Trigger dqrel/dqacq for original owner and new owner.
1759                  * If failed, the next call for lquota_chkquota will
1760                  * process it. */
1761                 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1762                               quota_opc);
1763         }
1764 #endif
1765         RETURN(rc);
1766 }
1767
1768 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1769                       const struct lu_buf *buf, const char *name, int fl,
1770                       struct thandle *handle)
1771 {
1772         int  rc;
1773         ENTRY;
1774
1775         mdd_write_lock(env, obj, MOR_TGT_CHILD);
1776         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1777         mdd_write_unlock(env, obj);
1778
1779         RETURN(rc);
1780 }
1781
1782 static int mdd_xattr_sanity_check(const struct lu_env *env,
1783                                   struct mdd_object *obj)
1784 {
1785         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1786         struct md_ucred *uc     = md_ucred(env);
1787         int rc;
1788         ENTRY;
1789
1790         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1791                 RETURN(-EPERM);
1792
1793         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1794         if (rc)
1795                 RETURN(rc);
1796
1797         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1798             !mdd_capable(uc, CFS_CAP_FOWNER))
1799                 RETURN(-EPERM);
1800
1801         RETURN(rc);
1802 }
1803
1804 static int mdd_declare_xattr_set(const struct lu_env *env,
1805                                  struct mdd_device *mdd,
1806                                  struct mdd_object *obj,
1807                                  const struct lu_buf *buf,
1808                                  const char *name,
1809                                  struct thandle *handle)
1810 {
1811         int rc;
1812
1813         rc = mdo_declare_xattr_set(env, obj, buf, name, 0, handle);
1814         if (rc)
1815                 return rc;
1816
1817         /* Only record user xattr changes */
1818         if ((strncmp("user.", name, 5) == 0))
1819                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1820
1821         return rc;
1822 }
1823
1824 /**
1825  * The caller should guarantee to update the object ctime
1826  * after xattr_set if needed.
1827  */
1828 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1829                          const struct lu_buf *buf, const char *name,
1830                          int fl)
1831 {
1832         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1833         struct mdd_device *mdd = mdo2mdd(obj);
1834         struct thandle *handle;
1835         int  rc;
1836         ENTRY;
1837
1838         if (!strcmp(name, XATTR_NAME_ACL_ACCESS)) {
1839                 rc = mdd_acl_set(env, mdd_obj, buf, fl);
1840                 RETURN(rc);
1841         }
1842
1843         rc = mdd_xattr_sanity_check(env, mdd_obj);
1844         if (rc)
1845                 RETURN(rc);
1846
1847         handle = mdd_trans_create(env, mdd);
1848         if (IS_ERR(handle))
1849                 RETURN(PTR_ERR(handle));
1850
1851         rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, handle);
1852         if (rc)
1853                 GOTO(stop, rc);
1854
1855         rc = mdd_trans_start(env, mdd, handle);
1856         if (rc)
1857                 GOTO(stop, rc);
1858
1859         /* security-replated changes may require sync */
1860         if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1861                 handle->th_sync |= !!mdd->mdd_sync_permission;
1862
1863         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1864         rc = mdo_xattr_set(env, mdd_obj, buf, name, fl, handle,
1865                            mdd_object_capa(env, mdd_obj));
1866         mdd_write_unlock(env, mdd_obj);
1867         if (rc)
1868                 GOTO(stop, rc);
1869
1870         /* Only record system & user xattr changes */
1871         if (strncmp(XATTR_USER_PREFIX, name,
1872                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1873                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1874                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1875                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1876                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0)
1877                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1878                                               handle);
1879
1880 stop:
1881         mdd_trans_stop(env, mdd, rc, handle);
1882
1883         RETURN(rc);
1884 }
1885
1886 static int mdd_declare_xattr_del(const struct lu_env *env,
1887                                  struct mdd_device *mdd,
1888                                  struct mdd_object *obj,
1889                                  const char *name,
1890                                  struct thandle *handle)
1891 {
1892         int rc;
1893
1894         rc = mdo_declare_xattr_del(env, obj, name, handle);
1895         if (rc)
1896                 return rc;
1897
1898         /* Only record user xattr changes */
1899         if ((strncmp("user.", name, 5) == 0))
1900                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1901
1902         return rc;
1903 }
1904
1905 /**
1906  * The caller should guarantee to update the object ctime
1907  * after xattr_set if needed.
1908  */
1909 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1910                   const char *name)
1911 {
1912         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1913         struct mdd_device *mdd = mdo2mdd(obj);
1914         struct thandle *handle;
1915         int  rc;
1916         ENTRY;
1917
1918         rc = mdd_xattr_sanity_check(env, mdd_obj);
1919         if (rc)
1920                 RETURN(rc);
1921
1922         handle = mdd_trans_create(env, mdd);
1923         if (IS_ERR(handle))
1924                 RETURN(PTR_ERR(handle));
1925
1926         rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
1927         if (rc)
1928                 GOTO(stop, rc);
1929
1930         rc = mdd_trans_start(env, mdd, handle);
1931         if (rc)
1932                 GOTO(stop, rc);
1933
1934         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1935         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1936                            mdd_object_capa(env, mdd_obj));
1937         mdd_write_unlock(env, mdd_obj);
1938         if (rc)
1939                 GOTO(stop, rc);
1940
1941         /* Only record system & user xattr changes */
1942         if (strncmp(XATTR_USER_PREFIX, name,
1943                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1944                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1945                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1946                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1947                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0)
1948                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1949                                               handle);
1950
1951 stop:
1952         mdd_trans_stop(env, mdd, rc, handle);
1953
1954         RETURN(rc);
1955 }
1956
1957 void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent,
1958                 struct mdd_object *child, struct lu_attr *attr)
1959 {
1960         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
1961         struct dt_object *np = parent ? mdd_object_child(parent) : NULL;
1962         struct dt_object *nc = mdd_object_child(child);
1963
1964         /* @hint will be initialized by underlying device. */
1965         nc->do_ops->do_ah_init(env, hint, np, nc, attr->la_mode & S_IFMT);
1966 }
1967
1968 /*
1969  * do NOT or the MAY_*'s, you'll get the weakest
1970  */
1971 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1972 {
1973         int res = 0;
1974
1975         /* Sadly, NFSD reopens a file repeatedly during operation, so the
1976          * "acc_mode = 0" allowance for newly-created files isn't honoured.
1977          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1978          * owner can write to a file even if it is marked readonly to hide
1979          * its brokenness. (bug 5781) */
1980         if (flags & MDS_OPEN_OWNEROVERRIDE) {
1981                 struct md_ucred *uc = md_ucred(env);
1982
1983                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1984                     (la->la_uid == uc->mu_fsuid))
1985                         return 0;
1986         }
1987
1988         if (flags & FMODE_READ)
1989                 res |= MAY_READ;
1990         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1991                 res |= MAY_WRITE;
1992         if (flags & MDS_FMODE_EXEC)
1993                 res = MAY_EXEC;
1994         return res;
1995 }
1996
1997 static int mdd_open_sanity_check(const struct lu_env *env,
1998                                  struct mdd_object *obj, int flag)
1999 {
2000         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2001         int mode, rc;
2002         ENTRY;
2003
2004         /* EEXIST check */
2005         if (mdd_is_dead_obj(obj))
2006                 RETURN(-ENOENT);
2007
2008         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
2009         if (rc)
2010                RETURN(rc);
2011
2012         if (S_ISLNK(tmp_la->la_mode))
2013                 RETURN(-ELOOP);
2014
2015         mode = accmode(env, tmp_la, flag);
2016
2017         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2018                 RETURN(-EISDIR);
2019
2020         if (!(flag & MDS_OPEN_CREATED)) {
2021                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
2022                 if (rc)
2023                         RETURN(rc);
2024         }
2025
2026         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2027             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2028                 flag &= ~MDS_OPEN_TRUNC;
2029
2030         /* For writing append-only file must open it with append mode. */
2031         if (mdd_is_append(obj)) {
2032                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2033                         RETURN(-EPERM);
2034                 if (flag & MDS_OPEN_TRUNC)
2035                         RETURN(-EPERM);
2036         }
2037
2038 #if 0
2039         /*
2040          * Now, flag -- O_NOATIME does not be packed by client.
2041          */
2042         if (flag & O_NOATIME) {
2043                 struct md_ucred *uc = md_ucred(env);
2044
2045                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2046                     (uc->mu_valid == UCRED_NEW)) &&
2047                     (uc->mu_fsuid != tmp_la->la_uid) &&
2048                     !mdd_capable(uc, CFS_CAP_FOWNER))
2049                         RETURN(-EPERM);
2050         }
2051 #endif
2052
2053         RETURN(0);
2054 }
2055
2056 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2057                     int flags)
2058 {
2059         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2060         int rc = 0;
2061
2062         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2063
2064         rc = mdd_open_sanity_check(env, mdd_obj, flags);
2065         if (rc == 0)
2066                 mdd_obj->mod_count++;
2067
2068         mdd_write_unlock(env, mdd_obj);
2069         return rc;
2070 }
2071
2072 int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
2073                             struct md_attr *ma, struct thandle *handle)
2074 {
2075         int rc;
2076
2077         rc = mdd_declare_unlink_log(env, obj, ma, handle);
2078         if (rc)
2079                 return rc;
2080
2081         return mdo_declare_destroy(env, obj, handle);
2082 }
2083
2084 /* return md_attr back,
2085  * if it is last unlink then return lov ea + llog cookie*/
2086 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2087                     struct md_attr *ma, struct thandle *handle)
2088 {
2089         int rc = 0;
2090         ENTRY;
2091
2092         if (S_ISREG(mdd_object_type(obj))) {
2093                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2094                  * Caller must be ready for that. */
2095                 rc = __mdd_lmm_get(env, obj, ma);
2096                 if ((ma->ma_valid & MA_LOV))
2097                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2098                                             obj, ma);
2099         }
2100
2101         if (rc == 0)
2102                 rc = mdo_destroy(env, obj, handle);
2103
2104         RETURN(rc);
2105 }
2106
2107 static int mdd_declare_close(const struct lu_env *env,
2108                              struct mdd_object *obj,
2109                              struct md_attr *ma,
2110                              struct thandle *handle)
2111 {
2112         int rc;
2113
2114         rc = orph_declare_index_delete(env, obj, handle);
2115         if (rc)
2116                 return rc;
2117
2118         return mdd_declare_object_kill(env, obj, ma, handle);
2119 }
2120
2121 /*
2122  * No permission check is needed.
2123  */
2124 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2125                      struct md_attr *ma, int mode)
2126 {
2127         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2128         struct mdd_device *mdd = mdo2mdd(obj);
2129         struct thandle    *handle = NULL;
2130         int rc;
2131         int is_orphan = 0, reset = 1;
2132
2133 #ifdef HAVE_QUOTA_SUPPORT
2134         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2135         struct mds_obd *mds = &obd->u.mds;
2136         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2137         int quota_opc = 0;
2138 #endif
2139         ENTRY;
2140
2141         if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2142                 mdd_obj->mod_count--;
2143
2144                 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2145                         CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2146                                "list\n", PFID(mdd_object_fid(mdd_obj)));
2147                 RETURN(0);
2148         }
2149
2150         /* check without any lock */
2151         if (mdd_obj->mod_count == 1 &&
2152             (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2153  again:
2154                 handle = mdd_trans_create(env, mdo2mdd(obj));
2155                 if (IS_ERR(handle))
2156                         RETURN(PTR_ERR(handle));
2157
2158                 rc = mdd_declare_close(env, mdd_obj, ma, handle);
2159                 if (rc)
2160                         GOTO(stop, rc);
2161
2162                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
2163                 if (rc)
2164                         GOTO(stop, rc);
2165
2166                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2167                 if (rc)
2168                         GOTO(stop, rc);
2169         }
2170
2171         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2172         if (handle == NULL && mdd_obj->mod_count == 1 &&
2173             (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2174                 mdd_write_unlock(env, mdd_obj);
2175                 goto again;
2176         }
2177
2178         /* release open count */
2179         mdd_obj->mod_count --;
2180
2181         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2182                 /* remove link to object from orphan index */
2183                 LASSERT(handle != NULL);
2184                 rc = __mdd_orphan_del(env, mdd_obj, handle);
2185                 if (rc == 0) {
2186                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2187                                "list, OSS objects to be destroyed.\n",
2188                                PFID(mdd_object_fid(mdd_obj)));
2189                         is_orphan = 1;
2190                 } else {
2191                         CERROR("Object "DFID" can not be deleted from orphan "
2192                                 "list, maybe cause OST objects can not be "
2193                                 "destroyed (err: %d).\n",
2194                                 PFID(mdd_object_fid(mdd_obj)), rc);
2195                         /* If object was not deleted from orphan list, do not
2196                          * destroy OSS objects, which will be done when next
2197                          * recovery. */
2198                         GOTO(out, rc);
2199                 }
2200         }
2201
2202         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
2203                         mdd_object_capa(env, mdd_obj));
2204         /* Object maybe not in orphan list originally, it is rare case for
2205          * mdd_finish_unlink() failure. */
2206         if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_orphan)) {
2207 #ifdef HAVE_QUOTA_SUPPORT
2208                 if (mds->mds_quota) {
2209                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2210                         mdd_quota_wrapper(&ma->ma_attr, qids);
2211                 }
2212 #endif
2213                 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2214                 if (ma->ma_valid & MA_FLAGS &&
2215                     ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2216                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2217                 } else {
2218                         if (handle == NULL) {
2219                                 handle = mdd_trans_create(env, mdo2mdd(obj));
2220                                 if (IS_ERR(handle))
2221                                         GOTO(out, rc = PTR_ERR(handle));
2222
2223                                 rc = mdd_declare_object_kill(env, mdd_obj, ma,
2224                                                              handle);
2225                                 if (rc)
2226                                         GOTO(out, rc);
2227
2228                                 rc = mdd_declare_changelog_store(env, mdd,
2229                                                                  NULL, handle);
2230                                 if (rc)
2231                                         GOTO(stop, rc);
2232
2233                                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2234                                 if (rc)
2235                                         GOTO(out, rc);
2236                         }
2237
2238                         rc = mdd_object_kill(env, mdd_obj, ma, handle);
2239                         if (rc == 0)
2240                                 reset = 0;
2241                 }
2242
2243                 if (rc != 0)
2244                         CERROR("Error when prepare to delete Object "DFID" , "
2245                                "which will cause OST objects can not be "
2246                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
2247         }
2248         EXIT;
2249
2250 out:
2251         if (reset)
2252                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2253
2254         mdd_write_unlock(env, mdd_obj);
2255
2256         if (rc == 0 &&
2257             (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
2258             !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
2259                 if (handle == NULL) {
2260                         handle = mdd_trans_create(env, mdo2mdd(obj));
2261                         if (IS_ERR(handle))
2262                                 GOTO(stop, rc = IS_ERR(handle));
2263
2264                         rc = mdd_declare_changelog_store(env, mdd, NULL,
2265                                                          handle);
2266                         if (rc)
2267                                 GOTO(stop, rc);
2268
2269                         rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2270                         if (rc)
2271                                 GOTO(stop, rc);
2272                 }
2273
2274                 mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
2275                                          mdd_obj, handle);
2276         }
2277
2278 stop:
2279         if (handle != NULL)
2280                 mdd_trans_stop(env, mdd, rc, handle);
2281 #ifdef HAVE_QUOTA_SUPPORT
2282         if (quota_opc)
2283                 /* Trigger dqrel on the owner of child. If failed,
2284                  * the next call for lquota_chkquota will process it */
2285                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2286                               quota_opc);
2287 #endif
2288         return rc;
2289 }
2290
2291 /*
2292  * Permission check is done when open,
2293  * no need check again.
2294  */
2295 static int mdd_readpage_sanity_check(const struct lu_env *env,
2296                                      struct mdd_object *obj)
2297 {
2298         struct dt_object *next = mdd_object_child(obj);
2299         int rc;
2300         ENTRY;
2301
2302         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2303                 rc = 0;
2304         else
2305                 rc = -ENOTDIR;
2306
2307         RETURN(rc);
2308 }
2309
2310 static int mdd_dir_page_build(const struct lu_env *env, union lu_page *lp,
2311                               int nob, const struct dt_it_ops *iops,
2312                               struct dt_it *it, __u32 attr, void *arg)
2313 {
2314         struct lu_dirpage       *dp = &lp->lp_dir;
2315         void                    *area = dp;
2316         int                      result;
2317         __u64                    hash = 0;
2318         struct lu_dirent        *ent;
2319         struct lu_dirent        *last = NULL;
2320         int                      first = 1;
2321
2322         memset(area, 0, sizeof (*dp));
2323         area += sizeof (*dp);
2324         nob  -= sizeof (*dp);
2325
2326         ent  = area;
2327         do {
2328                 int    len;
2329                 int    recsize;
2330
2331                 len = iops->key_size(env, it);
2332
2333                 /* IAM iterator can return record with zero len. */
2334                 if (len == 0)
2335                         goto next;
2336
2337                 hash = iops->store(env, it);
2338                 if (unlikely(first)) {
2339                         first = 0;
2340                         dp->ldp_hash_start = cpu_to_le64(hash);
2341                 }
2342
2343                 /* calculate max space required for lu_dirent */
2344                 recsize = lu_dirent_calc_size(len, attr);
2345
2346                 if (nob >= recsize) {
2347                         result = iops->rec(env, it, (struct dt_rec *)ent, attr);
2348                         if (result == -ESTALE)
2349                                 goto next;
2350                         if (result != 0)
2351                                 goto out;
2352
2353                         /* osd might not able to pack all attributes,
2354                          * so recheck rec length */
2355                         recsize = le16_to_cpu(ent->lde_reclen);
2356                 } else {
2357                         result = (last != NULL) ? 0 :-EINVAL;
2358                         goto out;
2359                 }
2360                 last = ent;
2361                 ent = (void *)ent + recsize;
2362                 nob -= recsize;
2363
2364 next:
2365                 result = iops->next(env, it);
2366                 if (result == -ESTALE)
2367                         goto next;
2368         } while (result == 0);
2369
2370 out:
2371         dp->ldp_hash_end = cpu_to_le64(hash);
2372         if (last != NULL) {
2373                 if (last->lde_hash == dp->ldp_hash_end)
2374                         dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2375                 last->lde_reclen = 0; /* end mark */
2376         }
2377         if (result > 0)
2378                 /* end of directory */
2379                 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2380         if (result < 0)
2381                 CWARN("build page failed: %d!\n", result);
2382         return result;
2383 }
2384
2385 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2386                  const struct lu_rdpg *rdpg)
2387 {
2388         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2389         int rc;
2390         ENTRY;
2391
2392         if (mdd_object_exists(mdd_obj) == 0) {
2393                 CERROR("%s: object "DFID" not found: rc = -2\n",
2394                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2395                 return -ENOENT;
2396         }
2397
2398         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2399         rc = mdd_readpage_sanity_check(env, mdd_obj);
2400         if (rc)
2401                 GOTO(out_unlock, rc);
2402
2403         if (mdd_is_dead_obj(mdd_obj)) {
2404                 struct page *pg;
2405                 struct lu_dirpage *dp;
2406
2407                 /*
2408                  * According to POSIX, please do not return any entry to client:
2409                  * even dot and dotdot should not be returned.
2410                  */
2411                 CDEBUG(D_INODE, "readdir from dead object: "DFID"\n",
2412                        PFID(mdd_object_fid(mdd_obj)));
2413
2414                 if (rdpg->rp_count <= 0)
2415                         GOTO(out_unlock, rc = -EFAULT);
2416                 LASSERT(rdpg->rp_pages != NULL);
2417
2418                 pg = rdpg->rp_pages[0];
2419                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2420                 memset(dp, 0 , sizeof(struct lu_dirpage));
2421                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2422                 dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
2423                 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2424                 cfs_kunmap(pg);
2425                 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2426         }
2427
2428         rc = dt_index_walk(env, mdd_object_child(mdd_obj), rdpg,
2429                            mdd_dir_page_build, NULL);
2430         if (rc >= 0) {
2431                 struct lu_dirpage       *dp;
2432
2433                 dp = cfs_kmap(rdpg->rp_pages[0]);
2434                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2435                 if (rc == 0) {
2436                         /*
2437                          * No pages were processed, mark this for first page
2438                          * and send back.
2439                          */
2440                         dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2441                         rc = min_t(unsigned int, LU_PAGE_SIZE, rdpg->rp_count);
2442                 }
2443                 cfs_kunmap(rdpg->rp_pages[0]);
2444         }
2445
2446         GOTO(out_unlock, rc);
2447 out_unlock:
2448         mdd_read_unlock(env, mdd_obj);
2449         return rc;
2450 }
2451
2452 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2453 {
2454         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2455
2456         if (mdd_object_exists(mdd_obj) == 0) {
2457                 CERROR("%s: object "DFID" not found: rc = -2\n",
2458                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2459                 return -ENOENT;
2460         }
2461         return dt_object_sync(env, mdd_object_child(mdd_obj));
2462 }
2463
2464 const struct md_object_operations mdd_obj_ops = {
2465         .moo_permission    = mdd_permission,
2466         .moo_attr_get      = mdd_attr_get,
2467         .moo_attr_set      = mdd_attr_set,
2468         .moo_xattr_get     = mdd_xattr_get,
2469         .moo_xattr_set     = mdd_xattr_set,
2470         .moo_xattr_list    = mdd_xattr_list,
2471         .moo_xattr_del     = mdd_xattr_del,
2472         .moo_open          = mdd_open,
2473         .moo_close         = mdd_close,
2474         .moo_readpage      = mdd_readpage,
2475         .moo_readlink      = mdd_readlink,
2476         .moo_changelog     = mdd_changelog,
2477         .moo_capa_get      = mdd_capa_get,
2478         .moo_object_sync   = mdd_object_sync,
2479         .moo_path          = mdd_path,
2480 };