Whamcloud - gitweb
LU-601 mdd: Fix transaction credits
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * Copyright (c) 2011 Whamcloud, Inc.
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  *
39  * lustre/mdd/mdd_object.c
40  *
41  * Lustre Metadata Server (mdd) routines
42  *
43  * Author: Wang Di <wangdi@clusterfs.com>
44  */
45
46 #ifndef EXPORT_SYMTAB
47 # define EXPORT_SYMTAB
48 #endif
49 #define DEBUG_SUBSYSTEM S_MDS
50
51 #include <linux/module.h>
52 #ifdef HAVE_EXT4_LDISKFS
53 #include <ldiskfs/ldiskfs_jbd2.h>
54 #else
55 #include <linux/jbd.h>
56 #endif
57 #include <obd.h>
58 #include <obd_class.h>
59 #include <obd_support.h>
60 #include <lprocfs_status.h>
61 /* fid_be_cpu(), fid_cpu_to_be(). */
62 #include <lustre_fid.h>
63
64 #include <lustre_param.h>
65 #ifdef HAVE_EXT4_LDISKFS
66 #include <ldiskfs/ldiskfs.h>
67 #else
68 #include <linux/ldiskfs_fs.h>
69 #endif
70 #include <lustre_mds.h>
71 #include <lustre/lustre_idl.h>
72
73 #include "mdd_internal.h"
74
75 static const struct lu_object_operations mdd_lu_obj_ops;
76
77 static int mdd_xattr_get(const struct lu_env *env,
78                          struct md_object *obj, struct lu_buf *buf,
79                          const char *name);
80
81 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
82                  void **data)
83 {
84         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
85                  PFID(mdd_object_fid(obj)));
86         mdo_data_get(env, obj, data);
87         return 0;
88 }
89
90 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
91                struct lu_attr *la, struct lustre_capa *capa)
92 {
93         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
94                  PFID(mdd_object_fid(obj)));
95         return mdo_attr_get(env, obj, la, capa);
96 }
97
98 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
99 {
100         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
101
102         if (flags & LUSTRE_APPEND_FL)
103                 obj->mod_flags |= APPEND_OBJ;
104
105         if (flags & LUSTRE_IMMUTABLE_FL)
106                 obj->mod_flags |= IMMUTE_OBJ;
107 }
108
109 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
110 {
111         struct mdd_thread_info *info;
112
113         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
114         LASSERT(info != NULL);
115         return info;
116 }
117
118 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
119 {
120         struct lu_buf *buf;
121
122         buf = &mdd_env_info(env)->mti_buf;
123         buf->lb_buf = area;
124         buf->lb_len = len;
125         return buf;
126 }
127
128 void mdd_buf_put(struct lu_buf *buf)
129 {
130         if (buf == NULL || buf->lb_buf == NULL)
131                 return;
132         OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
133         buf->lb_buf = NULL;
134         buf->lb_len = 0;
135 }
136
137 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
138                                        const void *area, ssize_t len)
139 {
140         struct lu_buf *buf;
141
142         buf = &mdd_env_info(env)->mti_buf;
143         buf->lb_buf = (void *)area;
144         buf->lb_len = len;
145         return buf;
146 }
147
148 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
149 {
150         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
151
152         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
153                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
154                 buf->lb_buf = NULL;
155         }
156         if (buf->lb_buf == NULL) {
157                 buf->lb_len = len;
158                 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
159                 if (buf->lb_buf == NULL)
160                         buf->lb_len = 0;
161         }
162         return buf;
163 }
164
165 /** Increase the size of the \a mti_big_buf.
166  * preserves old data in buffer
167  * old buffer remains unchanged on error
168  * \retval 0 or -ENOMEM
169  */
170 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
171 {
172         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
173         struct lu_buf buf;
174
175         LASSERT(len >= oldbuf->lb_len);
176         OBD_ALLOC_LARGE(buf.lb_buf, len);
177
178         if (buf.lb_buf == NULL)
179                 return -ENOMEM;
180
181         buf.lb_len = len;
182         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
183
184         OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
185
186         memcpy(oldbuf, &buf, sizeof(buf));
187
188         return 0;
189 }
190
191 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
192                                        struct mdd_device *mdd)
193 {
194         struct mdd_thread_info *mti = mdd_env_info(env);
195         int                     max_cookie_size;
196
197         max_cookie_size = mdd_lov_cookiesize(env, mdd);
198         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
199                 if (mti->mti_max_cookie)
200                         OBD_FREE_LARGE(mti->mti_max_cookie,
201                                        mti->mti_max_cookie_size);
202                 mti->mti_max_cookie = NULL;
203                 mti->mti_max_cookie_size = 0;
204         }
205         if (unlikely(mti->mti_max_cookie == NULL)) {
206                 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
207                 if (likely(mti->mti_max_cookie != NULL))
208                         mti->mti_max_cookie_size = max_cookie_size;
209         }
210         if (likely(mti->mti_max_cookie != NULL))
211                 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
212         return mti->mti_max_cookie;
213 }
214
215 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
216                                    struct mdd_device *mdd)
217 {
218         struct mdd_thread_info *mti = mdd_env_info(env);
219         int                     max_lmm_size;
220
221         max_lmm_size = mdd_lov_mdsize(env, mdd);
222         if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
223                 if (mti->mti_max_lmm)
224                         OBD_FREE_LARGE(mti->mti_max_lmm, mti->mti_max_lmm_size);
225                 mti->mti_max_lmm = NULL;
226                 mti->mti_max_lmm_size = 0;
227         }
228         if (unlikely(mti->mti_max_lmm == NULL)) {
229                 OBD_ALLOC_LARGE(mti->mti_max_lmm, max_lmm_size);
230                 if (likely(mti->mti_max_lmm != NULL))
231                         mti->mti_max_lmm_size = max_lmm_size;
232         }
233         return mti->mti_max_lmm;
234 }
235
236 struct lu_object *mdd_object_alloc(const struct lu_env *env,
237                                    const struct lu_object_header *hdr,
238                                    struct lu_device *d)
239 {
240         struct mdd_object *mdd_obj;
241
242         OBD_ALLOC_PTR(mdd_obj);
243         if (mdd_obj != NULL) {
244                 struct lu_object *o;
245
246                 o = mdd2lu_obj(mdd_obj);
247                 lu_object_init(o, NULL, d);
248                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
249                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
250                 mdd_obj->mod_count = 0;
251                 o->lo_ops = &mdd_lu_obj_ops;
252                 return o;
253         } else {
254                 return NULL;
255         }
256 }
257
258 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
259                            const struct lu_object_conf *unused)
260 {
261         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
262         struct mdd_object *mdd_obj = lu2mdd_obj(o);
263         struct lu_object  *below;
264         struct lu_device  *under;
265         ENTRY;
266
267         mdd_obj->mod_cltime = 0;
268         under = &d->mdd_child->dd_lu_dev;
269         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
270         mdd_pdlock_init(mdd_obj);
271         if (below == NULL)
272                 RETURN(-ENOMEM);
273
274         lu_object_add(o, below);
275
276         RETURN(0);
277 }
278
279 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
280 {
281         if (lu_object_exists(o))
282                 return mdd_get_flags(env, lu2mdd_obj(o));
283         else
284                 return 0;
285 }
286
287 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
288 {
289         struct mdd_object *mdd = lu2mdd_obj(o);
290
291         lu_object_fini(o);
292         OBD_FREE_PTR(mdd);
293 }
294
295 static int mdd_object_print(const struct lu_env *env, void *cookie,
296                             lu_printer_t p, const struct lu_object *o)
297 {
298         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
299         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
300                     "valid=%x, cltime="LPU64", flags=%lx)",
301                     mdd, mdd->mod_count, mdd->mod_valid,
302                     mdd->mod_cltime, mdd->mod_flags);
303 }
304
305 static const struct lu_object_operations mdd_lu_obj_ops = {
306         .loo_object_init    = mdd_object_init,
307         .loo_object_start   = mdd_object_start,
308         .loo_object_free    = mdd_object_free,
309         .loo_object_print   = mdd_object_print,
310 };
311
312 struct mdd_object *mdd_object_find(const struct lu_env *env,
313                                    struct mdd_device *d,
314                                    const struct lu_fid *f)
315 {
316         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
317 }
318
319 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
320                         const char *path, struct lu_fid *fid)
321 {
322         struct lu_buf *buf;
323         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
324         struct mdd_object *obj;
325         struct lu_name *lname = &mdd_env_info(env)->mti_name;
326         char *name;
327         int rc = 0;
328         ENTRY;
329
330         /* temp buffer for path element */
331         buf = mdd_buf_alloc(env, PATH_MAX);
332         if (buf->lb_buf == NULL)
333                 RETURN(-ENOMEM);
334
335         lname->ln_name = name = buf->lb_buf;
336         lname->ln_namelen = 0;
337         *f = mdd->mdd_root_fid;
338
339         while(1) {
340                 while (*path == '/')
341                         path++;
342                 if (*path == '\0')
343                         break;
344                 while (*path != '/' && *path != '\0') {
345                         *name = *path;
346                         path++;
347                         name++;
348                         lname->ln_namelen++;
349                 }
350
351                 *name = '\0';
352                 /* find obj corresponding to fid */
353                 obj = mdd_object_find(env, mdd, f);
354                 if (obj == NULL)
355                         GOTO(out, rc = -EREMOTE);
356                 if (IS_ERR(obj))
357                         GOTO(out, rc = PTR_ERR(obj));
358                 /* get child fid from parent and name */
359                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
360                 mdd_object_put(env, obj);
361                 if (rc)
362                         break;
363
364                 name = buf->lb_buf;
365                 lname->ln_namelen = 0;
366         }
367
368         if (!rc)
369                 *fid = *f;
370 out:
371         RETURN(rc);
372 }
373
374 /** The maximum depth that fid2path() will search.
375  * This is limited only because we want to store the fids for
376  * historical path lookup purposes.
377  */
378 #define MAX_PATH_DEPTH 100
379
380 /** mdd_path() lookup structure. */
381 struct path_lookup_info {
382         __u64                pli_recno;        /**< history point */
383         __u64                pli_currec;       /**< current record */
384         struct lu_fid        pli_fid;
385         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
386         struct mdd_object   *pli_mdd_obj;
387         char                *pli_path;         /**< full path */
388         int                  pli_pathlen;
389         int                  pli_linkno;       /**< which hardlink to follow */
390         int                  pli_fidcount;     /**< number of \a pli_fids */
391 };
392
393 static int mdd_path_current(const struct lu_env *env,
394                             struct path_lookup_info *pli)
395 {
396         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
397         struct mdd_object *mdd_obj;
398         struct lu_buf     *buf = NULL;
399         struct link_ea_header *leh;
400         struct link_ea_entry  *lee;
401         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
402         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
403         char *ptr;
404         int reclen;
405         int rc;
406         ENTRY;
407
408         ptr = pli->pli_path + pli->pli_pathlen - 1;
409         *ptr = 0;
410         --ptr;
411         pli->pli_fidcount = 0;
412         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
413
414         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
415                 mdd_obj = mdd_object_find(env, mdd,
416                                           &pli->pli_fids[pli->pli_fidcount]);
417                 if (mdd_obj == NULL)
418                         GOTO(out, rc = -EREMOTE);
419                 if (IS_ERR(mdd_obj))
420                         GOTO(out, rc = PTR_ERR(mdd_obj));
421                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
422                 if (rc <= 0) {
423                         mdd_object_put(env, mdd_obj);
424                         if (rc == -1)
425                                 rc = -EREMOTE;
426                         else if (rc == 0)
427                                 /* Do I need to error out here? */
428                                 rc = -ENOENT;
429                         GOTO(out, rc);
430                 }
431
432                 /* Get parent fid and object name */
433                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
434                 buf = mdd_links_get(env, mdd_obj);
435                 mdd_read_unlock(env, mdd_obj);
436                 mdd_object_put(env, mdd_obj);
437                 if (IS_ERR(buf))
438                         GOTO(out, rc = PTR_ERR(buf));
439
440                 leh = buf->lb_buf;
441                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
442                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
443
444                 /* If set, use link #linkno for path lookup, otherwise use
445                    link #0.  Only do this for the final path element. */
446                 if ((pli->pli_fidcount == 0) &&
447                     (pli->pli_linkno < leh->leh_reccount)) {
448                         int count;
449                         for (count = 0; count < pli->pli_linkno; count++) {
450                                 lee = (struct link_ea_entry *)
451                                      ((char *)lee + reclen);
452                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
453                         }
454                         if (pli->pli_linkno < leh->leh_reccount - 1)
455                                 /* indicate to user there are more links */
456                                 pli->pli_linkno++;
457                 }
458
459                 /* Pack the name in the end of the buffer */
460                 ptr -= tmpname->ln_namelen;
461                 if (ptr - 1 <= pli->pli_path)
462                         GOTO(out, rc = -EOVERFLOW);
463                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
464                 *(--ptr) = '/';
465
466                 /* Store the parent fid for historic lookup */
467                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
468                         GOTO(out, rc = -EOVERFLOW);
469                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
470         }
471
472         /* Verify that our path hasn't changed since we started the lookup.
473            Record the current index, and verify the path resolves to the
474            same fid. If it does, then the path is correct as of this index. */
475         cfs_spin_lock(&mdd->mdd_cl.mc_lock);
476         pli->pli_currec = mdd->mdd_cl.mc_index;
477         cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
478         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
479         if (rc) {
480                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
481                 GOTO (out, rc = -EAGAIN);
482         }
483         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
484                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
485                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
486                        PFID(&pli->pli_fid));
487                 GOTO(out, rc = -EAGAIN);
488         }
489         ptr++; /* skip leading / */
490         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
491
492         EXIT;
493 out:
494         if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
495                 /* if we vmalloced a large buffer drop it */
496                 mdd_buf_put(buf);
497
498         return rc;
499 }
500
501 static int mdd_path_historic(const struct lu_env *env,
502                              struct path_lookup_info *pli)
503 {
504         return 0;
505 }
506
507 /* Returns the full path to this fid, as of changelog record recno. */
508 static int mdd_path(const struct lu_env *env, struct md_object *obj,
509                     char *path, int pathlen, __u64 *recno, int *linkno)
510 {
511         struct path_lookup_info *pli;
512         int tries = 3;
513         int rc = -EAGAIN;
514         ENTRY;
515
516         if (pathlen < 3)
517                 RETURN(-EOVERFLOW);
518
519         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
520                 path[0] = '\0';
521                 RETURN(0);
522         }
523
524         OBD_ALLOC_PTR(pli);
525         if (pli == NULL)
526                 RETURN(-ENOMEM);
527
528         pli->pli_mdd_obj = md2mdd_obj(obj);
529         pli->pli_recno = *recno;
530         pli->pli_path = path;
531         pli->pli_pathlen = pathlen;
532         pli->pli_linkno = *linkno;
533
534         /* Retry multiple times in case file is being moved */
535         while (tries-- && rc == -EAGAIN)
536                 rc = mdd_path_current(env, pli);
537
538         /* For historical path lookup, the current links may not have existed
539          * at "recno" time.  We must switch over to earlier links/parents
540          * by using the changelog records.  If the earlier parent doesn't
541          * exist, we must search back through the changelog to reconstruct
542          * its parents, then check if it exists, etc.
543          * We may ignore this problem for the initial implementation and
544          * state that an "original" hardlink must still exist for us to find
545          * historic path name. */
546         if (pli->pli_recno != -1) {
547                 rc = mdd_path_historic(env, pli);
548         } else {
549                 *recno = pli->pli_currec;
550                 /* Return next link index to caller */
551                 *linkno = pli->pli_linkno;
552         }
553
554         OBD_FREE_PTR(pli);
555
556         RETURN (rc);
557 }
558
559 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
560 {
561         struct lu_attr *la = &mdd_env_info(env)->mti_la;
562         int rc;
563
564         ENTRY;
565         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
566         if (rc == 0) {
567                 mdd_flags_xlate(obj, la->la_flags);
568                 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
569                         obj->mod_flags |= MNLINK_OBJ;
570         }
571         RETURN(rc);
572 }
573
574 /* get only inode attributes */
575 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
576                   struct md_attr *ma)
577 {
578         int rc = 0;
579         ENTRY;
580
581         if (ma->ma_valid & MA_INODE)
582                 RETURN(0);
583
584         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
585                           mdd_object_capa(env, mdd_obj));
586         if (rc == 0)
587                 ma->ma_valid |= MA_INODE;
588         RETURN(rc);
589 }
590
591 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
592 {
593         struct lov_desc *ldesc;
594         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
595         struct lov_user_md *lum = (struct lov_user_md*)lmm;
596         ENTRY;
597
598         if (!lum)
599                 RETURN(0);
600
601         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
602         LASSERT(ldesc != NULL);
603
604         lum->lmm_magic = LOV_MAGIC_V1;
605         lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
606         lum->lmm_pattern = ldesc->ld_pattern;
607         lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
608         lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
609         lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
610
611         RETURN(sizeof(*lum));
612 }
613
614 static int is_rootdir(struct mdd_object *mdd_obj)
615 {
616         const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
617         const struct lu_fid *fid = mdo2fid(mdd_obj);
618
619         return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
620 }
621
622 /* get lov EA only */
623 static int __mdd_lmm_get(const struct lu_env *env,
624                          struct mdd_object *mdd_obj, struct md_attr *ma)
625 {
626         int rc;
627         ENTRY;
628
629         if (ma->ma_valid & MA_LOV)
630                 RETURN(0);
631
632         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
633                         XATTR_NAME_LOV);
634         if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
635                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
636         if (rc > 0) {
637                 ma->ma_lmm_size = rc;
638                 ma->ma_valid |= MA_LOV;
639                 rc = 0;
640         }
641         RETURN(rc);
642 }
643
644 /* get the first parent fid from link EA */
645 static int mdd_pfid_get(const struct lu_env *env,
646                         struct mdd_object *mdd_obj, struct md_attr *ma)
647 {
648         struct lu_buf *buf;
649         struct link_ea_header *leh;
650         struct link_ea_entry *lee;
651         struct lu_fid *pfid = &ma->ma_pfid;
652         ENTRY;
653
654         if (ma->ma_valid & MA_PFID)
655                 RETURN(0);
656
657         buf = mdd_links_get(env, mdd_obj);
658         if (IS_ERR(buf))
659                 RETURN(PTR_ERR(buf));
660
661         leh = buf->lb_buf;
662         lee = (struct link_ea_entry *)(leh + 1);
663         memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
664         fid_be_to_cpu(pfid, pfid);
665         ma->ma_valid |= MA_PFID;
666         if (buf->lb_len > OBD_ALLOC_BIG)
667                 /* if we vmalloced a large buffer drop it */
668                 mdd_buf_put(buf);
669         RETURN(0);
670 }
671
672 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
673                        struct md_attr *ma)
674 {
675         int rc;
676         ENTRY;
677
678         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
679         rc = __mdd_lmm_get(env, mdd_obj, ma);
680         mdd_read_unlock(env, mdd_obj);
681         RETURN(rc);
682 }
683
684 /* get lmv EA only*/
685 static int __mdd_lmv_get(const struct lu_env *env,
686                          struct mdd_object *mdd_obj, struct md_attr *ma)
687 {
688         int rc;
689         ENTRY;
690
691         if (ma->ma_valid & MA_LMV)
692                 RETURN(0);
693
694         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
695                         XATTR_NAME_LMV);
696         if (rc > 0) {
697                 ma->ma_valid |= MA_LMV;
698                 rc = 0;
699         }
700         RETURN(rc);
701 }
702
703 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
704                          struct md_attr *ma)
705 {
706         struct mdd_thread_info *info = mdd_env_info(env);
707         struct lustre_mdt_attrs *lma =
708                                  (struct lustre_mdt_attrs *)info->mti_xattr_buf;
709         int lma_size;
710         int rc;
711         ENTRY;
712
713         /* If all needed data are already valid, nothing to do */
714         if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
715             (ma->ma_need & (MA_HSM | MA_SOM)))
716                 RETURN(0);
717
718         /* Read LMA from disk EA */
719         lma_size = sizeof(info->mti_xattr_buf);
720         rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
721         if (rc <= 0)
722                 RETURN(rc);
723
724         /* Useless to check LMA incompatibility because this is already done in
725          * osd_ea_fid_get(), and this will fail long before this code is
726          * called.
727          * So, if we are here, LMA is compatible.
728          */
729
730         lustre_lma_swab(lma);
731
732         /* Swab and copy LMA */
733         if (ma->ma_need & MA_HSM) {
734                 if (lma->lma_compat & LMAC_HSM)
735                         ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
736                 else
737                         ma->ma_hsm.mh_flags = 0;
738                 ma->ma_valid |= MA_HSM;
739         }
740
741         /* Copy SOM */
742         if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
743                 LASSERT(ma->ma_som != NULL);
744                 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
745                 ma->ma_som->msd_size    = lma->lma_som_size;
746                 ma->ma_som->msd_blocks  = lma->lma_som_blocks;
747                 ma->ma_som->msd_mountid = lma->lma_som_mountid;
748                 ma->ma_valid |= MA_SOM;
749         }
750
751         RETURN(0);
752 }
753
754 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
755                                  struct md_attr *ma)
756 {
757         int rc = 0;
758         ENTRY;
759
760         if (ma->ma_need & MA_INODE)
761                 rc = mdd_iattr_get(env, mdd_obj, ma);
762
763         if (rc == 0 && ma->ma_need & MA_LOV) {
764                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
765                     S_ISDIR(mdd_object_type(mdd_obj)))
766                         rc = __mdd_lmm_get(env, mdd_obj, ma);
767         }
768         if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
769                 if (S_ISREG(mdd_object_type(mdd_obj)))
770                         rc = mdd_pfid_get(env, mdd_obj, ma);
771         }
772         if (rc == 0 && ma->ma_need & MA_LMV) {
773                 if (S_ISDIR(mdd_object_type(mdd_obj)))
774                         rc = __mdd_lmv_get(env, mdd_obj, ma);
775         }
776         if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
777                 if (S_ISREG(mdd_object_type(mdd_obj)))
778                         rc = __mdd_lma_get(env, mdd_obj, ma);
779         }
780 #ifdef CONFIG_FS_POSIX_ACL
781         if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
782                 if (S_ISDIR(mdd_object_type(mdd_obj)))
783                         rc = mdd_def_acl_get(env, mdd_obj, ma);
784         }
785 #endif
786         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
787                rc, ma->ma_valid, ma->ma_lmm);
788         RETURN(rc);
789 }
790
791 int mdd_attr_get_internal_locked(const struct lu_env *env,
792                                  struct mdd_object *mdd_obj, struct md_attr *ma)
793 {
794         int rc;
795         int needlock = ma->ma_need &
796                        (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
797
798         if (needlock)
799                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
800         rc = mdd_attr_get_internal(env, mdd_obj, ma);
801         if (needlock)
802                 mdd_read_unlock(env, mdd_obj);
803         return rc;
804 }
805
806 /*
807  * No permission check is needed.
808  */
809 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
810                         struct md_attr *ma)
811 {
812         struct mdd_object *mdd_obj = md2mdd_obj(obj);
813         int                rc;
814
815         ENTRY;
816         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
817         RETURN(rc);
818 }
819
820 /*
821  * No permission check is needed.
822  */
823 static int mdd_xattr_get(const struct lu_env *env,
824                          struct md_object *obj, struct lu_buf *buf,
825                          const char *name)
826 {
827         struct mdd_object *mdd_obj = md2mdd_obj(obj);
828         int rc;
829
830         ENTRY;
831
832         LASSERT(mdd_object_exists(mdd_obj));
833
834         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
835         rc = mdo_xattr_get(env, mdd_obj, buf, name,
836                            mdd_object_capa(env, mdd_obj));
837         mdd_read_unlock(env, mdd_obj);
838
839         RETURN(rc);
840 }
841
842 /*
843  * Permission check is done when open,
844  * no need check again.
845  */
846 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
847                         struct lu_buf *buf)
848 {
849         struct mdd_object *mdd_obj = md2mdd_obj(obj);
850         struct dt_object  *next;
851         loff_t             pos = 0;
852         int                rc;
853         ENTRY;
854
855         LASSERT(mdd_object_exists(mdd_obj));
856
857         next = mdd_object_child(mdd_obj);
858         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
859         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
860                                          mdd_object_capa(env, mdd_obj));
861         mdd_read_unlock(env, mdd_obj);
862         RETURN(rc);
863 }
864
865 /*
866  * No permission check is needed.
867  */
868 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
869                           struct lu_buf *buf)
870 {
871         struct mdd_object *mdd_obj = md2mdd_obj(obj);
872         int rc;
873
874         ENTRY;
875
876         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
877         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
878         mdd_read_unlock(env, mdd_obj);
879
880         RETURN(rc);
881 }
882
883 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
884                                struct mdd_object *c, struct md_attr *ma,
885                                struct thandle *handle,
886                                const struct md_op_spec *spec)
887 {
888         struct lu_attr *attr = &ma->ma_attr;
889         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
890         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
891         const struct dt_index_features *feat = spec->sp_feat;
892         int rc;
893         ENTRY;
894
895         if (!mdd_object_exists(c)) {
896                 struct dt_object *next = mdd_object_child(c);
897                 LASSERT(next);
898
899                 if (feat != &dt_directory_features && feat != NULL)
900                         dof->dof_type = DFT_INDEX;
901                 else
902                         dof->dof_type = dt_mode_to_dft(attr->la_mode);
903
904                 dof->u.dof_idx.di_feat = feat;
905
906                 /* @hint will be initialized by underlying device. */
907                 next->do_ops->do_ah_init(env, hint,
908                                          p ? mdd_object_child(p) : NULL,
909                                          attr->la_mode & S_IFMT);
910
911                 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
912                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
913         } else
914                 rc = -EEXIST;
915
916         RETURN(rc);
917 }
918
919 /**
920  * Make sure the ctime is increased only.
921  */
922 static inline int mdd_attr_check(const struct lu_env *env,
923                                  struct mdd_object *obj,
924                                  struct lu_attr *attr)
925 {
926         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
927         int rc;
928         ENTRY;
929
930         if (attr->la_valid & LA_CTIME) {
931                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
932                 if (rc)
933                         RETURN(rc);
934
935                 if (attr->la_ctime < tmp_la->la_ctime)
936                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
937                 else if (attr->la_valid == LA_CTIME &&
938                          attr->la_ctime == tmp_la->la_ctime)
939                         attr->la_valid &= ~LA_CTIME;
940         }
941         RETURN(0);
942 }
943
944 int mdd_attr_set_internal(const struct lu_env *env,
945                           struct mdd_object *obj,
946                           struct lu_attr *attr,
947                           struct thandle *handle,
948                           int needacl)
949 {
950         int rc;
951         ENTRY;
952
953         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
954 #ifdef CONFIG_FS_POSIX_ACL
955         if (!rc && (attr->la_valid & LA_MODE) && needacl)
956                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
957 #endif
958         RETURN(rc);
959 }
960
961 int mdd_attr_check_set_internal(const struct lu_env *env,
962                                 struct mdd_object *obj,
963                                 struct lu_attr *attr,
964                                 struct thandle *handle,
965                                 int needacl)
966 {
967         int rc;
968         ENTRY;
969
970         rc = mdd_attr_check(env, obj, attr);
971         if (rc)
972                 RETURN(rc);
973
974         if (attr->la_valid)
975                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
976         RETURN(rc);
977 }
978
979 static int mdd_attr_set_internal_locked(const struct lu_env *env,
980                                         struct mdd_object *obj,
981                                         struct lu_attr *attr,
982                                         struct thandle *handle,
983                                         int needacl)
984 {
985         int rc;
986         ENTRY;
987
988         needacl = needacl && (attr->la_valid & LA_MODE);
989         if (needacl)
990                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
991         rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
992         if (needacl)
993                 mdd_write_unlock(env, obj);
994         RETURN(rc);
995 }
996
997 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
998                                        struct mdd_object *obj,
999                                        struct lu_attr *attr,
1000                                        struct thandle *handle,
1001                                        int needacl)
1002 {
1003         int rc;
1004         ENTRY;
1005
1006         needacl = needacl && (attr->la_valid & LA_MODE);
1007         if (needacl)
1008                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1009         rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
1010         if (needacl)
1011                 mdd_write_unlock(env, obj);
1012         RETURN(rc);
1013 }
1014
1015 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
1016                     const struct lu_buf *buf, const char *name,
1017                     int fl, struct thandle *handle)
1018 {
1019         struct lustre_capa *capa = mdd_object_capa(env, obj);
1020         int rc = -EINVAL;
1021         ENTRY;
1022
1023         if (buf->lb_buf && buf->lb_len > 0)
1024                 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1025         else if (buf->lb_buf == NULL && buf->lb_len == 0)
1026                 rc = mdo_xattr_del(env, obj, name, handle, capa);
1027
1028         RETURN(rc);
1029 }
1030
1031 /*
1032  * This gives the same functionality as the code between
1033  * sys_chmod and inode_setattr
1034  * chown_common and inode_setattr
1035  * utimes and inode_setattr
1036  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1037  */
1038 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1039                         struct lu_attr *la, const struct md_attr *ma)
1040 {
1041         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
1042         struct md_ucred  *uc;
1043         int               rc;
1044         ENTRY;
1045
1046         if (!la->la_valid)
1047                 RETURN(0);
1048
1049         /* Do not permit change file type */
1050         if (la->la_valid & LA_TYPE)
1051                 RETURN(-EPERM);
1052
1053         /* They should not be processed by setattr */
1054         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1055                 RETURN(-EPERM);
1056
1057         /* export destroy does not have ->le_ses, but we may want
1058          * to drop LUSTRE_SOM_FL. */
1059         if (!env->le_ses)
1060                 RETURN(0);
1061
1062         uc = md_ucred(env);
1063
1064         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1065         if (rc)
1066                 RETURN(rc);
1067
1068         if (la->la_valid == LA_CTIME) {
1069                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1070                         /* This is only for set ctime when rename's source is
1071                          * on remote MDS. */
1072                         rc = mdd_may_delete(env, NULL, obj,
1073                                             (struct md_attr *)ma, 1, 0);
1074                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1075                         la->la_valid &= ~LA_CTIME;
1076                 RETURN(rc);
1077         }
1078
1079         if (la->la_valid == LA_ATIME) {
1080                 /* This is atime only set for read atime update on close. */
1081                 if (la->la_atime >= tmp_la->la_atime &&
1082                     la->la_atime < (tmp_la->la_atime +
1083                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1084                         la->la_valid &= ~LA_ATIME;
1085                 RETURN(0);
1086         }
1087
1088         /* Check if flags change. */
1089         if (la->la_valid & LA_FLAGS) {
1090                 unsigned int oldflags = 0;
1091                 unsigned int newflags = la->la_flags &
1092                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1093
1094                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1095                     !mdd_capable(uc, CFS_CAP_FOWNER))
1096                         RETURN(-EPERM);
1097
1098                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1099                  * only be changed by the relevant capability. */
1100                 if (mdd_is_immutable(obj))
1101                         oldflags |= LUSTRE_IMMUTABLE_FL;
1102                 if (mdd_is_append(obj))
1103                         oldflags |= LUSTRE_APPEND_FL;
1104                 if ((oldflags ^ newflags) &&
1105                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1106                         RETURN(-EPERM);
1107
1108                 if (!S_ISDIR(tmp_la->la_mode))
1109                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1110         }
1111
1112         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1113             (la->la_valid & ~LA_FLAGS) &&
1114             !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1115                 RETURN(-EPERM);
1116
1117         /* Check for setting the obj time. */
1118         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1119             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1120                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1121                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
1122                         rc = mdd_permission_internal_locked(env, obj, tmp_la,
1123                                                             MAY_WRITE,
1124                                                             MOR_TGT_CHILD);
1125                         if (rc)
1126                                 RETURN(rc);
1127                 }
1128         }
1129
1130         if (la->la_valid & LA_KILL_SUID) {
1131                 la->la_valid &= ~LA_KILL_SUID;
1132                 if ((tmp_la->la_mode & S_ISUID) &&
1133                     !(la->la_valid & LA_MODE)) {
1134                         la->la_mode = tmp_la->la_mode;
1135                         la->la_valid |= LA_MODE;
1136                 }
1137                 la->la_mode &= ~S_ISUID;
1138         }
1139
1140         if (la->la_valid & LA_KILL_SGID) {
1141                 la->la_valid &= ~LA_KILL_SGID;
1142                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1143                                         (S_ISGID | S_IXGRP)) &&
1144                     !(la->la_valid & LA_MODE)) {
1145                         la->la_mode = tmp_la->la_mode;
1146                         la->la_valid |= LA_MODE;
1147                 }
1148                 la->la_mode &= ~S_ISGID;
1149         }
1150
1151         /* Make sure a caller can chmod. */
1152         if (la->la_valid & LA_MODE) {
1153                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1154                     (uc->mu_fsuid != tmp_la->la_uid) &&
1155                     !mdd_capable(uc, CFS_CAP_FOWNER))
1156                         RETURN(-EPERM);
1157
1158                 if (la->la_mode == (cfs_umode_t) -1)
1159                         la->la_mode = tmp_la->la_mode;
1160                 else
1161                         la->la_mode = (la->la_mode & S_IALLUGO) |
1162                                       (tmp_la->la_mode & ~S_IALLUGO);
1163
1164                 /* Also check the setgid bit! */
1165                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1166                                        la->la_gid : tmp_la->la_gid) &&
1167                     !mdd_capable(uc, CFS_CAP_FSETID))
1168                         la->la_mode &= ~S_ISGID;
1169         } else {
1170                la->la_mode = tmp_la->la_mode;
1171         }
1172
1173         /* Make sure a caller can chown. */
1174         if (la->la_valid & LA_UID) {
1175                 if (la->la_uid == (uid_t) -1)
1176                         la->la_uid = tmp_la->la_uid;
1177                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1178                     (la->la_uid != tmp_la->la_uid)) &&
1179                     !mdd_capable(uc, CFS_CAP_CHOWN))
1180                         RETURN(-EPERM);
1181
1182                 /* If the user or group of a non-directory has been
1183                  * changed by a non-root user, remove the setuid bit.
1184                  * 19981026 David C Niemi <niemi@tux.org>
1185                  *
1186                  * Changed this to apply to all users, including root,
1187                  * to avoid some races. This is the behavior we had in
1188                  * 2.0. The check for non-root was definitely wrong
1189                  * for 2.2 anyway, as it should have been using
1190                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
1191                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1192                     !S_ISDIR(tmp_la->la_mode)) {
1193                         la->la_mode &= ~S_ISUID;
1194                         la->la_valid |= LA_MODE;
1195                 }
1196         }
1197
1198         /* Make sure caller can chgrp. */
1199         if (la->la_valid & LA_GID) {
1200                 if (la->la_gid == (gid_t) -1)
1201                         la->la_gid = tmp_la->la_gid;
1202                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1203                     ((la->la_gid != tmp_la->la_gid) &&
1204                     !lustre_in_group_p(uc, la->la_gid))) &&
1205                     !mdd_capable(uc, CFS_CAP_CHOWN))
1206                         RETURN(-EPERM);
1207
1208                 /* Likewise, if the user or group of a non-directory
1209                  * has been changed by a non-root user, remove the
1210                  * setgid bit UNLESS there is no group execute bit
1211                  * (this would be a file marked for mandatory
1212                  * locking).  19981026 David C Niemi <niemi@tux.org>
1213                  *
1214                  * Removed the fsuid check (see the comment above) --
1215                  * 19990830 SD. */
1216                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1217                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1218                         la->la_mode &= ~S_ISGID;
1219                         la->la_valid |= LA_MODE;
1220                 }
1221         }
1222
1223         /* For both Size-on-MDS case and truncate case,
1224          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1225          * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1226          * For SOM case, it is true, the MAY_WRITE perm has been checked
1227          * when open, no need check again. For truncate case, it is false,
1228          * the MAY_WRITE perm should be checked here. */
1229         if (ma->ma_attr_flags & MDS_SOM) {
1230                 /* For the "Size-on-MDS" setattr update, merge coming
1231                  * attributes with the set in the inode. BUG 10641 */
1232                 if ((la->la_valid & LA_ATIME) &&
1233                     (la->la_atime <= tmp_la->la_atime))
1234                         la->la_valid &= ~LA_ATIME;
1235
1236                 /* OST attributes do not have a priority over MDS attributes,
1237                  * so drop times if ctime is equal. */
1238                 if ((la->la_valid & LA_CTIME) &&
1239                     (la->la_ctime <= tmp_la->la_ctime))
1240                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1241         } else {
1242                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1243                         if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1244                               (uc->mu_fsuid == tmp_la->la_uid)) &&
1245                             !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1246                                 rc = mdd_permission_internal_locked(env, obj,
1247                                                             tmp_la, MAY_WRITE,
1248                                                             MOR_TGT_CHILD);
1249                                 if (rc)
1250                                         RETURN(rc);
1251                         }
1252                 }
1253                 if (la->la_valid & LA_CTIME) {
1254                         /* The pure setattr, it has the priority over what is
1255                          * already set, do not drop it if ctime is equal. */
1256                         if (la->la_ctime < tmp_la->la_ctime)
1257                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1258                                                   LA_CTIME);
1259                 }
1260         }
1261
1262         RETURN(0);
1263 }
1264
1265 /** Store a data change changelog record
1266  * If this fails, we must fail the whole transaction; we don't
1267  * want the change to commit without the log entry.
1268  * \param mdd_obj - mdd_object of change
1269  * \param handle - transacion handle
1270  */
1271 static int mdd_changelog_data_store(const struct lu_env     *env,
1272                                     struct mdd_device       *mdd,
1273                                     enum changelog_rec_type type,
1274                                     int                     flags,
1275                                     struct mdd_object       *mdd_obj,
1276                                     struct thandle          *handle)
1277 {
1278         const struct lu_fid *tfid = mdo2fid(mdd_obj);
1279         struct llog_changelog_rec *rec;
1280         struct lu_buf *buf;
1281         int reclen;
1282         int rc;
1283
1284         /* Not recording */
1285         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1286                 RETURN(0);
1287         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1288                 RETURN(0);
1289
1290         LASSERT(handle != NULL);
1291         LASSERT(mdd_obj != NULL);
1292
1293         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1294             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1295                 /* Don't need multiple updates in this log */
1296                 /* Don't check under lock - no big deal if we get an extra
1297                    entry */
1298                 RETURN(0);
1299         }
1300
1301         reclen = llog_data_len(sizeof(*rec));
1302         buf = mdd_buf_alloc(env, reclen);
1303         if (buf->lb_buf == NULL)
1304                 RETURN(-ENOMEM);
1305         rec = (struct llog_changelog_rec *)buf->lb_buf;
1306
1307         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1308         rec->cr.cr_type = (__u32)type;
1309         rec->cr.cr_tfid = *tfid;
1310         rec->cr.cr_namelen = 0;
1311         mdd_obj->mod_cltime = cfs_time_current_64();
1312
1313         rc = mdd_changelog_llog_write(mdd, rec, handle);
1314         if (rc < 0) {
1315                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1316                        rc, type, PFID(tfid));
1317                 return -EFAULT;
1318         }
1319
1320         return 0;
1321 }
1322
1323 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1324                   int flags, struct md_object *obj)
1325 {
1326         struct thandle *handle;
1327         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1328         struct mdd_device *mdd = mdo2mdd(obj);
1329         int rc;
1330         ENTRY;
1331
1332         handle = mdd_trans_start(env, mdd);
1333
1334         if (IS_ERR(handle))
1335                 return(PTR_ERR(handle));
1336
1337         rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1338                                       handle);
1339
1340         mdd_trans_stop(env, mdd, rc, handle);
1341
1342         RETURN(rc);
1343 }
1344
1345 /**
1346  * Should be called with write lock held.
1347  *
1348  * \see mdd_lma_set_locked().
1349  */
1350 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1351                        const struct md_attr *ma, struct thandle *handle)
1352 {
1353         struct mdd_thread_info *info = mdd_env_info(env);
1354         struct lu_buf *buf;
1355         struct lustre_mdt_attrs *lma =
1356                                 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1357         int lmasize = sizeof(struct lustre_mdt_attrs);
1358         int rc = 0;
1359
1360         ENTRY;
1361
1362         /* Either HSM or SOM part is not valid, we need to read it before */
1363         if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1364                 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1365                 if (rc <= 0)
1366                         RETURN(rc);
1367
1368                 lustre_lma_swab(lma);
1369         } else {
1370                 memset(lma, 0, lmasize);
1371         }
1372
1373         /* Copy HSM data */
1374         if (ma->ma_valid & MA_HSM) {
1375                 lma->lma_flags  |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1376                 lma->lma_compat |= LMAC_HSM;
1377         }
1378
1379         /* Copy SOM data */
1380         if (ma->ma_valid & MA_SOM) {
1381                 LASSERT(ma->ma_som != NULL);
1382                 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1383                         lma->lma_compat     &= ~LMAC_SOM;
1384                 } else {
1385                         lma->lma_compat     |= LMAC_SOM;
1386                         lma->lma_ioepoch     = ma->ma_som->msd_ioepoch;
1387                         lma->lma_som_size    = ma->ma_som->msd_size;
1388                         lma->lma_som_blocks  = ma->ma_som->msd_blocks;
1389                         lma->lma_som_mountid = ma->ma_som->msd_mountid;
1390                 }
1391         }
1392
1393         /* Copy FID */
1394         memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1395
1396         lustre_lma_swab(lma);
1397         buf = mdd_buf_get(env, lma, lmasize);
1398         rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1399
1400         RETURN(rc);
1401 }
1402
1403 /**
1404  * Save LMA extended attributes with data from \a ma.
1405  *
1406  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1407  * not, LMA EA will be first read from disk, modified and write back.
1408  *
1409  */
1410 static int mdd_lma_set_locked(const struct lu_env *env,
1411                               struct mdd_object *mdd_obj,
1412                               const struct md_attr *ma, struct thandle *handle)
1413 {
1414         int rc;
1415
1416         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1417         rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1418         mdd_write_unlock(env, mdd_obj);
1419         return rc;
1420 }
1421
1422 /* Precedence for choosing record type when multiple
1423  * attributes change: setattr > mtime > ctime > atime
1424  * (ctime changes when mtime does, plus chmod/chown.
1425  * atime and ctime are independent.) */
1426 static int mdd_attr_set_changelog(const struct lu_env *env,
1427                                   struct md_object *obj, struct thandle *handle,
1428                                   __u64 valid)
1429 {
1430         struct mdd_device *mdd = mdo2mdd(obj);
1431         int bits, type = 0;
1432
1433         bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1434         bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1435         bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1436         bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1437         bits = bits & mdd->mdd_cl.mc_mask;
1438         if (bits == 0)
1439                 return 0;
1440
1441         /* The record type is the lowest non-masked set bit */
1442         while (bits && ((bits & 1) == 0)) {
1443                 bits = bits >> 1;
1444                 type++;
1445         }
1446
1447         /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1448         return mdd_changelog_data_store(env, mdd, type, (int)valid,
1449                                         md2mdd_obj(obj), handle);
1450 }
1451
1452 /* set attr and LOV EA at once, return updated attr */
1453 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1454                         const struct md_attr *ma)
1455 {
1456         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1457         struct mdd_device *mdd = mdo2mdd(obj);
1458         struct thandle *handle;
1459         struct lov_mds_md *lmm = NULL;
1460         struct llog_cookie *logcookies = NULL;
1461         int  rc, lmm_size = 0, cookie_size = 0, chlog_cnt;
1462         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1463         struct obd_device *obd = mdd->mdd_obd_dev;
1464         struct mds_obd *mds = &obd->u.mds;
1465 #ifdef HAVE_QUOTA_SUPPORT
1466         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1467         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1468         int quota_opc = 0, block_count = 0;
1469         int inode_pending[MAXQUOTAS] = { 0, 0 };
1470         int block_pending[MAXQUOTAS] = { 0, 0 };
1471 #endif
1472         ENTRY;
1473
1474         *la_copy = ma->ma_attr;
1475         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1476         if (rc != 0)
1477                 RETURN(rc);
1478
1479         /* setattr on "close" only change atime, or do nothing */
1480         if (ma->ma_valid == MA_INODE &&
1481             ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1482                 RETURN(0);
1483
1484         /*TODO: add lock here*/
1485         /* start a log jounal handle if needed */
1486         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1487             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1488                 lmm_size = mdd_lov_mdsize(env, mdd);
1489                 lmm = mdd_max_lmm_get(env, mdd);
1490                 if (lmm == NULL)
1491                         GOTO(no_trans, rc = -ENOMEM);
1492
1493                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1494                                 XATTR_NAME_LOV);
1495
1496                 if (rc < 0)
1497                         GOTO(no_trans, rc);
1498         }
1499
1500         chlog_cnt = 1;
1501         if (la_copy->la_valid && !(la_copy->la_valid & LA_FLAGS) && lmm_size) {
1502                 chlog_cnt += (lmm->lmm_stripe_count >= 0) ?
1503                          lmm->lmm_stripe_count : mds->mds_lov_desc.ld_tgt_count;
1504         }
1505         mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1506                                     MDD_TXN_ATTR_SET_OP, chlog_cnt);
1507         handle = mdd_trans_start(env, mdd);
1508         if (IS_ERR(handle))
1509                 GOTO(no_trans, rc = PTR_ERR(handle));
1510
1511         if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1512                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1513                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1514
1515 #ifdef HAVE_QUOTA_SUPPORT
1516         if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1517                 struct obd_export *exp = md_quota(env)->mq_exp;
1518                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1519
1520                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1521                 if (!rc) {
1522                         quota_opc = FSFILT_OP_SETATTR;
1523                         mdd_quota_wrapper(la_copy, qnids);
1524                         mdd_quota_wrapper(la_tmp, qoids);
1525                         /* get file quota for new owner */
1526                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1527                                         qnids, inode_pending, 1, NULL, 0,
1528                                         NULL, 0);
1529                         block_count = (la_tmp->la_blocks + 7) >> 3;
1530                         if (block_count) {
1531                                 void *data = NULL;
1532                                 mdd_data_get(env, mdd_obj, &data);
1533                                 /* get block quota for new owner */
1534                                 lquota_chkquota(mds_quota_interface_ref, obd,
1535                                                 exp, qnids, block_pending,
1536                                                 block_count, NULL,
1537                                                 LQUOTA_FLAGS_BLK, data, 1);
1538                         }
1539                 }
1540         }
1541 #endif
1542
1543         if (la_copy->la_valid & LA_FLAGS) {
1544                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1545                                                   handle, 1);
1546                 if (rc == 0)
1547                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1548         } else if (la_copy->la_valid) {            /* setattr */
1549                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1550                                                   handle, 1);
1551                 /* journal chown/chgrp in llog, just like unlink */
1552                 if (rc == 0 && lmm_size){
1553                         cookie_size = mdd_lov_cookiesize(env, mdd);
1554                         logcookies = mdd_max_cookie_get(env, mdd);
1555                         if (logcookies == NULL)
1556                                 GOTO(cleanup, rc = -ENOMEM);
1557
1558                         if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1559                                             logcookies, cookie_size) <= 0)
1560                                 logcookies = NULL;
1561                 }
1562         }
1563
1564         if (rc == 0 && ma->ma_valid & MA_LOV) {
1565                 cfs_umode_t mode;
1566
1567                 mode = mdd_object_type(mdd_obj);
1568                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1569                         rc = mdd_lsm_sanity_check(env, mdd_obj);
1570                         if (rc)
1571                                 GOTO(cleanup, rc);
1572
1573                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1574                                             ma->ma_lmm_size, handle, 1);
1575                 }
1576
1577         }
1578         if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1579                 cfs_umode_t mode;
1580
1581                 mode = mdd_object_type(mdd_obj);
1582                 if (S_ISREG(mode))
1583                         rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1584
1585         }
1586 cleanup:
1587         if (rc == 0)
1588                 rc = mdd_attr_set_changelog(env, obj, handle,
1589                                             ma->ma_attr.la_valid);
1590         mdd_trans_stop(env, mdd, rc, handle);
1591 no_trans:
1592         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1593                 /*set obd attr, if needed*/
1594                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1595                                            logcookies);
1596         }
1597 #ifdef HAVE_QUOTA_SUPPORT
1598         if (quota_opc) {
1599                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1600                                       inode_pending, 0);
1601                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1602                                       block_pending, 1);
1603                 /* Trigger dqrel/dqacq for original owner and new owner.
1604                  * If failed, the next call for lquota_chkquota will
1605                  * process it. */
1606                 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1607                               quota_opc);
1608         }
1609 #endif
1610         RETURN(rc);
1611 }
1612
1613 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1614                       const struct lu_buf *buf, const char *name, int fl,
1615                       struct thandle *handle)
1616 {
1617         int  rc;
1618         ENTRY;
1619
1620         mdd_write_lock(env, obj, MOR_TGT_CHILD);
1621         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1622         mdd_write_unlock(env, obj);
1623
1624         RETURN(rc);
1625 }
1626
1627 static int mdd_xattr_sanity_check(const struct lu_env *env,
1628                                   struct mdd_object *obj)
1629 {
1630         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1631         struct md_ucred *uc     = md_ucred(env);
1632         int rc;
1633         ENTRY;
1634
1635         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1636                 RETURN(-EPERM);
1637
1638         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1639         if (rc)
1640                 RETURN(rc);
1641
1642         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1643             !mdd_capable(uc, CFS_CAP_FOWNER))
1644                 RETURN(-EPERM);
1645
1646         RETURN(rc);
1647 }
1648
1649 /**
1650  * The caller should guarantee to update the object ctime
1651  * after xattr_set if needed.
1652  */
1653 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1654                          const struct lu_buf *buf, const char *name,
1655                          int fl)
1656 {
1657         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1658         struct mdd_device *mdd = mdo2mdd(obj);
1659         struct thandle *handle;
1660         int  rc;
1661         ENTRY;
1662
1663         rc = mdd_xattr_sanity_check(env, mdd_obj);
1664         if (rc)
1665                 RETURN(rc);
1666
1667         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 1);
1668         /* security-replated changes may require sync */
1669         if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1670             mdd->mdd_sync_permission == 1)
1671                 txn_param_sync(&mdd_env_info(env)->mti_param);
1672
1673         handle = mdd_trans_start(env, mdd);
1674         if (IS_ERR(handle))
1675                 RETURN(PTR_ERR(handle));
1676
1677         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1678
1679         /* Only record user xattr changes */
1680         if ((rc == 0) && (strncmp("user.", name, 5) == 0))
1681                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1682                                               handle);
1683         mdd_trans_stop(env, mdd, rc, handle);
1684
1685         RETURN(rc);
1686 }
1687
1688 /**
1689  * The caller should guarantee to update the object ctime
1690  * after xattr_set if needed.
1691  */
1692 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1693                   const char *name)
1694 {
1695         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1696         struct mdd_device *mdd = mdo2mdd(obj);
1697         struct thandle *handle;
1698         int  rc;
1699         ENTRY;
1700
1701         rc = mdd_xattr_sanity_check(env, mdd_obj);
1702         if (rc)
1703                 RETURN(rc);
1704
1705         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 1);
1706         handle = mdd_trans_start(env, mdd);
1707         if (IS_ERR(handle))
1708                 RETURN(PTR_ERR(handle));
1709
1710         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1711         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1712                            mdd_object_capa(env, mdd_obj));
1713         mdd_write_unlock(env, mdd_obj);
1714
1715         /* Only record user xattr changes */
1716         if ((rc == 0) && (strncmp("user.", name, 5) != 0))
1717                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1718                                               handle);
1719
1720         mdd_trans_stop(env, mdd, rc, handle);
1721
1722         RETURN(rc);
1723 }
1724
1725 /* partial unlink */
1726 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1727                        struct md_attr *ma)
1728 {
1729         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1730         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1731         struct mdd_device *mdd = mdo2mdd(obj);
1732         struct thandle *handle;
1733 #ifdef HAVE_QUOTA_SUPPORT
1734         struct obd_device *obd = mdd->mdd_obd_dev;
1735         struct mds_obd *mds = &obd->u.mds;
1736         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1737         int quota_opc = 0;
1738 #endif
1739         int rc;
1740         ENTRY;
1741
1742         /*
1743          * Check -ENOENT early here because we need to get object type
1744          * to calculate credits before transaction start
1745          */
1746         if (!mdd_object_exists(mdd_obj))
1747                 RETURN(-ENOENT);
1748
1749         LASSERT(mdd_object_exists(mdd_obj) > 0);
1750
1751         rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP, 0);
1752         if (rc)
1753                 RETURN(rc);
1754
1755         handle = mdd_trans_start(env, mdd);
1756         if (IS_ERR(handle))
1757                 RETURN(-ENOMEM);
1758
1759         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1760
1761         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1762         if (rc)
1763                 GOTO(cleanup, rc);
1764
1765         __mdd_ref_del(env, mdd_obj, handle, 0);
1766
1767         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1768                 /* unlink dot */
1769                 __mdd_ref_del(env, mdd_obj, handle, 1);
1770         }
1771
1772         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1773         la_copy->la_ctime = ma->ma_attr.la_ctime;
1774
1775         la_copy->la_valid = LA_CTIME;
1776         rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1777         if (rc)
1778                 GOTO(cleanup, rc);
1779
1780         rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1781 #ifdef HAVE_QUOTA_SUPPORT
1782         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1783             ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1784                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1785                 mdd_quota_wrapper(&ma->ma_attr, qids);
1786         }
1787 #endif
1788
1789
1790         EXIT;
1791 cleanup:
1792         mdd_write_unlock(env, mdd_obj);
1793         mdd_trans_stop(env, mdd, rc, handle);
1794 #ifdef HAVE_QUOTA_SUPPORT
1795         if (quota_opc)
1796                 /* Trigger dqrel on the owner of child. If failed,
1797                  * the next call for lquota_chkquota will process it */
1798                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1799                               quota_opc);
1800 #endif
1801         return rc;
1802 }
1803
1804 /* partial operation */
1805 static int mdd_oc_sanity_check(const struct lu_env *env,
1806                                struct mdd_object *obj,
1807                                struct md_attr *ma)
1808 {
1809         int rc;
1810         ENTRY;
1811
1812         switch (ma->ma_attr.la_mode & S_IFMT) {
1813         case S_IFREG:
1814         case S_IFDIR:
1815         case S_IFLNK:
1816         case S_IFCHR:
1817         case S_IFBLK:
1818         case S_IFIFO:
1819         case S_IFSOCK:
1820                 rc = 0;
1821                 break;
1822         default:
1823                 rc = -EINVAL;
1824                 break;
1825         }
1826         RETURN(rc);
1827 }
1828
1829 static int mdd_object_create(const struct lu_env *env,
1830                              struct md_object *obj,
1831                              const struct md_op_spec *spec,
1832                              struct md_attr *ma)
1833 {
1834
1835         struct mdd_device *mdd = mdo2mdd(obj);
1836         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1837         const struct lu_fid *pfid = spec->u.sp_pfid;
1838         struct thandle *handle;
1839 #ifdef HAVE_QUOTA_SUPPORT
1840         struct obd_device *obd = mdd->mdd_obd_dev;
1841         struct obd_export *exp = md_quota(env)->mq_exp;
1842         struct mds_obd *mds = &obd->u.mds;
1843         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1844         int quota_opc = 0, block_count = 0;
1845         int inode_pending[MAXQUOTAS] = { 0, 0 };
1846         int block_pending[MAXQUOTAS] = { 0, 0 };
1847 #endif
1848         int rc = 0;
1849         ENTRY;
1850
1851 #ifdef HAVE_QUOTA_SUPPORT
1852         if (mds->mds_quota) {
1853                 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1854                 mdd_quota_wrapper(&ma->ma_attr, qids);
1855                 /* get file quota for child */
1856                 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1857                                 qids, inode_pending, 1, NULL, 0,
1858                                 NULL, 0);
1859                 switch (ma->ma_attr.la_mode & S_IFMT) {
1860                 case S_IFLNK:
1861                 case S_IFDIR:
1862                         block_count = 2;
1863                         break;
1864                 case S_IFREG:
1865                         block_count = 1;
1866                         break;
1867                 }
1868                 /* get block quota for child */
1869                 if (block_count)
1870                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1871                                         qids, block_pending, block_count,
1872                                         NULL, LQUOTA_FLAGS_BLK, NULL, 0);
1873         }
1874 #endif
1875
1876         mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP, 0);
1877         handle = mdd_trans_start(env, mdd);
1878         if (IS_ERR(handle))
1879                 GOTO(out_pending, rc = PTR_ERR(handle));
1880
1881         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1882         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1883         if (rc)
1884                 GOTO(unlock, rc);
1885
1886         rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1887         if (rc)
1888                 GOTO(unlock, rc);
1889
1890         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1891                 /* If creating the slave object, set slave EA here. */
1892                 int lmv_size = spec->u.sp_ea.eadatalen;
1893                 struct lmv_stripe_md *lmv;
1894
1895                 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1896                 LASSERT(lmv != NULL && lmv_size > 0);
1897
1898                 rc = __mdd_xattr_set(env, mdd_obj,
1899                                      mdd_buf_get_const(env, lmv, lmv_size),
1900                                      XATTR_NAME_LMV, 0, handle);
1901                 if (rc)
1902                         GOTO(unlock, rc);
1903
1904                 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1905                                            handle, 0);
1906         } else {
1907 #ifdef CONFIG_FS_POSIX_ACL
1908                 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1909                         struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1910
1911                         buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1912                         buf->lb_len = spec->u.sp_ea.eadatalen;
1913                         if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1914                                 rc = __mdd_acl_init(env, mdd_obj, buf,
1915                                                     &ma->ma_attr.la_mode,
1916                                                     handle);
1917                                 if (rc)
1918                                         GOTO(unlock, rc);
1919                                 else
1920                                         ma->ma_attr.la_valid |= LA_MODE;
1921                         }
1922
1923                         pfid = spec->u.sp_ea.fid;
1924                 }
1925 #endif
1926                 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1927                                            spec);
1928         }
1929         EXIT;
1930 unlock:
1931         if (rc == 0)
1932                 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1933         mdd_write_unlock(env, mdd_obj);
1934
1935         mdd_trans_stop(env, mdd, rc, handle);
1936 out_pending:
1937 #ifdef HAVE_QUOTA_SUPPORT
1938         if (quota_opc) {
1939                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1940                                       inode_pending, 0);
1941                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1942                                       block_pending, 1);
1943                 /* Trigger dqacq on the owner of child. If failed,
1944                  * the next call for lquota_chkquota will process it. */
1945                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1946                               quota_opc);
1947         }
1948 #endif
1949         return rc;
1950 }
1951
1952 /* partial link */
1953 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1954                        const struct md_attr *ma)
1955 {
1956         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1957         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1958         struct mdd_device *mdd = mdo2mdd(obj);
1959         struct thandle *handle;
1960         int rc;
1961         ENTRY;
1962
1963         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 0);
1964         handle = mdd_trans_start(env, mdd);
1965         if (IS_ERR(handle))
1966                 RETURN(-ENOMEM);
1967
1968         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1969         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1970         if (rc == 0)
1971                 __mdd_ref_add(env, mdd_obj, handle);
1972         mdd_write_unlock(env, mdd_obj);
1973         if (rc == 0) {
1974                 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1975                 la_copy->la_ctime = ma->ma_attr.la_ctime;
1976
1977                 la_copy->la_valid = LA_CTIME;
1978                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1979                                                         handle, 0);
1980         }
1981         mdd_trans_stop(env, mdd, 0, handle);
1982
1983         RETURN(rc);
1984 }
1985
1986 /*
1987  * do NOT or the MAY_*'s, you'll get the weakest
1988  */
1989 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1990 {
1991         int res = 0;
1992
1993         /* Sadly, NFSD reopens a file repeatedly during operation, so the
1994          * "acc_mode = 0" allowance for newly-created files isn't honoured.
1995          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1996          * owner can write to a file even if it is marked readonly to hide
1997          * its brokenness. (bug 5781) */
1998         if (flags & MDS_OPEN_OWNEROVERRIDE) {
1999                 struct md_ucred *uc = md_ucred(env);
2000
2001                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
2002                     (la->la_uid == uc->mu_fsuid))
2003                         return 0;
2004         }
2005
2006         if (flags & FMODE_READ)
2007                 res |= MAY_READ;
2008         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2009                 res |= MAY_WRITE;
2010         if (flags & MDS_FMODE_EXEC)
2011                 res = MAY_EXEC;
2012         return res;
2013 }
2014
2015 static int mdd_open_sanity_check(const struct lu_env *env,
2016                                  struct mdd_object *obj, int flag)
2017 {
2018         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2019         int mode, rc;
2020         ENTRY;
2021
2022         /* EEXIST check */
2023         if (mdd_is_dead_obj(obj))
2024                 RETURN(-ENOENT);
2025
2026         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
2027         if (rc)
2028                RETURN(rc);
2029
2030         if (S_ISLNK(tmp_la->la_mode))
2031                 RETURN(-ELOOP);
2032
2033         mode = accmode(env, tmp_la, flag);
2034
2035         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2036                 RETURN(-EISDIR);
2037
2038         if (!(flag & MDS_OPEN_CREATED)) {
2039                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
2040                 if (rc)
2041                         RETURN(rc);
2042         }
2043
2044         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2045             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2046                 flag &= ~MDS_OPEN_TRUNC;
2047
2048         /* For writing append-only file must open it with append mode. */
2049         if (mdd_is_append(obj)) {
2050                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2051                         RETURN(-EPERM);
2052                 if (flag & MDS_OPEN_TRUNC)
2053                         RETURN(-EPERM);
2054         }
2055
2056 #if 0
2057         /*
2058          * Now, flag -- O_NOATIME does not be packed by client.
2059          */
2060         if (flag & O_NOATIME) {
2061                 struct md_ucred *uc = md_ucred(env);
2062
2063                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2064                     (uc->mu_valid == UCRED_NEW)) &&
2065                     (uc->mu_fsuid != tmp_la->la_uid) &&
2066                     !mdd_capable(uc, CFS_CAP_FOWNER))
2067                         RETURN(-EPERM);
2068         }
2069 #endif
2070
2071         RETURN(0);
2072 }
2073
2074 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2075                     int flags)
2076 {
2077         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2078         int rc = 0;
2079
2080         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2081
2082         rc = mdd_open_sanity_check(env, mdd_obj, flags);
2083         if (rc == 0)
2084                 mdd_obj->mod_count++;
2085
2086         mdd_write_unlock(env, mdd_obj);
2087         return rc;
2088 }
2089
2090 /* return md_attr back,
2091  * if it is last unlink then return lov ea + llog cookie*/
2092 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2093                     struct md_attr *ma)
2094 {
2095         int rc = 0;
2096         ENTRY;
2097
2098         if (S_ISREG(mdd_object_type(obj))) {
2099                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2100                  * Caller must be ready for that. */
2101
2102                 rc = __mdd_lmm_get(env, obj, ma);
2103                 if ((ma->ma_valid & MA_LOV))
2104                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2105                                             obj, ma);
2106         }
2107         RETURN(rc);
2108 }
2109
2110 /*
2111  * No permission check is needed.
2112  */
2113 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2114                      struct md_attr *ma)
2115 {
2116         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2117         struct mdd_device *mdd = mdo2mdd(obj);
2118         struct thandle    *handle = NULL;
2119         int rc;
2120         int reset = 1;
2121
2122 #ifdef HAVE_QUOTA_SUPPORT
2123         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2124         struct mds_obd *mds = &obd->u.mds;
2125         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2126         int quota_opc = 0;
2127 #endif
2128         ENTRY;
2129
2130         if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2131                 mdd_obj->mod_count--;
2132
2133                 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2134                         CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2135                                "list\n", PFID(mdd_object_fid(mdd_obj)));
2136                 RETURN(0);
2137         }
2138
2139         /* check without any lock */
2140         if (mdd_obj->mod_count == 1 &&
2141             (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2142  again:
2143                 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP, 0);
2144                 if (rc)
2145                         RETURN(rc);
2146                 handle = mdd_trans_start(env, mdo2mdd(obj));
2147                 if (IS_ERR(handle))
2148                         RETURN(PTR_ERR(handle));
2149         }
2150
2151         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2152         if (handle == NULL && mdd_obj->mod_count == 1 &&
2153             (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2154                 mdd_write_unlock(env, mdd_obj);
2155                 goto again;
2156         }
2157
2158         /* release open count */
2159         mdd_obj->mod_count --;
2160
2161         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2162                 /* remove link to object from orphan index */
2163                 rc = __mdd_orphan_del(env, mdd_obj, handle);
2164                 if (rc == 0) {
2165                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2166                                "list, OSS objects to be destroyed.\n",
2167                                PFID(mdd_object_fid(mdd_obj)));
2168                 } else {
2169                         CERROR("Object "DFID" can not be deleted from orphan "
2170                                 "list, maybe cause OST objects can not be "
2171                                 "destroyed (err: %d).\n",
2172                                 PFID(mdd_object_fid(mdd_obj)), rc);
2173                         /* If object was not deleted from orphan list, do not
2174                          * destroy OSS objects, which will be done when next
2175                          * recovery. */
2176                         GOTO(out, rc);
2177                 }
2178         }
2179
2180         rc = mdd_iattr_get(env, mdd_obj, ma);
2181         /* Object maybe not in orphan list originally, it is rare case for
2182          * mdd_finish_unlink() failure. */
2183         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
2184 #ifdef HAVE_QUOTA_SUPPORT
2185                 if (mds->mds_quota) {
2186                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2187                         mdd_quota_wrapper(&ma->ma_attr, qids);
2188                 }
2189 #endif
2190                 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2191                 if (ma->ma_valid & MA_FLAGS &&
2192                     ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2193                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2194                 } else {
2195                         rc = mdd_object_kill(env, mdd_obj, ma);
2196                         if (rc == 0)
2197                                 reset = 0;
2198                 }
2199
2200                 if (rc != 0)
2201                         CERROR("Error when prepare to delete Object "DFID" , "
2202                                "which will cause OST objects can not be "
2203                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
2204         }
2205         EXIT;
2206
2207 out:
2208         if (reset)
2209                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2210
2211         mdd_write_unlock(env, mdd_obj);
2212         if (handle != NULL)
2213                 mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
2214 #ifdef HAVE_QUOTA_SUPPORT
2215         if (quota_opc)
2216                 /* Trigger dqrel on the owner of child. If failed,
2217                  * the next call for lquota_chkquota will process it */
2218                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2219                               quota_opc);
2220 #endif
2221         return rc;
2222 }
2223
2224 /*
2225  * Permission check is done when open,
2226  * no need check again.
2227  */
2228 static int mdd_readpage_sanity_check(const struct lu_env *env,
2229                                      struct mdd_object *obj)
2230 {
2231         struct dt_object *next = mdd_object_child(obj);
2232         int rc;
2233         ENTRY;
2234
2235         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2236                 rc = 0;
2237         else
2238                 rc = -ENOTDIR;
2239
2240         RETURN(rc);
2241 }
2242
2243 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2244                               struct lu_dirpage *dp, int nob,
2245                               const struct dt_it_ops *iops, struct dt_it *it,
2246                               __u32 attr)
2247 {
2248         void                   *area = dp;
2249         int                     result;
2250         __u64                   hash = 0;
2251         struct lu_dirent       *ent;
2252         struct lu_dirent       *last = NULL;
2253         int                     first = 1;
2254
2255         memset(area, 0, sizeof (*dp));
2256         area += sizeof (*dp);
2257         nob  -= sizeof (*dp);
2258
2259         ent  = area;
2260         do {
2261                 int    len;
2262                 int    recsize;
2263
2264                 len  = iops->key_size(env, it);
2265
2266                 /* IAM iterator can return record with zero len. */
2267                 if (len == 0)
2268                         goto next;
2269
2270                 hash = iops->store(env, it);
2271                 if (unlikely(first)) {
2272                         first = 0;
2273                         dp->ldp_hash_start = cpu_to_le64(hash);
2274                 }
2275
2276                 /* calculate max space required for lu_dirent */
2277                 recsize = lu_dirent_calc_size(len, attr);
2278
2279                 if (nob >= recsize) {
2280                         result = iops->rec(env, it, ent, attr);
2281                         if (result == -ESTALE)
2282                                 goto next;
2283                         if (result != 0)
2284                                 goto out;
2285
2286                         /* osd might not able to pack all attributes,
2287                          * so recheck rec length */
2288                         recsize = le16_to_cpu(ent->lde_reclen);
2289                 } else {
2290                         result = (last != NULL) ? 0 :-EINVAL;
2291                         goto out;
2292                 }
2293                 last = ent;
2294                 ent = (void *)ent + recsize;
2295                 nob -= recsize;
2296
2297 next:
2298                 result = iops->next(env, it);
2299                 if (result == -ESTALE)
2300                         goto next;
2301         } while (result == 0);
2302
2303 out:
2304         dp->ldp_hash_end = cpu_to_le64(hash);
2305         if (last != NULL) {
2306                 if (last->lde_hash == dp->ldp_hash_end)
2307                         dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2308                 last->lde_reclen = 0; /* end mark */
2309         }
2310         return result;
2311 }
2312
2313 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2314                           const struct lu_rdpg *rdpg)
2315 {
2316         struct dt_it      *it;
2317         struct dt_object  *next = mdd_object_child(obj);
2318         const struct dt_it_ops  *iops;
2319         struct page       *pg;
2320         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2321         int i;
2322         int nlupgs = 0;
2323         int rc;
2324         int nob;
2325
2326         LASSERT(rdpg->rp_pages != NULL);
2327         LASSERT(next->do_index_ops != NULL);
2328
2329         if (rdpg->rp_count <= 0)
2330                 return -EFAULT;
2331
2332         /*
2333          * iterate through directory and fill pages from @rdpg
2334          */
2335         iops = &next->do_index_ops->dio_it;
2336         it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
2337         if (IS_ERR(it))
2338                 return PTR_ERR(it);
2339
2340         rc = iops->load(env, it, rdpg->rp_hash);
2341
2342         if (rc == 0) {
2343                 /*
2344                  * Iterator didn't find record with exactly the key requested.
2345                  *
2346                  * It is currently either
2347                  *
2348                  *     - positioned above record with key less than
2349                  *     requested---skip it.
2350                  *
2351                  *     - or not positioned at all (is in IAM_IT_SKEWED
2352                  *     state)---position it on the next item.
2353                  */
2354                 rc = iops->next(env, it);
2355         } else if (rc > 0)
2356                 rc = 0;
2357
2358         /*
2359          * At this point and across for-loop:
2360          *
2361          *  rc == 0 -> ok, proceed.
2362          *  rc >  0 -> end of directory.
2363          *  rc <  0 -> error.
2364          */
2365         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2366              i++, nob -= CFS_PAGE_SIZE) {
2367                 struct lu_dirpage *dp;
2368
2369                 LASSERT(i < rdpg->rp_npages);
2370                 pg = rdpg->rp_pages[i];
2371                 dp = cfs_kmap(pg);
2372 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2373 repeat:
2374 #endif
2375                 rc = mdd_dir_page_build(env, mdd, dp,
2376                                         min_t(int, nob, LU_PAGE_SIZE),
2377                                         iops, it, rdpg->rp_attrs);
2378                 if (rc > 0) {
2379                         /*
2380                          * end of directory.
2381                          */
2382                         dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2383                         nlupgs++;
2384                 } else if (rc < 0) {
2385                         CWARN("build page failed: %d!\n", rc);
2386                 } else {
2387                         nlupgs++;
2388 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2389                         dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2390                         if ((unsigned long)dp & ~CFS_PAGE_MASK)
2391                                 goto repeat;
2392 #endif
2393                 }
2394                 cfs_kunmap(pg);
2395         }
2396         if (rc >= 0) {
2397                 struct lu_dirpage *dp;
2398
2399                 dp = cfs_kmap(rdpg->rp_pages[0]);
2400                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2401                 if (nlupgs == 0) {
2402                         /*
2403                          * No pages were processed, mark this for first page
2404                          * and send back.
2405                          */
2406                         dp->ldp_flags  = cpu_to_le32(LDF_EMPTY);
2407                         nlupgs = 1;
2408                 }
2409                 cfs_kunmap(rdpg->rp_pages[0]);
2410
2411                 rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
2412         }
2413         iops->put(env, it);
2414         iops->fini(env, it);
2415
2416         return rc;
2417 }
2418
2419 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2420                  const struct lu_rdpg *rdpg)
2421 {
2422         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2423         int rc;
2424         ENTRY;
2425
2426         LASSERT(mdd_object_exists(mdd_obj));
2427
2428         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2429         rc = mdd_readpage_sanity_check(env, mdd_obj);
2430         if (rc)
2431                 GOTO(out_unlock, rc);
2432
2433         if (mdd_is_dead_obj(mdd_obj)) {
2434                 struct page *pg;
2435                 struct lu_dirpage *dp;
2436
2437                 /*
2438                  * According to POSIX, please do not return any entry to client:
2439                  * even dot and dotdot should not be returned.
2440                  */
2441                 CWARN("readdir from dead object: "DFID"\n",
2442                         PFID(mdd_object_fid(mdd_obj)));
2443
2444                 if (rdpg->rp_count <= 0)
2445                         GOTO(out_unlock, rc = -EFAULT);
2446                 LASSERT(rdpg->rp_pages != NULL);
2447
2448                 pg = rdpg->rp_pages[0];
2449                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2450                 memset(dp, 0 , sizeof(struct lu_dirpage));
2451                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2452                 dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
2453                 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2454                 cfs_kunmap(pg);
2455                 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2456         }
2457
2458         rc = __mdd_readpage(env, mdd_obj, rdpg);
2459
2460         EXIT;
2461 out_unlock:
2462         mdd_read_unlock(env, mdd_obj);
2463         return rc;
2464 }
2465
2466 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2467 {
2468         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2469         struct dt_object *next;
2470
2471         LASSERT(mdd_object_exists(mdd_obj));
2472         next = mdd_object_child(mdd_obj);
2473         return next->do_ops->do_object_sync(env, next);
2474 }
2475
2476 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2477                                         struct md_object *obj)
2478 {
2479         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2480
2481         LASSERT(mdd_object_exists(mdd_obj));
2482         return do_version_get(env, mdd_object_child(mdd_obj));
2483 }
2484
2485 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2486                             dt_obj_version_t version)
2487 {
2488         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2489
2490         LASSERT(mdd_object_exists(mdd_obj));
2491         do_version_set(env, mdd_object_child(mdd_obj), version);
2492 }
2493
2494 const struct md_object_operations mdd_obj_ops = {
2495         .moo_permission    = mdd_permission,
2496         .moo_attr_get      = mdd_attr_get,
2497         .moo_attr_set      = mdd_attr_set,
2498         .moo_xattr_get     = mdd_xattr_get,
2499         .moo_xattr_set     = mdd_xattr_set,
2500         .moo_xattr_list    = mdd_xattr_list,
2501         .moo_xattr_del     = mdd_xattr_del,
2502         .moo_object_create = mdd_object_create,
2503         .moo_ref_add       = mdd_ref_add,
2504         .moo_ref_del       = mdd_ref_del,
2505         .moo_open          = mdd_open,
2506         .moo_close         = mdd_close,
2507         .moo_readpage      = mdd_readpage,
2508         .moo_readlink      = mdd_readlink,
2509         .moo_changelog     = mdd_changelog,
2510         .moo_capa_get      = mdd_capa_get,
2511         .moo_object_sync   = mdd_object_sync,
2512         .moo_version_get   = mdd_version_get,
2513         .moo_version_set   = mdd_version_set,
2514         .moo_path          = mdd_path,
2515         .moo_file_lock     = mdd_file_lock,
2516         .moo_file_unlock   = mdd_file_unlock,
2517 };