Whamcloud - gitweb
LU-506 FC15: unplug_fn removed since 2.6.39.
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * Copyright (c) 2011 Whamcloud, Inc.
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  *
39  * lustre/mdd/mdd_object.c
40  *
41  * Lustre Metadata Server (mdd) routines
42  *
43  * Author: Wang Di <wangdi@clusterfs.com>
44  */
45
46 #ifndef EXPORT_SYMTAB
47 # define EXPORT_SYMTAB
48 #endif
49 #define DEBUG_SUBSYSTEM S_MDS
50
51 #include <linux/module.h>
52 #ifdef HAVE_EXT4_LDISKFS
53 #include <ldiskfs/ldiskfs_jbd2.h>
54 #else
55 #include <linux/jbd.h>
56 #endif
57 #include <obd.h>
58 #include <obd_class.h>
59 #include <obd_support.h>
60 #include <lprocfs_status.h>
61 /* fid_be_cpu(), fid_cpu_to_be(). */
62 #include <lustre_fid.h>
63
64 #include <lustre_param.h>
65 #ifdef HAVE_EXT4_LDISKFS
66 #include <ldiskfs/ldiskfs.h>
67 #else
68 #include <linux/ldiskfs_fs.h>
69 #endif
70 #include <lustre_mds.h>
71 #include <lustre/lustre_idl.h>
72
73 #include "mdd_internal.h"
74
75 static const struct lu_object_operations mdd_lu_obj_ops;
76
77 static int mdd_xattr_get(const struct lu_env *env,
78                          struct md_object *obj, struct lu_buf *buf,
79                          const char *name);
80
81 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
82                  void **data)
83 {
84         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
85                  PFID(mdd_object_fid(obj)));
86         mdo_data_get(env, obj, data);
87         return 0;
88 }
89
90 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
91                struct lu_attr *la, struct lustre_capa *capa)
92 {
93         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
94                  PFID(mdd_object_fid(obj)));
95         return mdo_attr_get(env, obj, la, capa);
96 }
97
98 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
99 {
100         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
101
102         if (flags & LUSTRE_APPEND_FL)
103                 obj->mod_flags |= APPEND_OBJ;
104
105         if (flags & LUSTRE_IMMUTABLE_FL)
106                 obj->mod_flags |= IMMUTE_OBJ;
107 }
108
109 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
110 {
111         struct mdd_thread_info *info;
112
113         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
114         LASSERT(info != NULL);
115         return info;
116 }
117
118 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
119 {
120         struct lu_buf *buf;
121
122         buf = &mdd_env_info(env)->mti_buf;
123         buf->lb_buf = area;
124         buf->lb_len = len;
125         return buf;
126 }
127
128 void mdd_buf_put(struct lu_buf *buf)
129 {
130         if (buf == NULL || buf->lb_buf == NULL)
131                 return;
132         OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
133         buf->lb_buf = NULL;
134         buf->lb_len = 0;
135 }
136
137 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
138                                        const void *area, ssize_t len)
139 {
140         struct lu_buf *buf;
141
142         buf = &mdd_env_info(env)->mti_buf;
143         buf->lb_buf = (void *)area;
144         buf->lb_len = len;
145         return buf;
146 }
147
148 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
149 {
150         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
151
152         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
153                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
154                 buf->lb_buf = NULL;
155         }
156         if (buf->lb_buf == NULL) {
157                 buf->lb_len = len;
158                 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
159                 if (buf->lb_buf == NULL)
160                         buf->lb_len = 0;
161         }
162         return buf;
163 }
164
165 /** Increase the size of the \a mti_big_buf.
166  * preserves old data in buffer
167  * old buffer remains unchanged on error
168  * \retval 0 or -ENOMEM
169  */
170 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
171 {
172         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
173         struct lu_buf buf;
174
175         LASSERT(len >= oldbuf->lb_len);
176         OBD_ALLOC_LARGE(buf.lb_buf, len);
177
178         if (buf.lb_buf == NULL)
179                 return -ENOMEM;
180
181         buf.lb_len = len;
182         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
183
184         OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
185
186         memcpy(oldbuf, &buf, sizeof(buf));
187
188         return 0;
189 }
190
191 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
192                                        struct mdd_device *mdd)
193 {
194         struct mdd_thread_info *mti = mdd_env_info(env);
195         int                     max_cookie_size;
196
197         max_cookie_size = mdd_lov_cookiesize(env, mdd);
198         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
199                 if (mti->mti_max_cookie)
200                         OBD_FREE_LARGE(mti->mti_max_cookie,
201                                        mti->mti_max_cookie_size);
202                 mti->mti_max_cookie = NULL;
203                 mti->mti_max_cookie_size = 0;
204         }
205         if (unlikely(mti->mti_max_cookie == NULL)) {
206                 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
207                 if (likely(mti->mti_max_cookie != NULL))
208                         mti->mti_max_cookie_size = max_cookie_size;
209         }
210         if (likely(mti->mti_max_cookie != NULL))
211                 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
212         return mti->mti_max_cookie;
213 }
214
215 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
216                                    struct mdd_device *mdd)
217 {
218         struct mdd_thread_info *mti = mdd_env_info(env);
219         int                     max_lmm_size;
220
221         max_lmm_size = mdd_lov_mdsize(env, mdd);
222         if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
223                 if (mti->mti_max_lmm)
224                         OBD_FREE_LARGE(mti->mti_max_lmm, mti->mti_max_lmm_size);
225                 mti->mti_max_lmm = NULL;
226                 mti->mti_max_lmm_size = 0;
227         }
228         if (unlikely(mti->mti_max_lmm == NULL)) {
229                 OBD_ALLOC_LARGE(mti->mti_max_lmm, max_lmm_size);
230                 if (likely(mti->mti_max_lmm != NULL))
231                         mti->mti_max_lmm_size = max_lmm_size;
232         }
233         return mti->mti_max_lmm;
234 }
235
236 struct lu_object *mdd_object_alloc(const struct lu_env *env,
237                                    const struct lu_object_header *hdr,
238                                    struct lu_device *d)
239 {
240         struct mdd_object *mdd_obj;
241
242         OBD_ALLOC_PTR(mdd_obj);
243         if (mdd_obj != NULL) {
244                 struct lu_object *o;
245
246                 o = mdd2lu_obj(mdd_obj);
247                 lu_object_init(o, NULL, d);
248                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
249                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
250                 mdd_obj->mod_count = 0;
251                 o->lo_ops = &mdd_lu_obj_ops;
252                 return o;
253         } else {
254                 return NULL;
255         }
256 }
257
258 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
259                            const struct lu_object_conf *unused)
260 {
261         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
262         struct mdd_object *mdd_obj = lu2mdd_obj(o);
263         struct lu_object  *below;
264         struct lu_device  *under;
265         ENTRY;
266
267         mdd_obj->mod_cltime = 0;
268         under = &d->mdd_child->dd_lu_dev;
269         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
270         mdd_pdlock_init(mdd_obj);
271         if (below == NULL)
272                 RETURN(-ENOMEM);
273
274         lu_object_add(o, below);
275
276         RETURN(0);
277 }
278
279 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
280 {
281         if (lu_object_exists(o))
282                 return mdd_get_flags(env, lu2mdd_obj(o));
283         else
284                 return 0;
285 }
286
287 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
288 {
289         struct mdd_object *mdd = lu2mdd_obj(o);
290
291         lu_object_fini(o);
292         OBD_FREE_PTR(mdd);
293 }
294
295 static int mdd_object_print(const struct lu_env *env, void *cookie,
296                             lu_printer_t p, const struct lu_object *o)
297 {
298         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
299         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
300                     "valid=%x, cltime="LPU64", flags=%lx)",
301                     mdd, mdd->mod_count, mdd->mod_valid,
302                     mdd->mod_cltime, mdd->mod_flags);
303 }
304
305 static const struct lu_object_operations mdd_lu_obj_ops = {
306         .loo_object_init    = mdd_object_init,
307         .loo_object_start   = mdd_object_start,
308         .loo_object_free    = mdd_object_free,
309         .loo_object_print   = mdd_object_print,
310 };
311
312 struct mdd_object *mdd_object_find(const struct lu_env *env,
313                                    struct mdd_device *d,
314                                    const struct lu_fid *f)
315 {
316         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
317 }
318
319 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
320                         const char *path, struct lu_fid *fid)
321 {
322         struct lu_buf *buf;
323         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
324         struct mdd_object *obj;
325         struct lu_name *lname = &mdd_env_info(env)->mti_name;
326         char *name;
327         int rc = 0;
328         ENTRY;
329
330         /* temp buffer for path element */
331         buf = mdd_buf_alloc(env, PATH_MAX);
332         if (buf->lb_buf == NULL)
333                 RETURN(-ENOMEM);
334
335         lname->ln_name = name = buf->lb_buf;
336         lname->ln_namelen = 0;
337         *f = mdd->mdd_root_fid;
338
339         while(1) {
340                 while (*path == '/')
341                         path++;
342                 if (*path == '\0')
343                         break;
344                 while (*path != '/' && *path != '\0') {
345                         *name = *path;
346                         path++;
347                         name++;
348                         lname->ln_namelen++;
349                 }
350
351                 *name = '\0';
352                 /* find obj corresponding to fid */
353                 obj = mdd_object_find(env, mdd, f);
354                 if (obj == NULL)
355                         GOTO(out, rc = -EREMOTE);
356                 if (IS_ERR(obj))
357                         GOTO(out, rc = PTR_ERR(obj));
358                 /* get child fid from parent and name */
359                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
360                 mdd_object_put(env, obj);
361                 if (rc)
362                         break;
363
364                 name = buf->lb_buf;
365                 lname->ln_namelen = 0;
366         }
367
368         if (!rc)
369                 *fid = *f;
370 out:
371         RETURN(rc);
372 }
373
374 /** The maximum depth that fid2path() will search.
375  * This is limited only because we want to store the fids for
376  * historical path lookup purposes.
377  */
378 #define MAX_PATH_DEPTH 100
379
380 /** mdd_path() lookup structure. */
381 struct path_lookup_info {
382         __u64                pli_recno;        /**< history point */
383         __u64                pli_currec;       /**< current record */
384         struct lu_fid        pli_fid;
385         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
386         struct mdd_object   *pli_mdd_obj;
387         char                *pli_path;         /**< full path */
388         int                  pli_pathlen;
389         int                  pli_linkno;       /**< which hardlink to follow */
390         int                  pli_fidcount;     /**< number of \a pli_fids */
391 };
392
393 static int mdd_path_current(const struct lu_env *env,
394                             struct path_lookup_info *pli)
395 {
396         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
397         struct mdd_object *mdd_obj;
398         struct lu_buf     *buf = NULL;
399         struct link_ea_header *leh;
400         struct link_ea_entry  *lee;
401         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
402         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
403         char *ptr;
404         int reclen;
405         int rc;
406         ENTRY;
407
408         ptr = pli->pli_path + pli->pli_pathlen - 1;
409         *ptr = 0;
410         --ptr;
411         pli->pli_fidcount = 0;
412         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
413
414         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
415                 mdd_obj = mdd_object_find(env, mdd,
416                                           &pli->pli_fids[pli->pli_fidcount]);
417                 if (mdd_obj == NULL)
418                         GOTO(out, rc = -EREMOTE);
419                 if (IS_ERR(mdd_obj))
420                         GOTO(out, rc = PTR_ERR(mdd_obj));
421                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
422                 if (rc <= 0) {
423                         mdd_object_put(env, mdd_obj);
424                         if (rc == -1)
425                                 rc = -EREMOTE;
426                         else if (rc == 0)
427                                 /* Do I need to error out here? */
428                                 rc = -ENOENT;
429                         GOTO(out, rc);
430                 }
431
432                 /* Get parent fid and object name */
433                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
434                 buf = mdd_links_get(env, mdd_obj);
435                 mdd_read_unlock(env, mdd_obj);
436                 mdd_object_put(env, mdd_obj);
437                 if (IS_ERR(buf))
438                         GOTO(out, rc = PTR_ERR(buf));
439
440                 leh = buf->lb_buf;
441                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
442                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
443
444                 /* If set, use link #linkno for path lookup, otherwise use
445                    link #0.  Only do this for the final path element. */
446                 if ((pli->pli_fidcount == 0) &&
447                     (pli->pli_linkno < leh->leh_reccount)) {
448                         int count;
449                         for (count = 0; count < pli->pli_linkno; count++) {
450                                 lee = (struct link_ea_entry *)
451                                      ((char *)lee + reclen);
452                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
453                         }
454                         if (pli->pli_linkno < leh->leh_reccount - 1)
455                                 /* indicate to user there are more links */
456                                 pli->pli_linkno++;
457                 }
458
459                 /* Pack the name in the end of the buffer */
460                 ptr -= tmpname->ln_namelen;
461                 if (ptr - 1 <= pli->pli_path)
462                         GOTO(out, rc = -EOVERFLOW);
463                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
464                 *(--ptr) = '/';
465
466                 /* Store the parent fid for historic lookup */
467                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
468                         GOTO(out, rc = -EOVERFLOW);
469                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
470         }
471
472         /* Verify that our path hasn't changed since we started the lookup.
473            Record the current index, and verify the path resolves to the
474            same fid. If it does, then the path is correct as of this index. */
475         cfs_spin_lock(&mdd->mdd_cl.mc_lock);
476         pli->pli_currec = mdd->mdd_cl.mc_index;
477         cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
478         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
479         if (rc) {
480                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
481                 GOTO (out, rc = -EAGAIN);
482         }
483         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
484                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
485                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
486                        PFID(&pli->pli_fid));
487                 GOTO(out, rc = -EAGAIN);
488         }
489         ptr++; /* skip leading / */
490         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
491
492         EXIT;
493 out:
494         if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
495                 /* if we vmalloced a large buffer drop it */
496                 mdd_buf_put(buf);
497
498         return rc;
499 }
500
501 static int mdd_path_historic(const struct lu_env *env,
502                              struct path_lookup_info *pli)
503 {
504         return 0;
505 }
506
507 /* Returns the full path to this fid, as of changelog record recno. */
508 static int mdd_path(const struct lu_env *env, struct md_object *obj,
509                     char *path, int pathlen, __u64 *recno, int *linkno)
510 {
511         struct path_lookup_info *pli;
512         int tries = 3;
513         int rc = -EAGAIN;
514         ENTRY;
515
516         if (pathlen < 3)
517                 RETURN(-EOVERFLOW);
518
519         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
520                 path[0] = '\0';
521                 RETURN(0);
522         }
523
524         OBD_ALLOC_PTR(pli);
525         if (pli == NULL)
526                 RETURN(-ENOMEM);
527
528         pli->pli_mdd_obj = md2mdd_obj(obj);
529         pli->pli_recno = *recno;
530         pli->pli_path = path;
531         pli->pli_pathlen = pathlen;
532         pli->pli_linkno = *linkno;
533
534         /* Retry multiple times in case file is being moved */
535         while (tries-- && rc == -EAGAIN)
536                 rc = mdd_path_current(env, pli);
537
538         /* For historical path lookup, the current links may not have existed
539          * at "recno" time.  We must switch over to earlier links/parents
540          * by using the changelog records.  If the earlier parent doesn't
541          * exist, we must search back through the changelog to reconstruct
542          * its parents, then check if it exists, etc.
543          * We may ignore this problem for the initial implementation and
544          * state that an "original" hardlink must still exist for us to find
545          * historic path name. */
546         if (pli->pli_recno != -1) {
547                 rc = mdd_path_historic(env, pli);
548         } else {
549                 *recno = pli->pli_currec;
550                 /* Return next link index to caller */
551                 *linkno = pli->pli_linkno;
552         }
553
554         OBD_FREE_PTR(pli);
555
556         RETURN (rc);
557 }
558
559 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
560 {
561         struct lu_attr *la = &mdd_env_info(env)->mti_la;
562         int rc;
563
564         ENTRY;
565         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
566         if (rc == 0) {
567                 mdd_flags_xlate(obj, la->la_flags);
568                 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
569                         obj->mod_flags |= MNLINK_OBJ;
570         }
571         RETURN(rc);
572 }
573
574 /* get only inode attributes */
575 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
576                   struct md_attr *ma)
577 {
578         int rc = 0;
579         ENTRY;
580
581         if (ma->ma_valid & MA_INODE)
582                 RETURN(0);
583
584         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
585                           mdd_object_capa(env, mdd_obj));
586         if (rc == 0)
587                 ma->ma_valid |= MA_INODE;
588         RETURN(rc);
589 }
590
591 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
592 {
593         struct lov_desc *ldesc;
594         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
595         struct lov_user_md *lum = (struct lov_user_md*)lmm;
596         ENTRY;
597
598         if (!lum)
599                 RETURN(0);
600
601         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
602         LASSERT(ldesc != NULL);
603
604         lum->lmm_magic = LOV_MAGIC_V1;
605         lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
606         lum->lmm_pattern = ldesc->ld_pattern;
607         lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
608         lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
609         lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
610
611         RETURN(sizeof(*lum));
612 }
613
614 static int is_rootdir(struct mdd_object *mdd_obj)
615 {
616         const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
617         const struct lu_fid *fid = mdo2fid(mdd_obj);
618
619         return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
620 }
621
622 /* get lov EA only */
623 static int __mdd_lmm_get(const struct lu_env *env,
624                          struct mdd_object *mdd_obj, struct md_attr *ma)
625 {
626         int rc;
627         ENTRY;
628
629         if (ma->ma_valid & MA_LOV)
630                 RETURN(0);
631
632         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
633                         XATTR_NAME_LOV);
634         if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
635                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
636         if (rc > 0) {
637                 ma->ma_lmm_size = rc;
638                 ma->ma_valid |= MA_LOV;
639                 rc = 0;
640         }
641         RETURN(rc);
642 }
643
644 /* get the first parent fid from link EA */
645 static int mdd_pfid_get(const struct lu_env *env,
646                         struct mdd_object *mdd_obj, struct md_attr *ma)
647 {
648         struct lu_buf *buf;
649         struct link_ea_header *leh;
650         struct link_ea_entry *lee;
651         struct lu_fid *pfid = &ma->ma_pfid;
652         ENTRY;
653
654         if (ma->ma_valid & MA_PFID)
655                 RETURN(0);
656
657         buf = mdd_links_get(env, mdd_obj);
658         if (IS_ERR(buf))
659                 RETURN(PTR_ERR(buf));
660
661         leh = buf->lb_buf;
662         lee = (struct link_ea_entry *)(leh + 1);
663         memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
664         fid_be_to_cpu(pfid, pfid);
665         ma->ma_valid |= MA_PFID;
666         if (buf->lb_len > OBD_ALLOC_BIG)
667                 /* if we vmalloced a large buffer drop it */
668                 mdd_buf_put(buf);
669         RETURN(0);
670 }
671
672 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
673                        struct md_attr *ma)
674 {
675         int rc;
676         ENTRY;
677
678         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
679         rc = __mdd_lmm_get(env, mdd_obj, ma);
680         mdd_read_unlock(env, mdd_obj);
681         RETURN(rc);
682 }
683
684 /* get lmv EA only*/
685 static int __mdd_lmv_get(const struct lu_env *env,
686                          struct mdd_object *mdd_obj, struct md_attr *ma)
687 {
688         int rc;
689         ENTRY;
690
691         if (ma->ma_valid & MA_LMV)
692                 RETURN(0);
693
694         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
695                         XATTR_NAME_LMV);
696         if (rc > 0) {
697                 ma->ma_valid |= MA_LMV;
698                 rc = 0;
699         }
700         RETURN(rc);
701 }
702
703 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
704                          struct md_attr *ma)
705 {
706         struct mdd_thread_info *info = mdd_env_info(env);
707         struct lustre_mdt_attrs *lma =
708                                  (struct lustre_mdt_attrs *)info->mti_xattr_buf;
709         int lma_size;
710         int rc;
711         ENTRY;
712
713         /* If all needed data are already valid, nothing to do */
714         if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
715             (ma->ma_need & (MA_HSM | MA_SOM)))
716                 RETURN(0);
717
718         /* Read LMA from disk EA */
719         lma_size = sizeof(info->mti_xattr_buf);
720         rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
721         if (rc <= 0)
722                 RETURN(rc);
723
724         /* Useless to check LMA incompatibility because this is already done in
725          * osd_ea_fid_get(), and this will fail long before this code is
726          * called.
727          * So, if we are here, LMA is compatible.
728          */
729
730         lustre_lma_swab(lma);
731
732         /* Swab and copy LMA */
733         if (ma->ma_need & MA_HSM) {
734                 if (lma->lma_compat & LMAC_HSM)
735                         ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
736                 else
737                         ma->ma_hsm.mh_flags = 0;
738                 ma->ma_valid |= MA_HSM;
739         }
740
741         /* Copy SOM */
742         if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
743                 LASSERT(ma->ma_som != NULL);
744                 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
745                 ma->ma_som->msd_size    = lma->lma_som_size;
746                 ma->ma_som->msd_blocks  = lma->lma_som_blocks;
747                 ma->ma_som->msd_mountid = lma->lma_som_mountid;
748                 ma->ma_valid |= MA_SOM;
749         }
750
751         RETURN(0);
752 }
753
754 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
755                                  struct md_attr *ma)
756 {
757         int rc = 0;
758         ENTRY;
759
760         if (ma->ma_need & MA_INODE)
761                 rc = mdd_iattr_get(env, mdd_obj, ma);
762
763         if (rc == 0 && ma->ma_need & MA_LOV) {
764                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
765                     S_ISDIR(mdd_object_type(mdd_obj)))
766                         rc = __mdd_lmm_get(env, mdd_obj, ma);
767         }
768         if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
769                 if (S_ISREG(mdd_object_type(mdd_obj)))
770                         rc = mdd_pfid_get(env, mdd_obj, ma);
771         }
772         if (rc == 0 && ma->ma_need & MA_LMV) {
773                 if (S_ISDIR(mdd_object_type(mdd_obj)))
774                         rc = __mdd_lmv_get(env, mdd_obj, ma);
775         }
776         if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
777                 if (S_ISREG(mdd_object_type(mdd_obj)))
778                         rc = __mdd_lma_get(env, mdd_obj, ma);
779         }
780 #ifdef CONFIG_FS_POSIX_ACL
781         if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
782                 if (S_ISDIR(mdd_object_type(mdd_obj)))
783                         rc = mdd_def_acl_get(env, mdd_obj, ma);
784         }
785 #endif
786         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
787                rc, ma->ma_valid, ma->ma_lmm);
788         RETURN(rc);
789 }
790
791 int mdd_attr_get_internal_locked(const struct lu_env *env,
792                                  struct mdd_object *mdd_obj, struct md_attr *ma)
793 {
794         int rc;
795         int needlock = ma->ma_need &
796                        (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
797
798         if (needlock)
799                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
800         rc = mdd_attr_get_internal(env, mdd_obj, ma);
801         if (needlock)
802                 mdd_read_unlock(env, mdd_obj);
803         return rc;
804 }
805
806 /*
807  * No permission check is needed.
808  */
809 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
810                         struct md_attr *ma)
811 {
812         struct mdd_object *mdd_obj = md2mdd_obj(obj);
813         int                rc;
814
815         ENTRY;
816         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
817         RETURN(rc);
818 }
819
820 /*
821  * No permission check is needed.
822  */
823 static int mdd_xattr_get(const struct lu_env *env,
824                          struct md_object *obj, struct lu_buf *buf,
825                          const char *name)
826 {
827         struct mdd_object *mdd_obj = md2mdd_obj(obj);
828         int rc;
829
830         ENTRY;
831
832         LASSERT(mdd_object_exists(mdd_obj));
833
834         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
835         rc = mdo_xattr_get(env, mdd_obj, buf, name,
836                            mdd_object_capa(env, mdd_obj));
837         mdd_read_unlock(env, mdd_obj);
838
839         RETURN(rc);
840 }
841
842 /*
843  * Permission check is done when open,
844  * no need check again.
845  */
846 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
847                         struct lu_buf *buf)
848 {
849         struct mdd_object *mdd_obj = md2mdd_obj(obj);
850         struct dt_object  *next;
851         loff_t             pos = 0;
852         int                rc;
853         ENTRY;
854
855         LASSERT(mdd_object_exists(mdd_obj));
856
857         next = mdd_object_child(mdd_obj);
858         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
859         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
860                                          mdd_object_capa(env, mdd_obj));
861         mdd_read_unlock(env, mdd_obj);
862         RETURN(rc);
863 }
864
865 /*
866  * No permission check is needed.
867  */
868 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
869                           struct lu_buf *buf)
870 {
871         struct mdd_object *mdd_obj = md2mdd_obj(obj);
872         int rc;
873
874         ENTRY;
875
876         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
877         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
878         mdd_read_unlock(env, mdd_obj);
879
880         RETURN(rc);
881 }
882
883 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
884                                struct mdd_object *c, struct md_attr *ma,
885                                struct thandle *handle,
886                                const struct md_op_spec *spec)
887 {
888         struct lu_attr *attr = &ma->ma_attr;
889         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
890         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
891         const struct dt_index_features *feat = spec->sp_feat;
892         int rc;
893         ENTRY;
894
895         if (!mdd_object_exists(c)) {
896                 struct dt_object *next = mdd_object_child(c);
897                 LASSERT(next);
898
899                 if (feat != &dt_directory_features && feat != NULL)
900                         dof->dof_type = DFT_INDEX;
901                 else
902                         dof->dof_type = dt_mode_to_dft(attr->la_mode);
903
904                 dof->u.dof_idx.di_feat = feat;
905
906                 /* @hint will be initialized by underlying device. */
907                 next->do_ops->do_ah_init(env, hint,
908                                          p ? mdd_object_child(p) : NULL,
909                                          attr->la_mode & S_IFMT);
910
911                 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
912                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
913         } else
914                 rc = -EEXIST;
915
916         RETURN(rc);
917 }
918
919 /**
920  * Make sure the ctime is increased only.
921  */
922 static inline int mdd_attr_check(const struct lu_env *env,
923                                  struct mdd_object *obj,
924                                  struct lu_attr *attr)
925 {
926         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
927         int rc;
928         ENTRY;
929
930         if (attr->la_valid & LA_CTIME) {
931                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
932                 if (rc)
933                         RETURN(rc);
934
935                 if (attr->la_ctime < tmp_la->la_ctime)
936                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
937                 else if (attr->la_valid == LA_CTIME &&
938                          attr->la_ctime == tmp_la->la_ctime)
939                         attr->la_valid &= ~LA_CTIME;
940         }
941         RETURN(0);
942 }
943
944 int mdd_attr_set_internal(const struct lu_env *env,
945                           struct mdd_object *obj,
946                           struct lu_attr *attr,
947                           struct thandle *handle,
948                           int needacl)
949 {
950         int rc;
951         ENTRY;
952
953         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
954 #ifdef CONFIG_FS_POSIX_ACL
955         if (!rc && (attr->la_valid & LA_MODE) && needacl)
956                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
957 #endif
958         RETURN(rc);
959 }
960
961 int mdd_attr_check_set_internal(const struct lu_env *env,
962                                 struct mdd_object *obj,
963                                 struct lu_attr *attr,
964                                 struct thandle *handle,
965                                 int needacl)
966 {
967         int rc;
968         ENTRY;
969
970         rc = mdd_attr_check(env, obj, attr);
971         if (rc)
972                 RETURN(rc);
973
974         if (attr->la_valid)
975                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
976         RETURN(rc);
977 }
978
979 static int mdd_attr_set_internal_locked(const struct lu_env *env,
980                                         struct mdd_object *obj,
981                                         struct lu_attr *attr,
982                                         struct thandle *handle,
983                                         int needacl)
984 {
985         int rc;
986         ENTRY;
987
988         needacl = needacl && (attr->la_valid & LA_MODE);
989         if (needacl)
990                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
991         rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
992         if (needacl)
993                 mdd_write_unlock(env, obj);
994         RETURN(rc);
995 }
996
997 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
998                                        struct mdd_object *obj,
999                                        struct lu_attr *attr,
1000                                        struct thandle *handle,
1001                                        int needacl)
1002 {
1003         int rc;
1004         ENTRY;
1005
1006         needacl = needacl && (attr->la_valid & LA_MODE);
1007         if (needacl)
1008                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1009         rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
1010         if (needacl)
1011                 mdd_write_unlock(env, obj);
1012         RETURN(rc);
1013 }
1014
1015 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
1016                     const struct lu_buf *buf, const char *name,
1017                     int fl, struct thandle *handle)
1018 {
1019         struct lustre_capa *capa = mdd_object_capa(env, obj);
1020         int rc = -EINVAL;
1021         ENTRY;
1022
1023         if (buf->lb_buf && buf->lb_len > 0)
1024                 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1025         else if (buf->lb_buf == NULL && buf->lb_len == 0)
1026                 rc = mdo_xattr_del(env, obj, name, handle, capa);
1027
1028         RETURN(rc);
1029 }
1030
1031 /*
1032  * This gives the same functionality as the code between
1033  * sys_chmod and inode_setattr
1034  * chown_common and inode_setattr
1035  * utimes and inode_setattr
1036  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1037  */
1038 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1039                         struct lu_attr *la, const struct md_attr *ma)
1040 {
1041         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
1042         struct md_ucred  *uc;
1043         int               rc;
1044         ENTRY;
1045
1046         if (!la->la_valid)
1047                 RETURN(0);
1048
1049         /* Do not permit change file type */
1050         if (la->la_valid & LA_TYPE)
1051                 RETURN(-EPERM);
1052
1053         /* They should not be processed by setattr */
1054         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1055                 RETURN(-EPERM);
1056
1057         /* export destroy does not have ->le_ses, but we may want
1058          * to drop LUSTRE_SOM_FL. */
1059         if (!env->le_ses)
1060                 RETURN(0);
1061
1062         uc = md_ucred(env);
1063
1064         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1065         if (rc)
1066                 RETURN(rc);
1067
1068         if (la->la_valid == LA_CTIME) {
1069                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1070                         /* This is only for set ctime when rename's source is
1071                          * on remote MDS. */
1072                         rc = mdd_may_delete(env, NULL, obj,
1073                                             (struct md_attr *)ma, 1, 0);
1074                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1075                         la->la_valid &= ~LA_CTIME;
1076                 RETURN(rc);
1077         }
1078
1079         if (la->la_valid == LA_ATIME) {
1080                 /* This is atime only set for read atime update on close. */
1081                 if (la->la_atime >= tmp_la->la_atime &&
1082                     la->la_atime < (tmp_la->la_atime +
1083                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1084                         la->la_valid &= ~LA_ATIME;
1085                 RETURN(0);
1086         }
1087
1088         /* Check if flags change. */
1089         if (la->la_valid & LA_FLAGS) {
1090                 unsigned int oldflags = 0;
1091                 unsigned int newflags = la->la_flags &
1092                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1093
1094                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1095                     !mdd_capable(uc, CFS_CAP_FOWNER))
1096                         RETURN(-EPERM);
1097
1098                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1099                  * only be changed by the relevant capability. */
1100                 if (mdd_is_immutable(obj))
1101                         oldflags |= LUSTRE_IMMUTABLE_FL;
1102                 if (mdd_is_append(obj))
1103                         oldflags |= LUSTRE_APPEND_FL;
1104                 if ((oldflags ^ newflags) &&
1105                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1106                         RETURN(-EPERM);
1107
1108                 if (!S_ISDIR(tmp_la->la_mode))
1109                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1110         }
1111
1112         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1113             (la->la_valid & ~LA_FLAGS) &&
1114             !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1115                 RETURN(-EPERM);
1116
1117         /* Check for setting the obj time. */
1118         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1119             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1120                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1121                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
1122                         rc = mdd_permission_internal_locked(env, obj, tmp_la,
1123                                                             MAY_WRITE,
1124                                                             MOR_TGT_CHILD);
1125                         if (rc)
1126                                 RETURN(rc);
1127                 }
1128         }
1129
1130         if (la->la_valid & LA_KILL_SUID) {
1131                 la->la_valid &= ~LA_KILL_SUID;
1132                 if ((tmp_la->la_mode & S_ISUID) &&
1133                     !(la->la_valid & LA_MODE)) {
1134                         la->la_mode = tmp_la->la_mode;
1135                         la->la_valid |= LA_MODE;
1136                 }
1137                 la->la_mode &= ~S_ISUID;
1138         }
1139
1140         if (la->la_valid & LA_KILL_SGID) {
1141                 la->la_valid &= ~LA_KILL_SGID;
1142                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1143                                         (S_ISGID | S_IXGRP)) &&
1144                     !(la->la_valid & LA_MODE)) {
1145                         la->la_mode = tmp_la->la_mode;
1146                         la->la_valid |= LA_MODE;
1147                 }
1148                 la->la_mode &= ~S_ISGID;
1149         }
1150
1151         /* Make sure a caller can chmod. */
1152         if (la->la_valid & LA_MODE) {
1153                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1154                     (uc->mu_fsuid != tmp_la->la_uid) &&
1155                     !mdd_capable(uc, CFS_CAP_FOWNER))
1156                         RETURN(-EPERM);
1157
1158                 if (la->la_mode == (cfs_umode_t) -1)
1159                         la->la_mode = tmp_la->la_mode;
1160                 else
1161                         la->la_mode = (la->la_mode & S_IALLUGO) |
1162                                       (tmp_la->la_mode & ~S_IALLUGO);
1163
1164                 /* Also check the setgid bit! */
1165                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1166                                        la->la_gid : tmp_la->la_gid) &&
1167                     !mdd_capable(uc, CFS_CAP_FSETID))
1168                         la->la_mode &= ~S_ISGID;
1169         } else {
1170                la->la_mode = tmp_la->la_mode;
1171         }
1172
1173         /* Make sure a caller can chown. */
1174         if (la->la_valid & LA_UID) {
1175                 if (la->la_uid == (uid_t) -1)
1176                         la->la_uid = tmp_la->la_uid;
1177                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1178                     (la->la_uid != tmp_la->la_uid)) &&
1179                     !mdd_capable(uc, CFS_CAP_CHOWN))
1180                         RETURN(-EPERM);
1181
1182                 /* If the user or group of a non-directory has been
1183                  * changed by a non-root user, remove the setuid bit.
1184                  * 19981026 David C Niemi <niemi@tux.org>
1185                  *
1186                  * Changed this to apply to all users, including root,
1187                  * to avoid some races. This is the behavior we had in
1188                  * 2.0. The check for non-root was definitely wrong
1189                  * for 2.2 anyway, as it should have been using
1190                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
1191                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1192                     !S_ISDIR(tmp_la->la_mode)) {
1193                         la->la_mode &= ~S_ISUID;
1194                         la->la_valid |= LA_MODE;
1195                 }
1196         }
1197
1198         /* Make sure caller can chgrp. */
1199         if (la->la_valid & LA_GID) {
1200                 if (la->la_gid == (gid_t) -1)
1201                         la->la_gid = tmp_la->la_gid;
1202                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1203                     ((la->la_gid != tmp_la->la_gid) &&
1204                     !lustre_in_group_p(uc, la->la_gid))) &&
1205                     !mdd_capable(uc, CFS_CAP_CHOWN))
1206                         RETURN(-EPERM);
1207
1208                 /* Likewise, if the user or group of a non-directory
1209                  * has been changed by a non-root user, remove the
1210                  * setgid bit UNLESS there is no group execute bit
1211                  * (this would be a file marked for mandatory
1212                  * locking).  19981026 David C Niemi <niemi@tux.org>
1213                  *
1214                  * Removed the fsuid check (see the comment above) --
1215                  * 19990830 SD. */
1216                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1217                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1218                         la->la_mode &= ~S_ISGID;
1219                         la->la_valid |= LA_MODE;
1220                 }
1221         }
1222
1223         /* For both Size-on-MDS case and truncate case,
1224          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1225          * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1226          * For SOM case, it is true, the MAY_WRITE perm has been checked
1227          * when open, no need check again. For truncate case, it is false,
1228          * the MAY_WRITE perm should be checked here. */
1229         if (ma->ma_attr_flags & MDS_SOM) {
1230                 /* For the "Size-on-MDS" setattr update, merge coming
1231                  * attributes with the set in the inode. BUG 10641 */
1232                 if ((la->la_valid & LA_ATIME) &&
1233                     (la->la_atime <= tmp_la->la_atime))
1234                         la->la_valid &= ~LA_ATIME;
1235
1236                 /* OST attributes do not have a priority over MDS attributes,
1237                  * so drop times if ctime is equal. */
1238                 if ((la->la_valid & LA_CTIME) &&
1239                     (la->la_ctime <= tmp_la->la_ctime))
1240                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1241         } else {
1242                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1243                         if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1244                               (uc->mu_fsuid == tmp_la->la_uid)) &&
1245                             !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1246                                 rc = mdd_permission_internal_locked(env, obj,
1247                                                             tmp_la, MAY_WRITE,
1248                                                             MOR_TGT_CHILD);
1249                                 if (rc)
1250                                         RETURN(rc);
1251                         }
1252                 }
1253                 if (la->la_valid & LA_CTIME) {
1254                         /* The pure setattr, it has the priority over what is
1255                          * already set, do not drop it if ctime is equal. */
1256                         if (la->la_ctime < tmp_la->la_ctime)
1257                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1258                                                   LA_CTIME);
1259                 }
1260         }
1261
1262         RETURN(0);
1263 }
1264
1265 /** Store a data change changelog record
1266  * If this fails, we must fail the whole transaction; we don't
1267  * want the change to commit without the log entry.
1268  * \param mdd_obj - mdd_object of change
1269  * \param handle - transacion handle
1270  */
1271 static int mdd_changelog_data_store(const struct lu_env     *env,
1272                                     struct mdd_device       *mdd,
1273                                     enum changelog_rec_type type,
1274                                     int                     flags,
1275                                     struct mdd_object       *mdd_obj,
1276                                     struct thandle          *handle)
1277 {
1278         const struct lu_fid *tfid = mdo2fid(mdd_obj);
1279         struct llog_changelog_rec *rec;
1280         struct lu_buf *buf;
1281         int reclen;
1282         int rc;
1283
1284         /* Not recording */
1285         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1286                 RETURN(0);
1287         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1288                 RETURN(0);
1289
1290         LASSERT(handle != NULL);
1291         LASSERT(mdd_obj != NULL);
1292
1293         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1294             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1295                 /* Don't need multiple updates in this log */
1296                 /* Don't check under lock - no big deal if we get an extra
1297                    entry */
1298                 RETURN(0);
1299         }
1300
1301         reclen = llog_data_len(sizeof(*rec));
1302         buf = mdd_buf_alloc(env, reclen);
1303         if (buf->lb_buf == NULL)
1304                 RETURN(-ENOMEM);
1305         rec = (struct llog_changelog_rec *)buf->lb_buf;
1306
1307         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1308         rec->cr.cr_type = (__u32)type;
1309         rec->cr.cr_tfid = *tfid;
1310         rec->cr.cr_namelen = 0;
1311         mdd_obj->mod_cltime = cfs_time_current_64();
1312
1313         rc = mdd_changelog_llog_write(mdd, rec, handle);
1314         if (rc < 0) {
1315                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1316                        rc, type, PFID(tfid));
1317                 return -EFAULT;
1318         }
1319
1320         return 0;
1321 }
1322
1323 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1324                   int flags, struct md_object *obj)
1325 {
1326         struct thandle *handle;
1327         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1328         struct mdd_device *mdd = mdo2mdd(obj);
1329         int rc;
1330         ENTRY;
1331
1332         handle = mdd_trans_start(env, mdd);
1333
1334         if (IS_ERR(handle))
1335                 return(PTR_ERR(handle));
1336
1337         rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1338                                       handle);
1339
1340         mdd_trans_stop(env, mdd, rc, handle);
1341
1342         RETURN(rc);
1343 }
1344
1345 /**
1346  * Should be called with write lock held.
1347  *
1348  * \see mdd_lma_set_locked().
1349  */
1350 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1351                        const struct md_attr *ma, struct thandle *handle)
1352 {
1353         struct mdd_thread_info *info = mdd_env_info(env);
1354         struct lu_buf *buf;
1355         struct lustre_mdt_attrs *lma =
1356                                 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1357         int lmasize = sizeof(struct lustre_mdt_attrs);
1358         int rc = 0;
1359
1360         ENTRY;
1361
1362         /* Either HSM or SOM part is not valid, we need to read it before */
1363         if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1364                 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1365                 if (rc <= 0)
1366                         RETURN(rc);
1367
1368                 lustre_lma_swab(lma);
1369         } else {
1370                 memset(lma, 0, lmasize);
1371         }
1372
1373         /* Copy HSM data */
1374         if (ma->ma_valid & MA_HSM) {
1375                 lma->lma_flags  |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1376                 lma->lma_compat |= LMAC_HSM;
1377         }
1378
1379         /* Copy SOM data */
1380         if (ma->ma_valid & MA_SOM) {
1381                 LASSERT(ma->ma_som != NULL);
1382                 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1383                         lma->lma_compat     &= ~LMAC_SOM;
1384                 } else {
1385                         lma->lma_compat     |= LMAC_SOM;
1386                         lma->lma_ioepoch     = ma->ma_som->msd_ioepoch;
1387                         lma->lma_som_size    = ma->ma_som->msd_size;
1388                         lma->lma_som_blocks  = ma->ma_som->msd_blocks;
1389                         lma->lma_som_mountid = ma->ma_som->msd_mountid;
1390                 }
1391         }
1392
1393         /* Copy FID */
1394         memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1395
1396         lustre_lma_swab(lma);
1397         buf = mdd_buf_get(env, lma, lmasize);
1398         rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1399
1400         RETURN(rc);
1401 }
1402
1403 /**
1404  * Save LMA extended attributes with data from \a ma.
1405  *
1406  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1407  * not, LMA EA will be first read from disk, modified and write back.
1408  *
1409  */
1410 static int mdd_lma_set_locked(const struct lu_env *env,
1411                               struct mdd_object *mdd_obj,
1412                               const struct md_attr *ma, struct thandle *handle)
1413 {
1414         int rc;
1415
1416         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1417         rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1418         mdd_write_unlock(env, mdd_obj);
1419         return rc;
1420 }
1421
1422 /* Precedence for choosing record type when multiple
1423  * attributes change: setattr > mtime > ctime > atime
1424  * (ctime changes when mtime does, plus chmod/chown.
1425  * atime and ctime are independent.) */
1426 static int mdd_attr_set_changelog(const struct lu_env *env,
1427                                   struct md_object *obj, struct thandle *handle,
1428                                   __u64 valid)
1429 {
1430         struct mdd_device *mdd = mdo2mdd(obj);
1431         int bits, type = 0;
1432
1433         bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1434         bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1435         bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1436         bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1437         bits = bits & mdd->mdd_cl.mc_mask;
1438         if (bits == 0)
1439                 return 0;
1440
1441         /* The record type is the lowest non-masked set bit */
1442         while (bits && ((bits & 1) == 0)) {
1443                 bits = bits >> 1;
1444                 type++;
1445         }
1446
1447         /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1448         return mdd_changelog_data_store(env, mdd, type, (int)valid,
1449                                         md2mdd_obj(obj), handle);
1450 }
1451
1452 /* set attr and LOV EA at once, return updated attr */
1453 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1454                         const struct md_attr *ma)
1455 {
1456         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1457         struct mdd_device *mdd = mdo2mdd(obj);
1458         struct thandle *handle;
1459         struct lov_mds_md *lmm = NULL;
1460         struct llog_cookie *logcookies = NULL;
1461         int  rc, lmm_size = 0, cookie_size = 0, chlog_cnt;
1462         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1463         struct obd_device *obd = mdd->mdd_obd_dev;
1464         struct mds_obd *mds = &obd->u.mds;
1465 #ifdef HAVE_QUOTA_SUPPORT
1466         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1467         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1468         int quota_opc = 0, block_count = 0;
1469         int inode_pending[MAXQUOTAS] = { 0, 0 };
1470         int block_pending[MAXQUOTAS] = { 0, 0 };
1471 #endif
1472         ENTRY;
1473
1474         *la_copy = ma->ma_attr;
1475         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1476         if (rc != 0)
1477                 RETURN(rc);
1478
1479         /* setattr on "close" only change atime, or do nothing */
1480         if (ma->ma_valid == MA_INODE &&
1481             ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1482                 RETURN(0);
1483
1484         /*TODO: add lock here*/
1485         /* start a log jounal handle if needed */
1486         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1487             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1488                 lmm_size = mdd_lov_mdsize(env, mdd);
1489                 lmm = mdd_max_lmm_get(env, mdd);
1490                 if (lmm == NULL)
1491                         GOTO(no_trans, rc = -ENOMEM);
1492
1493                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1494                                 XATTR_NAME_LOV);
1495
1496                 if (rc < 0)
1497                         GOTO(no_trans, rc);
1498         }
1499
1500         chlog_cnt = 1;
1501         if (la_copy->la_valid && !(la_copy->la_valid & LA_FLAGS) && lmm_size) {
1502                 chlog_cnt += (lmm->lmm_stripe_count >= 0) ?
1503                          lmm->lmm_stripe_count : mds->mds_lov_desc.ld_tgt_count;
1504         }
1505
1506         mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1507                                     MDD_TXN_ATTR_SET_OP, chlog_cnt);
1508         handle = mdd_trans_start(env, mdd);
1509         if (IS_ERR(handle))
1510                 GOTO(no_trans, rc = PTR_ERR(handle));
1511
1512         /* permission changes may require sync operation */
1513         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1514                 handle->th_sync |= mdd->mdd_sync_permission;
1515
1516         if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1517                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1518                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1519
1520 #ifdef HAVE_QUOTA_SUPPORT
1521         if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1522                 struct obd_export *exp = md_quota(env)->mq_exp;
1523                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1524
1525                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1526                 if (!rc) {
1527                         quota_opc = FSFILT_OP_SETATTR;
1528                         mdd_quota_wrapper(la_copy, qnids);
1529                         mdd_quota_wrapper(la_tmp, qoids);
1530                         /* get file quota for new owner */
1531                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1532                                         qnids, inode_pending, 1, NULL, 0,
1533                                         NULL, 0);
1534                         block_count = (la_tmp->la_blocks + 7) >> 3;
1535                         if (block_count) {
1536                                 void *data = NULL;
1537                                 mdd_data_get(env, mdd_obj, &data);
1538                                 /* get block quota for new owner */
1539                                 lquota_chkquota(mds_quota_interface_ref, obd,
1540                                                 exp, qnids, block_pending,
1541                                                 block_count, NULL,
1542                                                 LQUOTA_FLAGS_BLK, data, 1);
1543                         }
1544                 }
1545         }
1546 #endif
1547
1548         if (la_copy->la_valid & LA_FLAGS) {
1549                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1550                                                   handle, 1);
1551                 if (rc == 0)
1552                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1553         } else if (la_copy->la_valid) {            /* setattr */
1554                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1555                                                   handle, 1);
1556                 /* journal chown/chgrp in llog, just like unlink */
1557                 if (rc == 0 && lmm_size){
1558                         cookie_size = mdd_lov_cookiesize(env, mdd);
1559                         logcookies = mdd_max_cookie_get(env, mdd);
1560                         if (logcookies == NULL)
1561                                 GOTO(cleanup, rc = -ENOMEM);
1562
1563                         if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1564                                             logcookies, cookie_size) <= 0)
1565                                 logcookies = NULL;
1566                 }
1567         }
1568
1569         if (rc == 0 && ma->ma_valid & MA_LOV) {
1570                 cfs_umode_t mode;
1571
1572                 mode = mdd_object_type(mdd_obj);
1573                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1574                         rc = mdd_lsm_sanity_check(env, mdd_obj);
1575                         if (rc)
1576                                 GOTO(cleanup, rc);
1577
1578                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1579                                             ma->ma_lmm_size, handle, 1);
1580                 }
1581
1582         }
1583         if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1584                 cfs_umode_t mode;
1585
1586                 mode = mdd_object_type(mdd_obj);
1587                 if (S_ISREG(mode))
1588                         rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1589
1590         }
1591 cleanup:
1592         if (rc == 0)
1593                 rc = mdd_attr_set_changelog(env, obj, handle,
1594                                             ma->ma_attr.la_valid);
1595         mdd_trans_stop(env, mdd, rc, handle);
1596 no_trans:
1597         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1598                 /*set obd attr, if needed*/
1599                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1600                                            logcookies);
1601         }
1602 #ifdef HAVE_QUOTA_SUPPORT
1603         if (quota_opc) {
1604                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1605                                       inode_pending, 0);
1606                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1607                                       block_pending, 1);
1608                 /* Trigger dqrel/dqacq for original owner and new owner.
1609                  * If failed, the next call for lquota_chkquota will
1610                  * process it. */
1611                 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1612                               quota_opc);
1613         }
1614 #endif
1615         RETURN(rc);
1616 }
1617
1618 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1619                       const struct lu_buf *buf, const char *name, int fl,
1620                       struct thandle *handle)
1621 {
1622         int  rc;
1623         ENTRY;
1624
1625         mdd_write_lock(env, obj, MOR_TGT_CHILD);
1626         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1627         mdd_write_unlock(env, obj);
1628
1629         RETURN(rc);
1630 }
1631
1632 static int mdd_xattr_sanity_check(const struct lu_env *env,
1633                                   struct mdd_object *obj)
1634 {
1635         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1636         struct md_ucred *uc     = md_ucred(env);
1637         int rc;
1638         ENTRY;
1639
1640         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1641                 RETURN(-EPERM);
1642
1643         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1644         if (rc)
1645                 RETURN(rc);
1646
1647         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1648             !mdd_capable(uc, CFS_CAP_FOWNER))
1649                 RETURN(-EPERM);
1650
1651         RETURN(rc);
1652 }
1653
1654 /**
1655  * The caller should guarantee to update the object ctime
1656  * after xattr_set if needed.
1657  */
1658 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1659                          const struct lu_buf *buf, const char *name,
1660                          int fl)
1661 {
1662         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1663         struct mdd_device *mdd = mdo2mdd(obj);
1664         struct thandle *handle;
1665         int  rc;
1666         ENTRY;
1667
1668         rc = mdd_xattr_sanity_check(env, mdd_obj);
1669         if (rc)
1670                 RETURN(rc);
1671
1672         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 1);
1673         handle = mdd_trans_start(env, mdd);
1674         if (IS_ERR(handle))
1675                 RETURN(PTR_ERR(handle));
1676
1677         /* security-replated changes may require sync */
1678         if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1679                 handle->th_sync |= mdd->mdd_sync_permission;
1680
1681         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1682
1683         /* Only record user xattr changes */
1684         if ((rc == 0) && (strncmp("user.", name, 5) == 0))
1685                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1686                                               handle);
1687         mdd_trans_stop(env, mdd, rc, handle);
1688
1689         RETURN(rc);
1690 }
1691
1692 /**
1693  * The caller should guarantee to update the object ctime
1694  * after xattr_set if needed.
1695  */
1696 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1697                   const char *name)
1698 {
1699         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1700         struct mdd_device *mdd = mdo2mdd(obj);
1701         struct thandle *handle;
1702         int  rc;
1703         ENTRY;
1704
1705         rc = mdd_xattr_sanity_check(env, mdd_obj);
1706         if (rc)
1707                 RETURN(rc);
1708
1709         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 1);
1710         handle = mdd_trans_start(env, mdd);
1711         if (IS_ERR(handle))
1712                 RETURN(PTR_ERR(handle));
1713
1714         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1715         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1716                            mdd_object_capa(env, mdd_obj));
1717         mdd_write_unlock(env, mdd_obj);
1718
1719         /* Only record user xattr changes */
1720         if ((rc == 0) && (strncmp("user.", name, 5) != 0))
1721                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1722                                               handle);
1723
1724         mdd_trans_stop(env, mdd, rc, handle);
1725
1726         RETURN(rc);
1727 }
1728
1729 /* partial unlink */
1730 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1731                        struct md_attr *ma)
1732 {
1733         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1734         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1735         struct mdd_device *mdd = mdo2mdd(obj);
1736         struct thandle *handle;
1737 #ifdef HAVE_QUOTA_SUPPORT
1738         struct obd_device *obd = mdd->mdd_obd_dev;
1739         struct mds_obd *mds = &obd->u.mds;
1740         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1741         int quota_opc = 0;
1742 #endif
1743         int rc;
1744         ENTRY;
1745
1746         /*
1747          * Check -ENOENT early here because we need to get object type
1748          * to calculate credits before transaction start
1749          */
1750         if (!mdd_object_exists(mdd_obj))
1751                 RETURN(-ENOENT);
1752
1753         LASSERT(mdd_object_exists(mdd_obj) > 0);
1754
1755         rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP, 0);
1756         if (rc)
1757                 RETURN(rc);
1758
1759         handle = mdd_trans_start(env, mdd);
1760         if (IS_ERR(handle))
1761                 RETURN(-ENOMEM);
1762
1763         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1764
1765         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1766         if (rc)
1767                 GOTO(cleanup, rc);
1768
1769         __mdd_ref_del(env, mdd_obj, handle, 0);
1770
1771         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1772                 /* unlink dot */
1773                 __mdd_ref_del(env, mdd_obj, handle, 1);
1774         }
1775
1776         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1777         la_copy->la_ctime = ma->ma_attr.la_ctime;
1778
1779         la_copy->la_valid = LA_CTIME;
1780         rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1781         if (rc)
1782                 GOTO(cleanup, rc);
1783
1784         rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1785 #ifdef HAVE_QUOTA_SUPPORT
1786         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1787             ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1788                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1789                 mdd_quota_wrapper(&ma->ma_attr, qids);
1790         }
1791 #endif
1792
1793
1794         EXIT;
1795 cleanup:
1796         mdd_write_unlock(env, mdd_obj);
1797         mdd_trans_stop(env, mdd, rc, handle);
1798 #ifdef HAVE_QUOTA_SUPPORT
1799         if (quota_opc)
1800                 /* Trigger dqrel on the owner of child. If failed,
1801                  * the next call for lquota_chkquota will process it */
1802                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1803                               quota_opc);
1804 #endif
1805         return rc;
1806 }
1807
1808 /* partial operation */
1809 static int mdd_oc_sanity_check(const struct lu_env *env,
1810                                struct mdd_object *obj,
1811                                struct md_attr *ma)
1812 {
1813         int rc;
1814         ENTRY;
1815
1816         switch (ma->ma_attr.la_mode & S_IFMT) {
1817         case S_IFREG:
1818         case S_IFDIR:
1819         case S_IFLNK:
1820         case S_IFCHR:
1821         case S_IFBLK:
1822         case S_IFIFO:
1823         case S_IFSOCK:
1824                 rc = 0;
1825                 break;
1826         default:
1827                 rc = -EINVAL;
1828                 break;
1829         }
1830         RETURN(rc);
1831 }
1832
1833 static int mdd_object_create(const struct lu_env *env,
1834                              struct md_object *obj,
1835                              const struct md_op_spec *spec,
1836                              struct md_attr *ma)
1837 {
1838
1839         struct mdd_device *mdd = mdo2mdd(obj);
1840         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1841         const struct lu_fid *pfid = spec->u.sp_pfid;
1842         struct thandle *handle;
1843 #ifdef HAVE_QUOTA_SUPPORT
1844         struct obd_device *obd = mdd->mdd_obd_dev;
1845         struct obd_export *exp = md_quota(env)->mq_exp;
1846         struct mds_obd *mds = &obd->u.mds;
1847         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1848         int quota_opc = 0, block_count = 0;
1849         int inode_pending[MAXQUOTAS] = { 0, 0 };
1850         int block_pending[MAXQUOTAS] = { 0, 0 };
1851 #endif
1852         int rc = 0;
1853         ENTRY;
1854
1855 #ifdef HAVE_QUOTA_SUPPORT
1856         if (mds->mds_quota) {
1857                 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1858                 mdd_quota_wrapper(&ma->ma_attr, qids);
1859                 /* get file quota for child */
1860                 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1861                                 qids, inode_pending, 1, NULL, 0,
1862                                 NULL, 0);
1863                 switch (ma->ma_attr.la_mode & S_IFMT) {
1864                 case S_IFLNK:
1865                 case S_IFDIR:
1866                         block_count = 2;
1867                         break;
1868                 case S_IFREG:
1869                         block_count = 1;
1870                         break;
1871                 }
1872                 /* get block quota for child */
1873                 if (block_count)
1874                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1875                                         qids, block_pending, block_count,
1876                                         NULL, LQUOTA_FLAGS_BLK, NULL, 0);
1877         }
1878 #endif
1879
1880         mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP, 0);
1881         handle = mdd_trans_start(env, mdd);
1882         if (IS_ERR(handle))
1883                 GOTO(out_pending, rc = PTR_ERR(handle));
1884
1885         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1886         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1887         if (rc)
1888                 GOTO(unlock, rc);
1889
1890         rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1891         if (rc)
1892                 GOTO(unlock, rc);
1893
1894         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1895                 /* If creating the slave object, set slave EA here. */
1896                 int lmv_size = spec->u.sp_ea.eadatalen;
1897                 struct lmv_stripe_md *lmv;
1898
1899                 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1900                 LASSERT(lmv != NULL && lmv_size > 0);
1901
1902                 rc = __mdd_xattr_set(env, mdd_obj,
1903                                      mdd_buf_get_const(env, lmv, lmv_size),
1904                                      XATTR_NAME_LMV, 0, handle);
1905                 if (rc)
1906                         GOTO(unlock, rc);
1907
1908                 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1909                                            handle, 0);
1910         } else {
1911 #ifdef CONFIG_FS_POSIX_ACL
1912                 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1913                         struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1914
1915                         buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1916                         buf->lb_len = spec->u.sp_ea.eadatalen;
1917                         if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1918                                 rc = __mdd_acl_init(env, mdd_obj, buf,
1919                                                     &ma->ma_attr.la_mode,
1920                                                     handle);
1921                                 if (rc)
1922                                         GOTO(unlock, rc);
1923                                 else
1924                                         ma->ma_attr.la_valid |= LA_MODE;
1925                         }
1926
1927                         pfid = spec->u.sp_ea.fid;
1928                 }
1929 #endif
1930                 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1931                                            spec);
1932         }
1933         EXIT;
1934 unlock:
1935         if (rc == 0)
1936                 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1937         mdd_write_unlock(env, mdd_obj);
1938
1939         mdd_trans_stop(env, mdd, rc, handle);
1940 out_pending:
1941 #ifdef HAVE_QUOTA_SUPPORT
1942         if (quota_opc) {
1943                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1944                                       inode_pending, 0);
1945                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1946                                       block_pending, 1);
1947                 /* Trigger dqacq on the owner of child. If failed,
1948                  * the next call for lquota_chkquota will process it. */
1949                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1950                               quota_opc);
1951         }
1952 #endif
1953         return rc;
1954 }
1955
1956 /* partial link */
1957 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1958                        const struct md_attr *ma)
1959 {
1960         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1961         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1962         struct mdd_device *mdd = mdo2mdd(obj);
1963         struct thandle *handle;
1964         int rc;
1965         ENTRY;
1966
1967         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 0);
1968         handle = mdd_trans_start(env, mdd);
1969         if (IS_ERR(handle))
1970                 RETURN(-ENOMEM);
1971
1972         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1973         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1974         if (rc == 0)
1975                 __mdd_ref_add(env, mdd_obj, handle);
1976         mdd_write_unlock(env, mdd_obj);
1977         if (rc == 0) {
1978                 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1979                 la_copy->la_ctime = ma->ma_attr.la_ctime;
1980
1981                 la_copy->la_valid = LA_CTIME;
1982                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1983                                                         handle, 0);
1984         }
1985         mdd_trans_stop(env, mdd, 0, handle);
1986
1987         RETURN(rc);
1988 }
1989
1990 /*
1991  * do NOT or the MAY_*'s, you'll get the weakest
1992  */
1993 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1994 {
1995         int res = 0;
1996
1997         /* Sadly, NFSD reopens a file repeatedly during operation, so the
1998          * "acc_mode = 0" allowance for newly-created files isn't honoured.
1999          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
2000          * owner can write to a file even if it is marked readonly to hide
2001          * its brokenness. (bug 5781) */
2002         if (flags & MDS_OPEN_OWNEROVERRIDE) {
2003                 struct md_ucred *uc = md_ucred(env);
2004
2005                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
2006                     (la->la_uid == uc->mu_fsuid))
2007                         return 0;
2008         }
2009
2010         if (flags & FMODE_READ)
2011                 res |= MAY_READ;
2012         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2013                 res |= MAY_WRITE;
2014         if (flags & MDS_FMODE_EXEC)
2015                 res = MAY_EXEC;
2016         return res;
2017 }
2018
2019 static int mdd_open_sanity_check(const struct lu_env *env,
2020                                  struct mdd_object *obj, int flag)
2021 {
2022         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2023         int mode, rc;
2024         ENTRY;
2025
2026         /* EEXIST check */
2027         if (mdd_is_dead_obj(obj))
2028                 RETURN(-ENOENT);
2029
2030         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
2031         if (rc)
2032                RETURN(rc);
2033
2034         if (S_ISLNK(tmp_la->la_mode))
2035                 RETURN(-ELOOP);
2036
2037         mode = accmode(env, tmp_la, flag);
2038
2039         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2040                 RETURN(-EISDIR);
2041
2042         if (!(flag & MDS_OPEN_CREATED)) {
2043                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
2044                 if (rc)
2045                         RETURN(rc);
2046         }
2047
2048         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2049             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2050                 flag &= ~MDS_OPEN_TRUNC;
2051
2052         /* For writing append-only file must open it with append mode. */
2053         if (mdd_is_append(obj)) {
2054                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2055                         RETURN(-EPERM);
2056                 if (flag & MDS_OPEN_TRUNC)
2057                         RETURN(-EPERM);
2058         }
2059
2060 #if 0
2061         /*
2062          * Now, flag -- O_NOATIME does not be packed by client.
2063          */
2064         if (flag & O_NOATIME) {
2065                 struct md_ucred *uc = md_ucred(env);
2066
2067                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2068                     (uc->mu_valid == UCRED_NEW)) &&
2069                     (uc->mu_fsuid != tmp_la->la_uid) &&
2070                     !mdd_capable(uc, CFS_CAP_FOWNER))
2071                         RETURN(-EPERM);
2072         }
2073 #endif
2074
2075         RETURN(0);
2076 }
2077
2078 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2079                     int flags)
2080 {
2081         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2082         int rc = 0;
2083
2084         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2085
2086         rc = mdd_open_sanity_check(env, mdd_obj, flags);
2087         if (rc == 0)
2088                 mdd_obj->mod_count++;
2089
2090         mdd_write_unlock(env, mdd_obj);
2091         return rc;
2092 }
2093
2094 /* return md_attr back,
2095  * if it is last unlink then return lov ea + llog cookie*/
2096 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2097                     struct md_attr *ma)
2098 {
2099         int rc = 0;
2100         ENTRY;
2101
2102         if (S_ISREG(mdd_object_type(obj))) {
2103                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2104                  * Caller must be ready for that. */
2105
2106                 rc = __mdd_lmm_get(env, obj, ma);
2107                 if ((ma->ma_valid & MA_LOV))
2108                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2109                                             obj, ma);
2110         }
2111         RETURN(rc);
2112 }
2113
2114 /*
2115  * No permission check is needed.
2116  */
2117 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2118                      struct md_attr *ma)
2119 {
2120         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2121         struct mdd_device *mdd = mdo2mdd(obj);
2122         struct thandle    *handle = NULL;
2123         int rc;
2124         int reset = 1;
2125
2126 #ifdef HAVE_QUOTA_SUPPORT
2127         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2128         struct mds_obd *mds = &obd->u.mds;
2129         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2130         int quota_opc = 0;
2131 #endif
2132         ENTRY;
2133
2134         if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2135                 mdd_obj->mod_count--;
2136
2137                 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2138                         CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2139                                "list\n", PFID(mdd_object_fid(mdd_obj)));
2140                 RETURN(0);
2141         }
2142
2143         /* check without any lock */
2144         if (mdd_obj->mod_count == 1 &&
2145             (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2146  again:
2147                 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP, 0);
2148                 if (rc)
2149                         RETURN(rc);
2150                 handle = mdd_trans_start(env, mdo2mdd(obj));
2151                 if (IS_ERR(handle))
2152                         RETURN(PTR_ERR(handle));
2153         }
2154
2155         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2156         if (handle == NULL && mdd_obj->mod_count == 1 &&
2157             (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2158                 mdd_write_unlock(env, mdd_obj);
2159                 goto again;
2160         }
2161
2162         /* release open count */
2163         mdd_obj->mod_count --;
2164
2165         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2166                 /* remove link to object from orphan index */
2167                 rc = __mdd_orphan_del(env, mdd_obj, handle);
2168                 if (rc == 0) {
2169                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2170                                "list, OSS objects to be destroyed.\n",
2171                                PFID(mdd_object_fid(mdd_obj)));
2172                 } else {
2173                         CERROR("Object "DFID" can not be deleted from orphan "
2174                                 "list, maybe cause OST objects can not be "
2175                                 "destroyed (err: %d).\n",
2176                                 PFID(mdd_object_fid(mdd_obj)), rc);
2177                         /* If object was not deleted from orphan list, do not
2178                          * destroy OSS objects, which will be done when next
2179                          * recovery. */
2180                         GOTO(out, rc);
2181                 }
2182         }
2183
2184         rc = mdd_iattr_get(env, mdd_obj, ma);
2185         /* Object maybe not in orphan list originally, it is rare case for
2186          * mdd_finish_unlink() failure. */
2187         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
2188 #ifdef HAVE_QUOTA_SUPPORT
2189                 if (mds->mds_quota) {
2190                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2191                         mdd_quota_wrapper(&ma->ma_attr, qids);
2192                 }
2193 #endif
2194                 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2195                 if (ma->ma_valid & MA_FLAGS &&
2196                     ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2197                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2198                 } else {
2199                         rc = mdd_object_kill(env, mdd_obj, ma);
2200                         if (rc == 0)
2201                                 reset = 0;
2202                 }
2203
2204                 if (rc != 0)
2205                         CERROR("Error when prepare to delete Object "DFID" , "
2206                                "which will cause OST objects can not be "
2207                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
2208         }
2209         EXIT;
2210
2211 out:
2212         if (reset)
2213                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2214
2215         mdd_write_unlock(env, mdd_obj);
2216         if (handle != NULL)
2217                 mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
2218 #ifdef HAVE_QUOTA_SUPPORT
2219         if (quota_opc)
2220                 /* Trigger dqrel on the owner of child. If failed,
2221                  * the next call for lquota_chkquota will process it */
2222                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2223                               quota_opc);
2224 #endif
2225         return rc;
2226 }
2227
2228 /*
2229  * Permission check is done when open,
2230  * no need check again.
2231  */
2232 static int mdd_readpage_sanity_check(const struct lu_env *env,
2233                                      struct mdd_object *obj)
2234 {
2235         struct dt_object *next = mdd_object_child(obj);
2236         int rc;
2237         ENTRY;
2238
2239         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2240                 rc = 0;
2241         else
2242                 rc = -ENOTDIR;
2243
2244         RETURN(rc);
2245 }
2246
2247 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2248                               struct lu_dirpage *dp, int nob,
2249                               const struct dt_it_ops *iops, struct dt_it *it,
2250                               __u32 attr)
2251 {
2252         void                   *area = dp;
2253         int                     result;
2254         __u64                   hash = 0;
2255         struct lu_dirent       *ent;
2256         struct lu_dirent       *last = NULL;
2257         int                     first = 1;
2258
2259         memset(area, 0, sizeof (*dp));
2260         area += sizeof (*dp);
2261         nob  -= sizeof (*dp);
2262
2263         ent  = area;
2264         do {
2265                 int    len;
2266                 int    recsize;
2267
2268                 len  = iops->key_size(env, it);
2269
2270                 /* IAM iterator can return record with zero len. */
2271                 if (len == 0)
2272                         goto next;
2273
2274                 hash = iops->store(env, it);
2275                 if (unlikely(first)) {
2276                         first = 0;
2277                         dp->ldp_hash_start = cpu_to_le64(hash);
2278                 }
2279
2280                 /* calculate max space required for lu_dirent */
2281                 recsize = lu_dirent_calc_size(len, attr);
2282
2283                 if (nob >= recsize) {
2284                         result = iops->rec(env, it, ent, attr);
2285                         if (result == -ESTALE)
2286                                 goto next;
2287                         if (result != 0)
2288                                 goto out;
2289
2290                         /* osd might not able to pack all attributes,
2291                          * so recheck rec length */
2292                         recsize = le16_to_cpu(ent->lde_reclen);
2293                 } else {
2294                         result = (last != NULL) ? 0 :-EINVAL;
2295                         goto out;
2296                 }
2297                 last = ent;
2298                 ent = (void *)ent + recsize;
2299                 nob -= recsize;
2300
2301 next:
2302                 result = iops->next(env, it);
2303                 if (result == -ESTALE)
2304                         goto next;
2305         } while (result == 0);
2306
2307 out:
2308         dp->ldp_hash_end = cpu_to_le64(hash);
2309         if (last != NULL) {
2310                 if (last->lde_hash == dp->ldp_hash_end)
2311                         dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2312                 last->lde_reclen = 0; /* end mark */
2313         }
2314         return result;
2315 }
2316
2317 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2318                           const struct lu_rdpg *rdpg)
2319 {
2320         struct dt_it      *it;
2321         struct dt_object  *next = mdd_object_child(obj);
2322         const struct dt_it_ops  *iops;
2323         struct page       *pg;
2324         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2325         int i;
2326         int nlupgs = 0;
2327         int rc;
2328         int nob;
2329
2330         LASSERT(rdpg->rp_pages != NULL);
2331         LASSERT(next->do_index_ops != NULL);
2332
2333         if (rdpg->rp_count <= 0)
2334                 return -EFAULT;
2335
2336         /*
2337          * iterate through directory and fill pages from @rdpg
2338          */
2339         iops = &next->do_index_ops->dio_it;
2340         it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
2341         if (IS_ERR(it))
2342                 return PTR_ERR(it);
2343
2344         rc = iops->load(env, it, rdpg->rp_hash);
2345
2346         if (rc == 0) {
2347                 /*
2348                  * Iterator didn't find record with exactly the key requested.
2349                  *
2350                  * It is currently either
2351                  *
2352                  *     - positioned above record with key less than
2353                  *     requested---skip it.
2354                  *
2355                  *     - or not positioned at all (is in IAM_IT_SKEWED
2356                  *     state)---position it on the next item.
2357                  */
2358                 rc = iops->next(env, it);
2359         } else if (rc > 0)
2360                 rc = 0;
2361
2362         /*
2363          * At this point and across for-loop:
2364          *
2365          *  rc == 0 -> ok, proceed.
2366          *  rc >  0 -> end of directory.
2367          *  rc <  0 -> error.
2368          */
2369         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2370              i++, nob -= CFS_PAGE_SIZE) {
2371                 struct lu_dirpage *dp;
2372
2373                 LASSERT(i < rdpg->rp_npages);
2374                 pg = rdpg->rp_pages[i];
2375                 dp = cfs_kmap(pg);
2376 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2377 repeat:
2378 #endif
2379                 rc = mdd_dir_page_build(env, mdd, dp,
2380                                         min_t(int, nob, LU_PAGE_SIZE),
2381                                         iops, it, rdpg->rp_attrs);
2382                 if (rc > 0) {
2383                         /*
2384                          * end of directory.
2385                          */
2386                         dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2387                         nlupgs++;
2388                 } else if (rc < 0) {
2389                         CWARN("build page failed: %d!\n", rc);
2390                 } else {
2391                         nlupgs++;
2392 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2393                         dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2394                         if ((unsigned long)dp & ~CFS_PAGE_MASK)
2395                                 goto repeat;
2396 #endif
2397                 }
2398                 cfs_kunmap(pg);
2399         }
2400         if (rc >= 0) {
2401                 struct lu_dirpage *dp;
2402
2403                 dp = cfs_kmap(rdpg->rp_pages[0]);
2404                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2405                 if (nlupgs == 0) {
2406                         /*
2407                          * No pages were processed, mark this for first page
2408                          * and send back.
2409                          */
2410                         dp->ldp_flags  = cpu_to_le32(LDF_EMPTY);
2411                         nlupgs = 1;
2412                 }
2413                 cfs_kunmap(rdpg->rp_pages[0]);
2414
2415                 rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
2416         }
2417         iops->put(env, it);
2418         iops->fini(env, it);
2419
2420         return rc;
2421 }
2422
2423 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2424                  const struct lu_rdpg *rdpg)
2425 {
2426         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2427         int rc;
2428         ENTRY;
2429
2430         LASSERT(mdd_object_exists(mdd_obj));
2431
2432         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2433         rc = mdd_readpage_sanity_check(env, mdd_obj);
2434         if (rc)
2435                 GOTO(out_unlock, rc);
2436
2437         if (mdd_is_dead_obj(mdd_obj)) {
2438                 struct page *pg;
2439                 struct lu_dirpage *dp;
2440
2441                 /*
2442                  * According to POSIX, please do not return any entry to client:
2443                  * even dot and dotdot should not be returned.
2444                  */
2445                 CWARN("readdir from dead object: "DFID"\n",
2446                         PFID(mdd_object_fid(mdd_obj)));
2447
2448                 if (rdpg->rp_count <= 0)
2449                         GOTO(out_unlock, rc = -EFAULT);
2450                 LASSERT(rdpg->rp_pages != NULL);
2451
2452                 pg = rdpg->rp_pages[0];
2453                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2454                 memset(dp, 0 , sizeof(struct lu_dirpage));
2455                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2456                 dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
2457                 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2458                 cfs_kunmap(pg);
2459                 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2460         }
2461
2462         rc = __mdd_readpage(env, mdd_obj, rdpg);
2463
2464         EXIT;
2465 out_unlock:
2466         mdd_read_unlock(env, mdd_obj);
2467         return rc;
2468 }
2469
2470 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2471 {
2472         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2473         struct dt_object *next;
2474
2475         LASSERT(mdd_object_exists(mdd_obj));
2476         next = mdd_object_child(mdd_obj);
2477         return next->do_ops->do_object_sync(env, next);
2478 }
2479
2480 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2481                                         struct md_object *obj)
2482 {
2483         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2484
2485         LASSERT(mdd_object_exists(mdd_obj));
2486         return do_version_get(env, mdd_object_child(mdd_obj));
2487 }
2488
2489 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2490                             dt_obj_version_t version)
2491 {
2492         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2493
2494         LASSERT(mdd_object_exists(mdd_obj));
2495         do_version_set(env, mdd_object_child(mdd_obj), version);
2496 }
2497
2498 const struct md_object_operations mdd_obj_ops = {
2499         .moo_permission    = mdd_permission,
2500         .moo_attr_get      = mdd_attr_get,
2501         .moo_attr_set      = mdd_attr_set,
2502         .moo_xattr_get     = mdd_xattr_get,
2503         .moo_xattr_set     = mdd_xattr_set,
2504         .moo_xattr_list    = mdd_xattr_list,
2505         .moo_xattr_del     = mdd_xattr_del,
2506         .moo_object_create = mdd_object_create,
2507         .moo_ref_add       = mdd_ref_add,
2508         .moo_ref_del       = mdd_ref_del,
2509         .moo_open          = mdd_open,
2510         .moo_close         = mdd_close,
2511         .moo_readpage      = mdd_readpage,
2512         .moo_readlink      = mdd_readlink,
2513         .moo_changelog     = mdd_changelog,
2514         .moo_capa_get      = mdd_capa_get,
2515         .moo_object_sync   = mdd_object_sync,
2516         .moo_version_get   = mdd_version_get,
2517         .moo_version_set   = mdd_version_set,
2518         .moo_path          = mdd_path,
2519         .moo_file_lock     = mdd_file_lock,
2520         .moo_file_unlock   = mdd_file_unlock,
2521 };