Whamcloud - gitweb
LU-579 MRP-120 changelog CLOSE event.
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * Copyright (c) 2011 Whamcloud, Inc.
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  *
39  * lustre/mdd/mdd_object.c
40  *
41  * Lustre Metadata Server (mdd) routines
42  *
43  * Author: Wang Di <wangdi@clusterfs.com>
44  */
45
46 #ifndef EXPORT_SYMTAB
47 # define EXPORT_SYMTAB
48 #endif
49 #define DEBUG_SUBSYSTEM S_MDS
50
51 #include <linux/module.h>
52 #include <obd.h>
53 #include <obd_class.h>
54 #include <obd_support.h>
55 #include <lprocfs_status.h>
56 /* fid_be_cpu(), fid_cpu_to_be(). */
57 #include <lustre_fid.h>
58
59 #include <lustre_param.h>
60 #include <lustre_mds.h>
61 #include <lustre/lustre_idl.h>
62
63 #include "mdd_internal.h"
64
65 static const struct lu_object_operations mdd_lu_obj_ops;
66
67 static int mdd_xattr_get(const struct lu_env *env,
68                          struct md_object *obj, struct lu_buf *buf,
69                          const char *name);
70
71 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
72                  void **data)
73 {
74         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
75                  PFID(mdd_object_fid(obj)));
76         mdo_data_get(env, obj, data);
77         return 0;
78 }
79
80 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
81                struct lu_attr *la, struct lustre_capa *capa)
82 {
83         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
84                  PFID(mdd_object_fid(obj)));
85         return mdo_attr_get(env, obj, la, capa);
86 }
87
88 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
89 {
90         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
91
92         if (flags & LUSTRE_APPEND_FL)
93                 obj->mod_flags |= APPEND_OBJ;
94
95         if (flags & LUSTRE_IMMUTABLE_FL)
96                 obj->mod_flags |= IMMUTE_OBJ;
97 }
98
99 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
100 {
101         struct mdd_thread_info *info;
102
103         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
104         LASSERT(info != NULL);
105         return info;
106 }
107
108 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
109 {
110         struct lu_buf *buf;
111
112         buf = &mdd_env_info(env)->mti_buf;
113         buf->lb_buf = area;
114         buf->lb_len = len;
115         return buf;
116 }
117
118 void mdd_buf_put(struct lu_buf *buf)
119 {
120         if (buf == NULL || buf->lb_buf == NULL)
121                 return;
122         OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
123         buf->lb_buf = NULL;
124         buf->lb_len = 0;
125 }
126
127 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
128                                        const void *area, ssize_t len)
129 {
130         struct lu_buf *buf;
131
132         buf = &mdd_env_info(env)->mti_buf;
133         buf->lb_buf = (void *)area;
134         buf->lb_len = len;
135         return buf;
136 }
137
138 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
139 {
140         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
141
142         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
143                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
144                 buf->lb_buf = NULL;
145         }
146         if (buf->lb_buf == NULL) {
147                 buf->lb_len = len;
148                 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
149                 if (buf->lb_buf == NULL)
150                         buf->lb_len = 0;
151         }
152         return buf;
153 }
154
155 /** Increase the size of the \a mti_big_buf.
156  * preserves old data in buffer
157  * old buffer remains unchanged on error
158  * \retval 0 or -ENOMEM
159  */
160 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
161 {
162         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
163         struct lu_buf buf;
164
165         LASSERT(len >= oldbuf->lb_len);
166         OBD_ALLOC_LARGE(buf.lb_buf, len);
167
168         if (buf.lb_buf == NULL)
169                 return -ENOMEM;
170
171         buf.lb_len = len;
172         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
173
174         OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
175
176         memcpy(oldbuf, &buf, sizeof(buf));
177
178         return 0;
179 }
180
181 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
182                                        struct mdd_device *mdd)
183 {
184         struct mdd_thread_info *mti = mdd_env_info(env);
185         int                     max_cookie_size;
186
187         max_cookie_size = mdd_lov_cookiesize(env, mdd);
188         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
189                 if (mti->mti_max_cookie)
190                         OBD_FREE_LARGE(mti->mti_max_cookie,
191                                        mti->mti_max_cookie_size);
192                 mti->mti_max_cookie = NULL;
193                 mti->mti_max_cookie_size = 0;
194         }
195         if (unlikely(mti->mti_max_cookie == NULL)) {
196                 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
197                 if (likely(mti->mti_max_cookie != NULL))
198                         mti->mti_max_cookie_size = max_cookie_size;
199         }
200         if (likely(mti->mti_max_cookie != NULL))
201                 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
202         return mti->mti_max_cookie;
203 }
204
205 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
206                                    struct mdd_device *mdd)
207 {
208         struct mdd_thread_info *mti = mdd_env_info(env);
209         int                     max_lmm_size;
210
211         max_lmm_size = mdd_lov_mdsize(env, mdd);
212         if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
213                 if (mti->mti_max_lmm)
214                         OBD_FREE_LARGE(mti->mti_max_lmm, mti->mti_max_lmm_size);
215                 mti->mti_max_lmm = NULL;
216                 mti->mti_max_lmm_size = 0;
217         }
218         if (unlikely(mti->mti_max_lmm == NULL)) {
219                 OBD_ALLOC_LARGE(mti->mti_max_lmm, max_lmm_size);
220                 if (likely(mti->mti_max_lmm != NULL))
221                         mti->mti_max_lmm_size = max_lmm_size;
222         }
223         return mti->mti_max_lmm;
224 }
225
226 struct lu_object *mdd_object_alloc(const struct lu_env *env,
227                                    const struct lu_object_header *hdr,
228                                    struct lu_device *d)
229 {
230         struct mdd_object *mdd_obj;
231
232         OBD_ALLOC_PTR(mdd_obj);
233         if (mdd_obj != NULL) {
234                 struct lu_object *o;
235
236                 o = mdd2lu_obj(mdd_obj);
237                 lu_object_init(o, NULL, d);
238                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
239                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
240                 mdd_obj->mod_count = 0;
241                 o->lo_ops = &mdd_lu_obj_ops;
242                 return o;
243         } else {
244                 return NULL;
245         }
246 }
247
248 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
249                            const struct lu_object_conf *unused)
250 {
251         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
252         struct mdd_object *mdd_obj = lu2mdd_obj(o);
253         struct lu_object  *below;
254         struct lu_device  *under;
255         ENTRY;
256
257         mdd_obj->mod_cltime = 0;
258         under = &d->mdd_child->dd_lu_dev;
259         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
260         mdd_pdlock_init(mdd_obj);
261         if (below == NULL)
262                 RETURN(-ENOMEM);
263
264         lu_object_add(o, below);
265
266         RETURN(0);
267 }
268
269 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
270 {
271         if (lu_object_exists(o))
272                 return mdd_get_flags(env, lu2mdd_obj(o));
273         else
274                 return 0;
275 }
276
277 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
278 {
279         struct mdd_object *mdd = lu2mdd_obj(o);
280
281         lu_object_fini(o);
282         OBD_FREE_PTR(mdd);
283 }
284
285 static int mdd_object_print(const struct lu_env *env, void *cookie,
286                             lu_printer_t p, const struct lu_object *o)
287 {
288         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
289         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
290                     "valid=%x, cltime="LPU64", flags=%lx)",
291                     mdd, mdd->mod_count, mdd->mod_valid,
292                     mdd->mod_cltime, mdd->mod_flags);
293 }
294
295 static const struct lu_object_operations mdd_lu_obj_ops = {
296         .loo_object_init    = mdd_object_init,
297         .loo_object_start   = mdd_object_start,
298         .loo_object_free    = mdd_object_free,
299         .loo_object_print   = mdd_object_print,
300 };
301
302 struct mdd_object *mdd_object_find(const struct lu_env *env,
303                                    struct mdd_device *d,
304                                    const struct lu_fid *f)
305 {
306         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
307 }
308
309 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
310                         const char *path, struct lu_fid *fid)
311 {
312         struct lu_buf *buf;
313         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
314         struct mdd_object *obj;
315         struct lu_name *lname = &mdd_env_info(env)->mti_name;
316         char *name;
317         int rc = 0;
318         ENTRY;
319
320         /* temp buffer for path element */
321         buf = mdd_buf_alloc(env, PATH_MAX);
322         if (buf->lb_buf == NULL)
323                 RETURN(-ENOMEM);
324
325         lname->ln_name = name = buf->lb_buf;
326         lname->ln_namelen = 0;
327         *f = mdd->mdd_root_fid;
328
329         while(1) {
330                 while (*path == '/')
331                         path++;
332                 if (*path == '\0')
333                         break;
334                 while (*path != '/' && *path != '\0') {
335                         *name = *path;
336                         path++;
337                         name++;
338                         lname->ln_namelen++;
339                 }
340
341                 *name = '\0';
342                 /* find obj corresponding to fid */
343                 obj = mdd_object_find(env, mdd, f);
344                 if (obj == NULL)
345                         GOTO(out, rc = -EREMOTE);
346                 if (IS_ERR(obj))
347                         GOTO(out, rc = PTR_ERR(obj));
348                 /* get child fid from parent and name */
349                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
350                 mdd_object_put(env, obj);
351                 if (rc)
352                         break;
353
354                 name = buf->lb_buf;
355                 lname->ln_namelen = 0;
356         }
357
358         if (!rc)
359                 *fid = *f;
360 out:
361         RETURN(rc);
362 }
363
364 /** The maximum depth that fid2path() will search.
365  * This is limited only because we want to store the fids for
366  * historical path lookup purposes.
367  */
368 #define MAX_PATH_DEPTH 100
369
370 /** mdd_path() lookup structure. */
371 struct path_lookup_info {
372         __u64                pli_recno;        /**< history point */
373         __u64                pli_currec;       /**< current record */
374         struct lu_fid        pli_fid;
375         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
376         struct mdd_object   *pli_mdd_obj;
377         char                *pli_path;         /**< full path */
378         int                  pli_pathlen;
379         int                  pli_linkno;       /**< which hardlink to follow */
380         int                  pli_fidcount;     /**< number of \a pli_fids */
381 };
382
383 static int mdd_path_current(const struct lu_env *env,
384                             struct path_lookup_info *pli)
385 {
386         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
387         struct mdd_object *mdd_obj;
388         struct lu_buf     *buf = NULL;
389         struct link_ea_header *leh;
390         struct link_ea_entry  *lee;
391         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
392         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
393         char *ptr;
394         int reclen;
395         int rc;
396         ENTRY;
397
398         ptr = pli->pli_path + pli->pli_pathlen - 1;
399         *ptr = 0;
400         --ptr;
401         pli->pli_fidcount = 0;
402         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
403
404         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
405                 mdd_obj = mdd_object_find(env, mdd,
406                                           &pli->pli_fids[pli->pli_fidcount]);
407                 if (mdd_obj == NULL)
408                         GOTO(out, rc = -EREMOTE);
409                 if (IS_ERR(mdd_obj))
410                         GOTO(out, rc = PTR_ERR(mdd_obj));
411                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
412                 if (rc <= 0) {
413                         mdd_object_put(env, mdd_obj);
414                         if (rc == -1)
415                                 rc = -EREMOTE;
416                         else if (rc == 0)
417                                 /* Do I need to error out here? */
418                                 rc = -ENOENT;
419                         GOTO(out, rc);
420                 }
421
422                 /* Get parent fid and object name */
423                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
424                 buf = mdd_links_get(env, mdd_obj);
425                 mdd_read_unlock(env, mdd_obj);
426                 mdd_object_put(env, mdd_obj);
427                 if (IS_ERR(buf))
428                         GOTO(out, rc = PTR_ERR(buf));
429
430                 leh = buf->lb_buf;
431                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
432                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
433
434                 /* If set, use link #linkno for path lookup, otherwise use
435                    link #0.  Only do this for the final path element. */
436                 if ((pli->pli_fidcount == 0) &&
437                     (pli->pli_linkno < leh->leh_reccount)) {
438                         int count;
439                         for (count = 0; count < pli->pli_linkno; count++) {
440                                 lee = (struct link_ea_entry *)
441                                      ((char *)lee + reclen);
442                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
443                         }
444                         if (pli->pli_linkno < leh->leh_reccount - 1)
445                                 /* indicate to user there are more links */
446                                 pli->pli_linkno++;
447                 }
448
449                 /* Pack the name in the end of the buffer */
450                 ptr -= tmpname->ln_namelen;
451                 if (ptr - 1 <= pli->pli_path)
452                         GOTO(out, rc = -EOVERFLOW);
453                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
454                 *(--ptr) = '/';
455
456                 /* Store the parent fid for historic lookup */
457                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
458                         GOTO(out, rc = -EOVERFLOW);
459                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
460         }
461
462         /* Verify that our path hasn't changed since we started the lookup.
463            Record the current index, and verify the path resolves to the
464            same fid. If it does, then the path is correct as of this index. */
465         cfs_spin_lock(&mdd->mdd_cl.mc_lock);
466         pli->pli_currec = mdd->mdd_cl.mc_index;
467         cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
468         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
469         if (rc) {
470                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
471                 GOTO (out, rc = -EAGAIN);
472         }
473         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
474                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
475                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
476                        PFID(&pli->pli_fid));
477                 GOTO(out, rc = -EAGAIN);
478         }
479         ptr++; /* skip leading / */
480         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
481
482         EXIT;
483 out:
484         if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
485                 /* if we vmalloced a large buffer drop it */
486                 mdd_buf_put(buf);
487
488         return rc;
489 }
490
491 static int mdd_path_historic(const struct lu_env *env,
492                              struct path_lookup_info *pli)
493 {
494         return 0;
495 }
496
497 /* Returns the full path to this fid, as of changelog record recno. */
498 static int mdd_path(const struct lu_env *env, struct md_object *obj,
499                     char *path, int pathlen, __u64 *recno, int *linkno)
500 {
501         struct path_lookup_info *pli;
502         int tries = 3;
503         int rc = -EAGAIN;
504         ENTRY;
505
506         if (pathlen < 3)
507                 RETURN(-EOVERFLOW);
508
509         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
510                 path[0] = '\0';
511                 RETURN(0);
512         }
513
514         OBD_ALLOC_PTR(pli);
515         if (pli == NULL)
516                 RETURN(-ENOMEM);
517
518         pli->pli_mdd_obj = md2mdd_obj(obj);
519         pli->pli_recno = *recno;
520         pli->pli_path = path;
521         pli->pli_pathlen = pathlen;
522         pli->pli_linkno = *linkno;
523
524         /* Retry multiple times in case file is being moved */
525         while (tries-- && rc == -EAGAIN)
526                 rc = mdd_path_current(env, pli);
527
528         /* For historical path lookup, the current links may not have existed
529          * at "recno" time.  We must switch over to earlier links/parents
530          * by using the changelog records.  If the earlier parent doesn't
531          * exist, we must search back through the changelog to reconstruct
532          * its parents, then check if it exists, etc.
533          * We may ignore this problem for the initial implementation and
534          * state that an "original" hardlink must still exist for us to find
535          * historic path name. */
536         if (pli->pli_recno != -1) {
537                 rc = mdd_path_historic(env, pli);
538         } else {
539                 *recno = pli->pli_currec;
540                 /* Return next link index to caller */
541                 *linkno = pli->pli_linkno;
542         }
543
544         OBD_FREE_PTR(pli);
545
546         RETURN (rc);
547 }
548
549 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
550 {
551         struct lu_attr *la = &mdd_env_info(env)->mti_la;
552         int rc;
553
554         ENTRY;
555         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
556         if (rc == 0) {
557                 mdd_flags_xlate(obj, la->la_flags);
558                 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
559                         obj->mod_flags |= MNLINK_OBJ;
560         }
561         RETURN(rc);
562 }
563
564 /* get only inode attributes */
565 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
566                   struct md_attr *ma)
567 {
568         int rc = 0;
569         ENTRY;
570
571         if (ma->ma_valid & MA_INODE)
572                 RETURN(0);
573
574         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
575                           mdd_object_capa(env, mdd_obj));
576         if (rc == 0)
577                 ma->ma_valid |= MA_INODE;
578         RETURN(rc);
579 }
580
581 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
582 {
583         struct lov_desc *ldesc;
584         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
585         struct lov_user_md *lum = (struct lov_user_md*)lmm;
586         ENTRY;
587
588         if (!lum)
589                 RETURN(0);
590
591         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
592         LASSERT(ldesc != NULL);
593
594         lum->lmm_magic = LOV_MAGIC_V1;
595         lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
596         lum->lmm_pattern = ldesc->ld_pattern;
597         lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
598         lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
599         lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
600
601         RETURN(sizeof(*lum));
602 }
603
604 static int is_rootdir(struct mdd_object *mdd_obj)
605 {
606         const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
607         const struct lu_fid *fid = mdo2fid(mdd_obj);
608
609         return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
610 }
611
612 /* get lov EA only */
613 static int __mdd_lmm_get(const struct lu_env *env,
614                          struct mdd_object *mdd_obj, struct md_attr *ma)
615 {
616         int rc;
617         ENTRY;
618
619         if (ma->ma_valid & MA_LOV)
620                 RETURN(0);
621
622         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
623                         XATTR_NAME_LOV);
624         if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
625                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
626         if (rc > 0) {
627                 ma->ma_lmm_size = rc;
628                 ma->ma_valid |= MA_LOV;
629                 rc = 0;
630         }
631         RETURN(rc);
632 }
633
634 /* get the first parent fid from link EA */
635 static int mdd_pfid_get(const struct lu_env *env,
636                         struct mdd_object *mdd_obj, struct md_attr *ma)
637 {
638         struct lu_buf *buf;
639         struct link_ea_header *leh;
640         struct link_ea_entry *lee;
641         struct lu_fid *pfid = &ma->ma_pfid;
642         ENTRY;
643
644         if (ma->ma_valid & MA_PFID)
645                 RETURN(0);
646
647         buf = mdd_links_get(env, mdd_obj);
648         if (IS_ERR(buf))
649                 RETURN(PTR_ERR(buf));
650
651         leh = buf->lb_buf;
652         lee = (struct link_ea_entry *)(leh + 1);
653         memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
654         fid_be_to_cpu(pfid, pfid);
655         ma->ma_valid |= MA_PFID;
656         if (buf->lb_len > OBD_ALLOC_BIG)
657                 /* if we vmalloced a large buffer drop it */
658                 mdd_buf_put(buf);
659         RETURN(0);
660 }
661
662 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
663                        struct md_attr *ma)
664 {
665         int rc;
666         ENTRY;
667
668         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
669         rc = __mdd_lmm_get(env, mdd_obj, ma);
670         mdd_read_unlock(env, mdd_obj);
671         RETURN(rc);
672 }
673
674 /* get lmv EA only*/
675 static int __mdd_lmv_get(const struct lu_env *env,
676                          struct mdd_object *mdd_obj, struct md_attr *ma)
677 {
678         int rc;
679         ENTRY;
680
681         if (ma->ma_valid & MA_LMV)
682                 RETURN(0);
683
684         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
685                         XATTR_NAME_LMV);
686         if (rc > 0) {
687                 ma->ma_valid |= MA_LMV;
688                 rc = 0;
689         }
690         RETURN(rc);
691 }
692
693 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
694                          struct md_attr *ma)
695 {
696         struct mdd_thread_info *info = mdd_env_info(env);
697         struct lustre_mdt_attrs *lma =
698                                  (struct lustre_mdt_attrs *)info->mti_xattr_buf;
699         int lma_size;
700         int rc;
701         ENTRY;
702
703         /* If all needed data are already valid, nothing to do */
704         if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
705             (ma->ma_need & (MA_HSM | MA_SOM)))
706                 RETURN(0);
707
708         /* Read LMA from disk EA */
709         lma_size = sizeof(info->mti_xattr_buf);
710         rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
711         if (rc <= 0)
712                 RETURN(rc);
713
714         /* Useless to check LMA incompatibility because this is already done in
715          * osd_ea_fid_get(), and this will fail long before this code is
716          * called.
717          * So, if we are here, LMA is compatible.
718          */
719
720         lustre_lma_swab(lma);
721
722         /* Swab and copy LMA */
723         if (ma->ma_need & MA_HSM) {
724                 if (lma->lma_compat & LMAC_HSM)
725                         ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
726                 else
727                         ma->ma_hsm.mh_flags = 0;
728                 ma->ma_valid |= MA_HSM;
729         }
730
731         /* Copy SOM */
732         if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
733                 LASSERT(ma->ma_som != NULL);
734                 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
735                 ma->ma_som->msd_size    = lma->lma_som_size;
736                 ma->ma_som->msd_blocks  = lma->lma_som_blocks;
737                 ma->ma_som->msd_mountid = lma->lma_som_mountid;
738                 ma->ma_valid |= MA_SOM;
739         }
740
741         RETURN(0);
742 }
743
744 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
745                                  struct md_attr *ma)
746 {
747         int rc = 0;
748         ENTRY;
749
750         if (ma->ma_need & MA_INODE)
751                 rc = mdd_iattr_get(env, mdd_obj, ma);
752
753         if (rc == 0 && ma->ma_need & MA_LOV) {
754                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
755                     S_ISDIR(mdd_object_type(mdd_obj)))
756                         rc = __mdd_lmm_get(env, mdd_obj, ma);
757         }
758         if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
759                 if (S_ISREG(mdd_object_type(mdd_obj)))
760                         rc = mdd_pfid_get(env, mdd_obj, ma);
761         }
762         if (rc == 0 && ma->ma_need & MA_LMV) {
763                 if (S_ISDIR(mdd_object_type(mdd_obj)))
764                         rc = __mdd_lmv_get(env, mdd_obj, ma);
765         }
766         if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
767                 if (S_ISREG(mdd_object_type(mdd_obj)))
768                         rc = __mdd_lma_get(env, mdd_obj, ma);
769         }
770 #ifdef CONFIG_FS_POSIX_ACL
771         if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
772                 if (S_ISDIR(mdd_object_type(mdd_obj)))
773                         rc = mdd_def_acl_get(env, mdd_obj, ma);
774         }
775 #endif
776         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
777                rc, ma->ma_valid, ma->ma_lmm);
778         RETURN(rc);
779 }
780
781 int mdd_attr_get_internal_locked(const struct lu_env *env,
782                                  struct mdd_object *mdd_obj, struct md_attr *ma)
783 {
784         int rc;
785         int needlock = ma->ma_need &
786                        (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
787
788         if (needlock)
789                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
790         rc = mdd_attr_get_internal(env, mdd_obj, ma);
791         if (needlock)
792                 mdd_read_unlock(env, mdd_obj);
793         return rc;
794 }
795
796 /*
797  * No permission check is needed.
798  */
799 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
800                         struct md_attr *ma)
801 {
802         struct mdd_object *mdd_obj = md2mdd_obj(obj);
803         int                rc;
804
805         ENTRY;
806         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
807         RETURN(rc);
808 }
809
810 /*
811  * No permission check is needed.
812  */
813 static int mdd_xattr_get(const struct lu_env *env,
814                          struct md_object *obj, struct lu_buf *buf,
815                          const char *name)
816 {
817         struct mdd_object *mdd_obj = md2mdd_obj(obj);
818         int rc;
819
820         ENTRY;
821
822         LASSERT(mdd_object_exists(mdd_obj));
823
824         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
825         rc = mdo_xattr_get(env, mdd_obj, buf, name,
826                            mdd_object_capa(env, mdd_obj));
827         mdd_read_unlock(env, mdd_obj);
828
829         RETURN(rc);
830 }
831
832 /*
833  * Permission check is done when open,
834  * no need check again.
835  */
836 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
837                         struct lu_buf *buf)
838 {
839         struct mdd_object *mdd_obj = md2mdd_obj(obj);
840         struct dt_object  *next;
841         loff_t             pos = 0;
842         int                rc;
843         ENTRY;
844
845         LASSERT(mdd_object_exists(mdd_obj));
846
847         next = mdd_object_child(mdd_obj);
848         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
849         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
850                                          mdd_object_capa(env, mdd_obj));
851         mdd_read_unlock(env, mdd_obj);
852         RETURN(rc);
853 }
854
855 /*
856  * No permission check is needed.
857  */
858 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
859                           struct lu_buf *buf)
860 {
861         struct mdd_object *mdd_obj = md2mdd_obj(obj);
862         int rc;
863
864         ENTRY;
865
866         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
867         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
868         mdd_read_unlock(env, mdd_obj);
869
870         RETURN(rc);
871 }
872
873 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
874                                struct mdd_object *c, struct md_attr *ma,
875                                struct thandle *handle,
876                                const struct md_op_spec *spec)
877 {
878         struct lu_attr *attr = &ma->ma_attr;
879         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
880         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
881         const struct dt_index_features *feat = spec->sp_feat;
882         int rc;
883         ENTRY;
884
885         if (!mdd_object_exists(c)) {
886                 struct dt_object *next = mdd_object_child(c);
887                 LASSERT(next);
888
889                 if (feat != &dt_directory_features && feat != NULL)
890                         dof->dof_type = DFT_INDEX;
891                 else
892                         dof->dof_type = dt_mode_to_dft(attr->la_mode);
893
894                 dof->u.dof_idx.di_feat = feat;
895
896                 /* @hint will be initialized by underlying device. */
897                 next->do_ops->do_ah_init(env, hint,
898                                          p ? mdd_object_child(p) : NULL,
899                                          attr->la_mode & S_IFMT);
900
901                 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
902                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
903         } else
904                 rc = -EEXIST;
905
906         RETURN(rc);
907 }
908
909 /**
910  * Make sure the ctime is increased only.
911  */
912 static inline int mdd_attr_check(const struct lu_env *env,
913                                  struct mdd_object *obj,
914                                  struct lu_attr *attr)
915 {
916         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
917         int rc;
918         ENTRY;
919
920         if (attr->la_valid & LA_CTIME) {
921                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
922                 if (rc)
923                         RETURN(rc);
924
925                 if (attr->la_ctime < tmp_la->la_ctime)
926                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
927                 else if (attr->la_valid == LA_CTIME &&
928                          attr->la_ctime == tmp_la->la_ctime)
929                         attr->la_valid &= ~LA_CTIME;
930         }
931         RETURN(0);
932 }
933
934 int mdd_attr_set_internal(const struct lu_env *env,
935                           struct mdd_object *obj,
936                           struct lu_attr *attr,
937                           struct thandle *handle,
938                           int needacl)
939 {
940         int rc;
941         ENTRY;
942
943         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
944 #ifdef CONFIG_FS_POSIX_ACL
945         if (!rc && (attr->la_valid & LA_MODE) && needacl)
946                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
947 #endif
948         RETURN(rc);
949 }
950
951 int mdd_attr_check_set_internal(const struct lu_env *env,
952                                 struct mdd_object *obj,
953                                 struct lu_attr *attr,
954                                 struct thandle *handle,
955                                 int needacl)
956 {
957         int rc;
958         ENTRY;
959
960         rc = mdd_attr_check(env, obj, attr);
961         if (rc)
962                 RETURN(rc);
963
964         if (attr->la_valid)
965                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
966         RETURN(rc);
967 }
968
969 static int mdd_attr_set_internal_locked(const struct lu_env *env,
970                                         struct mdd_object *obj,
971                                         struct lu_attr *attr,
972                                         struct thandle *handle,
973                                         int needacl)
974 {
975         int rc;
976         ENTRY;
977
978         needacl = needacl && (attr->la_valid & LA_MODE);
979         if (needacl)
980                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
981         rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
982         if (needacl)
983                 mdd_write_unlock(env, obj);
984         RETURN(rc);
985 }
986
987 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
988                                        struct mdd_object *obj,
989                                        struct lu_attr *attr,
990                                        struct thandle *handle,
991                                        int needacl)
992 {
993         int rc;
994         ENTRY;
995
996         needacl = needacl && (attr->la_valid & LA_MODE);
997         if (needacl)
998                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
999         rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
1000         if (needacl)
1001                 mdd_write_unlock(env, obj);
1002         RETURN(rc);
1003 }
1004
1005 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
1006                     const struct lu_buf *buf, const char *name,
1007                     int fl, struct thandle *handle)
1008 {
1009         struct lustre_capa *capa = mdd_object_capa(env, obj);
1010         int rc = -EINVAL;
1011         ENTRY;
1012
1013         if (buf->lb_buf && buf->lb_len > 0)
1014                 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1015         else if (buf->lb_buf == NULL && buf->lb_len == 0)
1016                 rc = mdo_xattr_del(env, obj, name, handle, capa);
1017
1018         RETURN(rc);
1019 }
1020
1021 /*
1022  * This gives the same functionality as the code between
1023  * sys_chmod and inode_setattr
1024  * chown_common and inode_setattr
1025  * utimes and inode_setattr
1026  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1027  */
1028 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1029                         struct lu_attr *la, const struct md_attr *ma)
1030 {
1031         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
1032         struct md_ucred  *uc;
1033         int               rc;
1034         ENTRY;
1035
1036         if (!la->la_valid)
1037                 RETURN(0);
1038
1039         /* Do not permit change file type */
1040         if (la->la_valid & LA_TYPE)
1041                 RETURN(-EPERM);
1042
1043         /* They should not be processed by setattr */
1044         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1045                 RETURN(-EPERM);
1046
1047         /* export destroy does not have ->le_ses, but we may want
1048          * to drop LUSTRE_SOM_FL. */
1049         if (!env->le_ses)
1050                 RETURN(0);
1051
1052         uc = md_ucred(env);
1053
1054         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1055         if (rc)
1056                 RETURN(rc);
1057
1058         if (la->la_valid == LA_CTIME) {
1059                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1060                         /* This is only for set ctime when rename's source is
1061                          * on remote MDS. */
1062                         rc = mdd_may_delete(env, NULL, obj,
1063                                             (struct md_attr *)ma, 1, 0);
1064                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1065                         la->la_valid &= ~LA_CTIME;
1066                 RETURN(rc);
1067         }
1068
1069         if (la->la_valid == LA_ATIME) {
1070                 /* This is atime only set for read atime update on close. */
1071                 if (la->la_atime >= tmp_la->la_atime &&
1072                     la->la_atime < (tmp_la->la_atime +
1073                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1074                         la->la_valid &= ~LA_ATIME;
1075                 RETURN(0);
1076         }
1077
1078         /* Check if flags change. */
1079         if (la->la_valid & LA_FLAGS) {
1080                 unsigned int oldflags = 0;
1081                 unsigned int newflags = la->la_flags &
1082                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1083
1084                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1085                     !mdd_capable(uc, CFS_CAP_FOWNER))
1086                         RETURN(-EPERM);
1087
1088                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1089                  * only be changed by the relevant capability. */
1090                 if (mdd_is_immutable(obj))
1091                         oldflags |= LUSTRE_IMMUTABLE_FL;
1092                 if (mdd_is_append(obj))
1093                         oldflags |= LUSTRE_APPEND_FL;
1094                 if ((oldflags ^ newflags) &&
1095                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1096                         RETURN(-EPERM);
1097
1098                 if (!S_ISDIR(tmp_la->la_mode))
1099                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1100         }
1101
1102         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1103             (la->la_valid & ~LA_FLAGS) &&
1104             !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1105                 RETURN(-EPERM);
1106
1107         /* Check for setting the obj time. */
1108         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1109             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1110                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1111                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
1112                         rc = mdd_permission_internal_locked(env, obj, tmp_la,
1113                                                             MAY_WRITE,
1114                                                             MOR_TGT_CHILD);
1115                         if (rc)
1116                                 RETURN(rc);
1117                 }
1118         }
1119
1120         if (la->la_valid & LA_KILL_SUID) {
1121                 la->la_valid &= ~LA_KILL_SUID;
1122                 if ((tmp_la->la_mode & S_ISUID) &&
1123                     !(la->la_valid & LA_MODE)) {
1124                         la->la_mode = tmp_la->la_mode;
1125                         la->la_valid |= LA_MODE;
1126                 }
1127                 la->la_mode &= ~S_ISUID;
1128         }
1129
1130         if (la->la_valid & LA_KILL_SGID) {
1131                 la->la_valid &= ~LA_KILL_SGID;
1132                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1133                                         (S_ISGID | S_IXGRP)) &&
1134                     !(la->la_valid & LA_MODE)) {
1135                         la->la_mode = tmp_la->la_mode;
1136                         la->la_valid |= LA_MODE;
1137                 }
1138                 la->la_mode &= ~S_ISGID;
1139         }
1140
1141         /* Make sure a caller can chmod. */
1142         if (la->la_valid & LA_MODE) {
1143                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1144                     (uc->mu_fsuid != tmp_la->la_uid) &&
1145                     !mdd_capable(uc, CFS_CAP_FOWNER))
1146                         RETURN(-EPERM);
1147
1148                 if (la->la_mode == (cfs_umode_t) -1)
1149                         la->la_mode = tmp_la->la_mode;
1150                 else
1151                         la->la_mode = (la->la_mode & S_IALLUGO) |
1152                                       (tmp_la->la_mode & ~S_IALLUGO);
1153
1154                 /* Also check the setgid bit! */
1155                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1156                                        la->la_gid : tmp_la->la_gid) &&
1157                     !mdd_capable(uc, CFS_CAP_FSETID))
1158                         la->la_mode &= ~S_ISGID;
1159         } else {
1160                la->la_mode = tmp_la->la_mode;
1161         }
1162
1163         /* Make sure a caller can chown. */
1164         if (la->la_valid & LA_UID) {
1165                 if (la->la_uid == (uid_t) -1)
1166                         la->la_uid = tmp_la->la_uid;
1167                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1168                     (la->la_uid != tmp_la->la_uid)) &&
1169                     !mdd_capable(uc, CFS_CAP_CHOWN))
1170                         RETURN(-EPERM);
1171
1172                 /* If the user or group of a non-directory has been
1173                  * changed by a non-root user, remove the setuid bit.
1174                  * 19981026 David C Niemi <niemi@tux.org>
1175                  *
1176                  * Changed this to apply to all users, including root,
1177                  * to avoid some races. This is the behavior we had in
1178                  * 2.0. The check for non-root was definitely wrong
1179                  * for 2.2 anyway, as it should have been using
1180                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
1181                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1182                     !S_ISDIR(tmp_la->la_mode)) {
1183                         la->la_mode &= ~S_ISUID;
1184                         la->la_valid |= LA_MODE;
1185                 }
1186         }
1187
1188         /* Make sure caller can chgrp. */
1189         if (la->la_valid & LA_GID) {
1190                 if (la->la_gid == (gid_t) -1)
1191                         la->la_gid = tmp_la->la_gid;
1192                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1193                     ((la->la_gid != tmp_la->la_gid) &&
1194                     !lustre_in_group_p(uc, la->la_gid))) &&
1195                     !mdd_capable(uc, CFS_CAP_CHOWN))
1196                         RETURN(-EPERM);
1197
1198                 /* Likewise, if the user or group of a non-directory
1199                  * has been changed by a non-root user, remove the
1200                  * setgid bit UNLESS there is no group execute bit
1201                  * (this would be a file marked for mandatory
1202                  * locking).  19981026 David C Niemi <niemi@tux.org>
1203                  *
1204                  * Removed the fsuid check (see the comment above) --
1205                  * 19990830 SD. */
1206                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1207                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1208                         la->la_mode &= ~S_ISGID;
1209                         la->la_valid |= LA_MODE;
1210                 }
1211         }
1212
1213         /* For both Size-on-MDS case and truncate case,
1214          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1215          * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1216          * For SOM case, it is true, the MAY_WRITE perm has been checked
1217          * when open, no need check again. For truncate case, it is false,
1218          * the MAY_WRITE perm should be checked here. */
1219         if (ma->ma_attr_flags & MDS_SOM) {
1220                 /* For the "Size-on-MDS" setattr update, merge coming
1221                  * attributes with the set in the inode. BUG 10641 */
1222                 if ((la->la_valid & LA_ATIME) &&
1223                     (la->la_atime <= tmp_la->la_atime))
1224                         la->la_valid &= ~LA_ATIME;
1225
1226                 /* OST attributes do not have a priority over MDS attributes,
1227                  * so drop times if ctime is equal. */
1228                 if ((la->la_valid & LA_CTIME) &&
1229                     (la->la_ctime <= tmp_la->la_ctime))
1230                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1231         } else {
1232                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1233                         if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1234                               (uc->mu_fsuid == tmp_la->la_uid)) &&
1235                             !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1236                                 rc = mdd_permission_internal_locked(env, obj,
1237                                                             tmp_la, MAY_WRITE,
1238                                                             MOR_TGT_CHILD);
1239                                 if (rc)
1240                                         RETURN(rc);
1241                         }
1242                 }
1243                 if (la->la_valid & LA_CTIME) {
1244                         /* The pure setattr, it has the priority over what is
1245                          * already set, do not drop it if ctime is equal. */
1246                         if (la->la_ctime < tmp_la->la_ctime)
1247                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1248                                                   LA_CTIME);
1249                 }
1250         }
1251
1252         RETURN(0);
1253 }
1254
1255 /** Store a data change changelog record
1256  * If this fails, we must fail the whole transaction; we don't
1257  * want the change to commit without the log entry.
1258  * \param mdd_obj - mdd_object of change
1259  * \param handle - transacion handle
1260  */
1261 static int mdd_changelog_data_store(const struct lu_env     *env,
1262                                     struct mdd_device       *mdd,
1263                                     enum changelog_rec_type type,
1264                                     int                     flags,
1265                                     struct mdd_object       *mdd_obj,
1266                                     struct thandle          *handle)
1267 {
1268         const struct lu_fid *tfid = mdo2fid(mdd_obj);
1269         struct llog_changelog_rec *rec;
1270         struct thandle *th = NULL;
1271         struct lu_buf *buf;
1272         int reclen;
1273         int rc;
1274
1275         /* Not recording */
1276         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1277                 RETURN(0);
1278         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1279                 RETURN(0);
1280
1281         LASSERT(mdd_obj != NULL);
1282
1283         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1284             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1285                 /* Don't need multiple updates in this log */
1286                 /* Don't check under lock - no big deal if we get an extra
1287                    entry */
1288                 RETURN(0);
1289         }
1290
1291         reclen = llog_data_len(sizeof(*rec));
1292         buf = mdd_buf_alloc(env, reclen);
1293         if (buf->lb_buf == NULL)
1294                 RETURN(-ENOMEM);
1295         rec = (struct llog_changelog_rec *)buf->lb_buf;
1296
1297         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1298         rec->cr.cr_type = (__u32)type;
1299         rec->cr.cr_tfid = *tfid;
1300         rec->cr.cr_namelen = 0;
1301         mdd_obj->mod_cltime = cfs_time_current_64();
1302
1303         if (handle == NULL) {
1304                 /* Used for the close event only for now. */
1305                 LASSERT(type == CL_CLOSE);
1306                 LASSERT(mdd_env_info(env)->mti_param.tp_credits != 0);
1307                 th = mdd_trans_start(env, mdd);
1308                 if (IS_ERR(th))
1309                         GOTO(err, rc = PTR_ERR(th));
1310         }
1311
1312         rc = mdd_changelog_llog_write(mdd, rec, handle ? : th);
1313
1314         if (th)
1315                 mdd_trans_stop(env, mdd, rc, th);
1316 err:
1317         if (rc < 0) {
1318                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1319                        rc, type, PFID(tfid));
1320                 return -EFAULT;
1321         }
1322
1323         return 0;
1324 }
1325
1326 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1327                   int flags, struct md_object *obj)
1328 {
1329         struct thandle *handle;
1330         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1331         struct mdd_device *mdd = mdo2mdd(obj);
1332         int rc;
1333         ENTRY;
1334
1335         handle = mdd_trans_start(env, mdd);
1336
1337         if (IS_ERR(handle))
1338                 return(PTR_ERR(handle));
1339
1340         rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1341                                       handle);
1342
1343         mdd_trans_stop(env, mdd, rc, handle);
1344
1345         RETURN(rc);
1346 }
1347
1348 /**
1349  * Should be called with write lock held.
1350  *
1351  * \see mdd_lma_set_locked().
1352  */
1353 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1354                        const struct md_attr *ma, struct thandle *handle)
1355 {
1356         struct mdd_thread_info *info = mdd_env_info(env);
1357         struct lu_buf *buf;
1358         struct lustre_mdt_attrs *lma =
1359                                 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1360         int lmasize = sizeof(struct lustre_mdt_attrs);
1361         int rc = 0;
1362
1363         ENTRY;
1364
1365         /* Either HSM or SOM part is not valid, we need to read it before */
1366         if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1367                 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1368                 if (rc <= 0)
1369                         RETURN(rc);
1370
1371                 lustre_lma_swab(lma);
1372         } else {
1373                 memset(lma, 0, lmasize);
1374         }
1375
1376         /* Copy HSM data */
1377         if (ma->ma_valid & MA_HSM) {
1378                 lma->lma_flags  |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1379                 lma->lma_compat |= LMAC_HSM;
1380         }
1381
1382         /* Copy SOM data */
1383         if (ma->ma_valid & MA_SOM) {
1384                 LASSERT(ma->ma_som != NULL);
1385                 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1386                         lma->lma_compat     &= ~LMAC_SOM;
1387                 } else {
1388                         lma->lma_compat     |= LMAC_SOM;
1389                         lma->lma_ioepoch     = ma->ma_som->msd_ioepoch;
1390                         lma->lma_som_size    = ma->ma_som->msd_size;
1391                         lma->lma_som_blocks  = ma->ma_som->msd_blocks;
1392                         lma->lma_som_mountid = ma->ma_som->msd_mountid;
1393                 }
1394         }
1395
1396         /* Copy FID */
1397         memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1398
1399         lustre_lma_swab(lma);
1400         buf = mdd_buf_get(env, lma, lmasize);
1401         rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1402
1403         RETURN(rc);
1404 }
1405
1406 /**
1407  * Save LMA extended attributes with data from \a ma.
1408  *
1409  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1410  * not, LMA EA will be first read from disk, modified and write back.
1411  *
1412  */
1413 static int mdd_lma_set_locked(const struct lu_env *env,
1414                               struct mdd_object *mdd_obj,
1415                               const struct md_attr *ma, struct thandle *handle)
1416 {
1417         int rc;
1418
1419         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1420         rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1421         mdd_write_unlock(env, mdd_obj);
1422         return rc;
1423 }
1424
1425 /* Precedence for choosing record type when multiple
1426  * attributes change: setattr > mtime > ctime > atime
1427  * (ctime changes when mtime does, plus chmod/chown.
1428  * atime and ctime are independent.) */
1429 static int mdd_attr_set_changelog(const struct lu_env *env,
1430                                   struct md_object *obj, struct thandle *handle,
1431                                   __u64 valid)
1432 {
1433         struct mdd_device *mdd = mdo2mdd(obj);
1434         int bits, type = 0;
1435
1436         bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1437         bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1438         bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1439         bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1440         bits = bits & mdd->mdd_cl.mc_mask;
1441         if (bits == 0)
1442                 return 0;
1443
1444         /* The record type is the lowest non-masked set bit */
1445         while (bits && ((bits & 1) == 0)) {
1446                 bits = bits >> 1;
1447                 type++;
1448         }
1449
1450         /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1451         return mdd_changelog_data_store(env, mdd, type, (int)valid,
1452                                         md2mdd_obj(obj), handle);
1453 }
1454
1455 /* set attr and LOV EA at once, return updated attr */
1456 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1457                         const struct md_attr *ma)
1458 {
1459         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1460         struct mdd_device *mdd = mdo2mdd(obj);
1461         struct thandle *handle;
1462         struct lov_mds_md *lmm = NULL;
1463         struct llog_cookie *logcookies = NULL;
1464         int  rc, lmm_size = 0, cookie_size = 0, chlog_cnt;
1465         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1466         struct obd_device *obd = mdd->mdd_obd_dev;
1467         struct mds_obd *mds = &obd->u.mds;
1468 #ifdef HAVE_QUOTA_SUPPORT
1469         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1470         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1471         int quota_opc = 0, block_count = 0;
1472         int inode_pending[MAXQUOTAS] = { 0, 0 };
1473         int block_pending[MAXQUOTAS] = { 0, 0 };
1474 #endif
1475         ENTRY;
1476
1477         *la_copy = ma->ma_attr;
1478         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1479         if (rc != 0)
1480                 RETURN(rc);
1481
1482         /* setattr on "close" only change atime, or do nothing */
1483         if (ma->ma_valid == MA_INODE &&
1484             ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1485                 RETURN(0);
1486
1487         /*TODO: add lock here*/
1488         /* start a log jounal handle if needed */
1489         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1490             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1491                 lmm_size = mdd_lov_mdsize(env, mdd);
1492                 lmm = mdd_max_lmm_get(env, mdd);
1493                 if (lmm == NULL)
1494                         GOTO(no_trans, rc = -ENOMEM);
1495
1496                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1497                                 XATTR_NAME_LOV);
1498
1499                 if (rc < 0)
1500                         GOTO(no_trans, rc);
1501         }
1502
1503         chlog_cnt = 1;
1504         if (la_copy->la_valid && !(la_copy->la_valid & LA_FLAGS) && lmm_size) {
1505                 chlog_cnt += (lmm->lmm_stripe_count >= 0) ?
1506                          lmm->lmm_stripe_count : mds->mds_lov_desc.ld_tgt_count;
1507         }
1508
1509         mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1510                                     MDD_TXN_ATTR_SET_OP, chlog_cnt);
1511         handle = mdd_trans_start(env, mdd);
1512         if (IS_ERR(handle))
1513                 GOTO(no_trans, rc = PTR_ERR(handle));
1514
1515         /* permission changes may require sync operation */
1516         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1517                 handle->th_sync |= mdd->mdd_sync_permission;
1518
1519         if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1520                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1521                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1522
1523 #ifdef HAVE_QUOTA_SUPPORT
1524         if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1525                 struct obd_export *exp = md_quota(env)->mq_exp;
1526                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1527
1528                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1529                 if (!rc) {
1530                         quota_opc = FSFILT_OP_SETATTR;
1531                         mdd_quota_wrapper(la_copy, qnids);
1532                         mdd_quota_wrapper(la_tmp, qoids);
1533                         /* get file quota for new owner */
1534                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1535                                         qnids, inode_pending, 1, NULL, 0,
1536                                         NULL, 0);
1537                         block_count = (la_tmp->la_blocks + 7) >> 3;
1538                         if (block_count) {
1539                                 void *data = NULL;
1540                                 mdd_data_get(env, mdd_obj, &data);
1541                                 /* get block quota for new owner */
1542                                 lquota_chkquota(mds_quota_interface_ref, obd,
1543                                                 exp, qnids, block_pending,
1544                                                 block_count, NULL,
1545                                                 LQUOTA_FLAGS_BLK, data, 1);
1546                         }
1547                 }
1548         }
1549 #endif
1550
1551         if (la_copy->la_valid & LA_FLAGS) {
1552                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1553                                                   handle, 1);
1554                 if (rc == 0)
1555                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1556         } else if (la_copy->la_valid) {            /* setattr */
1557                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1558                                                   handle, 1);
1559                 /* journal chown/chgrp in llog, just like unlink */
1560                 if (rc == 0 && lmm_size){
1561                         cookie_size = mdd_lov_cookiesize(env, mdd);
1562                         logcookies = mdd_max_cookie_get(env, mdd);
1563                         if (logcookies == NULL)
1564                                 GOTO(cleanup, rc = -ENOMEM);
1565
1566                         if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1567                                             logcookies, cookie_size) <= 0)
1568                                 logcookies = NULL;
1569                 }
1570         }
1571
1572         if (rc == 0 && ma->ma_valid & MA_LOV) {
1573                 cfs_umode_t mode;
1574
1575                 mode = mdd_object_type(mdd_obj);
1576                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1577                         rc = mdd_lsm_sanity_check(env, mdd_obj);
1578                         if (rc)
1579                                 GOTO(cleanup, rc);
1580
1581                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1582                                             ma->ma_lmm_size, handle, 1);
1583                 }
1584
1585         }
1586         if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1587                 cfs_umode_t mode;
1588
1589                 mode = mdd_object_type(mdd_obj);
1590                 if (S_ISREG(mode))
1591                         rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1592
1593         }
1594 cleanup:
1595         if (rc == 0)
1596                 rc = mdd_attr_set_changelog(env, obj, handle,
1597                                             ma->ma_attr.la_valid);
1598         mdd_trans_stop(env, mdd, rc, handle);
1599 no_trans:
1600         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1601                 /*set obd attr, if needed*/
1602                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1603                                            logcookies);
1604         }
1605 #ifdef HAVE_QUOTA_SUPPORT
1606         if (quota_opc) {
1607                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1608                                       inode_pending, 0);
1609                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1610                                       block_pending, 1);
1611                 /* Trigger dqrel/dqacq for original owner and new owner.
1612                  * If failed, the next call for lquota_chkquota will
1613                  * process it. */
1614                 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1615                               quota_opc);
1616         }
1617 #endif
1618         RETURN(rc);
1619 }
1620
1621 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1622                       const struct lu_buf *buf, const char *name, int fl,
1623                       struct thandle *handle)
1624 {
1625         int  rc;
1626         ENTRY;
1627
1628         mdd_write_lock(env, obj, MOR_TGT_CHILD);
1629         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1630         mdd_write_unlock(env, obj);
1631
1632         RETURN(rc);
1633 }
1634
1635 static int mdd_xattr_sanity_check(const struct lu_env *env,
1636                                   struct mdd_object *obj)
1637 {
1638         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1639         struct md_ucred *uc     = md_ucred(env);
1640         int rc;
1641         ENTRY;
1642
1643         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1644                 RETURN(-EPERM);
1645
1646         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1647         if (rc)
1648                 RETURN(rc);
1649
1650         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1651             !mdd_capable(uc, CFS_CAP_FOWNER))
1652                 RETURN(-EPERM);
1653
1654         RETURN(rc);
1655 }
1656
1657 /**
1658  * The caller should guarantee to update the object ctime
1659  * after xattr_set if needed.
1660  */
1661 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1662                          const struct lu_buf *buf, const char *name,
1663                          int fl)
1664 {
1665         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1666         struct mdd_device *mdd = mdo2mdd(obj);
1667         struct thandle *handle;
1668         int  rc;
1669         ENTRY;
1670
1671         rc = mdd_xattr_sanity_check(env, mdd_obj);
1672         if (rc)
1673                 RETURN(rc);
1674
1675         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 1);
1676         handle = mdd_trans_start(env, mdd);
1677         if (IS_ERR(handle))
1678                 RETURN(PTR_ERR(handle));
1679
1680         /* security-replated changes may require sync */
1681         if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1682                 handle->th_sync |= mdd->mdd_sync_permission;
1683
1684         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1685
1686         /* Only record system & user xattr changes */
1687         if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1688                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1689                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1690                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1691                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1692                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1693                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1694                                               handle);
1695         mdd_trans_stop(env, mdd, rc, handle);
1696
1697         RETURN(rc);
1698 }
1699
1700 /**
1701  * The caller should guarantee to update the object ctime
1702  * after xattr_set if needed.
1703  */
1704 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1705                   const char *name)
1706 {
1707         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1708         struct mdd_device *mdd = mdo2mdd(obj);
1709         struct thandle *handle;
1710         int  rc;
1711         ENTRY;
1712
1713         rc = mdd_xattr_sanity_check(env, mdd_obj);
1714         if (rc)
1715                 RETURN(rc);
1716
1717         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 1);
1718         handle = mdd_trans_start(env, mdd);
1719         if (IS_ERR(handle))
1720                 RETURN(PTR_ERR(handle));
1721
1722         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1723         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1724                            mdd_object_capa(env, mdd_obj));
1725         mdd_write_unlock(env, mdd_obj);
1726
1727         /* Only record system & user xattr changes */
1728         if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1729                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1730                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1731                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1732                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1733                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1734                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1735                                               handle);
1736
1737         mdd_trans_stop(env, mdd, rc, handle);
1738
1739         RETURN(rc);
1740 }
1741
1742 /* partial unlink */
1743 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1744                        struct md_attr *ma)
1745 {
1746         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1747         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1748         struct mdd_device *mdd = mdo2mdd(obj);
1749         struct thandle *handle;
1750 #ifdef HAVE_QUOTA_SUPPORT
1751         struct obd_device *obd = mdd->mdd_obd_dev;
1752         struct mds_obd *mds = &obd->u.mds;
1753         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1754         int quota_opc = 0;
1755 #endif
1756         int rc;
1757         ENTRY;
1758
1759         /*
1760          * Check -ENOENT early here because we need to get object type
1761          * to calculate credits before transaction start
1762          */
1763         if (!mdd_object_exists(mdd_obj))
1764                 RETURN(-ENOENT);
1765
1766         LASSERT(mdd_object_exists(mdd_obj) > 0);
1767
1768         rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP, 0);
1769         if (rc)
1770                 RETURN(rc);
1771
1772         handle = mdd_trans_start(env, mdd);
1773         if (IS_ERR(handle))
1774                 RETURN(-ENOMEM);
1775
1776         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1777
1778         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1779         if (rc)
1780                 GOTO(cleanup, rc);
1781
1782         __mdd_ref_del(env, mdd_obj, handle, 0);
1783
1784         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1785                 /* unlink dot */
1786                 __mdd_ref_del(env, mdd_obj, handle, 1);
1787         }
1788
1789         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1790         la_copy->la_ctime = ma->ma_attr.la_ctime;
1791
1792         la_copy->la_valid = LA_CTIME;
1793         rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1794         if (rc)
1795                 GOTO(cleanup, rc);
1796
1797         rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1798 #ifdef HAVE_QUOTA_SUPPORT
1799         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1800             ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1801                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1802                 mdd_quota_wrapper(&ma->ma_attr, qids);
1803         }
1804 #endif
1805
1806
1807         EXIT;
1808 cleanup:
1809         mdd_write_unlock(env, mdd_obj);
1810         mdd_trans_stop(env, mdd, rc, handle);
1811 #ifdef HAVE_QUOTA_SUPPORT
1812         if (quota_opc)
1813                 /* Trigger dqrel on the owner of child. If failed,
1814                  * the next call for lquota_chkquota will process it */
1815                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1816                               quota_opc);
1817 #endif
1818         return rc;
1819 }
1820
1821 /* partial operation */
1822 static int mdd_oc_sanity_check(const struct lu_env *env,
1823                                struct mdd_object *obj,
1824                                struct md_attr *ma)
1825 {
1826         int rc;
1827         ENTRY;
1828
1829         switch (ma->ma_attr.la_mode & S_IFMT) {
1830         case S_IFREG:
1831         case S_IFDIR:
1832         case S_IFLNK:
1833         case S_IFCHR:
1834         case S_IFBLK:
1835         case S_IFIFO:
1836         case S_IFSOCK:
1837                 rc = 0;
1838                 break;
1839         default:
1840                 rc = -EINVAL;
1841                 break;
1842         }
1843         RETURN(rc);
1844 }
1845
1846 static int mdd_object_create(const struct lu_env *env,
1847                              struct md_object *obj,
1848                              const struct md_op_spec *spec,
1849                              struct md_attr *ma)
1850 {
1851
1852         struct mdd_device *mdd = mdo2mdd(obj);
1853         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1854         const struct lu_fid *pfid = spec->u.sp_pfid;
1855         struct thandle *handle;
1856 #ifdef HAVE_QUOTA_SUPPORT
1857         struct obd_device *obd = mdd->mdd_obd_dev;
1858         struct obd_export *exp = md_quota(env)->mq_exp;
1859         struct mds_obd *mds = &obd->u.mds;
1860         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1861         int quota_opc = 0, block_count = 0;
1862         int inode_pending[MAXQUOTAS] = { 0, 0 };
1863         int block_pending[MAXQUOTAS] = { 0, 0 };
1864 #endif
1865         int rc = 0;
1866         ENTRY;
1867
1868 #ifdef HAVE_QUOTA_SUPPORT
1869         if (mds->mds_quota) {
1870                 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1871                 mdd_quota_wrapper(&ma->ma_attr, qids);
1872                 /* get file quota for child */
1873                 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1874                                 qids, inode_pending, 1, NULL, 0,
1875                                 NULL, 0);
1876                 switch (ma->ma_attr.la_mode & S_IFMT) {
1877                 case S_IFLNK:
1878                 case S_IFDIR:
1879                         block_count = 2;
1880                         break;
1881                 case S_IFREG:
1882                         block_count = 1;
1883                         break;
1884                 }
1885                 /* get block quota for child */
1886                 if (block_count)
1887                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1888                                         qids, block_pending, block_count,
1889                                         NULL, LQUOTA_FLAGS_BLK, NULL, 0);
1890         }
1891 #endif
1892
1893         mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP, 0);
1894         handle = mdd_trans_start(env, mdd);
1895         if (IS_ERR(handle))
1896                 GOTO(out_pending, rc = PTR_ERR(handle));
1897
1898         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1899         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1900         if (rc)
1901                 GOTO(unlock, rc);
1902
1903         rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1904         if (rc)
1905                 GOTO(unlock, rc);
1906
1907         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1908                 /* If creating the slave object, set slave EA here. */
1909                 int lmv_size = spec->u.sp_ea.eadatalen;
1910                 struct lmv_stripe_md *lmv;
1911
1912                 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1913                 LASSERT(lmv != NULL && lmv_size > 0);
1914
1915                 rc = __mdd_xattr_set(env, mdd_obj,
1916                                      mdd_buf_get_const(env, lmv, lmv_size),
1917                                      XATTR_NAME_LMV, 0, handle);
1918                 if (rc)
1919                         GOTO(unlock, rc);
1920
1921                 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1922                                            handle, 0);
1923         } else {
1924 #ifdef CONFIG_FS_POSIX_ACL
1925                 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1926                         struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1927
1928                         buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1929                         buf->lb_len = spec->u.sp_ea.eadatalen;
1930                         if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1931                                 rc = __mdd_acl_init(env, mdd_obj, buf,
1932                                                     &ma->ma_attr.la_mode,
1933                                                     handle);
1934                                 if (rc)
1935                                         GOTO(unlock, rc);
1936                                 else
1937                                         ma->ma_attr.la_valid |= LA_MODE;
1938                         }
1939
1940                         pfid = spec->u.sp_ea.fid;
1941                 }
1942 #endif
1943                 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1944                                            spec);
1945         }
1946         EXIT;
1947 unlock:
1948         if (rc == 0)
1949                 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1950         mdd_write_unlock(env, mdd_obj);
1951
1952         mdd_trans_stop(env, mdd, rc, handle);
1953 out_pending:
1954 #ifdef HAVE_QUOTA_SUPPORT
1955         if (quota_opc) {
1956                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1957                                       inode_pending, 0);
1958                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1959                                       block_pending, 1);
1960                 /* Trigger dqacq on the owner of child. If failed,
1961                  * the next call for lquota_chkquota will process it. */
1962                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1963                               quota_opc);
1964         }
1965 #endif
1966         return rc;
1967 }
1968
1969 /* partial link */
1970 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1971                        const struct md_attr *ma)
1972 {
1973         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1974         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1975         struct mdd_device *mdd = mdo2mdd(obj);
1976         struct thandle *handle;
1977         int rc;
1978         ENTRY;
1979
1980         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 0);
1981         handle = mdd_trans_start(env, mdd);
1982         if (IS_ERR(handle))
1983                 RETURN(-ENOMEM);
1984
1985         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1986         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1987         if (rc == 0)
1988                 __mdd_ref_add(env, mdd_obj, handle);
1989         mdd_write_unlock(env, mdd_obj);
1990         if (rc == 0) {
1991                 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1992                 la_copy->la_ctime = ma->ma_attr.la_ctime;
1993
1994                 la_copy->la_valid = LA_CTIME;
1995                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1996                                                         handle, 0);
1997         }
1998         mdd_trans_stop(env, mdd, 0, handle);
1999
2000         RETURN(rc);
2001 }
2002
2003 /*
2004  * do NOT or the MAY_*'s, you'll get the weakest
2005  */
2006 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
2007 {
2008         int res = 0;
2009
2010         /* Sadly, NFSD reopens a file repeatedly during operation, so the
2011          * "acc_mode = 0" allowance for newly-created files isn't honoured.
2012          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
2013          * owner can write to a file even if it is marked readonly to hide
2014          * its brokenness. (bug 5781) */
2015         if (flags & MDS_OPEN_OWNEROVERRIDE) {
2016                 struct md_ucred *uc = md_ucred(env);
2017
2018                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
2019                     (la->la_uid == uc->mu_fsuid))
2020                         return 0;
2021         }
2022
2023         if (flags & FMODE_READ)
2024                 res |= MAY_READ;
2025         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2026                 res |= MAY_WRITE;
2027         if (flags & MDS_FMODE_EXEC)
2028                 res = MAY_EXEC;
2029         return res;
2030 }
2031
2032 static int mdd_open_sanity_check(const struct lu_env *env,
2033                                  struct mdd_object *obj, int flag)
2034 {
2035         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2036         int mode, rc;
2037         ENTRY;
2038
2039         /* EEXIST check */
2040         if (mdd_is_dead_obj(obj))
2041                 RETURN(-ENOENT);
2042
2043         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
2044         if (rc)
2045                RETURN(rc);
2046
2047         if (S_ISLNK(tmp_la->la_mode))
2048                 RETURN(-ELOOP);
2049
2050         mode = accmode(env, tmp_la, flag);
2051
2052         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2053                 RETURN(-EISDIR);
2054
2055         if (!(flag & MDS_OPEN_CREATED)) {
2056                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
2057                 if (rc)
2058                         RETURN(rc);
2059         }
2060
2061         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2062             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2063                 flag &= ~MDS_OPEN_TRUNC;
2064
2065         /* For writing append-only file must open it with append mode. */
2066         if (mdd_is_append(obj)) {
2067                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2068                         RETURN(-EPERM);
2069                 if (flag & MDS_OPEN_TRUNC)
2070                         RETURN(-EPERM);
2071         }
2072
2073 #if 0
2074         /*
2075          * Now, flag -- O_NOATIME does not be packed by client.
2076          */
2077         if (flag & O_NOATIME) {
2078                 struct md_ucred *uc = md_ucred(env);
2079
2080                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2081                     (uc->mu_valid == UCRED_NEW)) &&
2082                     (uc->mu_fsuid != tmp_la->la_uid) &&
2083                     !mdd_capable(uc, CFS_CAP_FOWNER))
2084                         RETURN(-EPERM);
2085         }
2086 #endif
2087
2088         RETURN(0);
2089 }
2090
2091 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2092                     int flags)
2093 {
2094         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2095         int rc = 0;
2096
2097         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2098
2099         rc = mdd_open_sanity_check(env, mdd_obj, flags);
2100         if (rc == 0)
2101                 mdd_obj->mod_count++;
2102
2103         mdd_write_unlock(env, mdd_obj);
2104         return rc;
2105 }
2106
2107 /* return md_attr back,
2108  * if it is last unlink then return lov ea + llog cookie*/
2109 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2110                     struct md_attr *ma)
2111 {
2112         int rc = 0;
2113         ENTRY;
2114
2115         if (S_ISREG(mdd_object_type(obj))) {
2116                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2117                  * Caller must be ready for that. */
2118
2119                 rc = __mdd_lmm_get(env, obj, ma);
2120                 if ((ma->ma_valid & MA_LOV))
2121                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2122                                             obj, ma);
2123         }
2124         RETURN(rc);
2125 }
2126
2127 /*
2128  * No permission check is needed.
2129  */
2130 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2131                      struct md_attr *ma, int mode)
2132 {
2133         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2134         struct mdd_device *mdd = mdo2mdd(obj);
2135         struct thandle    *handle = NULL;
2136         int rc;
2137         int reset = 1;
2138
2139 #ifdef HAVE_QUOTA_SUPPORT
2140         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2141         struct mds_obd *mds = &obd->u.mds;
2142         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2143         int quota_opc = 0;
2144 #endif
2145         ENTRY;
2146
2147         if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2148                 mdd_obj->mod_count--;
2149
2150                 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2151                         CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2152                                "list\n", PFID(mdd_object_fid(mdd_obj)));
2153                 RETURN(0);
2154         }
2155
2156         /* check without any lock */
2157         if (mdd_obj->mod_count == 1 &&
2158             (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2159  again:
2160                 rc = mdd_log_txn_param_build(env, obj, ma,
2161                                              MDD_TXN_UNLINK_OP, 1);
2162                 if (rc)
2163                         RETURN(rc);
2164                 handle = mdd_trans_start(env, mdo2mdd(obj));
2165                 if (IS_ERR(handle))
2166                         RETURN(PTR_ERR(handle));
2167         }
2168
2169         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2170         if (handle == NULL && mdd_obj->mod_count == 1 &&
2171             (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2172                 mdd_write_unlock(env, mdd_obj);
2173                 goto again;
2174         }
2175
2176         /* release open count */
2177         mdd_obj->mod_count --;
2178
2179         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2180                 /* remove link to object from orphan index */
2181                 rc = __mdd_orphan_del(env, mdd_obj, handle);
2182                 if (rc == 0) {
2183                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2184                                "list, OSS objects to be destroyed.\n",
2185                                PFID(mdd_object_fid(mdd_obj)));
2186                 } else {
2187                         CERROR("Object "DFID" can not be deleted from orphan "
2188                                 "list, maybe cause OST objects can not be "
2189                                 "destroyed (err: %d).\n",
2190                                 PFID(mdd_object_fid(mdd_obj)), rc);
2191                         /* If object was not deleted from orphan list, do not
2192                          * destroy OSS objects, which will be done when next
2193                          * recovery. */
2194                         GOTO(out, rc);
2195                 }
2196         }
2197
2198         rc = mdd_iattr_get(env, mdd_obj, ma);
2199         /* Object maybe not in orphan list originally, it is rare case for
2200          * mdd_finish_unlink() failure. */
2201         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
2202 #ifdef HAVE_QUOTA_SUPPORT
2203                 if (mds->mds_quota) {
2204                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2205                         mdd_quota_wrapper(&ma->ma_attr, qids);
2206                 }
2207 #endif
2208                 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2209                 if (ma->ma_valid & MA_FLAGS &&
2210                     ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2211                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2212                 } else {
2213                         rc = mdd_object_kill(env, mdd_obj, ma);
2214                         if (rc == 0)
2215                                 reset = 0;
2216                 }
2217
2218                 if (rc != 0)
2219                         CERROR("Error when prepare to delete Object "DFID" , "
2220                                "which will cause OST objects can not be "
2221                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
2222         }
2223         EXIT;
2224
2225 out:
2226         if (reset)
2227                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2228
2229         mdd_write_unlock(env, mdd_obj);
2230
2231         if (rc == 0 &&
2232             (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
2233             !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
2234                 if (handle == 0)
2235                         mdd_txn_param_build(env, mdd, MDD_TXN_CLOSE_OP, 1);
2236                 mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
2237                                          mdd_obj, handle);
2238         }
2239
2240         if (handle != NULL)
2241                 mdd_trans_stop(env, mdd, rc, handle);
2242 #ifdef HAVE_QUOTA_SUPPORT
2243         if (quota_opc)
2244                 /* Trigger dqrel on the owner of child. If failed,
2245                  * the next call for lquota_chkquota will process it */
2246                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2247                               quota_opc);
2248 #endif
2249         return rc;
2250 }
2251
2252 /*
2253  * Permission check is done when open,
2254  * no need check again.
2255  */
2256 static int mdd_readpage_sanity_check(const struct lu_env *env,
2257                                      struct mdd_object *obj)
2258 {
2259         struct dt_object *next = mdd_object_child(obj);
2260         int rc;
2261         ENTRY;
2262
2263         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2264                 rc = 0;
2265         else
2266                 rc = -ENOTDIR;
2267
2268         RETURN(rc);
2269 }
2270
2271 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2272                               struct lu_dirpage *dp, int nob,
2273                               const struct dt_it_ops *iops, struct dt_it *it,
2274                               __u32 attr)
2275 {
2276         void                   *area = dp;
2277         int                     result;
2278         __u64                   hash = 0;
2279         struct lu_dirent       *ent;
2280         struct lu_dirent       *last = NULL;
2281         int                     first = 1;
2282
2283         memset(area, 0, sizeof (*dp));
2284         area += sizeof (*dp);
2285         nob  -= sizeof (*dp);
2286
2287         ent  = area;
2288         do {
2289                 int    len;
2290                 int    recsize;
2291
2292                 len  = iops->key_size(env, it);
2293
2294                 /* IAM iterator can return record with zero len. */
2295                 if (len == 0)
2296                         goto next;
2297
2298                 hash = iops->store(env, it);
2299                 if (unlikely(first)) {
2300                         first = 0;
2301                         dp->ldp_hash_start = cpu_to_le64(hash);
2302                 }
2303
2304                 /* calculate max space required for lu_dirent */
2305                 recsize = lu_dirent_calc_size(len, attr);
2306
2307                 if (nob >= recsize) {
2308                         result = iops->rec(env, it, ent, attr);
2309                         if (result == -ESTALE)
2310                                 goto next;
2311                         if (result != 0)
2312                                 goto out;
2313
2314                         /* osd might not able to pack all attributes,
2315                          * so recheck rec length */
2316                         recsize = le16_to_cpu(ent->lde_reclen);
2317                 } else {
2318                         result = (last != NULL) ? 0 :-EINVAL;
2319                         goto out;
2320                 }
2321                 last = ent;
2322                 ent = (void *)ent + recsize;
2323                 nob -= recsize;
2324
2325 next:
2326                 result = iops->next(env, it);
2327                 if (result == -ESTALE)
2328                         goto next;
2329         } while (result == 0);
2330
2331 out:
2332         dp->ldp_hash_end = cpu_to_le64(hash);
2333         if (last != NULL) {
2334                 if (last->lde_hash == dp->ldp_hash_end)
2335                         dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2336                 last->lde_reclen = 0; /* end mark */
2337         }
2338         return result;
2339 }
2340
2341 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2342                           const struct lu_rdpg *rdpg)
2343 {
2344         struct dt_it      *it;
2345         struct dt_object  *next = mdd_object_child(obj);
2346         const struct dt_it_ops  *iops;
2347         struct page       *pg;
2348         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2349         int i;
2350         int nlupgs = 0;
2351         int rc;
2352         int nob;
2353
2354         LASSERT(rdpg->rp_pages != NULL);
2355         LASSERT(next->do_index_ops != NULL);
2356
2357         if (rdpg->rp_count <= 0)
2358                 return -EFAULT;
2359
2360         /*
2361          * iterate through directory and fill pages from @rdpg
2362          */
2363         iops = &next->do_index_ops->dio_it;
2364         it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
2365         if (IS_ERR(it))
2366                 return PTR_ERR(it);
2367
2368         rc = iops->load(env, it, rdpg->rp_hash);
2369
2370         if (rc == 0) {
2371                 /*
2372                  * Iterator didn't find record with exactly the key requested.
2373                  *
2374                  * It is currently either
2375                  *
2376                  *     - positioned above record with key less than
2377                  *     requested---skip it.
2378                  *
2379                  *     - or not positioned at all (is in IAM_IT_SKEWED
2380                  *     state)---position it on the next item.
2381                  */
2382                 rc = iops->next(env, it);
2383         } else if (rc > 0)
2384                 rc = 0;
2385
2386         /*
2387          * At this point and across for-loop:
2388          *
2389          *  rc == 0 -> ok, proceed.
2390          *  rc >  0 -> end of directory.
2391          *  rc <  0 -> error.
2392          */
2393         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2394              i++, nob -= CFS_PAGE_SIZE) {
2395                 struct lu_dirpage *dp;
2396
2397                 LASSERT(i < rdpg->rp_npages);
2398                 pg = rdpg->rp_pages[i];
2399                 dp = cfs_kmap(pg);
2400 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2401 repeat:
2402 #endif
2403                 rc = mdd_dir_page_build(env, mdd, dp,
2404                                         min_t(int, nob, LU_PAGE_SIZE),
2405                                         iops, it, rdpg->rp_attrs);
2406                 if (rc > 0) {
2407                         /*
2408                          * end of directory.
2409                          */
2410                         dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2411                         nlupgs++;
2412                 } else if (rc < 0) {
2413                         CWARN("build page failed: %d!\n", rc);
2414                 } else {
2415                         nlupgs++;
2416 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2417                         dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2418                         if ((unsigned long)dp & ~CFS_PAGE_MASK)
2419                                 goto repeat;
2420 #endif
2421                 }
2422                 cfs_kunmap(pg);
2423         }
2424         if (rc >= 0) {
2425                 struct lu_dirpage *dp;
2426
2427                 dp = cfs_kmap(rdpg->rp_pages[0]);
2428                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2429                 if (nlupgs == 0) {
2430                         /*
2431                          * No pages were processed, mark this for first page
2432                          * and send back.
2433                          */
2434                         dp->ldp_flags  = cpu_to_le32(LDF_EMPTY);
2435                         nlupgs = 1;
2436                 }
2437                 cfs_kunmap(rdpg->rp_pages[0]);
2438
2439                 rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
2440         }
2441         iops->put(env, it);
2442         iops->fini(env, it);
2443
2444         return rc;
2445 }
2446
2447 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2448                  const struct lu_rdpg *rdpg)
2449 {
2450         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2451         int rc;
2452         ENTRY;
2453
2454         LASSERT(mdd_object_exists(mdd_obj));
2455
2456         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2457         rc = mdd_readpage_sanity_check(env, mdd_obj);
2458         if (rc)
2459                 GOTO(out_unlock, rc);
2460
2461         if (mdd_is_dead_obj(mdd_obj)) {
2462                 struct page *pg;
2463                 struct lu_dirpage *dp;
2464
2465                 /*
2466                  * According to POSIX, please do not return any entry to client:
2467                  * even dot and dotdot should not be returned.
2468                  */
2469                 CWARN("readdir from dead object: "DFID"\n",
2470                         PFID(mdd_object_fid(mdd_obj)));
2471
2472                 if (rdpg->rp_count <= 0)
2473                         GOTO(out_unlock, rc = -EFAULT);
2474                 LASSERT(rdpg->rp_pages != NULL);
2475
2476                 pg = rdpg->rp_pages[0];
2477                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2478                 memset(dp, 0 , sizeof(struct lu_dirpage));
2479                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2480                 dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
2481                 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2482                 cfs_kunmap(pg);
2483                 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2484         }
2485
2486         rc = __mdd_readpage(env, mdd_obj, rdpg);
2487
2488         EXIT;
2489 out_unlock:
2490         mdd_read_unlock(env, mdd_obj);
2491         return rc;
2492 }
2493
2494 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2495 {
2496         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2497         struct dt_object *next;
2498
2499         LASSERT(mdd_object_exists(mdd_obj));
2500         next = mdd_object_child(mdd_obj);
2501         return next->do_ops->do_object_sync(env, next);
2502 }
2503
2504 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2505                                         struct md_object *obj)
2506 {
2507         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2508
2509         LASSERT(mdd_object_exists(mdd_obj));
2510         return do_version_get(env, mdd_object_child(mdd_obj));
2511 }
2512
2513 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2514                             dt_obj_version_t version)
2515 {
2516         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2517
2518         LASSERT(mdd_object_exists(mdd_obj));
2519         do_version_set(env, mdd_object_child(mdd_obj), version);
2520 }
2521
2522 const struct md_object_operations mdd_obj_ops = {
2523         .moo_permission    = mdd_permission,
2524         .moo_attr_get      = mdd_attr_get,
2525         .moo_attr_set      = mdd_attr_set,
2526         .moo_xattr_get     = mdd_xattr_get,
2527         .moo_xattr_set     = mdd_xattr_set,
2528         .moo_xattr_list    = mdd_xattr_list,
2529         .moo_xattr_del     = mdd_xattr_del,
2530         .moo_object_create = mdd_object_create,
2531         .moo_ref_add       = mdd_ref_add,
2532         .moo_ref_del       = mdd_ref_del,
2533         .moo_open          = mdd_open,
2534         .moo_close         = mdd_close,
2535         .moo_readpage      = mdd_readpage,
2536         .moo_readlink      = mdd_readlink,
2537         .moo_changelog     = mdd_changelog,
2538         .moo_capa_get      = mdd_capa_get,
2539         .moo_object_sync   = mdd_object_sync,
2540         .moo_version_get   = mdd_version_get,
2541         .moo_version_set   = mdd_version_set,
2542         .moo_path          = mdd_path,
2543         .moo_file_lock     = mdd_file_lock,
2544         .moo_file_unlock   = mdd_file_unlock,
2545 };