Whamcloud - gitweb
2ae3f5347e1badb48a7669c4475b43f2ac3425ff
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * Copyright (c) 2011 Whamcloud, Inc.
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  *
39  * lustre/mdd/mdd_object.c
40  *
41  * Lustre Metadata Server (mdd) routines
42  *
43  * Author: Wang Di <wangdi@clusterfs.com>
44  */
45
46 #ifndef EXPORT_SYMTAB
47 # define EXPORT_SYMTAB
48 #endif
49 #define DEBUG_SUBSYSTEM S_MDS
50
51 #include <linux/module.h>
52 #ifdef HAVE_EXT4_LDISKFS
53 #include <ldiskfs/ldiskfs_jbd2.h>
54 #else
55 #include <linux/jbd.h>
56 #endif
57 #include <obd.h>
58 #include <obd_class.h>
59 #include <obd_support.h>
60 #include <lprocfs_status.h>
61 /* fid_be_cpu(), fid_cpu_to_be(). */
62 #include <lustre_fid.h>
63
64 #include <lustre_param.h>
65 #ifdef HAVE_EXT4_LDISKFS
66 #include <ldiskfs/ldiskfs.h>
67 #else
68 #include <linux/ldiskfs_fs.h>
69 #endif
70 #include <lustre_mds.h>
71 #include <lustre/lustre_idl.h>
72
73 #include "mdd_internal.h"
74
75 static const struct lu_object_operations mdd_lu_obj_ops;
76
77 static int mdd_xattr_get(const struct lu_env *env,
78                          struct md_object *obj, struct lu_buf *buf,
79                          const char *name);
80
81 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
82                  void **data)
83 {
84         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
85                  PFID(mdd_object_fid(obj)));
86         mdo_data_get(env, obj, data);
87         return 0;
88 }
89
90 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
91                struct lu_attr *la, struct lustre_capa *capa)
92 {
93         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
94                  PFID(mdd_object_fid(obj)));
95         return mdo_attr_get(env, obj, la, capa);
96 }
97
98 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
99 {
100         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
101
102         if (flags & LUSTRE_APPEND_FL)
103                 obj->mod_flags |= APPEND_OBJ;
104
105         if (flags & LUSTRE_IMMUTABLE_FL)
106                 obj->mod_flags |= IMMUTE_OBJ;
107 }
108
109 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
110 {
111         struct mdd_thread_info *info;
112
113         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
114         LASSERT(info != NULL);
115         return info;
116 }
117
118 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
119 {
120         struct lu_buf *buf;
121
122         buf = &mdd_env_info(env)->mti_buf;
123         buf->lb_buf = area;
124         buf->lb_len = len;
125         return buf;
126 }
127
128 void mdd_buf_put(struct lu_buf *buf)
129 {
130         if (buf == NULL || buf->lb_buf == NULL)
131                 return;
132         OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
133         buf->lb_buf = NULL;
134         buf->lb_len = 0;
135 }
136
137 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
138                                        const void *area, ssize_t len)
139 {
140         struct lu_buf *buf;
141
142         buf = &mdd_env_info(env)->mti_buf;
143         buf->lb_buf = (void *)area;
144         buf->lb_len = len;
145         return buf;
146 }
147
148 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
149 {
150         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
151
152         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
153                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
154                 buf->lb_buf = NULL;
155         }
156         if (buf->lb_buf == NULL) {
157                 buf->lb_len = len;
158                 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
159                 if (buf->lb_buf == NULL)
160                         buf->lb_len = 0;
161         }
162         return buf;
163 }
164
165 /** Increase the size of the \a mti_big_buf.
166  * preserves old data in buffer
167  * old buffer remains unchanged on error
168  * \retval 0 or -ENOMEM
169  */
170 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
171 {
172         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
173         struct lu_buf buf;
174
175         LASSERT(len >= oldbuf->lb_len);
176         OBD_ALLOC_LARGE(buf.lb_buf, len);
177
178         if (buf.lb_buf == NULL)
179                 return -ENOMEM;
180
181         buf.lb_len = len;
182         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
183
184         OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
185
186         memcpy(oldbuf, &buf, sizeof(buf));
187
188         return 0;
189 }
190
191 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
192                                        struct mdd_device *mdd)
193 {
194         struct mdd_thread_info *mti = mdd_env_info(env);
195         int                     max_cookie_size;
196
197         max_cookie_size = mdd_lov_cookiesize(env, mdd);
198         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
199                 if (mti->mti_max_cookie)
200                         OBD_FREE_LARGE(mti->mti_max_cookie,
201                                        mti->mti_max_cookie_size);
202                 mti->mti_max_cookie = NULL;
203                 mti->mti_max_cookie_size = 0;
204         }
205         if (unlikely(mti->mti_max_cookie == NULL)) {
206                 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
207                 if (likely(mti->mti_max_cookie != NULL))
208                         mti->mti_max_cookie_size = max_cookie_size;
209         }
210         if (likely(mti->mti_max_cookie != NULL))
211                 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
212         return mti->mti_max_cookie;
213 }
214
215 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
216                                    struct mdd_device *mdd)
217 {
218         struct mdd_thread_info *mti = mdd_env_info(env);
219         int                     max_lmm_size;
220
221         max_lmm_size = mdd_lov_mdsize(env, mdd);
222         if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
223                 if (mti->mti_max_lmm)
224                         OBD_FREE_LARGE(mti->mti_max_lmm, mti->mti_max_lmm_size);
225                 mti->mti_max_lmm = NULL;
226                 mti->mti_max_lmm_size = 0;
227         }
228         if (unlikely(mti->mti_max_lmm == NULL)) {
229                 OBD_ALLOC_LARGE(mti->mti_max_lmm, max_lmm_size);
230                 if (likely(mti->mti_max_lmm != NULL))
231                         mti->mti_max_lmm_size = max_lmm_size;
232         }
233         return mti->mti_max_lmm;
234 }
235
236 struct lu_object *mdd_object_alloc(const struct lu_env *env,
237                                    const struct lu_object_header *hdr,
238                                    struct lu_device *d)
239 {
240         struct mdd_object *mdd_obj;
241
242         OBD_ALLOC_PTR(mdd_obj);
243         if (mdd_obj != NULL) {
244                 struct lu_object *o;
245
246                 o = mdd2lu_obj(mdd_obj);
247                 lu_object_init(o, NULL, d);
248                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
249                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
250                 mdd_obj->mod_count = 0;
251                 o->lo_ops = &mdd_lu_obj_ops;
252                 return o;
253         } else {
254                 return NULL;
255         }
256 }
257
258 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
259                            const struct lu_object_conf *unused)
260 {
261         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
262         struct mdd_object *mdd_obj = lu2mdd_obj(o);
263         struct lu_object  *below;
264         struct lu_device  *under;
265         ENTRY;
266
267         mdd_obj->mod_cltime = 0;
268         under = &d->mdd_child->dd_lu_dev;
269         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
270         mdd_pdlock_init(mdd_obj);
271         if (below == NULL)
272                 RETURN(-ENOMEM);
273
274         lu_object_add(o, below);
275
276         RETURN(0);
277 }
278
279 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
280 {
281         if (lu_object_exists(o))
282                 return mdd_get_flags(env, lu2mdd_obj(o));
283         else
284                 return 0;
285 }
286
287 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
288 {
289         struct mdd_object *mdd = lu2mdd_obj(o);
290
291         lu_object_fini(o);
292         OBD_FREE_PTR(mdd);
293 }
294
295 static int mdd_object_print(const struct lu_env *env, void *cookie,
296                             lu_printer_t p, const struct lu_object *o)
297 {
298         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
299         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
300                     "valid=%x, cltime="LPU64", flags=%lx)",
301                     mdd, mdd->mod_count, mdd->mod_valid,
302                     mdd->mod_cltime, mdd->mod_flags);
303 }
304
305 static const struct lu_object_operations mdd_lu_obj_ops = {
306         .loo_object_init    = mdd_object_init,
307         .loo_object_start   = mdd_object_start,
308         .loo_object_free    = mdd_object_free,
309         .loo_object_print   = mdd_object_print,
310 };
311
312 struct mdd_object *mdd_object_find(const struct lu_env *env,
313                                    struct mdd_device *d,
314                                    const struct lu_fid *f)
315 {
316         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
317 }
318
319 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
320                         const char *path, struct lu_fid *fid)
321 {
322         struct lu_buf *buf;
323         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
324         struct mdd_object *obj;
325         struct lu_name *lname = &mdd_env_info(env)->mti_name;
326         char *name;
327         int rc = 0;
328         ENTRY;
329
330         /* temp buffer for path element */
331         buf = mdd_buf_alloc(env, PATH_MAX);
332         if (buf->lb_buf == NULL)
333                 RETURN(-ENOMEM);
334
335         lname->ln_name = name = buf->lb_buf;
336         lname->ln_namelen = 0;
337         *f = mdd->mdd_root_fid;
338
339         while(1) {
340                 while (*path == '/')
341                         path++;
342                 if (*path == '\0')
343                         break;
344                 while (*path != '/' && *path != '\0') {
345                         *name = *path;
346                         path++;
347                         name++;
348                         lname->ln_namelen++;
349                 }
350
351                 *name = '\0';
352                 /* find obj corresponding to fid */
353                 obj = mdd_object_find(env, mdd, f);
354                 if (obj == NULL)
355                         GOTO(out, rc = -EREMOTE);
356                 if (IS_ERR(obj))
357                         GOTO(out, rc = PTR_ERR(obj));
358                 /* get child fid from parent and name */
359                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
360                 mdd_object_put(env, obj);
361                 if (rc)
362                         break;
363
364                 name = buf->lb_buf;
365                 lname->ln_namelen = 0;
366         }
367
368         if (!rc)
369                 *fid = *f;
370 out:
371         RETURN(rc);
372 }
373
374 /** The maximum depth that fid2path() will search.
375  * This is limited only because we want to store the fids for
376  * historical path lookup purposes.
377  */
378 #define MAX_PATH_DEPTH 100
379
380 /** mdd_path() lookup structure. */
381 struct path_lookup_info {
382         __u64                pli_recno;        /**< history point */
383         __u64                pli_currec;       /**< current record */
384         struct lu_fid        pli_fid;
385         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
386         struct mdd_object   *pli_mdd_obj;
387         char                *pli_path;         /**< full path */
388         int                  pli_pathlen;
389         int                  pli_linkno;       /**< which hardlink to follow */
390         int                  pli_fidcount;     /**< number of \a pli_fids */
391 };
392
393 static int mdd_path_current(const struct lu_env *env,
394                             struct path_lookup_info *pli)
395 {
396         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
397         struct mdd_object *mdd_obj;
398         struct lu_buf     *buf = NULL;
399         struct link_ea_header *leh;
400         struct link_ea_entry  *lee;
401         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
402         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
403         char *ptr;
404         int reclen;
405         int rc;
406         ENTRY;
407
408         ptr = pli->pli_path + pli->pli_pathlen - 1;
409         *ptr = 0;
410         --ptr;
411         pli->pli_fidcount = 0;
412         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
413
414         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
415                 mdd_obj = mdd_object_find(env, mdd,
416                                           &pli->pli_fids[pli->pli_fidcount]);
417                 if (mdd_obj == NULL)
418                         GOTO(out, rc = -EREMOTE);
419                 if (IS_ERR(mdd_obj))
420                         GOTO(out, rc = PTR_ERR(mdd_obj));
421                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
422                 if (rc <= 0) {
423                         mdd_object_put(env, mdd_obj);
424                         if (rc == -1)
425                                 rc = -EREMOTE;
426                         else if (rc == 0)
427                                 /* Do I need to error out here? */
428                                 rc = -ENOENT;
429                         GOTO(out, rc);
430                 }
431
432                 /* Get parent fid and object name */
433                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
434                 buf = mdd_links_get(env, mdd_obj);
435                 mdd_read_unlock(env, mdd_obj);
436                 mdd_object_put(env, mdd_obj);
437                 if (IS_ERR(buf))
438                         GOTO(out, rc = PTR_ERR(buf));
439
440                 leh = buf->lb_buf;
441                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
442                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
443
444                 /* If set, use link #linkno for path lookup, otherwise use
445                    link #0.  Only do this for the final path element. */
446                 if ((pli->pli_fidcount == 0) &&
447                     (pli->pli_linkno < leh->leh_reccount)) {
448                         int count;
449                         for (count = 0; count < pli->pli_linkno; count++) {
450                                 lee = (struct link_ea_entry *)
451                                      ((char *)lee + reclen);
452                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
453                         }
454                         if (pli->pli_linkno < leh->leh_reccount - 1)
455                                 /* indicate to user there are more links */
456                                 pli->pli_linkno++;
457                 }
458
459                 /* Pack the name in the end of the buffer */
460                 ptr -= tmpname->ln_namelen;
461                 if (ptr - 1 <= pli->pli_path)
462                         GOTO(out, rc = -EOVERFLOW);
463                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
464                 *(--ptr) = '/';
465
466                 /* Store the parent fid for historic lookup */
467                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
468                         GOTO(out, rc = -EOVERFLOW);
469                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
470         }
471
472         /* Verify that our path hasn't changed since we started the lookup.
473            Record the current index, and verify the path resolves to the
474            same fid. If it does, then the path is correct as of this index. */
475         cfs_spin_lock(&mdd->mdd_cl.mc_lock);
476         pli->pli_currec = mdd->mdd_cl.mc_index;
477         cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
478         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
479         if (rc) {
480                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
481                 GOTO (out, rc = -EAGAIN);
482         }
483         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
484                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
485                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
486                        PFID(&pli->pli_fid));
487                 GOTO(out, rc = -EAGAIN);
488         }
489         ptr++; /* skip leading / */
490         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
491
492         EXIT;
493 out:
494         if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
495                 /* if we vmalloced a large buffer drop it */
496                 mdd_buf_put(buf);
497
498         return rc;
499 }
500
501 static int mdd_path_historic(const struct lu_env *env,
502                              struct path_lookup_info *pli)
503 {
504         return 0;
505 }
506
507 /* Returns the full path to this fid, as of changelog record recno. */
508 static int mdd_path(const struct lu_env *env, struct md_object *obj,
509                     char *path, int pathlen, __u64 *recno, int *linkno)
510 {
511         struct path_lookup_info *pli;
512         int tries = 3;
513         int rc = -EAGAIN;
514         ENTRY;
515
516         if (pathlen < 3)
517                 RETURN(-EOVERFLOW);
518
519         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
520                 path[0] = '\0';
521                 RETURN(0);
522         }
523
524         OBD_ALLOC_PTR(pli);
525         if (pli == NULL)
526                 RETURN(-ENOMEM);
527
528         pli->pli_mdd_obj = md2mdd_obj(obj);
529         pli->pli_recno = *recno;
530         pli->pli_path = path;
531         pli->pli_pathlen = pathlen;
532         pli->pli_linkno = *linkno;
533
534         /* Retry multiple times in case file is being moved */
535         while (tries-- && rc == -EAGAIN)
536                 rc = mdd_path_current(env, pli);
537
538         /* For historical path lookup, the current links may not have existed
539          * at "recno" time.  We must switch over to earlier links/parents
540          * by using the changelog records.  If the earlier parent doesn't
541          * exist, we must search back through the changelog to reconstruct
542          * its parents, then check if it exists, etc.
543          * We may ignore this problem for the initial implementation and
544          * state that an "original" hardlink must still exist for us to find
545          * historic path name. */
546         if (pli->pli_recno != -1) {
547                 rc = mdd_path_historic(env, pli);
548         } else {
549                 *recno = pli->pli_currec;
550                 /* Return next link index to caller */
551                 *linkno = pli->pli_linkno;
552         }
553
554         OBD_FREE_PTR(pli);
555
556         RETURN (rc);
557 }
558
559 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
560 {
561         struct lu_attr *la = &mdd_env_info(env)->mti_la;
562         int rc;
563
564         ENTRY;
565         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
566         if (rc == 0) {
567                 mdd_flags_xlate(obj, la->la_flags);
568                 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
569                         obj->mod_flags |= MNLINK_OBJ;
570         }
571         RETURN(rc);
572 }
573
574 /* get only inode attributes */
575 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
576                   struct md_attr *ma)
577 {
578         int rc = 0;
579         ENTRY;
580
581         if (ma->ma_valid & MA_INODE)
582                 RETURN(0);
583
584         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
585                           mdd_object_capa(env, mdd_obj));
586         if (rc == 0)
587                 ma->ma_valid |= MA_INODE;
588         RETURN(rc);
589 }
590
591 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
592 {
593         struct lov_desc *ldesc;
594         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
595         struct lov_user_md *lum = (struct lov_user_md*)lmm;
596         ENTRY;
597
598         if (!lum)
599                 RETURN(0);
600
601         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
602         LASSERT(ldesc != NULL);
603
604         lum->lmm_magic = LOV_MAGIC_V1;
605         lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
606         lum->lmm_pattern = ldesc->ld_pattern;
607         lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
608         lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
609         lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
610
611         RETURN(sizeof(*lum));
612 }
613
614 static int is_rootdir(struct mdd_object *mdd_obj)
615 {
616         const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
617         const struct lu_fid *fid = mdo2fid(mdd_obj);
618
619         return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
620 }
621
622 /* get lov EA only */
623 static int __mdd_lmm_get(const struct lu_env *env,
624                          struct mdd_object *mdd_obj, struct md_attr *ma)
625 {
626         int rc;
627         ENTRY;
628
629         if (ma->ma_valid & MA_LOV)
630                 RETURN(0);
631
632         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
633                         XATTR_NAME_LOV);
634         if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
635                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
636         if (rc > 0) {
637                 ma->ma_lmm_size = rc;
638                 ma->ma_valid |= MA_LOV;
639                 rc = 0;
640         }
641         RETURN(rc);
642 }
643
644 /* get the first parent fid from link EA */
645 static int mdd_pfid_get(const struct lu_env *env,
646                         struct mdd_object *mdd_obj, struct md_attr *ma)
647 {
648         struct lu_buf *buf;
649         struct link_ea_header *leh;
650         struct link_ea_entry *lee;
651         struct lu_fid *pfid = &ma->ma_pfid;
652         ENTRY;
653
654         if (ma->ma_valid & MA_PFID)
655                 RETURN(0);
656
657         buf = mdd_links_get(env, mdd_obj);
658         if (IS_ERR(buf))
659                 RETURN(PTR_ERR(buf));
660
661         leh = buf->lb_buf;
662         lee = (struct link_ea_entry *)(leh + 1);
663         memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
664         fid_be_to_cpu(pfid, pfid);
665         ma->ma_valid |= MA_PFID;
666         if (buf->lb_len > OBD_ALLOC_BIG)
667                 /* if we vmalloced a large buffer drop it */
668                 mdd_buf_put(buf);
669         RETURN(0);
670 }
671
672 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
673                        struct md_attr *ma)
674 {
675         int rc;
676         ENTRY;
677
678         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
679         rc = __mdd_lmm_get(env, mdd_obj, ma);
680         mdd_read_unlock(env, mdd_obj);
681         RETURN(rc);
682 }
683
684 /* get lmv EA only*/
685 static int __mdd_lmv_get(const struct lu_env *env,
686                          struct mdd_object *mdd_obj, struct md_attr *ma)
687 {
688         int rc;
689         ENTRY;
690
691         if (ma->ma_valid & MA_LMV)
692                 RETURN(0);
693
694         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
695                         XATTR_NAME_LMV);
696         if (rc > 0) {
697                 ma->ma_valid |= MA_LMV;
698                 rc = 0;
699         }
700         RETURN(rc);
701 }
702
703 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
704                          struct md_attr *ma)
705 {
706         struct mdd_thread_info *info = mdd_env_info(env);
707         struct lustre_mdt_attrs *lma =
708                                  (struct lustre_mdt_attrs *)info->mti_xattr_buf;
709         int lma_size;
710         int rc;
711         ENTRY;
712
713         /* If all needed data are already valid, nothing to do */
714         if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
715             (ma->ma_need & (MA_HSM | MA_SOM)))
716                 RETURN(0);
717
718         /* Read LMA from disk EA */
719         lma_size = sizeof(info->mti_xattr_buf);
720         rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
721         if (rc <= 0)
722                 RETURN(rc);
723
724         /* Useless to check LMA incompatibility because this is already done in
725          * osd_ea_fid_get(), and this will fail long before this code is
726          * called.
727          * So, if we are here, LMA is compatible.
728          */
729
730         lustre_lma_swab(lma);
731
732         /* Swab and copy LMA */
733         if (ma->ma_need & MA_HSM) {
734                 if (lma->lma_compat & LMAC_HSM)
735                         ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
736                 else
737                         ma->ma_hsm.mh_flags = 0;
738                 ma->ma_valid |= MA_HSM;
739         }
740
741         /* Copy SOM */
742         if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
743                 LASSERT(ma->ma_som != NULL);
744                 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
745                 ma->ma_som->msd_size    = lma->lma_som_size;
746                 ma->ma_som->msd_blocks  = lma->lma_som_blocks;
747                 ma->ma_som->msd_mountid = lma->lma_som_mountid;
748                 ma->ma_valid |= MA_SOM;
749         }
750
751         RETURN(0);
752 }
753
754 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
755                                  struct md_attr *ma)
756 {
757         int rc = 0;
758         ENTRY;
759
760         if (ma->ma_need & MA_INODE)
761                 rc = mdd_iattr_get(env, mdd_obj, ma);
762
763         if (rc == 0 && ma->ma_need & MA_LOV) {
764                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
765                     S_ISDIR(mdd_object_type(mdd_obj)))
766                         rc = __mdd_lmm_get(env, mdd_obj, ma);
767         }
768         if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
769                 if (S_ISREG(mdd_object_type(mdd_obj)))
770                         rc = mdd_pfid_get(env, mdd_obj, ma);
771         }
772         if (rc == 0 && ma->ma_need & MA_LMV) {
773                 if (S_ISDIR(mdd_object_type(mdd_obj)))
774                         rc = __mdd_lmv_get(env, mdd_obj, ma);
775         }
776         if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
777                 if (S_ISREG(mdd_object_type(mdd_obj)))
778                         rc = __mdd_lma_get(env, mdd_obj, ma);
779         }
780 #ifdef CONFIG_FS_POSIX_ACL
781         if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
782                 if (S_ISDIR(mdd_object_type(mdd_obj)))
783                         rc = mdd_def_acl_get(env, mdd_obj, ma);
784         }
785 #endif
786         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
787                rc, ma->ma_valid, ma->ma_lmm);
788         RETURN(rc);
789 }
790
791 int mdd_attr_get_internal_locked(const struct lu_env *env,
792                                  struct mdd_object *mdd_obj, struct md_attr *ma)
793 {
794         int rc;
795         int needlock = ma->ma_need &
796                        (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
797
798         if (needlock)
799                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
800         rc = mdd_attr_get_internal(env, mdd_obj, ma);
801         if (needlock)
802                 mdd_read_unlock(env, mdd_obj);
803         return rc;
804 }
805
806 /*
807  * No permission check is needed.
808  */
809 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
810                         struct md_attr *ma)
811 {
812         struct mdd_object *mdd_obj = md2mdd_obj(obj);
813         int                rc;
814
815         ENTRY;
816         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
817         RETURN(rc);
818 }
819
820 /*
821  * No permission check is needed.
822  */
823 static int mdd_xattr_get(const struct lu_env *env,
824                          struct md_object *obj, struct lu_buf *buf,
825                          const char *name)
826 {
827         struct mdd_object *mdd_obj = md2mdd_obj(obj);
828         int rc;
829
830         ENTRY;
831
832         LASSERT(mdd_object_exists(mdd_obj));
833
834         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
835         rc = mdo_xattr_get(env, mdd_obj, buf, name,
836                            mdd_object_capa(env, mdd_obj));
837         mdd_read_unlock(env, mdd_obj);
838
839         RETURN(rc);
840 }
841
842 /*
843  * Permission check is done when open,
844  * no need check again.
845  */
846 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
847                         struct lu_buf *buf)
848 {
849         struct mdd_object *mdd_obj = md2mdd_obj(obj);
850         struct dt_object  *next;
851         loff_t             pos = 0;
852         int                rc;
853         ENTRY;
854
855         LASSERT(mdd_object_exists(mdd_obj));
856
857         next = mdd_object_child(mdd_obj);
858         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
859         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
860                                          mdd_object_capa(env, mdd_obj));
861         mdd_read_unlock(env, mdd_obj);
862         RETURN(rc);
863 }
864
865 /*
866  * No permission check is needed.
867  */
868 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
869                           struct lu_buf *buf)
870 {
871         struct mdd_object *mdd_obj = md2mdd_obj(obj);
872         int rc;
873
874         ENTRY;
875
876         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
877         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
878         mdd_read_unlock(env, mdd_obj);
879
880         RETURN(rc);
881 }
882
883 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
884                                struct mdd_object *c, struct md_attr *ma,
885                                struct thandle *handle,
886                                const struct md_op_spec *spec)
887 {
888         struct lu_attr *attr = &ma->ma_attr;
889         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
890         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
891         const struct dt_index_features *feat = spec->sp_feat;
892         int rc;
893         ENTRY;
894
895         if (!mdd_object_exists(c)) {
896                 struct dt_object *next = mdd_object_child(c);
897                 LASSERT(next);
898
899                 if (feat != &dt_directory_features && feat != NULL)
900                         dof->dof_type = DFT_INDEX;
901                 else
902                         dof->dof_type = dt_mode_to_dft(attr->la_mode);
903
904                 dof->u.dof_idx.di_feat = feat;
905
906                 /* @hint will be initialized by underlying device. */
907                 next->do_ops->do_ah_init(env, hint,
908                                          p ? mdd_object_child(p) : NULL,
909                                          attr->la_mode & S_IFMT);
910
911                 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
912                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
913         } else
914                 rc = -EEXIST;
915
916         RETURN(rc);
917 }
918
919 /**
920  * Make sure the ctime is increased only.
921  */
922 static inline int mdd_attr_check(const struct lu_env *env,
923                                  struct mdd_object *obj,
924                                  struct lu_attr *attr)
925 {
926         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
927         int rc;
928         ENTRY;
929
930         if (attr->la_valid & LA_CTIME) {
931                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
932                 if (rc)
933                         RETURN(rc);
934
935                 if (attr->la_ctime < tmp_la->la_ctime)
936                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
937                 else if (attr->la_valid == LA_CTIME &&
938                          attr->la_ctime == tmp_la->la_ctime)
939                         attr->la_valid &= ~LA_CTIME;
940         }
941         RETURN(0);
942 }
943
944 int mdd_attr_set_internal(const struct lu_env *env,
945                           struct mdd_object *obj,
946                           struct lu_attr *attr,
947                           struct thandle *handle,
948                           int needacl)
949 {
950         int rc;
951         ENTRY;
952
953         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
954 #ifdef CONFIG_FS_POSIX_ACL
955         if (!rc && (attr->la_valid & LA_MODE) && needacl)
956                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
957 #endif
958         RETURN(rc);
959 }
960
961 int mdd_attr_check_set_internal(const struct lu_env *env,
962                                 struct mdd_object *obj,
963                                 struct lu_attr *attr,
964                                 struct thandle *handle,
965                                 int needacl)
966 {
967         int rc;
968         ENTRY;
969
970         rc = mdd_attr_check(env, obj, attr);
971         if (rc)
972                 RETURN(rc);
973
974         if (attr->la_valid)
975                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
976         RETURN(rc);
977 }
978
979 static int mdd_attr_set_internal_locked(const struct lu_env *env,
980                                         struct mdd_object *obj,
981                                         struct lu_attr *attr,
982                                         struct thandle *handle,
983                                         int needacl)
984 {
985         int rc;
986         ENTRY;
987
988         needacl = needacl && (attr->la_valid & LA_MODE);
989         if (needacl)
990                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
991         rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
992         if (needacl)
993                 mdd_write_unlock(env, obj);
994         RETURN(rc);
995 }
996
997 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
998                                        struct mdd_object *obj,
999                                        struct lu_attr *attr,
1000                                        struct thandle *handle,
1001                                        int needacl)
1002 {
1003         int rc;
1004         ENTRY;
1005
1006         needacl = needacl && (attr->la_valid & LA_MODE);
1007         if (needacl)
1008                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1009         rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
1010         if (needacl)
1011                 mdd_write_unlock(env, obj);
1012         RETURN(rc);
1013 }
1014
1015 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
1016                     const struct lu_buf *buf, const char *name,
1017                     int fl, struct thandle *handle)
1018 {
1019         struct lustre_capa *capa = mdd_object_capa(env, obj);
1020         int rc = -EINVAL;
1021         ENTRY;
1022
1023         if (buf->lb_buf && buf->lb_len > 0)
1024                 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1025         else if (buf->lb_buf == NULL && buf->lb_len == 0)
1026                 rc = mdo_xattr_del(env, obj, name, handle, capa);
1027
1028         RETURN(rc);
1029 }
1030
1031 /*
1032  * This gives the same functionality as the code between
1033  * sys_chmod and inode_setattr
1034  * chown_common and inode_setattr
1035  * utimes and inode_setattr
1036  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1037  */
1038 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1039                         struct lu_attr *la, const struct md_attr *ma)
1040 {
1041         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
1042         struct md_ucred  *uc;
1043         int               rc;
1044         ENTRY;
1045
1046         if (!la->la_valid)
1047                 RETURN(0);
1048
1049         /* Do not permit change file type */
1050         if (la->la_valid & LA_TYPE)
1051                 RETURN(-EPERM);
1052
1053         /* They should not be processed by setattr */
1054         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1055                 RETURN(-EPERM);
1056
1057         /* export destroy does not have ->le_ses, but we may want
1058          * to drop LUSTRE_SOM_FL. */
1059         if (!env->le_ses)
1060                 RETURN(0);
1061
1062         uc = md_ucred(env);
1063
1064         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1065         if (rc)
1066                 RETURN(rc);
1067
1068         if (la->la_valid == LA_CTIME) {
1069                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1070                         /* This is only for set ctime when rename's source is
1071                          * on remote MDS. */
1072                         rc = mdd_may_delete(env, NULL, obj,
1073                                             (struct md_attr *)ma, 1, 0);
1074                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1075                         la->la_valid &= ~LA_CTIME;
1076                 RETURN(rc);
1077         }
1078
1079         if (la->la_valid == LA_ATIME) {
1080                 /* This is atime only set for read atime update on close. */
1081                 if (la->la_atime >= tmp_la->la_atime &&
1082                     la->la_atime < (tmp_la->la_atime +
1083                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1084                         la->la_valid &= ~LA_ATIME;
1085                 RETURN(0);
1086         }
1087
1088         /* Check if flags change. */
1089         if (la->la_valid & LA_FLAGS) {
1090                 unsigned int oldflags = 0;
1091                 unsigned int newflags = la->la_flags &
1092                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1093
1094                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1095                     !mdd_capable(uc, CFS_CAP_FOWNER))
1096                         RETURN(-EPERM);
1097
1098                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1099                  * only be changed by the relevant capability. */
1100                 if (mdd_is_immutable(obj))
1101                         oldflags |= LUSTRE_IMMUTABLE_FL;
1102                 if (mdd_is_append(obj))
1103                         oldflags |= LUSTRE_APPEND_FL;
1104                 if ((oldflags ^ newflags) &&
1105                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1106                         RETURN(-EPERM);
1107
1108                 if (!S_ISDIR(tmp_la->la_mode))
1109                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1110         }
1111
1112         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1113             (la->la_valid & ~LA_FLAGS) &&
1114             !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1115                 RETURN(-EPERM);
1116
1117         /* Check for setting the obj time. */
1118         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1119             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1120                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1121                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
1122                         rc = mdd_permission_internal_locked(env, obj, tmp_la,
1123                                                             MAY_WRITE,
1124                                                             MOR_TGT_CHILD);
1125                         if (rc)
1126                                 RETURN(rc);
1127                 }
1128         }
1129
1130         if (la->la_valid & LA_KILL_SUID) {
1131                 la->la_valid &= ~LA_KILL_SUID;
1132                 if ((tmp_la->la_mode & S_ISUID) &&
1133                     !(la->la_valid & LA_MODE)) {
1134                         la->la_mode = tmp_la->la_mode;
1135                         la->la_valid |= LA_MODE;
1136                 }
1137                 la->la_mode &= ~S_ISUID;
1138         }
1139
1140         if (la->la_valid & LA_KILL_SGID) {
1141                 la->la_valid &= ~LA_KILL_SGID;
1142                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1143                                         (S_ISGID | S_IXGRP)) &&
1144                     !(la->la_valid & LA_MODE)) {
1145                         la->la_mode = tmp_la->la_mode;
1146                         la->la_valid |= LA_MODE;
1147                 }
1148                 la->la_mode &= ~S_ISGID;
1149         }
1150
1151         /* Make sure a caller can chmod. */
1152         if (la->la_valid & LA_MODE) {
1153                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1154                     (uc->mu_fsuid != tmp_la->la_uid) &&
1155                     !mdd_capable(uc, CFS_CAP_FOWNER))
1156                         RETURN(-EPERM);
1157
1158                 if (la->la_mode == (cfs_umode_t) -1)
1159                         la->la_mode = tmp_la->la_mode;
1160                 else
1161                         la->la_mode = (la->la_mode & S_IALLUGO) |
1162                                       (tmp_la->la_mode & ~S_IALLUGO);
1163
1164                 /* Also check the setgid bit! */
1165                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1166                                        la->la_gid : tmp_la->la_gid) &&
1167                     !mdd_capable(uc, CFS_CAP_FSETID))
1168                         la->la_mode &= ~S_ISGID;
1169         } else {
1170                la->la_mode = tmp_la->la_mode;
1171         }
1172
1173         /* Make sure a caller can chown. */
1174         if (la->la_valid & LA_UID) {
1175                 if (la->la_uid == (uid_t) -1)
1176                         la->la_uid = tmp_la->la_uid;
1177                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1178                     (la->la_uid != tmp_la->la_uid)) &&
1179                     !mdd_capable(uc, CFS_CAP_CHOWN))
1180                         RETURN(-EPERM);
1181
1182                 /* If the user or group of a non-directory has been
1183                  * changed by a non-root user, remove the setuid bit.
1184                  * 19981026 David C Niemi <niemi@tux.org>
1185                  *
1186                  * Changed this to apply to all users, including root,
1187                  * to avoid some races. This is the behavior we had in
1188                  * 2.0. The check for non-root was definitely wrong
1189                  * for 2.2 anyway, as it should have been using
1190                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
1191                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1192                     !S_ISDIR(tmp_la->la_mode)) {
1193                         la->la_mode &= ~S_ISUID;
1194                         la->la_valid |= LA_MODE;
1195                 }
1196         }
1197
1198         /* Make sure caller can chgrp. */
1199         if (la->la_valid & LA_GID) {
1200                 if (la->la_gid == (gid_t) -1)
1201                         la->la_gid = tmp_la->la_gid;
1202                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1203                     ((la->la_gid != tmp_la->la_gid) &&
1204                     !lustre_in_group_p(uc, la->la_gid))) &&
1205                     !mdd_capable(uc, CFS_CAP_CHOWN))
1206                         RETURN(-EPERM);
1207
1208                 /* Likewise, if the user or group of a non-directory
1209                  * has been changed by a non-root user, remove the
1210                  * setgid bit UNLESS there is no group execute bit
1211                  * (this would be a file marked for mandatory
1212                  * locking).  19981026 David C Niemi <niemi@tux.org>
1213                  *
1214                  * Removed the fsuid check (see the comment above) --
1215                  * 19990830 SD. */
1216                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1217                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1218                         la->la_mode &= ~S_ISGID;
1219                         la->la_valid |= LA_MODE;
1220                 }
1221         }
1222
1223         /* For both Size-on-MDS case and truncate case,
1224          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1225          * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1226          * For SOM case, it is true, the MAY_WRITE perm has been checked
1227          * when open, no need check again. For truncate case, it is false,
1228          * the MAY_WRITE perm should be checked here. */
1229         if (ma->ma_attr_flags & MDS_SOM) {
1230                 /* For the "Size-on-MDS" setattr update, merge coming
1231                  * attributes with the set in the inode. BUG 10641 */
1232                 if ((la->la_valid & LA_ATIME) &&
1233                     (la->la_atime <= tmp_la->la_atime))
1234                         la->la_valid &= ~LA_ATIME;
1235
1236                 /* OST attributes do not have a priority over MDS attributes,
1237                  * so drop times if ctime is equal. */
1238                 if ((la->la_valid & LA_CTIME) &&
1239                     (la->la_ctime <= tmp_la->la_ctime))
1240                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1241         } else {
1242                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1243                         if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1244                               (uc->mu_fsuid == tmp_la->la_uid)) &&
1245                             !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1246                                 rc = mdd_permission_internal_locked(env, obj,
1247                                                             tmp_la, MAY_WRITE,
1248                                                             MOR_TGT_CHILD);
1249                                 if (rc)
1250                                         RETURN(rc);
1251                         }
1252                 }
1253                 if (la->la_valid & LA_CTIME) {
1254                         /* The pure setattr, it has the priority over what is
1255                          * already set, do not drop it if ctime is equal. */
1256                         if (la->la_ctime < tmp_la->la_ctime)
1257                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1258                                                   LA_CTIME);
1259                 }
1260         }
1261
1262         RETURN(0);
1263 }
1264
1265 /** Store a data change changelog record
1266  * If this fails, we must fail the whole transaction; we don't
1267  * want the change to commit without the log entry.
1268  * \param mdd_obj - mdd_object of change
1269  * \param handle - transacion handle
1270  */
1271 static int mdd_changelog_data_store(const struct lu_env     *env,
1272                                     struct mdd_device       *mdd,
1273                                     enum changelog_rec_type type,
1274                                     int                     flags,
1275                                     struct mdd_object       *mdd_obj,
1276                                     struct thandle          *handle)
1277 {
1278         const struct lu_fid *tfid = mdo2fid(mdd_obj);
1279         struct llog_changelog_rec *rec;
1280         struct lu_buf *buf;
1281         int reclen;
1282         int rc;
1283
1284         /* Not recording */
1285         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1286                 RETURN(0);
1287         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1288                 RETURN(0);
1289
1290         LASSERT(handle != NULL);
1291         LASSERT(mdd_obj != NULL);
1292
1293         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1294             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1295                 /* Don't need multiple updates in this log */
1296                 /* Don't check under lock - no big deal if we get an extra
1297                    entry */
1298                 RETURN(0);
1299         }
1300
1301         reclen = llog_data_len(sizeof(*rec));
1302         buf = mdd_buf_alloc(env, reclen);
1303         if (buf->lb_buf == NULL)
1304                 RETURN(-ENOMEM);
1305         rec = (struct llog_changelog_rec *)buf->lb_buf;
1306
1307         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1308         rec->cr.cr_type = (__u32)type;
1309         rec->cr.cr_tfid = *tfid;
1310         rec->cr.cr_namelen = 0;
1311         mdd_obj->mod_cltime = cfs_time_current_64();
1312
1313         rc = mdd_changelog_llog_write(mdd, rec, handle);
1314         if (rc < 0) {
1315                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1316                        rc, type, PFID(tfid));
1317                 return -EFAULT;
1318         }
1319
1320         return 0;
1321 }
1322
1323 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1324                   int flags, struct md_object *obj)
1325 {
1326         struct thandle *handle;
1327         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1328         struct mdd_device *mdd = mdo2mdd(obj);
1329         int rc;
1330         ENTRY;
1331
1332         handle = mdd_trans_start(env, mdd);
1333
1334         if (IS_ERR(handle))
1335                 return(PTR_ERR(handle));
1336
1337         rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1338                                       handle);
1339
1340         mdd_trans_stop(env, mdd, rc, handle);
1341
1342         RETURN(rc);
1343 }
1344
1345 /**
1346  * Should be called with write lock held.
1347  *
1348  * \see mdd_lma_set_locked().
1349  */
1350 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1351                        const struct md_attr *ma, struct thandle *handle)
1352 {
1353         struct mdd_thread_info *info = mdd_env_info(env);
1354         struct lu_buf *buf;
1355         struct lustre_mdt_attrs *lma =
1356                                 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1357         int lmasize = sizeof(struct lustre_mdt_attrs);
1358         int rc = 0;
1359
1360         ENTRY;
1361
1362         /* Either HSM or SOM part is not valid, we need to read it before */
1363         if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1364                 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1365                 if (rc <= 0)
1366                         RETURN(rc);
1367
1368                 lustre_lma_swab(lma);
1369         } else {
1370                 memset(lma, 0, lmasize);
1371         }
1372
1373         /* Copy HSM data */
1374         if (ma->ma_valid & MA_HSM) {
1375                 lma->lma_flags  |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1376                 lma->lma_compat |= LMAC_HSM;
1377         }
1378
1379         /* Copy SOM data */
1380         if (ma->ma_valid & MA_SOM) {
1381                 LASSERT(ma->ma_som != NULL);
1382                 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1383                         lma->lma_compat     &= ~LMAC_SOM;
1384                 } else {
1385                         lma->lma_compat     |= LMAC_SOM;
1386                         lma->lma_ioepoch     = ma->ma_som->msd_ioepoch;
1387                         lma->lma_som_size    = ma->ma_som->msd_size;
1388                         lma->lma_som_blocks  = ma->ma_som->msd_blocks;
1389                         lma->lma_som_mountid = ma->ma_som->msd_mountid;
1390                 }
1391         }
1392
1393         /* Copy FID */
1394         memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1395
1396         lustre_lma_swab(lma);
1397         buf = mdd_buf_get(env, lma, lmasize);
1398         rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1399
1400         RETURN(rc);
1401 }
1402
1403 /**
1404  * Save LMA extended attributes with data from \a ma.
1405  *
1406  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1407  * not, LMA EA will be first read from disk, modified and write back.
1408  *
1409  */
1410 static int mdd_lma_set_locked(const struct lu_env *env,
1411                               struct mdd_object *mdd_obj,
1412                               const struct md_attr *ma, struct thandle *handle)
1413 {
1414         int rc;
1415
1416         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1417         rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1418         mdd_write_unlock(env, mdd_obj);
1419         return rc;
1420 }
1421
1422 /* Precedence for choosing record type when multiple
1423  * attributes change: setattr > mtime > ctime > atime
1424  * (ctime changes when mtime does, plus chmod/chown.
1425  * atime and ctime are independent.) */
1426 static int mdd_attr_set_changelog(const struct lu_env *env,
1427                                   struct md_object *obj, struct thandle *handle,
1428                                   __u64 valid)
1429 {
1430         struct mdd_device *mdd = mdo2mdd(obj);
1431         int bits, type = 0;
1432
1433         bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1434         bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1435         bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1436         bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1437         bits = bits & mdd->mdd_cl.mc_mask;
1438         if (bits == 0)
1439                 return 0;
1440
1441         /* The record type is the lowest non-masked set bit */
1442         while (bits && ((bits & 1) == 0)) {
1443                 bits = bits >> 1;
1444                 type++;
1445         }
1446
1447         /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1448         return mdd_changelog_data_store(env, mdd, type, (int)valid,
1449                                         md2mdd_obj(obj), handle);
1450 }
1451
1452 /* set attr and LOV EA at once, return updated attr */
1453 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1454                         const struct md_attr *ma)
1455 {
1456         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1457         struct mdd_device *mdd = mdo2mdd(obj);
1458         struct thandle *handle;
1459         struct lov_mds_md *lmm = NULL;
1460         struct llog_cookie *logcookies = NULL;
1461         int  rc, lmm_size = 0, cookie_size = 0, chlog_cnt;
1462         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1463         struct obd_device *obd = mdd->mdd_obd_dev;
1464         struct mds_obd *mds = &obd->u.mds;
1465 #ifdef HAVE_QUOTA_SUPPORT
1466         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1467         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1468         int quota_opc = 0, block_count = 0;
1469         int inode_pending[MAXQUOTAS] = { 0, 0 };
1470         int block_pending[MAXQUOTAS] = { 0, 0 };
1471 #endif
1472         ENTRY;
1473
1474         *la_copy = ma->ma_attr;
1475         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1476         if (rc != 0)
1477                 RETURN(rc);
1478
1479         /* setattr on "close" only change atime, or do nothing */
1480         if (ma->ma_valid == MA_INODE &&
1481             ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1482                 RETURN(0);
1483
1484         /*TODO: add lock here*/
1485         /* start a log jounal handle if needed */
1486         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1487             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1488                 lmm_size = mdd_lov_mdsize(env, mdd);
1489                 lmm = mdd_max_lmm_get(env, mdd);
1490                 if (lmm == NULL)
1491                         GOTO(no_trans, rc = -ENOMEM);
1492
1493                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1494                                 XATTR_NAME_LOV);
1495
1496                 if (rc < 0)
1497                         GOTO(no_trans, rc);
1498         }
1499
1500         chlog_cnt = 1;
1501         if (la_copy->la_valid && !(la_copy->la_valid & LA_FLAGS) && lmm_size) {
1502                 chlog_cnt += (lmm->lmm_stripe_count >= 0) ?
1503                          lmm->lmm_stripe_count : mds->mds_lov_desc.ld_tgt_count;
1504         }
1505
1506         mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1507                                     MDD_TXN_ATTR_SET_OP, chlog_cnt);
1508         handle = mdd_trans_start(env, mdd);
1509         if (IS_ERR(handle))
1510                 GOTO(no_trans, rc = PTR_ERR(handle));
1511
1512         /* permission changes may require sync operation */
1513         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1514                 handle->th_sync |= mdd->mdd_sync_permission;
1515
1516         if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1517                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1518                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1519
1520 #ifdef HAVE_QUOTA_SUPPORT
1521         if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1522                 struct obd_export *exp = md_quota(env)->mq_exp;
1523                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1524
1525                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1526                 if (!rc) {
1527                         quota_opc = FSFILT_OP_SETATTR;
1528                         mdd_quota_wrapper(la_copy, qnids);
1529                         mdd_quota_wrapper(la_tmp, qoids);
1530                         /* get file quota for new owner */
1531                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1532                                         qnids, inode_pending, 1, NULL, 0,
1533                                         NULL, 0);
1534                         block_count = (la_tmp->la_blocks + 7) >> 3;
1535                         if (block_count) {
1536                                 void *data = NULL;
1537                                 mdd_data_get(env, mdd_obj, &data);
1538                                 /* get block quota for new owner */
1539                                 lquota_chkquota(mds_quota_interface_ref, obd,
1540                                                 exp, qnids, block_pending,
1541                                                 block_count, NULL,
1542                                                 LQUOTA_FLAGS_BLK, data, 1);
1543                         }
1544                 }
1545         }
1546 #endif
1547
1548         if (la_copy->la_valid & LA_FLAGS) {
1549                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1550                                                   handle, 1);
1551                 if (rc == 0)
1552                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1553         } else if (la_copy->la_valid) {            /* setattr */
1554                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1555                                                   handle, 1);
1556                 /* journal chown/chgrp in llog, just like unlink */
1557                 if (rc == 0 && lmm_size){
1558                         cookie_size = mdd_lov_cookiesize(env, mdd);
1559                         logcookies = mdd_max_cookie_get(env, mdd);
1560                         if (logcookies == NULL)
1561                                 GOTO(cleanup, rc = -ENOMEM);
1562
1563                         if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1564                                             logcookies, cookie_size) <= 0)
1565                                 logcookies = NULL;
1566                 }
1567         }
1568
1569         if (rc == 0 && ma->ma_valid & MA_LOV) {
1570                 cfs_umode_t mode;
1571
1572                 mode = mdd_object_type(mdd_obj);
1573                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1574                         rc = mdd_lsm_sanity_check(env, mdd_obj);
1575                         if (rc)
1576                                 GOTO(cleanup, rc);
1577
1578                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1579                                             ma->ma_lmm_size, handle, 1);
1580                 }
1581
1582         }
1583         if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1584                 cfs_umode_t mode;
1585
1586                 mode = mdd_object_type(mdd_obj);
1587                 if (S_ISREG(mode))
1588                         rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1589
1590         }
1591 cleanup:
1592         if (rc == 0)
1593                 rc = mdd_attr_set_changelog(env, obj, handle,
1594                                             ma->ma_attr.la_valid);
1595         mdd_trans_stop(env, mdd, rc, handle);
1596 no_trans:
1597         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1598                 /*set obd attr, if needed*/
1599                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1600                                            logcookies);
1601         }
1602 #ifdef HAVE_QUOTA_SUPPORT
1603         if (quota_opc) {
1604                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1605                                       inode_pending, 0);
1606                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1607                                       block_pending, 1);
1608                 /* Trigger dqrel/dqacq for original owner and new owner.
1609                  * If failed, the next call for lquota_chkquota will
1610                  * process it. */
1611                 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1612                               quota_opc);
1613         }
1614 #endif
1615         RETURN(rc);
1616 }
1617
1618 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1619                       const struct lu_buf *buf, const char *name, int fl,
1620                       struct thandle *handle)
1621 {
1622         int  rc;
1623         ENTRY;
1624
1625         mdd_write_lock(env, obj, MOR_TGT_CHILD);
1626         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1627         mdd_write_unlock(env, obj);
1628
1629         RETURN(rc);
1630 }
1631
1632 static int mdd_xattr_sanity_check(const struct lu_env *env,
1633                                   struct mdd_object *obj)
1634 {
1635         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1636         struct md_ucred *uc     = md_ucred(env);
1637         int rc;
1638         ENTRY;
1639
1640         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1641                 RETURN(-EPERM);
1642
1643         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1644         if (rc)
1645                 RETURN(rc);
1646
1647         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1648             !mdd_capable(uc, CFS_CAP_FOWNER))
1649                 RETURN(-EPERM);
1650
1651         RETURN(rc);
1652 }
1653
1654 /**
1655  * The caller should guarantee to update the object ctime
1656  * after xattr_set if needed.
1657  */
1658 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1659                          const struct lu_buf *buf, const char *name,
1660                          int fl)
1661 {
1662         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1663         struct mdd_device *mdd = mdo2mdd(obj);
1664         struct thandle *handle;
1665         int  rc;
1666         ENTRY;
1667
1668         rc = mdd_xattr_sanity_check(env, mdd_obj);
1669         if (rc)
1670                 RETURN(rc);
1671
1672         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 1);
1673         handle = mdd_trans_start(env, mdd);
1674         if (IS_ERR(handle))
1675                 RETURN(PTR_ERR(handle));
1676
1677         /* security-replated changes may require sync */
1678         if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1679                 handle->th_sync |= mdd->mdd_sync_permission;
1680
1681         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1682
1683         /* Only record system & user xattr changes */
1684         if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1685                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1686                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1687                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1688                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1689                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1690                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1691                                               handle);
1692         mdd_trans_stop(env, mdd, rc, handle);
1693
1694         RETURN(rc);
1695 }
1696
1697 /**
1698  * The caller should guarantee to update the object ctime
1699  * after xattr_set if needed.
1700  */
1701 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1702                   const char *name)
1703 {
1704         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1705         struct mdd_device *mdd = mdo2mdd(obj);
1706         struct thandle *handle;
1707         int  rc;
1708         ENTRY;
1709
1710         rc = mdd_xattr_sanity_check(env, mdd_obj);
1711         if (rc)
1712                 RETURN(rc);
1713
1714         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 1);
1715         handle = mdd_trans_start(env, mdd);
1716         if (IS_ERR(handle))
1717                 RETURN(PTR_ERR(handle));
1718
1719         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1720         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1721                            mdd_object_capa(env, mdd_obj));
1722         mdd_write_unlock(env, mdd_obj);
1723
1724         /* Only record system & user xattr changes */
1725         if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1726                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1727                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1728                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1729                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1730                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1731                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1732                                               handle);
1733
1734         mdd_trans_stop(env, mdd, rc, handle);
1735
1736         RETURN(rc);
1737 }
1738
1739 /* partial unlink */
1740 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1741                        struct md_attr *ma)
1742 {
1743         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1744         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1745         struct mdd_device *mdd = mdo2mdd(obj);
1746         struct thandle *handle;
1747 #ifdef HAVE_QUOTA_SUPPORT
1748         struct obd_device *obd = mdd->mdd_obd_dev;
1749         struct mds_obd *mds = &obd->u.mds;
1750         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1751         int quota_opc = 0;
1752 #endif
1753         int rc;
1754         ENTRY;
1755
1756         /*
1757          * Check -ENOENT early here because we need to get object type
1758          * to calculate credits before transaction start
1759          */
1760         if (!mdd_object_exists(mdd_obj))
1761                 RETURN(-ENOENT);
1762
1763         LASSERT(mdd_object_exists(mdd_obj) > 0);
1764
1765         rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP, 0);
1766         if (rc)
1767                 RETURN(rc);
1768
1769         handle = mdd_trans_start(env, mdd);
1770         if (IS_ERR(handle))
1771                 RETURN(-ENOMEM);
1772
1773         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1774
1775         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1776         if (rc)
1777                 GOTO(cleanup, rc);
1778
1779         __mdd_ref_del(env, mdd_obj, handle, 0);
1780
1781         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1782                 /* unlink dot */
1783                 __mdd_ref_del(env, mdd_obj, handle, 1);
1784         }
1785
1786         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1787         la_copy->la_ctime = ma->ma_attr.la_ctime;
1788
1789         la_copy->la_valid = LA_CTIME;
1790         rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1791         if (rc)
1792                 GOTO(cleanup, rc);
1793
1794         rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1795 #ifdef HAVE_QUOTA_SUPPORT
1796         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1797             ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1798                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1799                 mdd_quota_wrapper(&ma->ma_attr, qids);
1800         }
1801 #endif
1802
1803
1804         EXIT;
1805 cleanup:
1806         mdd_write_unlock(env, mdd_obj);
1807         mdd_trans_stop(env, mdd, rc, handle);
1808 #ifdef HAVE_QUOTA_SUPPORT
1809         if (quota_opc)
1810                 /* Trigger dqrel on the owner of child. If failed,
1811                  * the next call for lquota_chkquota will process it */
1812                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1813                               quota_opc);
1814 #endif
1815         return rc;
1816 }
1817
1818 /* partial operation */
1819 static int mdd_oc_sanity_check(const struct lu_env *env,
1820                                struct mdd_object *obj,
1821                                struct md_attr *ma)
1822 {
1823         int rc;
1824         ENTRY;
1825
1826         switch (ma->ma_attr.la_mode & S_IFMT) {
1827         case S_IFREG:
1828         case S_IFDIR:
1829         case S_IFLNK:
1830         case S_IFCHR:
1831         case S_IFBLK:
1832         case S_IFIFO:
1833         case S_IFSOCK:
1834                 rc = 0;
1835                 break;
1836         default:
1837                 rc = -EINVAL;
1838                 break;
1839         }
1840         RETURN(rc);
1841 }
1842
1843 static int mdd_object_create(const struct lu_env *env,
1844                              struct md_object *obj,
1845                              const struct md_op_spec *spec,
1846                              struct md_attr *ma)
1847 {
1848
1849         struct mdd_device *mdd = mdo2mdd(obj);
1850         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1851         const struct lu_fid *pfid = spec->u.sp_pfid;
1852         struct thandle *handle;
1853 #ifdef HAVE_QUOTA_SUPPORT
1854         struct obd_device *obd = mdd->mdd_obd_dev;
1855         struct obd_export *exp = md_quota(env)->mq_exp;
1856         struct mds_obd *mds = &obd->u.mds;
1857         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1858         int quota_opc = 0, block_count = 0;
1859         int inode_pending[MAXQUOTAS] = { 0, 0 };
1860         int block_pending[MAXQUOTAS] = { 0, 0 };
1861 #endif
1862         int rc = 0;
1863         ENTRY;
1864
1865 #ifdef HAVE_QUOTA_SUPPORT
1866         if (mds->mds_quota) {
1867                 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1868                 mdd_quota_wrapper(&ma->ma_attr, qids);
1869                 /* get file quota for child */
1870                 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1871                                 qids, inode_pending, 1, NULL, 0,
1872                                 NULL, 0);
1873                 switch (ma->ma_attr.la_mode & S_IFMT) {
1874                 case S_IFLNK:
1875                 case S_IFDIR:
1876                         block_count = 2;
1877                         break;
1878                 case S_IFREG:
1879                         block_count = 1;
1880                         break;
1881                 }
1882                 /* get block quota for child */
1883                 if (block_count)
1884                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1885                                         qids, block_pending, block_count,
1886                                         NULL, LQUOTA_FLAGS_BLK, NULL, 0);
1887         }
1888 #endif
1889
1890         mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP, 0);
1891         handle = mdd_trans_start(env, mdd);
1892         if (IS_ERR(handle))
1893                 GOTO(out_pending, rc = PTR_ERR(handle));
1894
1895         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1896         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1897         if (rc)
1898                 GOTO(unlock, rc);
1899
1900         rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1901         if (rc)
1902                 GOTO(unlock, rc);
1903
1904         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1905                 /* If creating the slave object, set slave EA here. */
1906                 int lmv_size = spec->u.sp_ea.eadatalen;
1907                 struct lmv_stripe_md *lmv;
1908
1909                 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1910                 LASSERT(lmv != NULL && lmv_size > 0);
1911
1912                 rc = __mdd_xattr_set(env, mdd_obj,
1913                                      mdd_buf_get_const(env, lmv, lmv_size),
1914                                      XATTR_NAME_LMV, 0, handle);
1915                 if (rc)
1916                         GOTO(unlock, rc);
1917
1918                 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1919                                            handle, 0);
1920         } else {
1921 #ifdef CONFIG_FS_POSIX_ACL
1922                 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1923                         struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1924
1925                         buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1926                         buf->lb_len = spec->u.sp_ea.eadatalen;
1927                         if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1928                                 rc = __mdd_acl_init(env, mdd_obj, buf,
1929                                                     &ma->ma_attr.la_mode,
1930                                                     handle);
1931                                 if (rc)
1932                                         GOTO(unlock, rc);
1933                                 else
1934                                         ma->ma_attr.la_valid |= LA_MODE;
1935                         }
1936
1937                         pfid = spec->u.sp_ea.fid;
1938                 }
1939 #endif
1940                 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1941                                            spec);
1942         }
1943         EXIT;
1944 unlock:
1945         if (rc == 0)
1946                 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1947         mdd_write_unlock(env, mdd_obj);
1948
1949         mdd_trans_stop(env, mdd, rc, handle);
1950 out_pending:
1951 #ifdef HAVE_QUOTA_SUPPORT
1952         if (quota_opc) {
1953                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1954                                       inode_pending, 0);
1955                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1956                                       block_pending, 1);
1957                 /* Trigger dqacq on the owner of child. If failed,
1958                  * the next call for lquota_chkquota will process it. */
1959                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1960                               quota_opc);
1961         }
1962 #endif
1963         return rc;
1964 }
1965
1966 /* partial link */
1967 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1968                        const struct md_attr *ma)
1969 {
1970         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1971         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1972         struct mdd_device *mdd = mdo2mdd(obj);
1973         struct thandle *handle;
1974         int rc;
1975         ENTRY;
1976
1977         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 0);
1978         handle = mdd_trans_start(env, mdd);
1979         if (IS_ERR(handle))
1980                 RETURN(-ENOMEM);
1981
1982         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1983         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1984         if (rc == 0)
1985                 __mdd_ref_add(env, mdd_obj, handle);
1986         mdd_write_unlock(env, mdd_obj);
1987         if (rc == 0) {
1988                 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1989                 la_copy->la_ctime = ma->ma_attr.la_ctime;
1990
1991                 la_copy->la_valid = LA_CTIME;
1992                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1993                                                         handle, 0);
1994         }
1995         mdd_trans_stop(env, mdd, 0, handle);
1996
1997         RETURN(rc);
1998 }
1999
2000 /*
2001  * do NOT or the MAY_*'s, you'll get the weakest
2002  */
2003 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
2004 {
2005         int res = 0;
2006
2007         /* Sadly, NFSD reopens a file repeatedly during operation, so the
2008          * "acc_mode = 0" allowance for newly-created files isn't honoured.
2009          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
2010          * owner can write to a file even if it is marked readonly to hide
2011          * its brokenness. (bug 5781) */
2012         if (flags & MDS_OPEN_OWNEROVERRIDE) {
2013                 struct md_ucred *uc = md_ucred(env);
2014
2015                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
2016                     (la->la_uid == uc->mu_fsuid))
2017                         return 0;
2018         }
2019
2020         if (flags & FMODE_READ)
2021                 res |= MAY_READ;
2022         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2023                 res |= MAY_WRITE;
2024         if (flags & MDS_FMODE_EXEC)
2025                 res = MAY_EXEC;
2026         return res;
2027 }
2028
2029 static int mdd_open_sanity_check(const struct lu_env *env,
2030                                  struct mdd_object *obj, int flag)
2031 {
2032         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2033         int mode, rc;
2034         ENTRY;
2035
2036         /* EEXIST check */
2037         if (mdd_is_dead_obj(obj))
2038                 RETURN(-ENOENT);
2039
2040         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
2041         if (rc)
2042                RETURN(rc);
2043
2044         if (S_ISLNK(tmp_la->la_mode))
2045                 RETURN(-ELOOP);
2046
2047         mode = accmode(env, tmp_la, flag);
2048
2049         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2050                 RETURN(-EISDIR);
2051
2052         if (!(flag & MDS_OPEN_CREATED)) {
2053                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
2054                 if (rc)
2055                         RETURN(rc);
2056         }
2057
2058         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2059             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2060                 flag &= ~MDS_OPEN_TRUNC;
2061
2062         /* For writing append-only file must open it with append mode. */
2063         if (mdd_is_append(obj)) {
2064                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2065                         RETURN(-EPERM);
2066                 if (flag & MDS_OPEN_TRUNC)
2067                         RETURN(-EPERM);
2068         }
2069
2070 #if 0
2071         /*
2072          * Now, flag -- O_NOATIME does not be packed by client.
2073          */
2074         if (flag & O_NOATIME) {
2075                 struct md_ucred *uc = md_ucred(env);
2076
2077                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2078                     (uc->mu_valid == UCRED_NEW)) &&
2079                     (uc->mu_fsuid != tmp_la->la_uid) &&
2080                     !mdd_capable(uc, CFS_CAP_FOWNER))
2081                         RETURN(-EPERM);
2082         }
2083 #endif
2084
2085         RETURN(0);
2086 }
2087
2088 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2089                     int flags)
2090 {
2091         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2092         int rc = 0;
2093
2094         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2095
2096         rc = mdd_open_sanity_check(env, mdd_obj, flags);
2097         if (rc == 0)
2098                 mdd_obj->mod_count++;
2099
2100         mdd_write_unlock(env, mdd_obj);
2101         return rc;
2102 }
2103
2104 /* return md_attr back,
2105  * if it is last unlink then return lov ea + llog cookie*/
2106 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2107                     struct md_attr *ma)
2108 {
2109         int rc = 0;
2110         ENTRY;
2111
2112         if (S_ISREG(mdd_object_type(obj))) {
2113                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2114                  * Caller must be ready for that. */
2115
2116                 rc = __mdd_lmm_get(env, obj, ma);
2117                 if ((ma->ma_valid & MA_LOV))
2118                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2119                                             obj, ma);
2120         }
2121         RETURN(rc);
2122 }
2123
2124 /*
2125  * No permission check is needed.
2126  */
2127 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2128                      struct md_attr *ma)
2129 {
2130         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2131         struct mdd_device *mdd = mdo2mdd(obj);
2132         struct thandle    *handle = NULL;
2133         int rc;
2134         int reset = 1;
2135
2136 #ifdef HAVE_QUOTA_SUPPORT
2137         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2138         struct mds_obd *mds = &obd->u.mds;
2139         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2140         int quota_opc = 0;
2141 #endif
2142         ENTRY;
2143
2144         if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2145                 mdd_obj->mod_count--;
2146
2147                 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2148                         CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2149                                "list\n", PFID(mdd_object_fid(mdd_obj)));
2150                 RETURN(0);
2151         }
2152
2153         /* check without any lock */
2154         if (mdd_obj->mod_count == 1 &&
2155             (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2156  again:
2157                 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP, 0);
2158                 if (rc)
2159                         RETURN(rc);
2160                 handle = mdd_trans_start(env, mdo2mdd(obj));
2161                 if (IS_ERR(handle))
2162                         RETURN(PTR_ERR(handle));
2163         }
2164
2165         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2166         if (handle == NULL && mdd_obj->mod_count == 1 &&
2167             (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2168                 mdd_write_unlock(env, mdd_obj);
2169                 goto again;
2170         }
2171
2172         /* release open count */
2173         mdd_obj->mod_count --;
2174
2175         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2176                 /* remove link to object from orphan index */
2177                 rc = __mdd_orphan_del(env, mdd_obj, handle);
2178                 if (rc == 0) {
2179                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2180                                "list, OSS objects to be destroyed.\n",
2181                                PFID(mdd_object_fid(mdd_obj)));
2182                 } else {
2183                         CERROR("Object "DFID" can not be deleted from orphan "
2184                                 "list, maybe cause OST objects can not be "
2185                                 "destroyed (err: %d).\n",
2186                                 PFID(mdd_object_fid(mdd_obj)), rc);
2187                         /* If object was not deleted from orphan list, do not
2188                          * destroy OSS objects, which will be done when next
2189                          * recovery. */
2190                         GOTO(out, rc);
2191                 }
2192         }
2193
2194         rc = mdd_iattr_get(env, mdd_obj, ma);
2195         /* Object maybe not in orphan list originally, it is rare case for
2196          * mdd_finish_unlink() failure. */
2197         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
2198 #ifdef HAVE_QUOTA_SUPPORT
2199                 if (mds->mds_quota) {
2200                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2201                         mdd_quota_wrapper(&ma->ma_attr, qids);
2202                 }
2203 #endif
2204                 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2205                 if (ma->ma_valid & MA_FLAGS &&
2206                     ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2207                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2208                 } else {
2209                         rc = mdd_object_kill(env, mdd_obj, ma);
2210                         if (rc == 0)
2211                                 reset = 0;
2212                 }
2213
2214                 if (rc != 0)
2215                         CERROR("Error when prepare to delete Object "DFID" , "
2216                                "which will cause OST objects can not be "
2217                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
2218         }
2219         EXIT;
2220
2221 out:
2222         if (reset)
2223                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2224
2225         mdd_write_unlock(env, mdd_obj);
2226         if (handle != NULL)
2227                 mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
2228 #ifdef HAVE_QUOTA_SUPPORT
2229         if (quota_opc)
2230                 /* Trigger dqrel on the owner of child. If failed,
2231                  * the next call for lquota_chkquota will process it */
2232                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2233                               quota_opc);
2234 #endif
2235         return rc;
2236 }
2237
2238 /*
2239  * Permission check is done when open,
2240  * no need check again.
2241  */
2242 static int mdd_readpage_sanity_check(const struct lu_env *env,
2243                                      struct mdd_object *obj)
2244 {
2245         struct dt_object *next = mdd_object_child(obj);
2246         int rc;
2247         ENTRY;
2248
2249         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2250                 rc = 0;
2251         else
2252                 rc = -ENOTDIR;
2253
2254         RETURN(rc);
2255 }
2256
2257 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2258                               struct lu_dirpage *dp, int nob,
2259                               const struct dt_it_ops *iops, struct dt_it *it,
2260                               __u32 attr)
2261 {
2262         void                   *area = dp;
2263         int                     result;
2264         __u64                   hash = 0;
2265         struct lu_dirent       *ent;
2266         struct lu_dirent       *last = NULL;
2267         int                     first = 1;
2268
2269         memset(area, 0, sizeof (*dp));
2270         area += sizeof (*dp);
2271         nob  -= sizeof (*dp);
2272
2273         ent  = area;
2274         do {
2275                 int    len;
2276                 int    recsize;
2277
2278                 len  = iops->key_size(env, it);
2279
2280                 /* IAM iterator can return record with zero len. */
2281                 if (len == 0)
2282                         goto next;
2283
2284                 hash = iops->store(env, it);
2285                 if (unlikely(first)) {
2286                         first = 0;
2287                         dp->ldp_hash_start = cpu_to_le64(hash);
2288                 }
2289
2290                 /* calculate max space required for lu_dirent */
2291                 recsize = lu_dirent_calc_size(len, attr);
2292
2293                 if (nob >= recsize) {
2294                         result = iops->rec(env, it, ent, attr);
2295                         if (result == -ESTALE)
2296                                 goto next;
2297                         if (result != 0)
2298                                 goto out;
2299
2300                         /* osd might not able to pack all attributes,
2301                          * so recheck rec length */
2302                         recsize = le16_to_cpu(ent->lde_reclen);
2303                 } else {
2304                         result = (last != NULL) ? 0 :-EINVAL;
2305                         goto out;
2306                 }
2307                 last = ent;
2308                 ent = (void *)ent + recsize;
2309                 nob -= recsize;
2310
2311 next:
2312                 result = iops->next(env, it);
2313                 if (result == -ESTALE)
2314                         goto next;
2315         } while (result == 0);
2316
2317 out:
2318         dp->ldp_hash_end = cpu_to_le64(hash);
2319         if (last != NULL) {
2320                 if (last->lde_hash == dp->ldp_hash_end)
2321                         dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2322                 last->lde_reclen = 0; /* end mark */
2323         }
2324         return result;
2325 }
2326
2327 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2328                           const struct lu_rdpg *rdpg)
2329 {
2330         struct dt_it      *it;
2331         struct dt_object  *next = mdd_object_child(obj);
2332         const struct dt_it_ops  *iops;
2333         struct page       *pg;
2334         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2335         int i;
2336         int nlupgs = 0;
2337         int rc;
2338         int nob;
2339
2340         LASSERT(rdpg->rp_pages != NULL);
2341         LASSERT(next->do_index_ops != NULL);
2342
2343         if (rdpg->rp_count <= 0)
2344                 return -EFAULT;
2345
2346         /*
2347          * iterate through directory and fill pages from @rdpg
2348          */
2349         iops = &next->do_index_ops->dio_it;
2350         it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
2351         if (IS_ERR(it))
2352                 return PTR_ERR(it);
2353
2354         rc = iops->load(env, it, rdpg->rp_hash);
2355
2356         if (rc == 0) {
2357                 /*
2358                  * Iterator didn't find record with exactly the key requested.
2359                  *
2360                  * It is currently either
2361                  *
2362                  *     - positioned above record with key less than
2363                  *     requested---skip it.
2364                  *
2365                  *     - or not positioned at all (is in IAM_IT_SKEWED
2366                  *     state)---position it on the next item.
2367                  */
2368                 rc = iops->next(env, it);
2369         } else if (rc > 0)
2370                 rc = 0;
2371
2372         /*
2373          * At this point and across for-loop:
2374          *
2375          *  rc == 0 -> ok, proceed.
2376          *  rc >  0 -> end of directory.
2377          *  rc <  0 -> error.
2378          */
2379         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2380              i++, nob -= CFS_PAGE_SIZE) {
2381                 struct lu_dirpage *dp;
2382
2383                 LASSERT(i < rdpg->rp_npages);
2384                 pg = rdpg->rp_pages[i];
2385                 dp = cfs_kmap(pg);
2386 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2387 repeat:
2388 #endif
2389                 rc = mdd_dir_page_build(env, mdd, dp,
2390                                         min_t(int, nob, LU_PAGE_SIZE),
2391                                         iops, it, rdpg->rp_attrs);
2392                 if (rc > 0) {
2393                         /*
2394                          * end of directory.
2395                          */
2396                         dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2397                         nlupgs++;
2398                 } else if (rc < 0) {
2399                         CWARN("build page failed: %d!\n", rc);
2400                 } else {
2401                         nlupgs++;
2402 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2403                         dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2404                         if ((unsigned long)dp & ~CFS_PAGE_MASK)
2405                                 goto repeat;
2406 #endif
2407                 }
2408                 cfs_kunmap(pg);
2409         }
2410         if (rc >= 0) {
2411                 struct lu_dirpage *dp;
2412
2413                 dp = cfs_kmap(rdpg->rp_pages[0]);
2414                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2415                 if (nlupgs == 0) {
2416                         /*
2417                          * No pages were processed, mark this for first page
2418                          * and send back.
2419                          */
2420                         dp->ldp_flags  = cpu_to_le32(LDF_EMPTY);
2421                         nlupgs = 1;
2422                 }
2423                 cfs_kunmap(rdpg->rp_pages[0]);
2424
2425                 rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
2426         }
2427         iops->put(env, it);
2428         iops->fini(env, it);
2429
2430         return rc;
2431 }
2432
2433 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2434                  const struct lu_rdpg *rdpg)
2435 {
2436         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2437         int rc;
2438         ENTRY;
2439
2440         LASSERT(mdd_object_exists(mdd_obj));
2441
2442         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2443         rc = mdd_readpage_sanity_check(env, mdd_obj);
2444         if (rc)
2445                 GOTO(out_unlock, rc);
2446
2447         if (mdd_is_dead_obj(mdd_obj)) {
2448                 struct page *pg;
2449                 struct lu_dirpage *dp;
2450
2451                 /*
2452                  * According to POSIX, please do not return any entry to client:
2453                  * even dot and dotdot should not be returned.
2454                  */
2455                 CWARN("readdir from dead object: "DFID"\n",
2456                         PFID(mdd_object_fid(mdd_obj)));
2457
2458                 if (rdpg->rp_count <= 0)
2459                         GOTO(out_unlock, rc = -EFAULT);
2460                 LASSERT(rdpg->rp_pages != NULL);
2461
2462                 pg = rdpg->rp_pages[0];
2463                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2464                 memset(dp, 0 , sizeof(struct lu_dirpage));
2465                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2466                 dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
2467                 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2468                 cfs_kunmap(pg);
2469                 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2470         }
2471
2472         rc = __mdd_readpage(env, mdd_obj, rdpg);
2473
2474         EXIT;
2475 out_unlock:
2476         mdd_read_unlock(env, mdd_obj);
2477         return rc;
2478 }
2479
2480 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2481 {
2482         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2483         struct dt_object *next;
2484
2485         LASSERT(mdd_object_exists(mdd_obj));
2486         next = mdd_object_child(mdd_obj);
2487         return next->do_ops->do_object_sync(env, next);
2488 }
2489
2490 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2491                                         struct md_object *obj)
2492 {
2493         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2494
2495         LASSERT(mdd_object_exists(mdd_obj));
2496         return do_version_get(env, mdd_object_child(mdd_obj));
2497 }
2498
2499 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2500                             dt_obj_version_t version)
2501 {
2502         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2503
2504         LASSERT(mdd_object_exists(mdd_obj));
2505         do_version_set(env, mdd_object_child(mdd_obj), version);
2506 }
2507
2508 const struct md_object_operations mdd_obj_ops = {
2509         .moo_permission    = mdd_permission,
2510         .moo_attr_get      = mdd_attr_get,
2511         .moo_attr_set      = mdd_attr_set,
2512         .moo_xattr_get     = mdd_xattr_get,
2513         .moo_xattr_set     = mdd_xattr_set,
2514         .moo_xattr_list    = mdd_xattr_list,
2515         .moo_xattr_del     = mdd_xattr_del,
2516         .moo_object_create = mdd_object_create,
2517         .moo_ref_add       = mdd_ref_add,
2518         .moo_ref_del       = mdd_ref_del,
2519         .moo_open          = mdd_open,
2520         .moo_close         = mdd_close,
2521         .moo_readpage      = mdd_readpage,
2522         .moo_readlink      = mdd_readlink,
2523         .moo_changelog     = mdd_changelog,
2524         .moo_capa_get      = mdd_capa_get,
2525         .moo_object_sync   = mdd_object_sync,
2526         .moo_version_get   = mdd_version_get,
2527         .moo_version_set   = mdd_version_set,
2528         .moo_path          = mdd_path,
2529         .moo_file_lock     = mdd_file_lock,
2530         .moo_file_unlock   = mdd_file_unlock,
2531 };