Whamcloud - gitweb
8f8a42489b29826afcdb9235d520d12165f34ed8
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_object.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49 #ifdef HAVE_EXT4_LDISKFS
50 #include <ldiskfs/ldiskfs_jbd2.h>
51 #else
52 #include <linux/jbd.h>
53 #endif
54 #include <obd.h>
55 #include <obd_class.h>
56 #include <obd_support.h>
57 #include <lprocfs_status.h>
58 /* fid_be_cpu(), fid_cpu_to_be(). */
59 #include <lustre_fid.h>
60
61 #include <lustre_param.h>
62 #ifdef HAVE_EXT4_LDISKFS
63 #include <ldiskfs/ldiskfs.h>
64 #else
65 #include <linux/ldiskfs_fs.h>
66 #endif
67 #include <lustre_mds.h>
68 #include <lustre/lustre_idl.h>
69
70 #include "mdd_internal.h"
71
72 static const struct lu_object_operations mdd_lu_obj_ops;
73
74 static int mdd_xattr_get(const struct lu_env *env,
75                          struct md_object *obj, struct lu_buf *buf,
76                          const char *name);
77
78 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
79                  void **data)
80 {
81         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
82                  PFID(mdd_object_fid(obj)));
83         mdo_data_get(env, obj, data);
84         return 0;
85 }
86
87 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
88                struct lu_attr *la, struct lustre_capa *capa)
89 {
90         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
91                  PFID(mdd_object_fid(obj)));
92         return mdo_attr_get(env, obj, la, capa);
93 }
94
95 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
96 {
97         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
98
99         if (flags & LUSTRE_APPEND_FL)
100                 obj->mod_flags |= APPEND_OBJ;
101
102         if (flags & LUSTRE_IMMUTABLE_FL)
103                 obj->mod_flags |= IMMUTE_OBJ;
104 }
105
106 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
107 {
108         struct mdd_thread_info *info;
109
110         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
111         LASSERT(info != NULL);
112         return info;
113 }
114
115 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
116 {
117         struct lu_buf *buf;
118
119         buf = &mdd_env_info(env)->mti_buf;
120         buf->lb_buf = area;
121         buf->lb_len = len;
122         return buf;
123 }
124
125 void mdd_buf_put(struct lu_buf *buf)
126 {
127         if (buf == NULL || buf->lb_buf == NULL)
128                 return;
129         OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
130         buf->lb_buf = NULL;
131         buf->lb_len = 0;
132 }
133
134 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
135                                        const void *area, ssize_t len)
136 {
137         struct lu_buf *buf;
138
139         buf = &mdd_env_info(env)->mti_buf;
140         buf->lb_buf = (void *)area;
141         buf->lb_len = len;
142         return buf;
143 }
144
145 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
146 {
147         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
148
149         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
150                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
151                 buf->lb_buf = NULL;
152         }
153         if (buf->lb_buf == NULL) {
154                 buf->lb_len = len;
155                 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
156                 if (buf->lb_buf == NULL)
157                         buf->lb_len = 0;
158         }
159         return buf;
160 }
161
162 /** Increase the size of the \a mti_big_buf.
163  * preserves old data in buffer
164  * old buffer remains unchanged on error
165  * \retval 0 or -ENOMEM
166  */
167 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
168 {
169         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
170         struct lu_buf buf;
171
172         LASSERT(len >= oldbuf->lb_len);
173         OBD_ALLOC_LARGE(buf.lb_buf, len);
174
175         if (buf.lb_buf == NULL)
176                 return -ENOMEM;
177
178         buf.lb_len = len;
179         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
180
181         OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
182
183         memcpy(oldbuf, &buf, sizeof(buf));
184
185         return 0;
186 }
187
188 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
189                                        struct mdd_device *mdd)
190 {
191         struct mdd_thread_info *mti = mdd_env_info(env);
192         int                     max_cookie_size;
193
194         max_cookie_size = mdd_lov_cookiesize(env, mdd);
195         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
196                 if (mti->mti_max_cookie)
197                         OBD_FREE_LARGE(mti->mti_max_cookie,
198                                        mti->mti_max_cookie_size);
199                 mti->mti_max_cookie = NULL;
200                 mti->mti_max_cookie_size = 0;
201         }
202         if (unlikely(mti->mti_max_cookie == NULL)) {
203                 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
204                 if (likely(mti->mti_max_cookie != NULL))
205                         mti->mti_max_cookie_size = max_cookie_size;
206         }
207         if (likely(mti->mti_max_cookie != NULL))
208                 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
209         return mti->mti_max_cookie;
210 }
211
212 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
213                                    struct mdd_device *mdd)
214 {
215         struct mdd_thread_info *mti = mdd_env_info(env);
216         int                     max_lmm_size;
217
218         max_lmm_size = mdd_lov_mdsize(env, mdd);
219         if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
220                 if (mti->mti_max_lmm)
221                         OBD_FREE_LARGE(mti->mti_max_lmm, mti->mti_max_lmm_size);
222                 mti->mti_max_lmm = NULL;
223                 mti->mti_max_lmm_size = 0;
224         }
225         if (unlikely(mti->mti_max_lmm == NULL)) {
226                 OBD_ALLOC_LARGE(mti->mti_max_lmm, max_lmm_size);
227                 if (likely(mti->mti_max_lmm != NULL))
228                         mti->mti_max_lmm_size = max_lmm_size;
229         }
230         return mti->mti_max_lmm;
231 }
232
233 struct lu_object *mdd_object_alloc(const struct lu_env *env,
234                                    const struct lu_object_header *hdr,
235                                    struct lu_device *d)
236 {
237         struct mdd_object *mdd_obj;
238
239         OBD_ALLOC_PTR(mdd_obj);
240         if (mdd_obj != NULL) {
241                 struct lu_object *o;
242
243                 o = mdd2lu_obj(mdd_obj);
244                 lu_object_init(o, NULL, d);
245                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
246                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
247                 mdd_obj->mod_count = 0;
248                 o->lo_ops = &mdd_lu_obj_ops;
249                 return o;
250         } else {
251                 return NULL;
252         }
253 }
254
255 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
256                            const struct lu_object_conf *unused)
257 {
258         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
259         struct mdd_object *mdd_obj = lu2mdd_obj(o);
260         struct lu_object  *below;
261         struct lu_device  *under;
262         ENTRY;
263
264         mdd_obj->mod_cltime = 0;
265         under = &d->mdd_child->dd_lu_dev;
266         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
267         mdd_pdlock_init(mdd_obj);
268         if (below == NULL)
269                 RETURN(-ENOMEM);
270
271         lu_object_add(o, below);
272
273         RETURN(0);
274 }
275
276 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
277 {
278         if (lu_object_exists(o))
279                 return mdd_get_flags(env, lu2mdd_obj(o));
280         else
281                 return 0;
282 }
283
284 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
285 {
286         struct mdd_object *mdd = lu2mdd_obj(o);
287
288         lu_object_fini(o);
289         OBD_FREE_PTR(mdd);
290 }
291
292 static int mdd_object_print(const struct lu_env *env, void *cookie,
293                             lu_printer_t p, const struct lu_object *o)
294 {
295         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
296         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
297                     "valid=%x, cltime="LPU64", flags=%lx)",
298                     mdd, mdd->mod_count, mdd->mod_valid,
299                     mdd->mod_cltime, mdd->mod_flags);
300 }
301
302 static const struct lu_object_operations mdd_lu_obj_ops = {
303         .loo_object_init    = mdd_object_init,
304         .loo_object_start   = mdd_object_start,
305         .loo_object_free    = mdd_object_free,
306         .loo_object_print   = mdd_object_print,
307 };
308
309 struct mdd_object *mdd_object_find(const struct lu_env *env,
310                                    struct mdd_device *d,
311                                    const struct lu_fid *f)
312 {
313         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
314 }
315
316 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
317                         const char *path, struct lu_fid *fid)
318 {
319         struct lu_buf *buf;
320         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
321         struct mdd_object *obj;
322         struct lu_name *lname = &mdd_env_info(env)->mti_name;
323         char *name;
324         int rc = 0;
325         ENTRY;
326
327         /* temp buffer for path element */
328         buf = mdd_buf_alloc(env, PATH_MAX);
329         if (buf->lb_buf == NULL)
330                 RETURN(-ENOMEM);
331
332         lname->ln_name = name = buf->lb_buf;
333         lname->ln_namelen = 0;
334         *f = mdd->mdd_root_fid;
335
336         while(1) {
337                 while (*path == '/')
338                         path++;
339                 if (*path == '\0')
340                         break;
341                 while (*path != '/' && *path != '\0') {
342                         *name = *path;
343                         path++;
344                         name++;
345                         lname->ln_namelen++;
346                 }
347
348                 *name = '\0';
349                 /* find obj corresponding to fid */
350                 obj = mdd_object_find(env, mdd, f);
351                 if (obj == NULL)
352                         GOTO(out, rc = -EREMOTE);
353                 if (IS_ERR(obj))
354                         GOTO(out, rc = PTR_ERR(obj));
355                 /* get child fid from parent and name */
356                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
357                 mdd_object_put(env, obj);
358                 if (rc)
359                         break;
360
361                 name = buf->lb_buf;
362                 lname->ln_namelen = 0;
363         }
364
365         if (!rc)
366                 *fid = *f;
367 out:
368         RETURN(rc);
369 }
370
371 /** The maximum depth that fid2path() will search.
372  * This is limited only because we want to store the fids for
373  * historical path lookup purposes.
374  */
375 #define MAX_PATH_DEPTH 100
376
377 /** mdd_path() lookup structure. */
378 struct path_lookup_info {
379         __u64                pli_recno;        /**< history point */
380         __u64                pli_currec;       /**< current record */
381         struct lu_fid        pli_fid;
382         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
383         struct mdd_object   *pli_mdd_obj;
384         char                *pli_path;         /**< full path */
385         int                  pli_pathlen;
386         int                  pli_linkno;       /**< which hardlink to follow */
387         int                  pli_fidcount;     /**< number of \a pli_fids */
388 };
389
390 static int mdd_path_current(const struct lu_env *env,
391                             struct path_lookup_info *pli)
392 {
393         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
394         struct mdd_object *mdd_obj;
395         struct lu_buf     *buf = NULL;
396         struct link_ea_header *leh;
397         struct link_ea_entry  *lee;
398         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
399         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
400         char *ptr;
401         int reclen;
402         int rc;
403         ENTRY;
404
405         ptr = pli->pli_path + pli->pli_pathlen - 1;
406         *ptr = 0;
407         --ptr;
408         pli->pli_fidcount = 0;
409         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
410
411         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
412                 mdd_obj = mdd_object_find(env, mdd,
413                                           &pli->pli_fids[pli->pli_fidcount]);
414                 if (mdd_obj == NULL)
415                         GOTO(out, rc = -EREMOTE);
416                 if (IS_ERR(mdd_obj))
417                         GOTO(out, rc = PTR_ERR(mdd_obj));
418                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
419                 if (rc <= 0) {
420                         mdd_object_put(env, mdd_obj);
421                         if (rc == -1)
422                                 rc = -EREMOTE;
423                         else if (rc == 0)
424                                 /* Do I need to error out here? */
425                                 rc = -ENOENT;
426                         GOTO(out, rc);
427                 }
428
429                 /* Get parent fid and object name */
430                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
431                 buf = mdd_links_get(env, mdd_obj);
432                 mdd_read_unlock(env, mdd_obj);
433                 mdd_object_put(env, mdd_obj);
434                 if (IS_ERR(buf))
435                         GOTO(out, rc = PTR_ERR(buf));
436
437                 leh = buf->lb_buf;
438                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
439                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
440
441                 /* If set, use link #linkno for path lookup, otherwise use
442                    link #0.  Only do this for the final path element. */
443                 if ((pli->pli_fidcount == 0) &&
444                     (pli->pli_linkno < leh->leh_reccount)) {
445                         int count;
446                         for (count = 0; count < pli->pli_linkno; count++) {
447                                 lee = (struct link_ea_entry *)
448                                      ((char *)lee + reclen);
449                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
450                         }
451                         if (pli->pli_linkno < leh->leh_reccount - 1)
452                                 /* indicate to user there are more links */
453                                 pli->pli_linkno++;
454                 }
455
456                 /* Pack the name in the end of the buffer */
457                 ptr -= tmpname->ln_namelen;
458                 if (ptr - 1 <= pli->pli_path)
459                         GOTO(out, rc = -EOVERFLOW);
460                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
461                 *(--ptr) = '/';
462
463                 /* Store the parent fid for historic lookup */
464                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
465                         GOTO(out, rc = -EOVERFLOW);
466                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
467         }
468
469         /* Verify that our path hasn't changed since we started the lookup.
470            Record the current index, and verify the path resolves to the
471            same fid. If it does, then the path is correct as of this index. */
472         cfs_spin_lock(&mdd->mdd_cl.mc_lock);
473         pli->pli_currec = mdd->mdd_cl.mc_index;
474         cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
475         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
476         if (rc) {
477                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
478                 GOTO (out, rc = -EAGAIN);
479         }
480         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
481                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
482                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
483                        PFID(&pli->pli_fid));
484                 GOTO(out, rc = -EAGAIN);
485         }
486         ptr++; /* skip leading / */
487         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
488
489         EXIT;
490 out:
491         if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
492                 /* if we vmalloced a large buffer drop it */
493                 mdd_buf_put(buf);
494
495         return rc;
496 }
497
498 static int mdd_path_historic(const struct lu_env *env,
499                              struct path_lookup_info *pli)
500 {
501         return 0;
502 }
503
504 /* Returns the full path to this fid, as of changelog record recno. */
505 static int mdd_path(const struct lu_env *env, struct md_object *obj,
506                     char *path, int pathlen, __u64 *recno, int *linkno)
507 {
508         struct path_lookup_info *pli;
509         int tries = 3;
510         int rc = -EAGAIN;
511         ENTRY;
512
513         if (pathlen < 3)
514                 RETURN(-EOVERFLOW);
515
516         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
517                 path[0] = '\0';
518                 RETURN(0);
519         }
520
521         OBD_ALLOC_PTR(pli);
522         if (pli == NULL)
523                 RETURN(-ENOMEM);
524
525         pli->pli_mdd_obj = md2mdd_obj(obj);
526         pli->pli_recno = *recno;
527         pli->pli_path = path;
528         pli->pli_pathlen = pathlen;
529         pli->pli_linkno = *linkno;
530
531         /* Retry multiple times in case file is being moved */
532         while (tries-- && rc == -EAGAIN)
533                 rc = mdd_path_current(env, pli);
534
535         /* For historical path lookup, the current links may not have existed
536          * at "recno" time.  We must switch over to earlier links/parents
537          * by using the changelog records.  If the earlier parent doesn't
538          * exist, we must search back through the changelog to reconstruct
539          * its parents, then check if it exists, etc.
540          * We may ignore this problem for the initial implementation and
541          * state that an "original" hardlink must still exist for us to find
542          * historic path name. */
543         if (pli->pli_recno != -1) {
544                 rc = mdd_path_historic(env, pli);
545         } else {
546                 *recno = pli->pli_currec;
547                 /* Return next link index to caller */
548                 *linkno = pli->pli_linkno;
549         }
550
551         OBD_FREE_PTR(pli);
552
553         RETURN (rc);
554 }
555
556 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
557 {
558         struct lu_attr *la = &mdd_env_info(env)->mti_la;
559         int rc;
560
561         ENTRY;
562         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
563         if (rc == 0) {
564                 mdd_flags_xlate(obj, la->la_flags);
565                 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
566                         obj->mod_flags |= MNLINK_OBJ;
567         }
568         RETURN(rc);
569 }
570
571 /* get only inode attributes */
572 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
573                   struct md_attr *ma)
574 {
575         int rc = 0;
576         ENTRY;
577
578         if (ma->ma_valid & MA_INODE)
579                 RETURN(0);
580
581         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
582                           mdd_object_capa(env, mdd_obj));
583         if (rc == 0)
584                 ma->ma_valid |= MA_INODE;
585         RETURN(rc);
586 }
587
588 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
589 {
590         struct lov_desc *ldesc;
591         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
592         struct lov_user_md *lum = (struct lov_user_md*)lmm;
593         ENTRY;
594
595         if (!lum)
596                 RETURN(0);
597
598         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
599         LASSERT(ldesc != NULL);
600
601         lum->lmm_magic = LOV_MAGIC_V1;
602         lum->lmm_object_seq = LOV_OBJECT_GROUP_DEFAULT;
603         lum->lmm_pattern = ldesc->ld_pattern;
604         lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
605         lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
606         lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
607
608         RETURN(sizeof(*lum));
609 }
610
611 /* get lov EA only */
612 static int __mdd_lmm_get(const struct lu_env *env,
613                          struct mdd_object *mdd_obj, struct md_attr *ma)
614 {
615         int rc;
616         ENTRY;
617
618         if (ma->ma_valid & MA_LOV)
619                 RETURN(0);
620
621         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
622                         XATTR_NAME_LOV);
623         if (rc == 0 && (ma->ma_need & MA_LOV_DEF))
624                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
625         if (rc > 0) {
626                 ma->ma_lmm_size = rc;
627                 ma->ma_valid |= MA_LOV;
628                 rc = 0;
629         }
630         RETURN(rc);
631 }
632
633 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
634                        struct md_attr *ma)
635 {
636         int rc;
637         ENTRY;
638
639         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
640         rc = __mdd_lmm_get(env, mdd_obj, ma);
641         mdd_read_unlock(env, mdd_obj);
642         RETURN(rc);
643 }
644
645 /* get lmv EA only*/
646 static int __mdd_lmv_get(const struct lu_env *env,
647                          struct mdd_object *mdd_obj, struct md_attr *ma)
648 {
649         int rc;
650         ENTRY;
651
652         if (ma->ma_valid & MA_LMV)
653                 RETURN(0);
654
655         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
656                         XATTR_NAME_LMV);
657         if (rc > 0) {
658                 ma->ma_valid |= MA_LMV;
659                 rc = 0;
660         }
661         RETURN(rc);
662 }
663
664 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
665                          struct md_attr *ma)
666 {
667         struct mdd_thread_info *info = mdd_env_info(env);
668         struct lustre_mdt_attrs *lma =
669                                  (struct lustre_mdt_attrs *)info->mti_xattr_buf;
670         int lma_size;
671         int rc;
672         ENTRY;
673
674         /* If all needed data are already valid, nothing to do */
675         if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
676             (ma->ma_need & (MA_HSM | MA_SOM)))
677                 RETURN(0);
678
679         /* Read LMA from disk EA */
680         lma_size = sizeof(info->mti_xattr_buf);
681         rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
682         if (rc <= 0)
683                 RETURN(rc);
684
685         /* Useless to check LMA incompatibility because this is already done in
686          * osd_ea_fid_get(), and this will fail long before this code is
687          * called.
688          * So, if we are here, LMA is compatible.
689          */
690
691         lustre_lma_swab(lma);
692
693         /* Swab and copy LMA */
694         if (ma->ma_need & MA_HSM) {
695                 if (lma->lma_compat & LMAC_HSM)
696                         ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
697                 else
698                         ma->ma_hsm.mh_flags = 0;
699                 ma->ma_valid |= MA_HSM;
700         }
701
702         /* Copy SOM */
703         if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
704                 LASSERT(ma->ma_som != NULL);
705                 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
706                 ma->ma_som->msd_size    = lma->lma_som_size;
707                 ma->ma_som->msd_blocks  = lma->lma_som_blocks;
708                 ma->ma_som->msd_mountid = lma->lma_som_mountid;
709                 ma->ma_valid |= MA_SOM;
710         }
711
712         RETURN(0);
713 }
714
715 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
716                                  struct md_attr *ma)
717 {
718         int rc = 0;
719         ENTRY;
720
721         if (ma->ma_need & MA_INODE)
722                 rc = mdd_iattr_get(env, mdd_obj, ma);
723
724         if (rc == 0 && ma->ma_need & MA_LOV) {
725                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
726                     S_ISDIR(mdd_object_type(mdd_obj)))
727                         rc = __mdd_lmm_get(env, mdd_obj, ma);
728         }
729         if (rc == 0 && ma->ma_need & MA_LMV) {
730                 if (S_ISDIR(mdd_object_type(mdd_obj)))
731                         rc = __mdd_lmv_get(env, mdd_obj, ma);
732         }
733         if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
734                 if (S_ISREG(mdd_object_type(mdd_obj)))
735                         rc = __mdd_lma_get(env, mdd_obj, ma);
736         }
737 #ifdef CONFIG_FS_POSIX_ACL
738         if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
739                 if (S_ISDIR(mdd_object_type(mdd_obj)))
740                         rc = mdd_def_acl_get(env, mdd_obj, ma);
741         }
742 #endif
743         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
744                rc, ma->ma_valid, ma->ma_lmm);
745         RETURN(rc);
746 }
747
748 int mdd_attr_get_internal_locked(const struct lu_env *env,
749                                  struct mdd_object *mdd_obj, struct md_attr *ma)
750 {
751         int rc;
752         int needlock = ma->ma_need &
753                        (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM);
754
755         if (needlock)
756                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
757         rc = mdd_attr_get_internal(env, mdd_obj, ma);
758         if (needlock)
759                 mdd_read_unlock(env, mdd_obj);
760         return rc;
761 }
762
763 /*
764  * No permission check is needed.
765  */
766 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
767                         struct md_attr *ma)
768 {
769         struct mdd_object *mdd_obj = md2mdd_obj(obj);
770         int                rc;
771
772         ENTRY;
773         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
774         RETURN(rc);
775 }
776
777 /*
778  * No permission check is needed.
779  */
780 static int mdd_xattr_get(const struct lu_env *env,
781                          struct md_object *obj, struct lu_buf *buf,
782                          const char *name)
783 {
784         struct mdd_object *mdd_obj = md2mdd_obj(obj);
785         int rc;
786
787         ENTRY;
788
789         LASSERT(mdd_object_exists(mdd_obj));
790
791         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
792         rc = mdo_xattr_get(env, mdd_obj, buf, name,
793                            mdd_object_capa(env, mdd_obj));
794         mdd_read_unlock(env, mdd_obj);
795
796         RETURN(rc);
797 }
798
799 /*
800  * Permission check is done when open,
801  * no need check again.
802  */
803 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
804                         struct lu_buf *buf)
805 {
806         struct mdd_object *mdd_obj = md2mdd_obj(obj);
807         struct dt_object  *next;
808         loff_t             pos = 0;
809         int                rc;
810         ENTRY;
811
812         LASSERT(mdd_object_exists(mdd_obj));
813
814         next = mdd_object_child(mdd_obj);
815         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
816         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
817                                          mdd_object_capa(env, mdd_obj));
818         mdd_read_unlock(env, mdd_obj);
819         RETURN(rc);
820 }
821
822 /*
823  * No permission check is needed.
824  */
825 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
826                           struct lu_buf *buf)
827 {
828         struct mdd_object *mdd_obj = md2mdd_obj(obj);
829         int rc;
830
831         ENTRY;
832
833         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
834         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
835         mdd_read_unlock(env, mdd_obj);
836
837         RETURN(rc);
838 }
839
840 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
841                                struct mdd_object *c, struct md_attr *ma,
842                                struct thandle *handle,
843                                const struct md_op_spec *spec)
844 {
845         struct lu_attr *attr = &ma->ma_attr;
846         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
847         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
848         const struct dt_index_features *feat = spec->sp_feat;
849         int rc;
850         ENTRY;
851
852         if (!mdd_object_exists(c)) {
853                 struct dt_object *next = mdd_object_child(c);
854                 LASSERT(next);
855
856                 if (feat != &dt_directory_features && feat != NULL)
857                         dof->dof_type = DFT_INDEX;
858                 else
859                         dof->dof_type = dt_mode_to_dft(attr->la_mode);
860
861                 dof->u.dof_idx.di_feat = feat;
862
863                 /* @hint will be initialized by underlying device. */
864                 next->do_ops->do_ah_init(env, hint,
865                                          p ? mdd_object_child(p) : NULL,
866                                          attr->la_mode & S_IFMT);
867
868                 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
869                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
870         } else
871                 rc = -EEXIST;
872
873         RETURN(rc);
874 }
875
876 /**
877  * Make sure the ctime is increased only.
878  */
879 static inline int mdd_attr_check(const struct lu_env *env,
880                                  struct mdd_object *obj,
881                                  struct lu_attr *attr)
882 {
883         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
884         int rc;
885         ENTRY;
886
887         if (attr->la_valid & LA_CTIME) {
888                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
889                 if (rc)
890                         RETURN(rc);
891
892                 if (attr->la_ctime < tmp_la->la_ctime)
893                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
894                 else if (attr->la_valid == LA_CTIME &&
895                          attr->la_ctime == tmp_la->la_ctime)
896                         attr->la_valid &= ~LA_CTIME;
897         }
898         RETURN(0);
899 }
900
901 int mdd_attr_set_internal(const struct lu_env *env,
902                           struct mdd_object *obj,
903                           struct lu_attr *attr,
904                           struct thandle *handle,
905                           int needacl)
906 {
907         int rc;
908         ENTRY;
909
910         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
911 #ifdef CONFIG_FS_POSIX_ACL
912         if (!rc && (attr->la_valid & LA_MODE) && needacl)
913                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
914 #endif
915         RETURN(rc);
916 }
917
918 int mdd_attr_check_set_internal(const struct lu_env *env,
919                                 struct mdd_object *obj,
920                                 struct lu_attr *attr,
921                                 struct thandle *handle,
922                                 int needacl)
923 {
924         int rc;
925         ENTRY;
926
927         rc = mdd_attr_check(env, obj, attr);
928         if (rc)
929                 RETURN(rc);
930
931         if (attr->la_valid)
932                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
933         RETURN(rc);
934 }
935
936 static int mdd_attr_set_internal_locked(const struct lu_env *env,
937                                         struct mdd_object *obj,
938                                         struct lu_attr *attr,
939                                         struct thandle *handle,
940                                         int needacl)
941 {
942         int rc;
943         ENTRY;
944
945         needacl = needacl && (attr->la_valid & LA_MODE);
946         if (needacl)
947                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
948         rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
949         if (needacl)
950                 mdd_write_unlock(env, obj);
951         RETURN(rc);
952 }
953
954 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
955                                        struct mdd_object *obj,
956                                        struct lu_attr *attr,
957                                        struct thandle *handle,
958                                        int needacl)
959 {
960         int rc;
961         ENTRY;
962
963         needacl = needacl && (attr->la_valid & LA_MODE);
964         if (needacl)
965                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
966         rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
967         if (needacl)
968                 mdd_write_unlock(env, obj);
969         RETURN(rc);
970 }
971
972 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
973                     const struct lu_buf *buf, const char *name,
974                     int fl, struct thandle *handle)
975 {
976         struct lustre_capa *capa = mdd_object_capa(env, obj);
977         int rc = -EINVAL;
978         ENTRY;
979
980         if (buf->lb_buf && buf->lb_len > 0)
981                 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
982         else if (buf->lb_buf == NULL && buf->lb_len == 0)
983                 rc = mdo_xattr_del(env, obj, name, handle, capa);
984
985         RETURN(rc);
986 }
987
988 /*
989  * This gives the same functionality as the code between
990  * sys_chmod and inode_setattr
991  * chown_common and inode_setattr
992  * utimes and inode_setattr
993  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
994  */
995 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
996                         struct lu_attr *la, const struct md_attr *ma)
997 {
998         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
999         struct md_ucred  *uc;
1000         int               rc;
1001         ENTRY;
1002
1003         if (!la->la_valid)
1004                 RETURN(0);
1005
1006         /* Do not permit change file type */
1007         if (la->la_valid & LA_TYPE)
1008                 RETURN(-EPERM);
1009
1010         /* They should not be processed by setattr */
1011         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1012                 RETURN(-EPERM);
1013
1014         /* export destroy does not have ->le_ses, but we may want
1015          * to drop LUSTRE_SOM_FL. */
1016         if (!env->le_ses)
1017                 RETURN(0);
1018
1019         uc = md_ucred(env);
1020
1021         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1022         if (rc)
1023                 RETURN(rc);
1024
1025         if (la->la_valid == LA_CTIME) {
1026                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1027                         /* This is only for set ctime when rename's source is
1028                          * on remote MDS. */
1029                         rc = mdd_may_delete(env, NULL, obj,
1030                                             (struct md_attr *)ma, 1, 0);
1031                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1032                         la->la_valid &= ~LA_CTIME;
1033                 RETURN(rc);
1034         }
1035
1036         if (la->la_valid == LA_ATIME) {
1037                 /* This is atime only set for read atime update on close. */
1038                 if (la->la_atime >= tmp_la->la_atime &&
1039                     la->la_atime < (tmp_la->la_atime +
1040                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1041                         la->la_valid &= ~LA_ATIME;
1042                 RETURN(0);
1043         }
1044
1045         /* Check if flags change. */
1046         if (la->la_valid & LA_FLAGS) {
1047                 unsigned int oldflags = 0;
1048                 unsigned int newflags = la->la_flags &
1049                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1050
1051                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1052                     !mdd_capable(uc, CFS_CAP_FOWNER))
1053                         RETURN(-EPERM);
1054
1055                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1056                  * only be changed by the relevant capability. */
1057                 if (mdd_is_immutable(obj))
1058                         oldflags |= LUSTRE_IMMUTABLE_FL;
1059                 if (mdd_is_append(obj))
1060                         oldflags |= LUSTRE_APPEND_FL;
1061                 if ((oldflags ^ newflags) &&
1062                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1063                         RETURN(-EPERM);
1064
1065                 if (!S_ISDIR(tmp_la->la_mode))
1066                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1067         }
1068
1069         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1070             (la->la_valid & ~LA_FLAGS) &&
1071             !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1072                 RETURN(-EPERM);
1073
1074         /* Check for setting the obj time. */
1075         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1076             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1077                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1078                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
1079                         rc = mdd_permission_internal_locked(env, obj, tmp_la,
1080                                                             MAY_WRITE,
1081                                                             MOR_TGT_CHILD);
1082                         if (rc)
1083                                 RETURN(rc);
1084                 }
1085         }
1086
1087         if (la->la_valid & LA_KILL_SUID) {
1088                 la->la_valid &= ~LA_KILL_SUID;
1089                 if ((tmp_la->la_mode & S_ISUID) &&
1090                     !(la->la_valid & LA_MODE)) {
1091                         la->la_mode = tmp_la->la_mode;
1092                         la->la_valid |= LA_MODE;
1093                 }
1094                 la->la_mode &= ~S_ISUID;
1095         }
1096
1097         if (la->la_valid & LA_KILL_SGID) {
1098                 la->la_valid &= ~LA_KILL_SGID;
1099                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1100                                         (S_ISGID | S_IXGRP)) &&
1101                     !(la->la_valid & LA_MODE)) {
1102                         la->la_mode = tmp_la->la_mode;
1103                         la->la_valid |= LA_MODE;
1104                 }
1105                 la->la_mode &= ~S_ISGID;
1106         }
1107
1108         /* Make sure a caller can chmod. */
1109         if (la->la_valid & LA_MODE) {
1110                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1111                     (uc->mu_fsuid != tmp_la->la_uid) &&
1112                     !mdd_capable(uc, CFS_CAP_FOWNER))
1113                         RETURN(-EPERM);
1114
1115                 if (la->la_mode == (cfs_umode_t) -1)
1116                         la->la_mode = tmp_la->la_mode;
1117                 else
1118                         la->la_mode = (la->la_mode & S_IALLUGO) |
1119                                       (tmp_la->la_mode & ~S_IALLUGO);
1120
1121                 /* Also check the setgid bit! */
1122                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1123                                        la->la_gid : tmp_la->la_gid) &&
1124                     !mdd_capable(uc, CFS_CAP_FSETID))
1125                         la->la_mode &= ~S_ISGID;
1126         } else {
1127                la->la_mode = tmp_la->la_mode;
1128         }
1129
1130         /* Make sure a caller can chown. */
1131         if (la->la_valid & LA_UID) {
1132                 if (la->la_uid == (uid_t) -1)
1133                         la->la_uid = tmp_la->la_uid;
1134                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1135                     (la->la_uid != tmp_la->la_uid)) &&
1136                     !mdd_capable(uc, CFS_CAP_CHOWN))
1137                         RETURN(-EPERM);
1138
1139                 /* If the user or group of a non-directory has been
1140                  * changed by a non-root user, remove the setuid bit.
1141                  * 19981026 David C Niemi <niemi@tux.org>
1142                  *
1143                  * Changed this to apply to all users, including root,
1144                  * to avoid some races. This is the behavior we had in
1145                  * 2.0. The check for non-root was definitely wrong
1146                  * for 2.2 anyway, as it should have been using
1147                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
1148                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1149                     !S_ISDIR(tmp_la->la_mode)) {
1150                         la->la_mode &= ~S_ISUID;
1151                         la->la_valid |= LA_MODE;
1152                 }
1153         }
1154
1155         /* Make sure caller can chgrp. */
1156         if (la->la_valid & LA_GID) {
1157                 if (la->la_gid == (gid_t) -1)
1158                         la->la_gid = tmp_la->la_gid;
1159                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1160                     ((la->la_gid != tmp_la->la_gid) &&
1161                     !lustre_in_group_p(uc, la->la_gid))) &&
1162                     !mdd_capable(uc, CFS_CAP_CHOWN))
1163                         RETURN(-EPERM);
1164
1165                 /* Likewise, if the user or group of a non-directory
1166                  * has been changed by a non-root user, remove the
1167                  * setgid bit UNLESS there is no group execute bit
1168                  * (this would be a file marked for mandatory
1169                  * locking).  19981026 David C Niemi <niemi@tux.org>
1170                  *
1171                  * Removed the fsuid check (see the comment above) --
1172                  * 19990830 SD. */
1173                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1174                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1175                         la->la_mode &= ~S_ISGID;
1176                         la->la_valid |= LA_MODE;
1177                 }
1178         }
1179
1180         /* For both Size-on-MDS case and truncate case,
1181          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1182          * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1183          * For SOM case, it is true, the MAY_WRITE perm has been checked
1184          * when open, no need check again. For truncate case, it is false,
1185          * the MAY_WRITE perm should be checked here. */
1186         if (ma->ma_attr_flags & MDS_SOM) {
1187                 /* For the "Size-on-MDS" setattr update, merge coming
1188                  * attributes with the set in the inode. BUG 10641 */
1189                 if ((la->la_valid & LA_ATIME) &&
1190                     (la->la_atime <= tmp_la->la_atime))
1191                         la->la_valid &= ~LA_ATIME;
1192
1193                 /* OST attributes do not have a priority over MDS attributes,
1194                  * so drop times if ctime is equal. */
1195                 if ((la->la_valid & LA_CTIME) &&
1196                     (la->la_ctime <= tmp_la->la_ctime))
1197                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1198         } else {
1199                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1200                         if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1201                               (uc->mu_fsuid == tmp_la->la_uid)) &&
1202                             !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1203                                 rc = mdd_permission_internal_locked(env, obj,
1204                                                             tmp_la, MAY_WRITE,
1205                                                             MOR_TGT_CHILD);
1206                                 if (rc)
1207                                         RETURN(rc);
1208                         }
1209                 }
1210                 if (la->la_valid & LA_CTIME) {
1211                         /* The pure setattr, it has the priority over what is
1212                          * already set, do not drop it if ctime is equal. */
1213                         if (la->la_ctime < tmp_la->la_ctime)
1214                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1215                                                   LA_CTIME);
1216                 }
1217         }
1218
1219         RETURN(0);
1220 }
1221
1222 /** Store a data change changelog record
1223  * If this fails, we must fail the whole transaction; we don't
1224  * want the change to commit without the log entry.
1225  * \param mdd_obj - mdd_object of change
1226  * \param handle - transacion handle
1227  */
1228 static int mdd_changelog_data_store(const struct lu_env     *env,
1229                                     struct mdd_device       *mdd,
1230                                     enum changelog_rec_type type,
1231                                     int                     flags,
1232                                     struct mdd_object       *mdd_obj,
1233                                     struct thandle          *handle)
1234 {
1235         const struct lu_fid *tfid = mdo2fid(mdd_obj);
1236         struct llog_changelog_rec *rec;
1237         struct lu_buf *buf;
1238         int reclen;
1239         int rc;
1240
1241         /* Not recording */
1242         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1243                 RETURN(0);
1244         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1245                 RETURN(0);
1246
1247         LASSERT(handle != NULL);
1248         LASSERT(mdd_obj != NULL);
1249
1250         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1251             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1252                 /* Don't need multiple updates in this log */
1253                 /* Don't check under lock - no big deal if we get an extra
1254                    entry */
1255                 RETURN(0);
1256         }
1257
1258         reclen = llog_data_len(sizeof(*rec));
1259         buf = mdd_buf_alloc(env, reclen);
1260         if (buf->lb_buf == NULL)
1261                 RETURN(-ENOMEM);
1262         rec = (struct llog_changelog_rec *)buf->lb_buf;
1263
1264         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1265         rec->cr.cr_type = (__u32)type;
1266         rec->cr.cr_tfid = *tfid;
1267         rec->cr.cr_namelen = 0;
1268         mdd_obj->mod_cltime = cfs_time_current_64();
1269
1270         rc = mdd_changelog_llog_write(mdd, rec, handle);
1271         if (rc < 0) {
1272                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1273                        rc, type, PFID(tfid));
1274                 return -EFAULT;
1275         }
1276
1277         return 0;
1278 }
1279
1280 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1281                   int flags, struct md_object *obj)
1282 {
1283         struct thandle *handle;
1284         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1285         struct mdd_device *mdd = mdo2mdd(obj);
1286         int rc;
1287         ENTRY;
1288
1289         handle = mdd_trans_start(env, mdd);
1290
1291         if (IS_ERR(handle))
1292                 return(PTR_ERR(handle));
1293
1294         rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1295                                       handle);
1296
1297         mdd_trans_stop(env, mdd, rc, handle);
1298
1299         RETURN(rc);
1300 }
1301
1302 /**
1303  * Should be called with write lock held.
1304  *
1305  * \see mdd_lma_set_locked().
1306  */
1307 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1308                        const struct md_attr *ma, struct thandle *handle)
1309 {
1310         struct mdd_thread_info *info = mdd_env_info(env);
1311         struct lu_buf *buf;
1312         struct lustre_mdt_attrs *lma =
1313                                 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1314         int lmasize = sizeof(struct lustre_mdt_attrs);
1315         int rc = 0;
1316
1317         ENTRY;
1318
1319         /* Either HSM or SOM part is not valid, we need to read it before */
1320         if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1321                 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1322                 if (rc <= 0)
1323                         RETURN(rc);
1324
1325                 lustre_lma_swab(lma);
1326         } else {
1327                 memset(lma, 0, lmasize);
1328         }
1329
1330         /* Copy HSM data */
1331         if (ma->ma_valid & MA_HSM) {
1332                 lma->lma_flags  |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1333                 lma->lma_compat |= LMAC_HSM;
1334         }
1335
1336         /* Copy SOM data */
1337         if (ma->ma_valid & MA_SOM) {
1338                 LASSERT(ma->ma_som != NULL);
1339                 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1340                         lma->lma_compat     &= ~LMAC_SOM;
1341                 } else {
1342                         lma->lma_compat     |= LMAC_SOM;
1343                         lma->lma_ioepoch     = ma->ma_som->msd_ioepoch;
1344                         lma->lma_som_size    = ma->ma_som->msd_size;
1345                         lma->lma_som_blocks  = ma->ma_som->msd_blocks;
1346                         lma->lma_som_mountid = ma->ma_som->msd_mountid;
1347                 }
1348         }
1349
1350         /* Copy FID */
1351         memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1352
1353         lustre_lma_swab(lma);
1354         buf = mdd_buf_get(env, lma, lmasize);
1355         rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1356
1357         RETURN(rc);
1358 }
1359
1360 /**
1361  * Save LMA extended attributes with data from \a ma.
1362  *
1363  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1364  * not, LMA EA will be first read from disk, modified and write back.
1365  *
1366  */
1367 static int mdd_lma_set_locked(const struct lu_env *env,
1368                               struct mdd_object *mdd_obj,
1369                               const struct md_attr *ma, struct thandle *handle)
1370 {
1371         int rc;
1372
1373         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1374         rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1375         mdd_write_unlock(env, mdd_obj);
1376         return rc;
1377 }
1378
1379 /* Precedence for choosing record type when multiple
1380  * attributes change: setattr > mtime > ctime > atime
1381  * (ctime changes when mtime does, plus chmod/chown.
1382  * atime and ctime are independent.) */
1383 static int mdd_attr_set_changelog(const struct lu_env *env,
1384                                   struct md_object *obj, struct thandle *handle,
1385                                   __u64 valid)
1386 {
1387         struct mdd_device *mdd = mdo2mdd(obj);
1388         int bits, type = 0;
1389
1390         bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1391         bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1392         bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1393         bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1394         bits = bits & mdd->mdd_cl.mc_mask;
1395         if (bits == 0)
1396                 return 0;
1397
1398         /* The record type is the lowest non-masked set bit */
1399         while (bits && ((bits & 1) == 0)) {
1400                 bits = bits >> 1;
1401                 type++;
1402         }
1403
1404         /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1405         return mdd_changelog_data_store(env, mdd, type, (int)valid,
1406                                         md2mdd_obj(obj), handle);
1407 }
1408
1409 /* set attr and LOV EA at once, return updated attr */
1410 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1411                         const struct md_attr *ma)
1412 {
1413         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1414         struct mdd_device *mdd = mdo2mdd(obj);
1415         struct thandle *handle;
1416         struct lov_mds_md *lmm = NULL;
1417         struct llog_cookie *logcookies = NULL;
1418         int  rc, lmm_size = 0, cookie_size = 0;
1419         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1420 #ifdef HAVE_QUOTA_SUPPORT
1421         struct obd_device *obd = mdd->mdd_obd_dev;
1422         struct mds_obd *mds = &obd->u.mds;
1423         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1424         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1425         int quota_opc = 0, block_count = 0;
1426         int inode_pending[MAXQUOTAS] = { 0, 0 };
1427         int block_pending[MAXQUOTAS] = { 0, 0 };
1428 #endif
1429         ENTRY;
1430
1431         *la_copy = ma->ma_attr;
1432         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1433         if (rc != 0)
1434                 RETURN(rc);
1435
1436         /* setattr on "close" only change atime, or do nothing */
1437         if (ma->ma_valid == MA_INODE &&
1438             ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1439                 RETURN(0);
1440
1441         mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1442                                     MDD_TXN_ATTR_SET_OP);
1443         handle = mdd_trans_start(env, mdd);
1444         if (IS_ERR(handle))
1445                 RETURN(PTR_ERR(handle));
1446         /*TODO: add lock here*/
1447         /* start a log jounal handle if needed */
1448         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1449             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1450                 lmm_size = mdd_lov_mdsize(env, mdd);
1451                 lmm = mdd_max_lmm_get(env, mdd);
1452                 if (lmm == NULL)
1453                         GOTO(cleanup, rc = -ENOMEM);
1454
1455                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1456                                 XATTR_NAME_LOV);
1457
1458                 if (rc < 0)
1459                         GOTO(cleanup, rc);
1460         }
1461
1462         if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1463                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1464                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1465
1466 #ifdef HAVE_QUOTA_SUPPORT
1467         if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1468                 struct obd_export *exp = md_quota(env)->mq_exp;
1469                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1470
1471                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1472                 if (!rc) {
1473                         quota_opc = FSFILT_OP_SETATTR;
1474                         mdd_quota_wrapper(la_copy, qnids);
1475                         mdd_quota_wrapper(la_tmp, qoids);
1476                         /* get file quota for new owner */
1477                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1478                                         qnids, inode_pending, 1, NULL, 0,
1479                                         NULL, 0);
1480                         block_count = (la_tmp->la_blocks + 7) >> 3;
1481                         if (block_count) {
1482                                 void *data = NULL;
1483                                 mdd_data_get(env, mdd_obj, &data);
1484                                 /* get block quota for new owner */
1485                                 lquota_chkquota(mds_quota_interface_ref, obd,
1486                                                 exp, qnids, block_pending,
1487                                                 block_count, NULL,
1488                                                 LQUOTA_FLAGS_BLK, data, 1);
1489                         }
1490                 }
1491         }
1492 #endif
1493
1494         if (la_copy->la_valid & LA_FLAGS) {
1495                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1496                                                   handle, 1);
1497                 if (rc == 0)
1498                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1499         } else if (la_copy->la_valid) {            /* setattr */
1500                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1501                                                   handle, 1);
1502                 /* journal chown/chgrp in llog, just like unlink */
1503                 if (rc == 0 && lmm_size){
1504                         cookie_size = mdd_lov_cookiesize(env, mdd);
1505                         logcookies = mdd_max_cookie_get(env, mdd);
1506                         if (logcookies == NULL)
1507                                 GOTO(cleanup, rc = -ENOMEM);
1508
1509                         if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1510                                             logcookies, cookie_size) <= 0)
1511                                 logcookies = NULL;
1512                 }
1513         }
1514
1515         if (rc == 0 && ma->ma_valid & MA_LOV) {
1516                 cfs_umode_t mode;
1517
1518                 mode = mdd_object_type(mdd_obj);
1519                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1520                         rc = mdd_lsm_sanity_check(env, mdd_obj);
1521                         if (rc)
1522                                 GOTO(cleanup, rc);
1523
1524                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1525                                             ma->ma_lmm_size, handle, 1);
1526                 }
1527
1528         }
1529         if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1530                 cfs_umode_t mode;
1531
1532                 mode = mdd_object_type(mdd_obj);
1533                 if (S_ISREG(mode))
1534                         rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1535
1536         }
1537 cleanup:
1538         if (rc == 0)
1539                 rc = mdd_attr_set_changelog(env, obj, handle,
1540                                             ma->ma_attr.la_valid);
1541         mdd_trans_stop(env, mdd, rc, handle);
1542         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1543                 /*set obd attr, if needed*/
1544                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1545                                            logcookies);
1546         }
1547 #ifdef HAVE_QUOTA_SUPPORT
1548         if (quota_opc) {
1549                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1550                                       inode_pending, 0);
1551                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1552                                       block_pending, 1);
1553                 /* Trigger dqrel/dqacq for original owner and new owner.
1554                  * If failed, the next call for lquota_chkquota will
1555                  * process it. */
1556                 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1557                               quota_opc);
1558         }
1559 #endif
1560         RETURN(rc);
1561 }
1562
1563 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1564                       const struct lu_buf *buf, const char *name, int fl,
1565                       struct thandle *handle)
1566 {
1567         int  rc;
1568         ENTRY;
1569
1570         mdd_write_lock(env, obj, MOR_TGT_CHILD);
1571         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1572         mdd_write_unlock(env, obj);
1573
1574         RETURN(rc);
1575 }
1576
1577 static int mdd_xattr_sanity_check(const struct lu_env *env,
1578                                   struct mdd_object *obj)
1579 {
1580         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1581         struct md_ucred *uc     = md_ucred(env);
1582         int rc;
1583         ENTRY;
1584
1585         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1586                 RETURN(-EPERM);
1587
1588         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1589         if (rc)
1590                 RETURN(rc);
1591
1592         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1593             !mdd_capable(uc, CFS_CAP_FOWNER))
1594                 RETURN(-EPERM);
1595
1596         RETURN(rc);
1597 }
1598
1599 /**
1600  * The caller should guarantee to update the object ctime
1601  * after xattr_set if needed.
1602  */
1603 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1604                          const struct lu_buf *buf, const char *name,
1605                          int fl)
1606 {
1607         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1608         struct mdd_device *mdd = mdo2mdd(obj);
1609         struct thandle *handle;
1610         int  rc;
1611         ENTRY;
1612
1613         rc = mdd_xattr_sanity_check(env, mdd_obj);
1614         if (rc)
1615                 RETURN(rc);
1616
1617         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1618         /* security-replated changes may require sync */
1619         if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1620             mdd->mdd_sync_permission == 1)
1621                 txn_param_sync(&mdd_env_info(env)->mti_param);
1622
1623         handle = mdd_trans_start(env, mdd);
1624         if (IS_ERR(handle))
1625                 RETURN(PTR_ERR(handle));
1626
1627         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1628
1629         /* Only record user xattr changes */
1630         if ((rc == 0) && (strncmp("user.", name, 5) == 0))
1631                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1632                                               handle);
1633         mdd_trans_stop(env, mdd, rc, handle);
1634
1635         RETURN(rc);
1636 }
1637
1638 /**
1639  * The caller should guarantee to update the object ctime
1640  * after xattr_set if needed.
1641  */
1642 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1643                   const char *name)
1644 {
1645         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1646         struct mdd_device *mdd = mdo2mdd(obj);
1647         struct thandle *handle;
1648         int  rc;
1649         ENTRY;
1650
1651         rc = mdd_xattr_sanity_check(env, mdd_obj);
1652         if (rc)
1653                 RETURN(rc);
1654
1655         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1656         handle = mdd_trans_start(env, mdd);
1657         if (IS_ERR(handle))
1658                 RETURN(PTR_ERR(handle));
1659
1660         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1661         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1662                            mdd_object_capa(env, mdd_obj));
1663         mdd_write_unlock(env, mdd_obj);
1664
1665         /* Only record user xattr changes */
1666         if ((rc == 0) && (strncmp("user.", name, 5) != 0))
1667                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1668                                               handle);
1669
1670         mdd_trans_stop(env, mdd, rc, handle);
1671
1672         RETURN(rc);
1673 }
1674
1675 /* partial unlink */
1676 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1677                        struct md_attr *ma)
1678 {
1679         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1680         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1681         struct mdd_device *mdd = mdo2mdd(obj);
1682         struct thandle *handle;
1683 #ifdef HAVE_QUOTA_SUPPORT
1684         struct obd_device *obd = mdd->mdd_obd_dev;
1685         struct mds_obd *mds = &obd->u.mds;
1686         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1687         int quota_opc = 0;
1688 #endif
1689         int rc;
1690         ENTRY;
1691
1692         /*
1693          * Check -ENOENT early here because we need to get object type
1694          * to calculate credits before transaction start
1695          */
1696         if (!mdd_object_exists(mdd_obj))
1697                 RETURN(-ENOENT);
1698
1699         LASSERT(mdd_object_exists(mdd_obj) > 0);
1700
1701         rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1702         if (rc)
1703                 RETURN(rc);
1704
1705         handle = mdd_trans_start(env, mdd);
1706         if (IS_ERR(handle))
1707                 RETURN(-ENOMEM);
1708
1709         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1710
1711         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1712         if (rc)
1713                 GOTO(cleanup, rc);
1714
1715         __mdd_ref_del(env, mdd_obj, handle, 0);
1716
1717         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1718                 /* unlink dot */
1719                 __mdd_ref_del(env, mdd_obj, handle, 1);
1720         }
1721
1722         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1723         la_copy->la_ctime = ma->ma_attr.la_ctime;
1724
1725         la_copy->la_valid = LA_CTIME;
1726         rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1727         if (rc)
1728                 GOTO(cleanup, rc);
1729
1730         rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1731 #ifdef HAVE_QUOTA_SUPPORT
1732         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1733             ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1734                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1735                 mdd_quota_wrapper(&ma->ma_attr, qids);
1736         }
1737 #endif
1738
1739
1740         EXIT;
1741 cleanup:
1742         mdd_write_unlock(env, mdd_obj);
1743         mdd_trans_stop(env, mdd, rc, handle);
1744 #ifdef HAVE_QUOTA_SUPPORT
1745         if (quota_opc)
1746                 /* Trigger dqrel on the owner of child. If failed,
1747                  * the next call for lquota_chkquota will process it */
1748                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1749                               quota_opc);
1750 #endif
1751         return rc;
1752 }
1753
1754 /* partial operation */
1755 static int mdd_oc_sanity_check(const struct lu_env *env,
1756                                struct mdd_object *obj,
1757                                struct md_attr *ma)
1758 {
1759         int rc;
1760         ENTRY;
1761
1762         switch (ma->ma_attr.la_mode & S_IFMT) {
1763         case S_IFREG:
1764         case S_IFDIR:
1765         case S_IFLNK:
1766         case S_IFCHR:
1767         case S_IFBLK:
1768         case S_IFIFO:
1769         case S_IFSOCK:
1770                 rc = 0;
1771                 break;
1772         default:
1773                 rc = -EINVAL;
1774                 break;
1775         }
1776         RETURN(rc);
1777 }
1778
1779 static int mdd_object_create(const struct lu_env *env,
1780                              struct md_object *obj,
1781                              const struct md_op_spec *spec,
1782                              struct md_attr *ma)
1783 {
1784
1785         struct mdd_device *mdd = mdo2mdd(obj);
1786         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1787         const struct lu_fid *pfid = spec->u.sp_pfid;
1788         struct thandle *handle;
1789 #ifdef HAVE_QUOTA_SUPPORT
1790         struct obd_device *obd = mdd->mdd_obd_dev;
1791         struct obd_export *exp = md_quota(env)->mq_exp;
1792         struct mds_obd *mds = &obd->u.mds;
1793         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1794         int quota_opc = 0, block_count = 0;
1795         int inode_pending[MAXQUOTAS] = { 0, 0 };
1796         int block_pending[MAXQUOTAS] = { 0, 0 };
1797 #endif
1798         int rc = 0;
1799         ENTRY;
1800
1801 #ifdef HAVE_QUOTA_SUPPORT
1802         if (mds->mds_quota) {
1803                 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1804                 mdd_quota_wrapper(&ma->ma_attr, qids);
1805                 /* get file quota for child */
1806                 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1807                                 qids, inode_pending, 1, NULL, 0,
1808                                 NULL, 0);
1809                 switch (ma->ma_attr.la_mode & S_IFMT) {
1810                 case S_IFLNK:
1811                 case S_IFDIR:
1812                         block_count = 2;
1813                         break;
1814                 case S_IFREG:
1815                         block_count = 1;
1816                         break;
1817                 }
1818                 /* get block quota for child */
1819                 if (block_count)
1820                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1821                                         qids, block_pending, block_count,
1822                                         NULL, LQUOTA_FLAGS_BLK, NULL, 0);
1823         }
1824 #endif
1825
1826         mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
1827         handle = mdd_trans_start(env, mdd);
1828         if (IS_ERR(handle))
1829                 GOTO(out_pending, rc = PTR_ERR(handle));
1830
1831         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1832         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1833         if (rc)
1834                 GOTO(unlock, rc);
1835
1836         rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1837         if (rc)
1838                 GOTO(unlock, rc);
1839
1840         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1841                 /* If creating the slave object, set slave EA here. */
1842                 int lmv_size = spec->u.sp_ea.eadatalen;
1843                 struct lmv_stripe_md *lmv;
1844
1845                 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1846                 LASSERT(lmv != NULL && lmv_size > 0);
1847
1848                 rc = __mdd_xattr_set(env, mdd_obj,
1849                                      mdd_buf_get_const(env, lmv, lmv_size),
1850                                      XATTR_NAME_LMV, 0, handle);
1851                 if (rc)
1852                         GOTO(unlock, rc);
1853
1854                 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1855                                            handle, 0);
1856         } else {
1857 #ifdef CONFIG_FS_POSIX_ACL
1858                 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1859                         struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1860
1861                         buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1862                         buf->lb_len = spec->u.sp_ea.eadatalen;
1863                         if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1864                                 rc = __mdd_acl_init(env, mdd_obj, buf,
1865                                                     &ma->ma_attr.la_mode,
1866                                                     handle);
1867                                 if (rc)
1868                                         GOTO(unlock, rc);
1869                                 else
1870                                         ma->ma_attr.la_valid |= LA_MODE;
1871                         }
1872
1873                         pfid = spec->u.sp_ea.fid;
1874                 }
1875 #endif
1876                 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1877                                            spec);
1878         }
1879         EXIT;
1880 unlock:
1881         if (rc == 0)
1882                 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1883         mdd_write_unlock(env, mdd_obj);
1884
1885         mdd_trans_stop(env, mdd, rc, handle);
1886 out_pending:
1887 #ifdef HAVE_QUOTA_SUPPORT
1888         if (quota_opc) {
1889                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1890                                       inode_pending, 0);
1891                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1892                                       block_pending, 1);
1893                 /* Trigger dqacq on the owner of child. If failed,
1894                  * the next call for lquota_chkquota will process it. */
1895                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1896                               quota_opc);
1897         }
1898 #endif
1899         return rc;
1900 }
1901
1902 /* partial link */
1903 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1904                        const struct md_attr *ma)
1905 {
1906         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1907         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1908         struct mdd_device *mdd = mdo2mdd(obj);
1909         struct thandle *handle;
1910         int rc;
1911         ENTRY;
1912
1913         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1914         handle = mdd_trans_start(env, mdd);
1915         if (IS_ERR(handle))
1916                 RETURN(-ENOMEM);
1917
1918         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1919         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1920         if (rc == 0)
1921                 __mdd_ref_add(env, mdd_obj, handle);
1922         mdd_write_unlock(env, mdd_obj);
1923         if (rc == 0) {
1924                 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1925                 la_copy->la_ctime = ma->ma_attr.la_ctime;
1926
1927                 la_copy->la_valid = LA_CTIME;
1928                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1929                                                         handle, 0);
1930         }
1931         mdd_trans_stop(env, mdd, 0, handle);
1932
1933         RETURN(rc);
1934 }
1935
1936 /*
1937  * do NOT or the MAY_*'s, you'll get the weakest
1938  */
1939 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1940 {
1941         int res = 0;
1942
1943         /* Sadly, NFSD reopens a file repeatedly during operation, so the
1944          * "acc_mode = 0" allowance for newly-created files isn't honoured.
1945          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1946          * owner can write to a file even if it is marked readonly to hide
1947          * its brokenness. (bug 5781) */
1948         if (flags & MDS_OPEN_OWNEROVERRIDE) {
1949                 struct md_ucred *uc = md_ucred(env);
1950
1951                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1952                     (la->la_uid == uc->mu_fsuid))
1953                         return 0;
1954         }
1955
1956         if (flags & FMODE_READ)
1957                 res |= MAY_READ;
1958         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1959                 res |= MAY_WRITE;
1960         if (flags & MDS_FMODE_EXEC)
1961                 res = MAY_EXEC;
1962         return res;
1963 }
1964
1965 static int mdd_open_sanity_check(const struct lu_env *env,
1966                                  struct mdd_object *obj, int flag)
1967 {
1968         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1969         int mode, rc;
1970         ENTRY;
1971
1972         /* EEXIST check */
1973         if (mdd_is_dead_obj(obj))
1974                 RETURN(-ENOENT);
1975
1976         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1977         if (rc)
1978                RETURN(rc);
1979
1980         if (S_ISLNK(tmp_la->la_mode))
1981                 RETURN(-ELOOP);
1982
1983         mode = accmode(env, tmp_la, flag);
1984
1985         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1986                 RETURN(-EISDIR);
1987
1988         if (!(flag & MDS_OPEN_CREATED)) {
1989                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1990                 if (rc)
1991                         RETURN(rc);
1992         }
1993
1994         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1995             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1996                 flag &= ~MDS_OPEN_TRUNC;
1997
1998         /* For writing append-only file must open it with append mode. */
1999         if (mdd_is_append(obj)) {
2000                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2001                         RETURN(-EPERM);
2002                 if (flag & MDS_OPEN_TRUNC)
2003                         RETURN(-EPERM);
2004         }
2005
2006 #if 0
2007         /*
2008          * Now, flag -- O_NOATIME does not be packed by client.
2009          */
2010         if (flag & O_NOATIME) {
2011                 struct md_ucred *uc = md_ucred(env);
2012
2013                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2014                     (uc->mu_valid == UCRED_NEW)) &&
2015                     (uc->mu_fsuid != tmp_la->la_uid) &&
2016                     !mdd_capable(uc, CFS_CAP_FOWNER))
2017                         RETURN(-EPERM);
2018         }
2019 #endif
2020
2021         RETURN(0);
2022 }
2023
2024 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2025                     int flags)
2026 {
2027         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2028         int rc = 0;
2029
2030         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2031
2032         rc = mdd_open_sanity_check(env, mdd_obj, flags);
2033         if (rc == 0)
2034                 mdd_obj->mod_count++;
2035
2036         mdd_write_unlock(env, mdd_obj);
2037         return rc;
2038 }
2039
2040 /* return md_attr back,
2041  * if it is last unlink then return lov ea + llog cookie*/
2042 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2043                     struct md_attr *ma)
2044 {
2045         int rc = 0;
2046         ENTRY;
2047
2048         if (S_ISREG(mdd_object_type(obj))) {
2049                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2050                  * Caller must be ready for that. */
2051
2052                 rc = __mdd_lmm_get(env, obj, ma);
2053                 if ((ma->ma_valid & MA_LOV))
2054                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2055                                             obj, ma);
2056         }
2057         RETURN(rc);
2058 }
2059
2060 /*
2061  * No permission check is needed.
2062  */
2063 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2064                      struct md_attr *ma)
2065 {
2066         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2067         struct mdd_device *mdd = mdo2mdd(obj);
2068         struct thandle    *handle = NULL;
2069         int rc;
2070         int reset = 1;
2071
2072 #ifdef HAVE_QUOTA_SUPPORT
2073         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2074         struct mds_obd *mds = &obd->u.mds;
2075         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2076         int quota_opc = 0;
2077 #endif
2078         ENTRY;
2079
2080         if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2081                 mdd_obj->mod_count--;
2082
2083                 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2084                         CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2085                                "list\n", PFID(mdd_object_fid(mdd_obj)));
2086                 RETURN(0);
2087         }
2088
2089         /* check without any lock */
2090         if (mdd_obj->mod_count == 1 &&
2091             (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2092  again:
2093                 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
2094                 if (rc)
2095                         RETURN(rc);
2096                 handle = mdd_trans_start(env, mdo2mdd(obj));
2097                 if (IS_ERR(handle))
2098                         RETURN(PTR_ERR(handle));
2099         }
2100
2101         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2102         if (handle == NULL && mdd_obj->mod_count == 1 &&
2103             (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2104                 mdd_write_unlock(env, mdd_obj);
2105                 goto again;
2106         }
2107
2108         /* release open count */
2109         mdd_obj->mod_count --;
2110
2111         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2112                 /* remove link to object from orphan index */
2113                 rc = __mdd_orphan_del(env, mdd_obj, handle);
2114                 if (rc == 0) {
2115                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2116                                "list, OSS objects to be destroyed.\n",
2117                                PFID(mdd_object_fid(mdd_obj)));
2118                 } else {
2119                         CERROR("Object "DFID" can not be deleted from orphan "
2120                                 "list, maybe cause OST objects can not be "
2121                                 "destroyed (err: %d).\n",
2122                                 PFID(mdd_object_fid(mdd_obj)), rc);
2123                         /* If object was not deleted from orphan list, do not
2124                          * destroy OSS objects, which will be done when next
2125                          * recovery. */
2126                         GOTO(out, rc);
2127                 }
2128         }
2129
2130         rc = mdd_iattr_get(env, mdd_obj, ma);
2131         /* Object maybe not in orphan list originally, it is rare case for
2132          * mdd_finish_unlink() failure. */
2133         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
2134 #ifdef HAVE_QUOTA_SUPPORT
2135                 if (mds->mds_quota) {
2136                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2137                         mdd_quota_wrapper(&ma->ma_attr, qids);
2138                 }
2139 #endif
2140                 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2141                 if (ma->ma_valid & MA_FLAGS &&
2142                     ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2143                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2144                 } else {
2145                         rc = mdd_object_kill(env, mdd_obj, ma);
2146                         if (rc == 0)
2147                                 reset = 0;
2148                 }
2149
2150                 if (rc != 0)
2151                         CERROR("Error when prepare to delete Object "DFID" , "
2152                                "which will cause OST objects can not be "
2153                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
2154         }
2155         EXIT;
2156
2157 out:
2158         if (reset)
2159                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2160
2161         mdd_write_unlock(env, mdd_obj);
2162         if (handle != NULL)
2163                 mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
2164 #ifdef HAVE_QUOTA_SUPPORT
2165         if (quota_opc)
2166                 /* Trigger dqrel on the owner of child. If failed,
2167                  * the next call for lquota_chkquota will process it */
2168                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2169                               quota_opc);
2170 #endif
2171         return rc;
2172 }
2173
2174 /*
2175  * Permission check is done when open,
2176  * no need check again.
2177  */
2178 static int mdd_readpage_sanity_check(const struct lu_env *env,
2179                                      struct mdd_object *obj)
2180 {
2181         struct dt_object *next = mdd_object_child(obj);
2182         int rc;
2183         ENTRY;
2184
2185         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2186                 rc = 0;
2187         else
2188                 rc = -ENOTDIR;
2189
2190         RETURN(rc);
2191 }
2192
2193 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2194                               int first, void *area, int nob,
2195                               const struct dt_it_ops *iops, struct dt_it *it,
2196                               __u64 *start, __u64 *end,
2197                               struct lu_dirent **last, __u32 attr)
2198 {
2199         int                     result;
2200         __u64                   hash = 0;
2201         struct lu_dirent       *ent;
2202
2203         if (first) {
2204                 memset(area, 0, sizeof (struct lu_dirpage));
2205                 area += sizeof (struct lu_dirpage);
2206                 nob  -= sizeof (struct lu_dirpage);
2207         }
2208
2209         ent  = area;
2210         do {
2211                 int    len;
2212                 int    recsize;
2213
2214                 len  = iops->key_size(env, it);
2215
2216                 /* IAM iterator can return record with zero len. */
2217                 if (len == 0)
2218                         goto next;
2219
2220                 hash = iops->store(env, it);
2221                 if (unlikely(first)) {
2222                         first = 0;
2223                         *start = hash;
2224                 }
2225
2226                 /* calculate max space required for lu_dirent */
2227                 recsize = lu_dirent_calc_size(len, attr);
2228
2229                 if (nob >= recsize) {
2230                         result = iops->rec(env, it, ent, attr);
2231                         if (result == -ESTALE)
2232                                 goto next;
2233                         if (result != 0)
2234                                 goto out;
2235
2236                         /* osd might not able to pack all attributes,
2237                          * so recheck rec length */
2238                         recsize = le16_to_cpu(ent->lde_reclen);
2239                 } else {
2240                         /*
2241                          * record doesn't fit into page, enlarge previous one.
2242                          */
2243                         if (*last) {
2244                                 (*last)->lde_reclen =
2245                                         cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
2246                                                         nob);
2247                                 result = 0;
2248                         } else
2249                                 result = -EINVAL;
2250
2251                         goto out;
2252                 }
2253                 *last = ent;
2254                 ent = (void *)ent + recsize;
2255                 nob -= recsize;
2256
2257 next:
2258                 result = iops->next(env, it);
2259                 if (result == -ESTALE)
2260                         goto next;
2261         } while (result == 0);
2262
2263 out:
2264         *end = hash;
2265         return result;
2266 }
2267
2268 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2269                           const struct lu_rdpg *rdpg)
2270 {
2271         struct dt_it      *it;
2272         struct dt_object  *next = mdd_object_child(obj);
2273         const struct dt_it_ops  *iops;
2274         struct page       *pg;
2275         struct lu_dirent  *last = NULL;
2276         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2277         int i;
2278         int rc;
2279         int nob;
2280         __u64 hash_start;
2281         __u64 hash_end = 0;
2282
2283         LASSERT(rdpg->rp_pages != NULL);
2284         LASSERT(next->do_index_ops != NULL);
2285
2286         if (rdpg->rp_count <= 0)
2287                 return -EFAULT;
2288
2289         /*
2290          * iterate through directory and fill pages from @rdpg
2291          */
2292         iops = &next->do_index_ops->dio_it;
2293         it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
2294         if (IS_ERR(it))
2295                 return PTR_ERR(it);
2296
2297         rc = iops->load(env, it, rdpg->rp_hash);
2298
2299         if (rc == 0){
2300                 /*
2301                  * Iterator didn't find record with exactly the key requested.
2302                  *
2303                  * It is currently either
2304                  *
2305                  *     - positioned above record with key less than
2306                  *     requested---skip it.
2307                  *
2308                  *     - or not positioned at all (is in IAM_IT_SKEWED
2309                  *     state)---position it on the next item.
2310                  */
2311                 rc = iops->next(env, it);
2312         } else if (rc > 0)
2313                 rc = 0;
2314
2315         /*
2316          * At this point and across for-loop:
2317          *
2318          *  rc == 0 -> ok, proceed.
2319          *  rc >  0 -> end of directory.
2320          *  rc <  0 -> error.
2321          */
2322         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2323              i++, nob -= CFS_PAGE_SIZE) {
2324                 LASSERT(i < rdpg->rp_npages);
2325                 pg = rdpg->rp_pages[i];
2326                 rc = mdd_dir_page_build(env, mdd, !i, cfs_kmap(pg),
2327                                         min_t(int, nob, CFS_PAGE_SIZE), iops,
2328                                         it, &hash_start, &hash_end, &last,
2329                                         rdpg->rp_attrs);
2330                 if (rc != 0 || i == rdpg->rp_npages - 1) {
2331                         if (last)
2332                                 last->lde_reclen = 0;
2333                 }
2334                 cfs_kunmap(pg);
2335         }
2336         if (rc > 0) {
2337                 /*
2338                  * end of directory.
2339                  */
2340                 hash_end = DIR_END_OFF;
2341                 rc = 0;
2342         }
2343         if (rc == 0) {
2344                 struct lu_dirpage *dp;
2345
2346                 dp = cfs_kmap(rdpg->rp_pages[0]);
2347                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2348                 dp->ldp_hash_end   = cpu_to_le64(hash_end);
2349                 if (i == 0)
2350                         /*
2351                          * No pages were processed, mark this.
2352                          */
2353                         dp->ldp_flags |= LDF_EMPTY;
2354
2355                 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2356                 cfs_kunmap(rdpg->rp_pages[0]);
2357         }
2358         iops->put(env, it);
2359         iops->fini(env, it);
2360
2361         return rc;
2362 }
2363
2364 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2365                  const struct lu_rdpg *rdpg)
2366 {
2367         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2368         int rc;
2369         ENTRY;
2370
2371         LASSERT(mdd_object_exists(mdd_obj));
2372
2373         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2374         rc = mdd_readpage_sanity_check(env, mdd_obj);
2375         if (rc)
2376                 GOTO(out_unlock, rc);
2377
2378         if (mdd_is_dead_obj(mdd_obj)) {
2379                 struct page *pg;
2380                 struct lu_dirpage *dp;
2381
2382                 /*
2383                  * According to POSIX, please do not return any entry to client:
2384                  * even dot and dotdot should not be returned.
2385                  */
2386                 CWARN("readdir from dead object: "DFID"\n",
2387                         PFID(mdd_object_fid(mdd_obj)));
2388
2389                 if (rdpg->rp_count <= 0)
2390                         GOTO(out_unlock, rc = -EFAULT);
2391                 LASSERT(rdpg->rp_pages != NULL);
2392
2393                 pg = rdpg->rp_pages[0];
2394                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2395                 memset(dp, 0 , sizeof(struct lu_dirpage));
2396                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2397                 dp->ldp_hash_end   = cpu_to_le64(DIR_END_OFF);
2398                 dp->ldp_flags |= LDF_EMPTY;
2399                 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2400                 cfs_kunmap(pg);
2401                 GOTO(out_unlock, rc = 0);
2402         }
2403
2404         rc = __mdd_readpage(env, mdd_obj, rdpg);
2405
2406         EXIT;
2407 out_unlock:
2408         mdd_read_unlock(env, mdd_obj);
2409         return rc;
2410 }
2411
2412 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2413 {
2414         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2415         struct dt_object *next;
2416
2417         LASSERT(mdd_object_exists(mdd_obj));
2418         next = mdd_object_child(mdd_obj);
2419         return next->do_ops->do_object_sync(env, next);
2420 }
2421
2422 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2423                                         struct md_object *obj)
2424 {
2425         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2426
2427         LASSERT(mdd_object_exists(mdd_obj));
2428         return do_version_get(env, mdd_object_child(mdd_obj));
2429 }
2430
2431 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2432                             dt_obj_version_t version)
2433 {
2434         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2435
2436         LASSERT(mdd_object_exists(mdd_obj));
2437         do_version_set(env, mdd_object_child(mdd_obj), version);
2438 }
2439
2440 const struct md_object_operations mdd_obj_ops = {
2441         .moo_permission    = mdd_permission,
2442         .moo_attr_get      = mdd_attr_get,
2443         .moo_attr_set      = mdd_attr_set,
2444         .moo_xattr_get     = mdd_xattr_get,
2445         .moo_xattr_set     = mdd_xattr_set,
2446         .moo_xattr_list    = mdd_xattr_list,
2447         .moo_xattr_del     = mdd_xattr_del,
2448         .moo_object_create = mdd_object_create,
2449         .moo_ref_add       = mdd_ref_add,
2450         .moo_ref_del       = mdd_ref_del,
2451         .moo_open          = mdd_open,
2452         .moo_close         = mdd_close,
2453         .moo_readpage      = mdd_readpage,
2454         .moo_readlink      = mdd_readlink,
2455         .moo_changelog     = mdd_changelog,
2456         .moo_capa_get      = mdd_capa_get,
2457         .moo_object_sync   = mdd_object_sync,
2458         .moo_version_get   = mdd_version_get,
2459         .moo_version_set   = mdd_version_set,
2460         .moo_path          = mdd_path,
2461         .moo_file_lock     = mdd_file_lock,
2462         .moo_file_unlock   = mdd_file_unlock,
2463 };