Whamcloud - gitweb
b=22376 accmode fix
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_object.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49 #ifdef HAVE_EXT4_LDISKFS
50 #include <ldiskfs/ldiskfs_jbd2.h>
51 #else
52 #include <linux/jbd.h>
53 #endif
54 #include <obd.h>
55 #include <obd_class.h>
56 #include <obd_support.h>
57 #include <lprocfs_status.h>
58 /* fid_be_cpu(), fid_cpu_to_be(). */
59 #include <lustre_fid.h>
60
61 #include <lustre_param.h>
62 #ifdef HAVE_EXT4_LDISKFS
63 #include <ldiskfs/ldiskfs.h>
64 #else
65 #include <linux/ldiskfs_fs.h>
66 #endif
67 #include <lustre_mds.h>
68 #include <lustre/lustre_idl.h>
69
70 #include "mdd_internal.h"
71
72 static const struct lu_object_operations mdd_lu_obj_ops;
73
74 static int mdd_xattr_get(const struct lu_env *env,
75                          struct md_object *obj, struct lu_buf *buf,
76                          const char *name);
77
78 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
79                  void **data)
80 {
81         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
82                  PFID(mdd_object_fid(obj)));
83         mdo_data_get(env, obj, data);
84         return 0;
85 }
86
87 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
88                struct lu_attr *la, struct lustre_capa *capa)
89 {
90         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
91                  PFID(mdd_object_fid(obj)));
92         return mdo_attr_get(env, obj, la, capa);
93 }
94
95 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
96 {
97         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
98
99         if (flags & LUSTRE_APPEND_FL)
100                 obj->mod_flags |= APPEND_OBJ;
101
102         if (flags & LUSTRE_IMMUTABLE_FL)
103                 obj->mod_flags |= IMMUTE_OBJ;
104 }
105
106 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
107 {
108         struct mdd_thread_info *info;
109
110         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
111         LASSERT(info != NULL);
112         return info;
113 }
114
115 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
116 {
117         struct lu_buf *buf;
118
119         buf = &mdd_env_info(env)->mti_buf;
120         buf->lb_buf = area;
121         buf->lb_len = len;
122         return buf;
123 }
124
125 void mdd_buf_put(struct lu_buf *buf)
126 {
127         if (buf == NULL || buf->lb_buf == NULL)
128                 return;
129         if (buf->lb_vmalloc)
130                 OBD_VFREE(buf->lb_buf, buf->lb_len);
131         else
132                 OBD_FREE(buf->lb_buf, buf->lb_len);
133         buf->lb_buf = NULL;
134         buf->lb_len = 0;
135 }
136
137 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
138                                        const void *area, ssize_t len)
139 {
140         struct lu_buf *buf;
141
142         buf = &mdd_env_info(env)->mti_buf;
143         buf->lb_buf = (void *)area;
144         buf->lb_len = len;
145         return buf;
146 }
147
148 #define BUF_VMALLOC_SIZE (CFS_PAGE_SIZE<<2) /* 16k */
149 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
150 {
151         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
152
153         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
154                 if (buf->lb_vmalloc)
155                         OBD_VFREE(buf->lb_buf, buf->lb_len);
156                 else
157                         OBD_FREE(buf->lb_buf, buf->lb_len);
158                 buf->lb_buf = NULL;
159         }
160         if (buf->lb_buf == NULL) {
161                 buf->lb_len = len;
162                 if (buf->lb_len <= BUF_VMALLOC_SIZE) {
163                         OBD_ALLOC(buf->lb_buf, buf->lb_len);
164                         buf->lb_vmalloc = 0;
165                 }
166                 if (buf->lb_buf == NULL) {
167                         OBD_VMALLOC(buf->lb_buf, buf->lb_len);
168                         buf->lb_vmalloc = 1;
169                 }
170                 if (buf->lb_buf == NULL)
171                         buf->lb_len = 0;
172         }
173         return buf;
174 }
175
176 /** Increase the size of the \a mti_big_buf.
177  * preserves old data in buffer
178  * old buffer remains unchanged on error
179  * \retval 0 or -ENOMEM
180  */
181 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
182 {
183         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
184         struct lu_buf buf;
185
186         LASSERT(len >= oldbuf->lb_len);
187         if (len > BUF_VMALLOC_SIZE) {
188                 OBD_VMALLOC(buf.lb_buf, len);
189                 buf.lb_vmalloc = 1;
190         } else {
191                 OBD_ALLOC(buf.lb_buf, len);
192                 buf.lb_vmalloc = 0;
193         }
194         if (buf.lb_buf == NULL)
195                 return -ENOMEM;
196
197         buf.lb_len = len;
198         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
199
200         if (oldbuf->lb_vmalloc)
201                 OBD_VFREE(oldbuf->lb_buf, oldbuf->lb_len);
202         else
203                 OBD_FREE(oldbuf->lb_buf, oldbuf->lb_len);
204
205         memcpy(oldbuf, &buf, sizeof(buf));
206
207         return 0;
208 }
209
210 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
211                                        struct mdd_device *mdd)
212 {
213         struct mdd_thread_info *mti = mdd_env_info(env);
214         int                     max_cookie_size;
215
216         max_cookie_size = mdd_lov_cookiesize(env, mdd);
217         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
218                 if (mti->mti_max_cookie)
219                         OBD_FREE(mti->mti_max_cookie, mti->mti_max_cookie_size);
220                 mti->mti_max_cookie = NULL;
221                 mti->mti_max_cookie_size = 0;
222         }
223         if (unlikely(mti->mti_max_cookie == NULL)) {
224                 OBD_ALLOC(mti->mti_max_cookie, max_cookie_size);
225                 if (likely(mti->mti_max_cookie != NULL))
226                         mti->mti_max_cookie_size = max_cookie_size;
227         }
228         if (likely(mti->mti_max_cookie != NULL))
229                 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
230         return mti->mti_max_cookie;
231 }
232
233 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
234                                    struct mdd_device *mdd)
235 {
236         struct mdd_thread_info *mti = mdd_env_info(env);
237         int                     max_lmm_size;
238
239         max_lmm_size = mdd_lov_mdsize(env, mdd);
240         if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
241                 if (mti->mti_max_lmm)
242                         OBD_FREE(mti->mti_max_lmm, mti->mti_max_lmm_size);
243                 mti->mti_max_lmm = NULL;
244                 mti->mti_max_lmm_size = 0;
245         }
246         if (unlikely(mti->mti_max_lmm == NULL)) {
247                 OBD_ALLOC(mti->mti_max_lmm, max_lmm_size);
248                 if (likely(mti->mti_max_lmm != NULL))
249                         mti->mti_max_lmm_size = max_lmm_size;
250         }
251         return mti->mti_max_lmm;
252 }
253
254 struct lu_object *mdd_object_alloc(const struct lu_env *env,
255                                    const struct lu_object_header *hdr,
256                                    struct lu_device *d)
257 {
258         struct mdd_object *mdd_obj;
259
260         OBD_ALLOC_PTR(mdd_obj);
261         if (mdd_obj != NULL) {
262                 struct lu_object *o;
263
264                 o = mdd2lu_obj(mdd_obj);
265                 lu_object_init(o, NULL, d);
266                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
267                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
268                 mdd_obj->mod_count = 0;
269                 o->lo_ops = &mdd_lu_obj_ops;
270                 return o;
271         } else {
272                 return NULL;
273         }
274 }
275
276 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
277                            const struct lu_object_conf *unused)
278 {
279         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
280         struct mdd_object *mdd_obj = lu2mdd_obj(o);
281         struct lu_object  *below;
282         struct lu_device  *under;
283         ENTRY;
284
285         mdd_obj->mod_cltime = 0;
286         under = &d->mdd_child->dd_lu_dev;
287         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
288         mdd_pdlock_init(mdd_obj);
289         if (below == NULL)
290                 RETURN(-ENOMEM);
291
292         lu_object_add(o, below);
293
294         RETURN(0);
295 }
296
297 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
298 {
299         if (lu_object_exists(o))
300                 return mdd_get_flags(env, lu2mdd_obj(o));
301         else
302                 return 0;
303 }
304
305 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
306 {
307         struct mdd_object *mdd = lu2mdd_obj(o);
308
309         lu_object_fini(o);
310         OBD_FREE_PTR(mdd);
311 }
312
313 static int mdd_object_print(const struct lu_env *env, void *cookie,
314                             lu_printer_t p, const struct lu_object *o)
315 {
316         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
317         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
318                     "valid=%x, cltime="LPU64", flags=%lx)",
319                     mdd, mdd->mod_count, mdd->mod_valid,
320                     mdd->mod_cltime, mdd->mod_flags);
321 }
322
323 static const struct lu_object_operations mdd_lu_obj_ops = {
324         .loo_object_init    = mdd_object_init,
325         .loo_object_start   = mdd_object_start,
326         .loo_object_free    = mdd_object_free,
327         .loo_object_print   = mdd_object_print,
328 };
329
330 struct mdd_object *mdd_object_find(const struct lu_env *env,
331                                    struct mdd_device *d,
332                                    const struct lu_fid *f)
333 {
334         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
335 }
336
337 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
338                         const char *path, struct lu_fid *fid)
339 {
340         struct lu_buf *buf;
341         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
342         struct mdd_object *obj;
343         struct lu_name *lname = &mdd_env_info(env)->mti_name;
344         char *name;
345         int rc = 0;
346         ENTRY;
347
348         /* temp buffer for path element */
349         buf = mdd_buf_alloc(env, PATH_MAX);
350         if (buf->lb_buf == NULL)
351                 RETURN(-ENOMEM);
352
353         lname->ln_name = name = buf->lb_buf;
354         lname->ln_namelen = 0;
355         *f = mdd->mdd_root_fid;
356
357         while(1) {
358                 while (*path == '/')
359                         path++;
360                 if (*path == '\0')
361                         break;
362                 while (*path != '/' && *path != '\0') {
363                         *name = *path;
364                         path++;
365                         name++;
366                         lname->ln_namelen++;
367                 }
368
369                 *name = '\0';
370                 /* find obj corresponding to fid */
371                 obj = mdd_object_find(env, mdd, f);
372                 if (obj == NULL)
373                         GOTO(out, rc = -EREMOTE);
374                 if (IS_ERR(obj))
375                         GOTO(out, rc = -PTR_ERR(obj));
376                 /* get child fid from parent and name */
377                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
378                 mdd_object_put(env, obj);
379                 if (rc)
380                         break;
381
382                 name = buf->lb_buf;
383                 lname->ln_namelen = 0;
384         }
385
386         if (!rc)
387                 *fid = *f;
388 out:
389         RETURN(rc);
390 }
391
392 /** The maximum depth that fid2path() will search.
393  * This is limited only because we want to store the fids for
394  * historical path lookup purposes.
395  */
396 #define MAX_PATH_DEPTH 100
397
398 /** mdd_path() lookup structure. */
399 struct path_lookup_info {
400         __u64                pli_recno;        /**< history point */
401         __u64                pli_currec;       /**< current record */
402         struct lu_fid        pli_fid;
403         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
404         struct mdd_object   *pli_mdd_obj;
405         char                *pli_path;         /**< full path */
406         int                  pli_pathlen;
407         int                  pli_linkno;       /**< which hardlink to follow */
408         int                  pli_fidcount;     /**< number of \a pli_fids */
409 };
410
411 static int mdd_path_current(const struct lu_env *env,
412                             struct path_lookup_info *pli)
413 {
414         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
415         struct mdd_object *mdd_obj;
416         struct lu_buf     *buf = NULL;
417         struct link_ea_header *leh;
418         struct link_ea_entry  *lee;
419         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
420         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
421         char *ptr;
422         int reclen;
423         int rc;
424         ENTRY;
425
426         ptr = pli->pli_path + pli->pli_pathlen - 1;
427         *ptr = 0;
428         --ptr;
429         pli->pli_fidcount = 0;
430         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
431
432         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
433                 mdd_obj = mdd_object_find(env, mdd,
434                                           &pli->pli_fids[pli->pli_fidcount]);
435                 if (mdd_obj == NULL)
436                         GOTO(out, rc = -EREMOTE);
437                 if (IS_ERR(mdd_obj))
438                         GOTO(out, rc = -PTR_ERR(mdd_obj));
439                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
440                 if (rc <= 0) {
441                         mdd_object_put(env, mdd_obj);
442                         if (rc == -1)
443                                 rc = -EREMOTE;
444                         else if (rc == 0)
445                                 /* Do I need to error out here? */
446                                 rc = -ENOENT;
447                         GOTO(out, rc);
448                 }
449
450                 /* Get parent fid and object name */
451                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
452                 buf = mdd_links_get(env, mdd_obj);
453                 mdd_read_unlock(env, mdd_obj);
454                 mdd_object_put(env, mdd_obj);
455                 if (IS_ERR(buf))
456                         GOTO(out, rc = PTR_ERR(buf));
457
458                 leh = buf->lb_buf;
459                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
460                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
461
462                 /* If set, use link #linkno for path lookup, otherwise use
463                    link #0.  Only do this for the final path element. */
464                 if ((pli->pli_fidcount == 0) &&
465                     (pli->pli_linkno < leh->leh_reccount)) {
466                         int count;
467                         for (count = 0; count < pli->pli_linkno; count++) {
468                                 lee = (struct link_ea_entry *)
469                                      ((char *)lee + reclen);
470                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
471                         }
472                         if (pli->pli_linkno < leh->leh_reccount - 1)
473                                 /* indicate to user there are more links */
474                                 pli->pli_linkno++;
475                 }
476
477                 /* Pack the name in the end of the buffer */
478                 ptr -= tmpname->ln_namelen;
479                 if (ptr - 1 <= pli->pli_path)
480                         GOTO(out, rc = -EOVERFLOW);
481                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
482                 *(--ptr) = '/';
483
484                 /* Store the parent fid for historic lookup */
485                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
486                         GOTO(out, rc = -EOVERFLOW);
487                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
488         }
489
490         /* Verify that our path hasn't changed since we started the lookup.
491            Record the current index, and verify the path resolves to the
492            same fid. If it does, then the path is correct as of this index. */
493         cfs_spin_lock(&mdd->mdd_cl.mc_lock);
494         pli->pli_currec = mdd->mdd_cl.mc_index;
495         cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
496         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
497         if (rc) {
498                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
499                 GOTO (out, rc = -EAGAIN);
500         }
501         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
502                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
503                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
504                        PFID(&pli->pli_fid));
505                 GOTO(out, rc = -EAGAIN);
506         }
507         ptr++; /* skip leading / */
508         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
509
510         EXIT;
511 out:
512         if (buf && !IS_ERR(buf) && buf->lb_vmalloc)
513                 /* if we vmalloced a large buffer drop it */
514                 mdd_buf_put(buf);
515
516         return rc;
517 }
518
519 static int mdd_path_historic(const struct lu_env *env,
520                              struct path_lookup_info *pli)
521 {
522         return 0;
523 }
524
525 /* Returns the full path to this fid, as of changelog record recno. */
526 static int mdd_path(const struct lu_env *env, struct md_object *obj,
527                     char *path, int pathlen, __u64 *recno, int *linkno)
528 {
529         struct path_lookup_info *pli;
530         int tries = 3;
531         int rc = -EAGAIN;
532         ENTRY;
533
534         if (pathlen < 3)
535                 RETURN(-EOVERFLOW);
536
537         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
538                 path[0] = '\0';
539                 RETURN(0);
540         }
541
542         OBD_ALLOC_PTR(pli);
543         if (pli == NULL)
544                 RETURN(-ENOMEM);
545
546         pli->pli_mdd_obj = md2mdd_obj(obj);
547         pli->pli_recno = *recno;
548         pli->pli_path = path;
549         pli->pli_pathlen = pathlen;
550         pli->pli_linkno = *linkno;
551
552         /* Retry multiple times in case file is being moved */
553         while (tries-- && rc == -EAGAIN)
554                 rc = mdd_path_current(env, pli);
555
556         /* For historical path lookup, the current links may not have existed
557          * at "recno" time.  We must switch over to earlier links/parents
558          * by using the changelog records.  If the earlier parent doesn't
559          * exist, we must search back through the changelog to reconstruct
560          * its parents, then check if it exists, etc.
561          * We may ignore this problem for the initial implementation and
562          * state that an "original" hardlink must still exist for us to find
563          * historic path name. */
564         if (pli->pli_recno != -1) {
565                 rc = mdd_path_historic(env, pli);
566         } else {
567                 *recno = pli->pli_currec;
568                 /* Return next link index to caller */
569                 *linkno = pli->pli_linkno;
570         }
571
572         OBD_FREE_PTR(pli);
573
574         RETURN (rc);
575 }
576
577 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
578 {
579         struct lu_attr *la = &mdd_env_info(env)->mti_la;
580         int rc;
581
582         ENTRY;
583         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
584         if (rc == 0) {
585                 mdd_flags_xlate(obj, la->la_flags);
586                 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
587                         obj->mod_flags |= MNLINK_OBJ;
588         }
589         RETURN(rc);
590 }
591
592 /* get only inode attributes */
593 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
594                   struct md_attr *ma)
595 {
596         int rc = 0;
597         ENTRY;
598
599         if (ma->ma_valid & MA_INODE)
600                 RETURN(0);
601
602         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
603                           mdd_object_capa(env, mdd_obj));
604         if (rc == 0)
605                 ma->ma_valid |= MA_INODE;
606         RETURN(rc);
607 }
608
609 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
610 {
611         struct lov_desc *ldesc;
612         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
613         struct lov_user_md *lum = (struct lov_user_md*)lmm;
614         ENTRY;
615
616         if (!lum)
617                 RETURN(0);
618
619         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
620         LASSERT(ldesc != NULL);
621
622         lum->lmm_magic = LOV_MAGIC_V1;
623         lum->lmm_object_seq = LOV_OBJECT_GROUP_DEFAULT;
624         lum->lmm_pattern = ldesc->ld_pattern;
625         lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
626         lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
627         lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
628
629         RETURN(sizeof(*lum));
630 }
631
632 /* get lov EA only */
633 static int __mdd_lmm_get(const struct lu_env *env,
634                          struct mdd_object *mdd_obj, struct md_attr *ma)
635 {
636         int rc;
637         ENTRY;
638
639         if (ma->ma_valid & MA_LOV)
640                 RETURN(0);
641
642         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
643                         XATTR_NAME_LOV);
644         if (rc == 0 && (ma->ma_need & MA_LOV_DEF))
645                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
646         if (rc > 0) {
647                 ma->ma_lmm_size = rc;
648                 ma->ma_valid |= MA_LOV;
649                 rc = 0;
650         }
651         RETURN(rc);
652 }
653
654 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
655                        struct md_attr *ma)
656 {
657         int rc;
658         ENTRY;
659
660         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
661         rc = __mdd_lmm_get(env, mdd_obj, ma);
662         mdd_read_unlock(env, mdd_obj);
663         RETURN(rc);
664 }
665
666 /* get lmv EA only*/
667 static int __mdd_lmv_get(const struct lu_env *env,
668                          struct mdd_object *mdd_obj, struct md_attr *ma)
669 {
670         int rc;
671         ENTRY;
672
673         if (ma->ma_valid & MA_LMV)
674                 RETURN(0);
675
676         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
677                         XATTR_NAME_LMV);
678         if (rc > 0) {
679                 ma->ma_valid |= MA_LMV;
680                 rc = 0;
681         }
682         RETURN(rc);
683 }
684
685 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
686                          struct md_attr *ma)
687 {
688         struct mdd_thread_info *info = mdd_env_info(env);
689         struct lustre_mdt_attrs *lma =
690                                  (struct lustre_mdt_attrs *)info->mti_xattr_buf;
691         int lma_size;
692         int rc;
693         ENTRY;
694
695         /* If all needed data are already valid, nothing to do */
696         if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
697             (ma->ma_need & (MA_HSM | MA_SOM)))
698                 RETURN(0);
699
700         /* Read LMA from disk EA */
701         lma_size = sizeof(info->mti_xattr_buf);
702         rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
703         if (rc <= 0)
704                 RETURN(rc);
705
706         /* Useless to check LMA incompatibility because this is already done in
707          * osd_ea_fid_get(), and this will fail long before this code is
708          * called.
709          * So, if we are here, LMA is compatible.
710          */
711
712         lustre_lma_swab(lma);
713
714         /* Swab and copy LMA */
715         if (ma->ma_need & MA_HSM) {
716                 if (lma->lma_compat & LMAC_HSM)
717                         ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
718                 else
719                         ma->ma_hsm.mh_flags = 0;
720                 ma->ma_valid |= MA_HSM;
721         }
722
723         /* Copy SOM */
724         if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
725                 LASSERT(ma->ma_som != NULL);
726                 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
727                 ma->ma_som->msd_size    = lma->lma_som_size;
728                 ma->ma_som->msd_blocks  = lma->lma_som_blocks;
729                 ma->ma_som->msd_mountid = lma->lma_som_mountid;
730                 ma->ma_valid |= MA_SOM;
731         }
732
733         RETURN(0);
734 }
735
736 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
737                                  struct md_attr *ma)
738 {
739         int rc = 0;
740         ENTRY;
741
742         if (ma->ma_need & MA_INODE)
743                 rc = mdd_iattr_get(env, mdd_obj, ma);
744
745         if (rc == 0 && ma->ma_need & MA_LOV) {
746                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
747                     S_ISDIR(mdd_object_type(mdd_obj)))
748                         rc = __mdd_lmm_get(env, mdd_obj, ma);
749         }
750         if (rc == 0 && ma->ma_need & MA_LMV) {
751                 if (S_ISDIR(mdd_object_type(mdd_obj)))
752                         rc = __mdd_lmv_get(env, mdd_obj, ma);
753         }
754         if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
755                 if (S_ISREG(mdd_object_type(mdd_obj)))
756                         rc = __mdd_lma_get(env, mdd_obj, ma);
757         }
758 #ifdef CONFIG_FS_POSIX_ACL
759         if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
760                 if (S_ISDIR(mdd_object_type(mdd_obj)))
761                         rc = mdd_def_acl_get(env, mdd_obj, ma);
762         }
763 #endif
764         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
765                rc, ma->ma_valid, ma->ma_lmm);
766         RETURN(rc);
767 }
768
769 int mdd_attr_get_internal_locked(const struct lu_env *env,
770                                  struct mdd_object *mdd_obj, struct md_attr *ma)
771 {
772         int rc;
773         int needlock = ma->ma_need &
774                        (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM);
775
776         if (needlock)
777                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
778         rc = mdd_attr_get_internal(env, mdd_obj, ma);
779         if (needlock)
780                 mdd_read_unlock(env, mdd_obj);
781         return rc;
782 }
783
784 /*
785  * No permission check is needed.
786  */
787 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
788                         struct md_attr *ma)
789 {
790         struct mdd_object *mdd_obj = md2mdd_obj(obj);
791         int                rc;
792
793         ENTRY;
794         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
795         RETURN(rc);
796 }
797
798 /*
799  * No permission check is needed.
800  */
801 static int mdd_xattr_get(const struct lu_env *env,
802                          struct md_object *obj, struct lu_buf *buf,
803                          const char *name)
804 {
805         struct mdd_object *mdd_obj = md2mdd_obj(obj);
806         int rc;
807
808         ENTRY;
809
810         LASSERT(mdd_object_exists(mdd_obj));
811
812         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
813         rc = mdo_xattr_get(env, mdd_obj, buf, name,
814                            mdd_object_capa(env, mdd_obj));
815         mdd_read_unlock(env, mdd_obj);
816
817         RETURN(rc);
818 }
819
820 /*
821  * Permission check is done when open,
822  * no need check again.
823  */
824 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
825                         struct lu_buf *buf)
826 {
827         struct mdd_object *mdd_obj = md2mdd_obj(obj);
828         struct dt_object  *next;
829         loff_t             pos = 0;
830         int                rc;
831         ENTRY;
832
833         LASSERT(mdd_object_exists(mdd_obj));
834
835         next = mdd_object_child(mdd_obj);
836         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
837         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
838                                          mdd_object_capa(env, mdd_obj));
839         mdd_read_unlock(env, mdd_obj);
840         RETURN(rc);
841 }
842
843 /*
844  * No permission check is needed.
845  */
846 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
847                           struct lu_buf *buf)
848 {
849         struct mdd_object *mdd_obj = md2mdd_obj(obj);
850         int rc;
851
852         ENTRY;
853
854         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
855         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
856         mdd_read_unlock(env, mdd_obj);
857
858         RETURN(rc);
859 }
860
861 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
862                                struct mdd_object *c, struct md_attr *ma,
863                                struct thandle *handle,
864                                const struct md_op_spec *spec)
865 {
866         struct lu_attr *attr = &ma->ma_attr;
867         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
868         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
869         const struct dt_index_features *feat = spec->sp_feat;
870         int rc;
871         ENTRY;
872
873         if (!mdd_object_exists(c)) {
874                 struct dt_object *next = mdd_object_child(c);
875                 LASSERT(next);
876
877                 if (feat != &dt_directory_features && feat != NULL)
878                         dof->dof_type = DFT_INDEX;
879                 else
880                         dof->dof_type = dt_mode_to_dft(attr->la_mode);
881
882                 dof->u.dof_idx.di_feat = feat;
883
884                 /* @hint will be initialized by underlying device. */
885                 next->do_ops->do_ah_init(env, hint,
886                                          p ? mdd_object_child(p) : NULL,
887                                          attr->la_mode & S_IFMT);
888
889                 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
890                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
891         } else
892                 rc = -EEXIST;
893
894         RETURN(rc);
895 }
896
897 /**
898  * Make sure the ctime is increased only.
899  */
900 static inline int mdd_attr_check(const struct lu_env *env,
901                                  struct mdd_object *obj,
902                                  struct lu_attr *attr)
903 {
904         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
905         int rc;
906         ENTRY;
907
908         if (attr->la_valid & LA_CTIME) {
909                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
910                 if (rc)
911                         RETURN(rc);
912
913                 if (attr->la_ctime < tmp_la->la_ctime)
914                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
915                 else if (attr->la_valid == LA_CTIME &&
916                          attr->la_ctime == tmp_la->la_ctime)
917                         attr->la_valid &= ~LA_CTIME;
918         }
919         RETURN(0);
920 }
921
922 int mdd_attr_set_internal(const struct lu_env *env,
923                           struct mdd_object *obj,
924                           struct lu_attr *attr,
925                           struct thandle *handle,
926                           int needacl)
927 {
928         int rc;
929         ENTRY;
930
931         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
932 #ifdef CONFIG_FS_POSIX_ACL
933         if (!rc && (attr->la_valid & LA_MODE) && needacl)
934                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
935 #endif
936         RETURN(rc);
937 }
938
939 int mdd_attr_check_set_internal(const struct lu_env *env,
940                                 struct mdd_object *obj,
941                                 struct lu_attr *attr,
942                                 struct thandle *handle,
943                                 int needacl)
944 {
945         int rc;
946         ENTRY;
947
948         rc = mdd_attr_check(env, obj, attr);
949         if (rc)
950                 RETURN(rc);
951
952         if (attr->la_valid)
953                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
954         RETURN(rc);
955 }
956
957 static int mdd_attr_set_internal_locked(const struct lu_env *env,
958                                         struct mdd_object *obj,
959                                         struct lu_attr *attr,
960                                         struct thandle *handle,
961                                         int needacl)
962 {
963         int rc;
964         ENTRY;
965
966         needacl = needacl && (attr->la_valid & LA_MODE);
967         if (needacl)
968                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
969         rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
970         if (needacl)
971                 mdd_write_unlock(env, obj);
972         RETURN(rc);
973 }
974
975 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
976                                        struct mdd_object *obj,
977                                        struct lu_attr *attr,
978                                        struct thandle *handle,
979                                        int needacl)
980 {
981         int rc;
982         ENTRY;
983
984         needacl = needacl && (attr->la_valid & LA_MODE);
985         if (needacl)
986                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
987         rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
988         if (needacl)
989                 mdd_write_unlock(env, obj);
990         RETURN(rc);
991 }
992
993 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
994                     const struct lu_buf *buf, const char *name,
995                     int fl, struct thandle *handle)
996 {
997         struct lustre_capa *capa = mdd_object_capa(env, obj);
998         int rc = -EINVAL;
999         ENTRY;
1000
1001         if (buf->lb_buf && buf->lb_len > 0)
1002                 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1003         else if (buf->lb_buf == NULL && buf->lb_len == 0)
1004                 rc = mdo_xattr_del(env, obj, name, handle, capa);
1005
1006         RETURN(rc);
1007 }
1008
1009 /*
1010  * This gives the same functionality as the code between
1011  * sys_chmod and inode_setattr
1012  * chown_common and inode_setattr
1013  * utimes and inode_setattr
1014  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1015  */
1016 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1017                         struct lu_attr *la, const struct md_attr *ma)
1018 {
1019         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
1020         struct md_ucred  *uc;
1021         int               rc;
1022         ENTRY;
1023
1024         if (!la->la_valid)
1025                 RETURN(0);
1026
1027         /* Do not permit change file type */
1028         if (la->la_valid & LA_TYPE)
1029                 RETURN(-EPERM);
1030
1031         /* They should not be processed by setattr */
1032         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1033                 RETURN(-EPERM);
1034
1035         /* export destroy does not have ->le_ses, but we may want
1036          * to drop LUSTRE_SOM_FL. */
1037         if (!env->le_ses)
1038                 RETURN(0);
1039
1040         uc = md_ucred(env);
1041
1042         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1043         if (rc)
1044                 RETURN(rc);
1045
1046         if (la->la_valid == LA_CTIME) {
1047                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1048                         /* This is only for set ctime when rename's source is
1049                          * on remote MDS. */
1050                         rc = mdd_may_delete(env, NULL, obj,
1051                                             (struct md_attr *)ma, 1, 0);
1052                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1053                         la->la_valid &= ~LA_CTIME;
1054                 RETURN(rc);
1055         }
1056
1057         if (la->la_valid == LA_ATIME) {
1058                 /* This is atime only set for read atime update on close. */
1059                 if (la->la_atime > tmp_la->la_atime &&
1060                     la->la_atime <= (tmp_la->la_atime +
1061                                      mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1062                         la->la_valid &= ~LA_ATIME;
1063                 RETURN(0);
1064         }
1065
1066         /* Check if flags change. */
1067         if (la->la_valid & LA_FLAGS) {
1068                 unsigned int oldflags = 0;
1069                 unsigned int newflags = la->la_flags &
1070                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1071
1072                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1073                     !mdd_capable(uc, CFS_CAP_FOWNER))
1074                         RETURN(-EPERM);
1075
1076                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1077                  * only be changed by the relevant capability. */
1078                 if (mdd_is_immutable(obj))
1079                         oldflags |= LUSTRE_IMMUTABLE_FL;
1080                 if (mdd_is_append(obj))
1081                         oldflags |= LUSTRE_APPEND_FL;
1082                 if ((oldflags ^ newflags) &&
1083                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1084                         RETURN(-EPERM);
1085
1086                 if (!S_ISDIR(tmp_la->la_mode))
1087                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1088         }
1089
1090         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1091             (la->la_valid & ~LA_FLAGS) &&
1092             !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1093                 RETURN(-EPERM);
1094
1095         /* Check for setting the obj time. */
1096         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1097             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1098                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1099                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
1100                         rc = mdd_permission_internal_locked(env, obj, tmp_la,
1101                                                             MAY_WRITE,
1102                                                             MOR_TGT_CHILD);
1103                         if (rc)
1104                                 RETURN(rc);
1105                 }
1106         }
1107
1108         /* Make sure a caller can chmod. */
1109         if (la->la_valid & LA_MODE) {
1110                 /* Bypass la_vaild == LA_MODE,
1111                  * this is for changing file with SUID or SGID. */
1112                 if ((la->la_valid & ~LA_MODE) &&
1113                     !(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1114                     (uc->mu_fsuid != tmp_la->la_uid) &&
1115                     !mdd_capable(uc, CFS_CAP_FOWNER))
1116                         RETURN(-EPERM);
1117
1118                 if (la->la_mode == (cfs_umode_t) -1)
1119                         la->la_mode = tmp_la->la_mode;
1120                 else
1121                         la->la_mode = (la->la_mode & S_IALLUGO) |
1122                                       (tmp_la->la_mode & ~S_IALLUGO);
1123
1124                 /* Also check the setgid bit! */
1125                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1126                                        la->la_gid : tmp_la->la_gid) &&
1127                     !mdd_capable(uc, CFS_CAP_FSETID))
1128                         la->la_mode &= ~S_ISGID;
1129         } else {
1130                la->la_mode = tmp_la->la_mode;
1131         }
1132
1133         /* Make sure a caller can chown. */
1134         if (la->la_valid & LA_UID) {
1135                 if (la->la_uid == (uid_t) -1)
1136                         la->la_uid = tmp_la->la_uid;
1137                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1138                     (la->la_uid != tmp_la->la_uid)) &&
1139                     !mdd_capable(uc, CFS_CAP_CHOWN))
1140                         RETURN(-EPERM);
1141
1142                 /* If the user or group of a non-directory has been
1143                  * changed by a non-root user, remove the setuid bit.
1144                  * 19981026 David C Niemi <niemi@tux.org>
1145                  *
1146                  * Changed this to apply to all users, including root,
1147                  * to avoid some races. This is the behavior we had in
1148                  * 2.0. The check for non-root was definitely wrong
1149                  * for 2.2 anyway, as it should have been using
1150                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
1151                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1152                     !S_ISDIR(tmp_la->la_mode)) {
1153                         la->la_mode &= ~S_ISUID;
1154                         la->la_valid |= LA_MODE;
1155                 }
1156         }
1157
1158         /* Make sure caller can chgrp. */
1159         if (la->la_valid & LA_GID) {
1160                 if (la->la_gid == (gid_t) -1)
1161                         la->la_gid = tmp_la->la_gid;
1162                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1163                     ((la->la_gid != tmp_la->la_gid) &&
1164                     !lustre_in_group_p(uc, la->la_gid))) &&
1165                     !mdd_capable(uc, CFS_CAP_CHOWN))
1166                         RETURN(-EPERM);
1167
1168                 /* Likewise, if the user or group of a non-directory
1169                  * has been changed by a non-root user, remove the
1170                  * setgid bit UNLESS there is no group execute bit
1171                  * (this would be a file marked for mandatory
1172                  * locking).  19981026 David C Niemi <niemi@tux.org>
1173                  *
1174                  * Removed the fsuid check (see the comment above) --
1175                  * 19990830 SD. */
1176                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1177                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1178                         la->la_mode &= ~S_ISGID;
1179                         la->la_valid |= LA_MODE;
1180                 }
1181         }
1182
1183         /* For both Size-on-MDS case and truncate case,
1184          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1185          * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1186          * For SOM case, it is true, the MAY_WRITE perm has been checked
1187          * when open, no need check again. For truncate case, it is false,
1188          * the MAY_WRITE perm should be checked here. */
1189         if (ma->ma_attr_flags & MDS_SOM) {
1190                 /* For the "Size-on-MDS" setattr update, merge coming
1191                  * attributes with the set in the inode. BUG 10641 */
1192                 if ((la->la_valid & LA_ATIME) &&
1193                     (la->la_atime <= tmp_la->la_atime))
1194                         la->la_valid &= ~LA_ATIME;
1195
1196                 /* OST attributes do not have a priority over MDS attributes,
1197                  * so drop times if ctime is equal. */
1198                 if ((la->la_valid & LA_CTIME) &&
1199                     (la->la_ctime <= tmp_la->la_ctime))
1200                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1201         } else {
1202                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1203                         if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1204                               (uc->mu_fsuid == tmp_la->la_uid)) &&
1205                             !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1206                                 rc = mdd_permission_internal_locked(env, obj,
1207                                                             tmp_la, MAY_WRITE,
1208                                                             MOR_TGT_CHILD);
1209                                 if (rc)
1210                                         RETURN(rc);
1211                         }
1212                 }
1213                 if (la->la_valid & LA_CTIME) {
1214                         /* The pure setattr, it has the priority over what is
1215                          * already set, do not drop it if ctime is equal. */
1216                         if (la->la_ctime < tmp_la->la_ctime)
1217                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1218                                                   LA_CTIME);
1219                 }
1220         }
1221
1222         RETURN(0);
1223 }
1224
1225 /** Store a data change changelog record
1226  * If this fails, we must fail the whole transaction; we don't
1227  * want the change to commit without the log entry.
1228  * \param mdd_obj - mdd_object of change
1229  * \param handle - transacion handle
1230  */
1231 static int mdd_changelog_data_store(const struct lu_env     *env,
1232                                     struct mdd_device       *mdd,
1233                                     enum changelog_rec_type type,
1234                                     int                     flags,
1235                                     struct mdd_object       *mdd_obj,
1236                                     struct thandle          *handle)
1237 {
1238         const struct lu_fid *tfid = mdo2fid(mdd_obj);
1239         struct llog_changelog_rec *rec;
1240         struct lu_buf *buf;
1241         int reclen;
1242         int rc;
1243
1244         /* Not recording */
1245         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1246                 RETURN(0);
1247         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1248                 RETURN(0);
1249
1250         LASSERT(handle != NULL);
1251         LASSERT(mdd_obj != NULL);
1252
1253         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1254             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1255                 /* Don't need multiple updates in this log */
1256                 /* Don't check under lock - no big deal if we get an extra
1257                    entry */
1258                 RETURN(0);
1259         }
1260
1261         reclen = llog_data_len(sizeof(*rec));
1262         buf = mdd_buf_alloc(env, reclen);
1263         if (buf->lb_buf == NULL)
1264                 RETURN(-ENOMEM);
1265         rec = (struct llog_changelog_rec *)buf->lb_buf;
1266
1267         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1268         rec->cr.cr_type = (__u32)type;
1269         rec->cr.cr_tfid = *tfid;
1270         rec->cr.cr_namelen = 0;
1271         mdd_obj->mod_cltime = cfs_time_current_64();
1272
1273         rc = mdd_changelog_llog_write(mdd, rec, handle);
1274         if (rc < 0) {
1275                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1276                        rc, type, PFID(tfid));
1277                 return -EFAULT;
1278         }
1279
1280         return 0;
1281 }
1282
1283 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1284                   int flags, struct md_object *obj)
1285 {
1286         struct thandle *handle;
1287         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1288         struct mdd_device *mdd = mdo2mdd(obj);
1289         int rc;
1290         ENTRY;
1291
1292         handle = mdd_trans_start(env, mdd);
1293
1294         if (IS_ERR(handle))
1295                 return(PTR_ERR(handle));
1296
1297         rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1298                                       handle);
1299
1300         mdd_trans_stop(env, mdd, rc, handle);
1301
1302         RETURN(rc);
1303 }
1304
1305 /**
1306  * Should be called with write lock held.
1307  *
1308  * \see mdd_lma_set_locked().
1309  */
1310 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1311                        const struct md_attr *ma, struct thandle *handle)
1312 {
1313         struct mdd_thread_info *info = mdd_env_info(env);
1314         struct lu_buf *buf;
1315         struct lustre_mdt_attrs *lma =
1316                                 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1317         int lmasize = sizeof(struct lustre_mdt_attrs);
1318         int rc = 0;
1319
1320         ENTRY;
1321
1322         /* Either HSM or SOM part is not valid, we need to read it before */
1323         if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1324                 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1325                 if (rc <= 0)
1326                         RETURN(rc);
1327
1328                 lustre_lma_swab(lma);
1329         } else {
1330                 memset(lma, 0, lmasize);
1331         }
1332
1333         /* Copy HSM data */
1334         if (ma->ma_valid & MA_HSM) {
1335                 lma->lma_flags  |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1336                 lma->lma_compat |= LMAC_HSM;
1337         }
1338
1339         /* Copy SOM data */
1340         if (ma->ma_valid & MA_SOM) {
1341                 LASSERT(ma->ma_som != NULL);
1342                 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1343                         lma->lma_compat     &= ~LMAC_SOM;
1344                 } else {
1345                         lma->lma_compat     |= LMAC_SOM;
1346                         lma->lma_ioepoch     = ma->ma_som->msd_ioepoch;
1347                         lma->lma_som_size    = ma->ma_som->msd_size;
1348                         lma->lma_som_blocks  = ma->ma_som->msd_blocks;
1349                         lma->lma_som_mountid = ma->ma_som->msd_mountid;
1350                 }
1351         }
1352
1353         /* Copy FID */
1354         memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1355
1356         lustre_lma_swab(lma);
1357         buf = mdd_buf_get(env, lma, lmasize);
1358         rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1359
1360         RETURN(rc);
1361 }
1362
1363 /**
1364  * Save LMA extended attributes with data from \a ma.
1365  *
1366  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1367  * not, LMA EA will be first read from disk, modified and write back.
1368  *
1369  */
1370 static int mdd_lma_set_locked(const struct lu_env *env,
1371                               struct mdd_object *mdd_obj,
1372                               const struct md_attr *ma, struct thandle *handle)
1373 {
1374         int rc;
1375
1376         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1377         rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1378         mdd_write_unlock(env, mdd_obj);
1379         return rc;
1380 }
1381
1382 /* Precedence for choosing record type when multiple
1383  * attributes change: setattr > mtime > ctime > atime
1384  * (ctime changes when mtime does, plus chmod/chown.
1385  * atime and ctime are independent.) */
1386 static int mdd_attr_set_changelog(const struct lu_env *env,
1387                                   struct md_object *obj, struct thandle *handle,
1388                                   __u64 valid)
1389 {
1390         struct mdd_device *mdd = mdo2mdd(obj);
1391         int bits, type = 0;
1392
1393         bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1394         bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1395         bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1396         bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1397         bits = bits & mdd->mdd_cl.mc_mask;
1398         if (bits == 0)
1399                 return 0;
1400
1401         /* The record type is the lowest non-masked set bit */
1402         while (bits && ((bits & 1) == 0)) {
1403                 bits = bits >> 1;
1404                 type++;
1405         }
1406
1407         /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1408         return mdd_changelog_data_store(env, mdd, type, (int)valid,
1409                                         md2mdd_obj(obj), handle);
1410 }
1411
1412 /* set attr and LOV EA at once, return updated attr */
1413 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1414                         const struct md_attr *ma)
1415 {
1416         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1417         struct mdd_device *mdd = mdo2mdd(obj);
1418         struct thandle *handle;
1419         struct lov_mds_md *lmm = NULL;
1420         struct llog_cookie *logcookies = NULL;
1421         int  rc, lmm_size = 0, cookie_size = 0;
1422         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1423 #ifdef HAVE_QUOTA_SUPPORT
1424         struct obd_device *obd = mdd->mdd_obd_dev;
1425         struct mds_obd *mds = &obd->u.mds;
1426         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1427         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1428         int quota_opc = 0, block_count = 0;
1429         int inode_pending[MAXQUOTAS] = { 0, 0 };
1430         int block_pending[MAXQUOTAS] = { 0, 0 };
1431 #endif
1432         ENTRY;
1433
1434         *la_copy = ma->ma_attr;
1435         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1436         if (rc != 0)
1437                 RETURN(rc);
1438
1439         /* setattr on "close" only change atime, or do nothing */
1440         if (ma->ma_valid == MA_INODE &&
1441             ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1442                 RETURN(0);
1443
1444         mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1445                                     MDD_TXN_ATTR_SET_OP);
1446         handle = mdd_trans_start(env, mdd);
1447         if (IS_ERR(handle))
1448                 RETURN(PTR_ERR(handle));
1449         /*TODO: add lock here*/
1450         /* start a log jounal handle if needed */
1451         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1452             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1453                 lmm_size = mdd_lov_mdsize(env, mdd);
1454                 lmm = mdd_max_lmm_get(env, mdd);
1455                 if (lmm == NULL)
1456                         GOTO(cleanup, rc = -ENOMEM);
1457
1458                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1459                                 XATTR_NAME_LOV);
1460
1461                 if (rc < 0)
1462                         GOTO(cleanup, rc);
1463         }
1464
1465         if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1466                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1467                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1468
1469 #ifdef HAVE_QUOTA_SUPPORT
1470         if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1471                 struct obd_export *exp = md_quota(env)->mq_exp;
1472                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1473
1474                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1475                 if (!rc) {
1476                         quota_opc = FSFILT_OP_SETATTR;
1477                         mdd_quota_wrapper(la_copy, qnids);
1478                         mdd_quota_wrapper(la_tmp, qoids);
1479                         /* get file quota for new owner */
1480                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1481                                         qnids, inode_pending, 1, NULL, 0,
1482                                         NULL, 0);
1483                         block_count = (la_tmp->la_blocks + 7) >> 3;
1484                         if (block_count) {
1485                                 void *data = NULL;
1486                                 mdd_data_get(env, mdd_obj, &data);
1487                                 /* get block quota for new owner */
1488                                 lquota_chkquota(mds_quota_interface_ref, obd,
1489                                                 exp, qnids, block_pending,
1490                                                 block_count, NULL,
1491                                                 LQUOTA_FLAGS_BLK, data, 1);
1492                         }
1493                 }
1494         }
1495 #endif
1496
1497         if (la_copy->la_valid & LA_FLAGS) {
1498                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1499                                                   handle, 1);
1500                 if (rc == 0)
1501                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1502         } else if (la_copy->la_valid) {            /* setattr */
1503                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1504                                                   handle, 1);
1505                 /* journal chown/chgrp in llog, just like unlink */
1506                 if (rc == 0 && lmm_size){
1507                         cookie_size = mdd_lov_cookiesize(env, mdd);
1508                         logcookies = mdd_max_cookie_get(env, mdd);
1509                         if (logcookies == NULL)
1510                                 GOTO(cleanup, rc = -ENOMEM);
1511
1512                         if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1513                                             logcookies, cookie_size) <= 0)
1514                                 logcookies = NULL;
1515                 }
1516         }
1517
1518         if (rc == 0 && ma->ma_valid & MA_LOV) {
1519                 cfs_umode_t mode;
1520
1521                 mode = mdd_object_type(mdd_obj);
1522                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1523                         rc = mdd_lsm_sanity_check(env, mdd_obj);
1524                         if (rc)
1525                                 GOTO(cleanup, rc);
1526
1527                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1528                                             ma->ma_lmm_size, handle, 1);
1529                 }
1530
1531         }
1532         if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1533                 cfs_umode_t mode;
1534
1535                 mode = mdd_object_type(mdd_obj);
1536                 if (S_ISREG(mode))
1537                         rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1538
1539         }
1540 cleanup:
1541         if (rc == 0)
1542                 rc = mdd_attr_set_changelog(env, obj, handle,
1543                                             ma->ma_attr.la_valid);
1544         mdd_trans_stop(env, mdd, rc, handle);
1545         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1546                 /*set obd attr, if needed*/
1547                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1548                                            logcookies);
1549         }
1550 #ifdef HAVE_QUOTA_SUPPORT
1551         if (quota_opc) {
1552                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1553                                       inode_pending, 0);
1554                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1555                                       block_pending, 1);
1556                 /* Trigger dqrel/dqacq for original owner and new owner.
1557                  * If failed, the next call for lquota_chkquota will
1558                  * process it. */
1559                 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1560                               quota_opc);
1561         }
1562 #endif
1563         RETURN(rc);
1564 }
1565
1566 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1567                       const struct lu_buf *buf, const char *name, int fl,
1568                       struct thandle *handle)
1569 {
1570         int  rc;
1571         ENTRY;
1572
1573         mdd_write_lock(env, obj, MOR_TGT_CHILD);
1574         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1575         mdd_write_unlock(env, obj);
1576
1577         RETURN(rc);
1578 }
1579
1580 static int mdd_xattr_sanity_check(const struct lu_env *env,
1581                                   struct mdd_object *obj)
1582 {
1583         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1584         struct md_ucred *uc     = md_ucred(env);
1585         int rc;
1586         ENTRY;
1587
1588         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1589                 RETURN(-EPERM);
1590
1591         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1592         if (rc)
1593                 RETURN(rc);
1594
1595         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1596             !mdd_capable(uc, CFS_CAP_FOWNER))
1597                 RETURN(-EPERM);
1598
1599         RETURN(rc);
1600 }
1601
1602 /**
1603  * The caller should guarantee to update the object ctime
1604  * after xattr_set if needed.
1605  */
1606 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1607                          const struct lu_buf *buf, const char *name,
1608                          int fl)
1609 {
1610         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1611         struct mdd_device *mdd = mdo2mdd(obj);
1612         struct thandle *handle;
1613         int  rc;
1614         ENTRY;
1615
1616         rc = mdd_xattr_sanity_check(env, mdd_obj);
1617         if (rc)
1618                 RETURN(rc);
1619
1620         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1621         /* security-replated changes may require sync */
1622         if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1623             mdd->mdd_sync_permission == 1)
1624                 txn_param_sync(&mdd_env_info(env)->mti_param);
1625
1626         handle = mdd_trans_start(env, mdd);
1627         if (IS_ERR(handle))
1628                 RETURN(PTR_ERR(handle));
1629
1630         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1631
1632         /* Only record user xattr changes */
1633         if ((rc == 0) && (strncmp("user.", name, 5) == 0))
1634                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1635                                               handle);
1636         mdd_trans_stop(env, mdd, rc, handle);
1637
1638         RETURN(rc);
1639 }
1640
1641 /**
1642  * The caller should guarantee to update the object ctime
1643  * after xattr_set if needed.
1644  */
1645 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1646                   const char *name)
1647 {
1648         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1649         struct mdd_device *mdd = mdo2mdd(obj);
1650         struct thandle *handle;
1651         int  rc;
1652         ENTRY;
1653
1654         rc = mdd_xattr_sanity_check(env, mdd_obj);
1655         if (rc)
1656                 RETURN(rc);
1657
1658         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1659         handle = mdd_trans_start(env, mdd);
1660         if (IS_ERR(handle))
1661                 RETURN(PTR_ERR(handle));
1662
1663         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1664         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1665                            mdd_object_capa(env, mdd_obj));
1666         mdd_write_unlock(env, mdd_obj);
1667
1668         /* Only record user xattr changes */
1669         if ((rc == 0) && (strncmp("user.", name, 5) != 0))
1670                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1671                                               handle);
1672
1673         mdd_trans_stop(env, mdd, rc, handle);
1674
1675         RETURN(rc);
1676 }
1677
1678 /* partial unlink */
1679 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1680                        struct md_attr *ma)
1681 {
1682         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1683         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1684         struct mdd_device *mdd = mdo2mdd(obj);
1685         struct thandle *handle;
1686 #ifdef HAVE_QUOTA_SUPPORT
1687         struct obd_device *obd = mdd->mdd_obd_dev;
1688         struct mds_obd *mds = &obd->u.mds;
1689         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1690         int quota_opc = 0;
1691 #endif
1692         int rc;
1693         ENTRY;
1694
1695         /*
1696          * Check -ENOENT early here because we need to get object type
1697          * to calculate credits before transaction start
1698          */
1699         if (!mdd_object_exists(mdd_obj))
1700                 RETURN(-ENOENT);
1701
1702         LASSERT(mdd_object_exists(mdd_obj) > 0);
1703
1704         rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1705         if (rc)
1706                 RETURN(rc);
1707
1708         handle = mdd_trans_start(env, mdd);
1709         if (IS_ERR(handle))
1710                 RETURN(-ENOMEM);
1711
1712         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1713
1714         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1715         if (rc)
1716                 GOTO(cleanup, rc);
1717
1718         __mdd_ref_del(env, mdd_obj, handle, 0);
1719
1720         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1721                 /* unlink dot */
1722                 __mdd_ref_del(env, mdd_obj, handle, 1);
1723         }
1724
1725         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1726         la_copy->la_ctime = ma->ma_attr.la_ctime;
1727
1728         la_copy->la_valid = LA_CTIME;
1729         rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1730         if (rc)
1731                 GOTO(cleanup, rc);
1732
1733         rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1734 #ifdef HAVE_QUOTA_SUPPORT
1735         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1736             ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1737                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1738                 mdd_quota_wrapper(&ma->ma_attr, qids);
1739         }
1740 #endif
1741
1742
1743         EXIT;
1744 cleanup:
1745         mdd_write_unlock(env, mdd_obj);
1746         mdd_trans_stop(env, mdd, rc, handle);
1747 #ifdef HAVE_QUOTA_SUPPORT
1748         if (quota_opc)
1749                 /* Trigger dqrel on the owner of child. If failed,
1750                  * the next call for lquota_chkquota will process it */
1751                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1752                               quota_opc);
1753 #endif
1754         return rc;
1755 }
1756
1757 /* partial operation */
1758 static int mdd_oc_sanity_check(const struct lu_env *env,
1759                                struct mdd_object *obj,
1760                                struct md_attr *ma)
1761 {
1762         int rc;
1763         ENTRY;
1764
1765         switch (ma->ma_attr.la_mode & S_IFMT) {
1766         case S_IFREG:
1767         case S_IFDIR:
1768         case S_IFLNK:
1769         case S_IFCHR:
1770         case S_IFBLK:
1771         case S_IFIFO:
1772         case S_IFSOCK:
1773                 rc = 0;
1774                 break;
1775         default:
1776                 rc = -EINVAL;
1777                 break;
1778         }
1779         RETURN(rc);
1780 }
1781
1782 static int mdd_object_create(const struct lu_env *env,
1783                              struct md_object *obj,
1784                              const struct md_op_spec *spec,
1785                              struct md_attr *ma)
1786 {
1787
1788         struct mdd_device *mdd = mdo2mdd(obj);
1789         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1790         const struct lu_fid *pfid = spec->u.sp_pfid;
1791         struct thandle *handle;
1792 #ifdef HAVE_QUOTA_SUPPORT
1793         struct obd_device *obd = mdd->mdd_obd_dev;
1794         struct obd_export *exp = md_quota(env)->mq_exp;
1795         struct mds_obd *mds = &obd->u.mds;
1796         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1797         int quota_opc = 0, block_count = 0;
1798         int inode_pending[MAXQUOTAS] = { 0, 0 };
1799         int block_pending[MAXQUOTAS] = { 0, 0 };
1800 #endif
1801         int rc = 0;
1802         ENTRY;
1803
1804 #ifdef HAVE_QUOTA_SUPPORT
1805         if (mds->mds_quota) {
1806                 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1807                 mdd_quota_wrapper(&ma->ma_attr, qids);
1808                 /* get file quota for child */
1809                 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1810                                 qids, inode_pending, 1, NULL, 0,
1811                                 NULL, 0);
1812                 switch (ma->ma_attr.la_mode & S_IFMT) {
1813                 case S_IFLNK:
1814                 case S_IFDIR:
1815                         block_count = 2;
1816                         break;
1817                 case S_IFREG:
1818                         block_count = 1;
1819                         break;
1820                 }
1821                 /* get block quota for child */
1822                 if (block_count)
1823                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1824                                         qids, block_pending, block_count,
1825                                         NULL, LQUOTA_FLAGS_BLK, NULL, 0);
1826         }
1827 #endif
1828
1829         mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
1830         handle = mdd_trans_start(env, mdd);
1831         if (IS_ERR(handle))
1832                 GOTO(out_pending, rc = PTR_ERR(handle));
1833
1834         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1835         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1836         if (rc)
1837                 GOTO(unlock, rc);
1838
1839         rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1840         if (rc)
1841                 GOTO(unlock, rc);
1842
1843         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1844                 /* If creating the slave object, set slave EA here. */
1845                 int lmv_size = spec->u.sp_ea.eadatalen;
1846                 struct lmv_stripe_md *lmv;
1847
1848                 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1849                 LASSERT(lmv != NULL && lmv_size > 0);
1850
1851                 rc = __mdd_xattr_set(env, mdd_obj,
1852                                      mdd_buf_get_const(env, lmv, lmv_size),
1853                                      XATTR_NAME_LMV, 0, handle);
1854                 if (rc)
1855                         GOTO(unlock, rc);
1856
1857                 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1858                                            handle, 0);
1859         } else {
1860 #ifdef CONFIG_FS_POSIX_ACL
1861                 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1862                         struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1863
1864                         buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1865                         buf->lb_len = spec->u.sp_ea.eadatalen;
1866                         if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1867                                 rc = __mdd_acl_init(env, mdd_obj, buf,
1868                                                     &ma->ma_attr.la_mode,
1869                                                     handle);
1870                                 if (rc)
1871                                         GOTO(unlock, rc);
1872                                 else
1873                                         ma->ma_attr.la_valid |= LA_MODE;
1874                         }
1875
1876                         pfid = spec->u.sp_ea.fid;
1877                 }
1878 #endif
1879                 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1880                                            spec);
1881         }
1882         EXIT;
1883 unlock:
1884         if (rc == 0)
1885                 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1886         mdd_write_unlock(env, mdd_obj);
1887
1888         mdd_trans_stop(env, mdd, rc, handle);
1889 out_pending:
1890 #ifdef HAVE_QUOTA_SUPPORT
1891         if (quota_opc) {
1892                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1893                                       inode_pending, 0);
1894                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1895                                       block_pending, 1);
1896                 /* Trigger dqacq on the owner of child. If failed,
1897                  * the next call for lquota_chkquota will process it. */
1898                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1899                               quota_opc);
1900         }
1901 #endif
1902         return rc;
1903 }
1904
1905 /* partial link */
1906 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1907                        const struct md_attr *ma)
1908 {
1909         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1910         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1911         struct mdd_device *mdd = mdo2mdd(obj);
1912         struct thandle *handle;
1913         int rc;
1914         ENTRY;
1915
1916         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1917         handle = mdd_trans_start(env, mdd);
1918         if (IS_ERR(handle))
1919                 RETURN(-ENOMEM);
1920
1921         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1922         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1923         if (rc == 0)
1924                 __mdd_ref_add(env, mdd_obj, handle);
1925         mdd_write_unlock(env, mdd_obj);
1926         if (rc == 0) {
1927                 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1928                 la_copy->la_ctime = ma->ma_attr.la_ctime;
1929
1930                 la_copy->la_valid = LA_CTIME;
1931                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1932                                                         handle, 0);
1933         }
1934         mdd_trans_stop(env, mdd, 0, handle);
1935
1936         RETURN(rc);
1937 }
1938
1939 /*
1940  * do NOT or the MAY_*'s, you'll get the weakest
1941  */
1942 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1943 {
1944         int res = 0;
1945
1946         /* Sadly, NFSD reopens a file repeatedly during operation, so the
1947          * "acc_mode = 0" allowance for newly-created files isn't honoured.
1948          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1949          * owner can write to a file even if it is marked readonly to hide
1950          * its brokenness. (bug 5781) */
1951         if (flags & MDS_OPEN_OWNEROVERRIDE) {
1952                 struct md_ucred *uc = md_ucred(env);
1953
1954                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1955                     (la->la_uid == uc->mu_fsuid))
1956                         return 0;
1957         }
1958
1959         if (flags & FMODE_READ)
1960                 res |= MAY_READ;
1961         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1962                 res |= MAY_WRITE;
1963         if (flags & MDS_FMODE_EXEC)
1964                 res = MAY_EXEC;
1965         return res;
1966 }
1967
1968 static int mdd_open_sanity_check(const struct lu_env *env,
1969                                  struct mdd_object *obj, int flag)
1970 {
1971         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1972         int mode, rc;
1973         ENTRY;
1974
1975         /* EEXIST check */
1976         if (mdd_is_dead_obj(obj))
1977                 RETURN(-ENOENT);
1978
1979         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1980         if (rc)
1981                RETURN(rc);
1982
1983         if (S_ISLNK(tmp_la->la_mode))
1984                 RETURN(-ELOOP);
1985
1986         mode = accmode(env, tmp_la, flag);
1987
1988         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1989                 RETURN(-EISDIR);
1990
1991         if (!(flag & MDS_OPEN_CREATED)) {
1992                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1993                 if (rc)
1994                         RETURN(rc);
1995         }
1996
1997         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1998             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1999                 flag &= ~MDS_OPEN_TRUNC;
2000
2001         /* For writing append-only file must open it with append mode. */
2002         if (mdd_is_append(obj)) {
2003                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2004                         RETURN(-EPERM);
2005                 if (flag & MDS_OPEN_TRUNC)
2006                         RETURN(-EPERM);
2007         }
2008
2009 #if 0
2010         /*
2011          * Now, flag -- O_NOATIME does not be packed by client.
2012          */
2013         if (flag & O_NOATIME) {
2014                 struct md_ucred *uc = md_ucred(env);
2015
2016                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2017                     (uc->mu_valid == UCRED_NEW)) &&
2018                     (uc->mu_fsuid != tmp_la->la_uid) &&
2019                     !mdd_capable(uc, CFS_CAP_FOWNER))
2020                         RETURN(-EPERM);
2021         }
2022 #endif
2023
2024         RETURN(0);
2025 }
2026
2027 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2028                     int flags)
2029 {
2030         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2031         int rc = 0;
2032
2033         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2034
2035         rc = mdd_open_sanity_check(env, mdd_obj, flags);
2036         if (rc == 0)
2037                 mdd_obj->mod_count++;
2038
2039         mdd_write_unlock(env, mdd_obj);
2040         return rc;
2041 }
2042
2043 /* return md_attr back,
2044  * if it is last unlink then return lov ea + llog cookie*/
2045 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2046                     struct md_attr *ma)
2047 {
2048         int rc = 0;
2049         ENTRY;
2050
2051         if (S_ISREG(mdd_object_type(obj))) {
2052                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2053                  * Caller must be ready for that. */
2054
2055                 rc = __mdd_lmm_get(env, obj, ma);
2056                 if ((ma->ma_valid & MA_LOV))
2057                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2058                                             obj, ma);
2059         }
2060         RETURN(rc);
2061 }
2062
2063 /*
2064  * No permission check is needed.
2065  */
2066 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2067                      struct md_attr *ma)
2068 {
2069         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2070         struct mdd_device *mdd = mdo2mdd(obj);
2071         struct thandle    *handle = NULL;
2072         int rc;
2073         int reset = 1;
2074
2075 #ifdef HAVE_QUOTA_SUPPORT
2076         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2077         struct mds_obd *mds = &obd->u.mds;
2078         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2079         int quota_opc = 0;
2080 #endif
2081         ENTRY;
2082
2083         /* check without any lock */
2084         if (mdd_obj->mod_count == 1 &&
2085             (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2086  again:
2087                 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
2088                 if (rc)
2089                         RETURN(rc);
2090                 handle = mdd_trans_start(env, mdo2mdd(obj));
2091                 if (IS_ERR(handle))
2092                         RETURN(PTR_ERR(handle));
2093         }
2094
2095         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2096         if (handle == NULL &&
2097             mdd_obj->mod_count == 1 &&
2098             (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2099                 mdd_write_unlock(env, mdd_obj);
2100                 goto again;
2101         }
2102
2103         /* release open count */
2104         mdd_obj->mod_count --;
2105
2106         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2107                 /* remove link to object from orphan index */
2108                 rc = __mdd_orphan_del(env, mdd_obj, handle);
2109                 if (rc == 0) {
2110                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2111                                "list, OSS objects to be destroyed.\n",
2112                                PFID(mdd_object_fid(mdd_obj)));
2113                 } else {
2114                         CERROR("Object "DFID" can not be deleted from orphan "
2115                                 "list, maybe cause OST objects can not be "
2116                                 "destroyed (err: %d).\n",
2117                                 PFID(mdd_object_fid(mdd_obj)), rc);
2118                         /* If object was not deleted from orphan list, do not
2119                          * destroy OSS objects, which will be done when next
2120                          * recovery. */
2121                         GOTO(out, rc);
2122                 }
2123         }
2124
2125         rc = mdd_iattr_get(env, mdd_obj, ma);
2126         /* Object maybe not in orphan list originally, it is rare case for
2127          * mdd_finish_unlink() failure. */
2128         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
2129 #ifdef HAVE_QUOTA_SUPPORT
2130                 if (mds->mds_quota) {
2131                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2132                         mdd_quota_wrapper(&ma->ma_attr, qids);
2133                 }
2134 #endif
2135                 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2136                 if (ma->ma_valid & MA_FLAGS &&
2137                     ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2138                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2139                 } else {
2140                         rc = mdd_object_kill(env, mdd_obj, ma);
2141                                 if (rc == 0)
2142                                         reset = 0;
2143                 }
2144
2145                 if (rc != 0)
2146                         CERROR("Error when prepare to delete Object "DFID" , "
2147                                "which will cause OST objects can not be "
2148                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
2149         }
2150         EXIT;
2151
2152 out:
2153         if (reset)
2154                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2155
2156         mdd_write_unlock(env, mdd_obj);
2157         if (handle != NULL)
2158                 mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
2159 #ifdef HAVE_QUOTA_SUPPORT
2160         if (quota_opc)
2161                 /* Trigger dqrel on the owner of child. If failed,
2162                  * the next call for lquota_chkquota will process it */
2163                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2164                               quota_opc);
2165 #endif
2166         return rc;
2167 }
2168
2169 /*
2170  * Permission check is done when open,
2171  * no need check again.
2172  */
2173 static int mdd_readpage_sanity_check(const struct lu_env *env,
2174                                      struct mdd_object *obj)
2175 {
2176         struct dt_object *next = mdd_object_child(obj);
2177         int rc;
2178         ENTRY;
2179
2180         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2181                 rc = 0;
2182         else
2183                 rc = -ENOTDIR;
2184
2185         RETURN(rc);
2186 }
2187
2188 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2189                               int first, void *area, int nob,
2190                               const struct dt_it_ops *iops, struct dt_it *it,
2191                               __u64 *start, __u64 *end,
2192                               struct lu_dirent **last, __u32 attr)
2193 {
2194         int                     result;
2195         __u64                   hash = 0;
2196         struct lu_dirent       *ent;
2197
2198         if (first) {
2199                 memset(area, 0, sizeof (struct lu_dirpage));
2200                 area += sizeof (struct lu_dirpage);
2201                 nob  -= sizeof (struct lu_dirpage);
2202         }
2203
2204         ent  = area;
2205         do {
2206                 int    len;
2207                 int    recsize;
2208
2209                 len  = iops->key_size(env, it);
2210
2211                 /* IAM iterator can return record with zero len. */
2212                 if (len == 0)
2213                         goto next;
2214
2215                 hash = iops->store(env, it);
2216                 if (unlikely(first)) {
2217                         first = 0;
2218                         *start = hash;
2219                 }
2220
2221                 /* calculate max space required for lu_dirent */
2222                 recsize = lu_dirent_calc_size(len, attr);
2223
2224                 if (nob >= recsize) {
2225                         result = iops->rec(env, it, ent, attr);
2226                         if (result == -ESTALE)
2227                                 goto next;
2228                         if (result != 0)
2229                                 goto out;
2230
2231                         /* osd might not able to pack all attributes,
2232                          * so recheck rec length */
2233                         recsize = le16_to_cpu(ent->lde_reclen);
2234                 } else {
2235                         /*
2236                          * record doesn't fit into page, enlarge previous one.
2237                          */
2238                         if (*last) {
2239                                 (*last)->lde_reclen =
2240                                         cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
2241                                                         nob);
2242                                 result = 0;
2243                         } else
2244                                 result = -EINVAL;
2245
2246                         goto out;
2247                 }
2248                 *last = ent;
2249                 ent = (void *)ent + recsize;
2250                 nob -= recsize;
2251
2252 next:
2253                 result = iops->next(env, it);
2254                 if (result == -ESTALE)
2255                         goto next;
2256         } while (result == 0);
2257
2258 out:
2259         *end = hash;
2260         return result;
2261 }
2262
2263 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2264                           const struct lu_rdpg *rdpg)
2265 {
2266         struct dt_it      *it;
2267         struct dt_object  *next = mdd_object_child(obj);
2268         const struct dt_it_ops  *iops;
2269         struct page       *pg;
2270         struct lu_dirent  *last = NULL;
2271         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2272         int i;
2273         int rc;
2274         int nob;
2275         __u64 hash_start;
2276         __u64 hash_end = 0;
2277
2278         LASSERT(rdpg->rp_pages != NULL);
2279         LASSERT(next->do_index_ops != NULL);
2280
2281         if (rdpg->rp_count <= 0)
2282                 return -EFAULT;
2283
2284         /*
2285          * iterate through directory and fill pages from @rdpg
2286          */
2287         iops = &next->do_index_ops->dio_it;
2288         it = iops->init(env, next, mdd_object_capa(env, obj));
2289         if (IS_ERR(it))
2290                 return PTR_ERR(it);
2291
2292         rc = iops->load(env, it, rdpg->rp_hash);
2293
2294         if (rc == 0){
2295                 /*
2296                  * Iterator didn't find record with exactly the key requested.
2297                  *
2298                  * It is currently either
2299                  *
2300                  *     - positioned above record with key less than
2301                  *     requested---skip it.
2302                  *
2303                  *     - or not positioned at all (is in IAM_IT_SKEWED
2304                  *     state)---position it on the next item.
2305                  */
2306                 rc = iops->next(env, it);
2307         } else if (rc > 0)
2308                 rc = 0;
2309
2310         /*
2311          * At this point and across for-loop:
2312          *
2313          *  rc == 0 -> ok, proceed.
2314          *  rc >  0 -> end of directory.
2315          *  rc <  0 -> error.
2316          */
2317         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2318              i++, nob -= CFS_PAGE_SIZE) {
2319                 LASSERT(i < rdpg->rp_npages);
2320                 pg = rdpg->rp_pages[i];
2321                 rc = mdd_dir_page_build(env, mdd, !i, cfs_kmap(pg),
2322                                         min_t(int, nob, CFS_PAGE_SIZE), iops,
2323                                         it, &hash_start, &hash_end, &last,
2324                                         rdpg->rp_attrs);
2325                 if (rc != 0 || i == rdpg->rp_npages - 1) {
2326                         if (last)
2327                                 last->lde_reclen = 0;
2328                 }
2329                 cfs_kunmap(pg);
2330         }
2331         if (rc > 0) {
2332                 /*
2333                  * end of directory.
2334                  */
2335                 hash_end = DIR_END_OFF;
2336                 rc = 0;
2337         }
2338         if (rc == 0) {
2339                 struct lu_dirpage *dp;
2340
2341                 dp = cfs_kmap(rdpg->rp_pages[0]);
2342                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2343                 dp->ldp_hash_end   = cpu_to_le64(hash_end);
2344                 if (i == 0)
2345                         /*
2346                          * No pages were processed, mark this.
2347                          */
2348                         dp->ldp_flags |= LDF_EMPTY;
2349
2350                 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2351                 cfs_kunmap(rdpg->rp_pages[0]);
2352         }
2353         iops->put(env, it);
2354         iops->fini(env, it);
2355
2356         return rc;
2357 }
2358
2359 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2360                  const struct lu_rdpg *rdpg)
2361 {
2362         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2363         int rc;
2364         ENTRY;
2365
2366         LASSERT(mdd_object_exists(mdd_obj));
2367
2368         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2369         rc = mdd_readpage_sanity_check(env, mdd_obj);
2370         if (rc)
2371                 GOTO(out_unlock, rc);
2372
2373         if (mdd_is_dead_obj(mdd_obj)) {
2374                 struct page *pg;
2375                 struct lu_dirpage *dp;
2376
2377                 /*
2378                  * According to POSIX, please do not return any entry to client:
2379                  * even dot and dotdot should not be returned.
2380                  */
2381                 CWARN("readdir from dead object: "DFID"\n",
2382                         PFID(mdd_object_fid(mdd_obj)));
2383
2384                 if (rdpg->rp_count <= 0)
2385                         GOTO(out_unlock, rc = -EFAULT);
2386                 LASSERT(rdpg->rp_pages != NULL);
2387
2388                 pg = rdpg->rp_pages[0];
2389                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2390                 memset(dp, 0 , sizeof(struct lu_dirpage));
2391                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2392                 dp->ldp_hash_end   = cpu_to_le64(DIR_END_OFF);
2393                 dp->ldp_flags |= LDF_EMPTY;
2394                 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2395                 cfs_kunmap(pg);
2396                 GOTO(out_unlock, rc = 0);
2397         }
2398
2399         rc = __mdd_readpage(env, mdd_obj, rdpg);
2400
2401         EXIT;
2402 out_unlock:
2403         mdd_read_unlock(env, mdd_obj);
2404         return rc;
2405 }
2406
2407 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2408 {
2409         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2410         struct dt_object *next;
2411
2412         LASSERT(mdd_object_exists(mdd_obj));
2413         next = mdd_object_child(mdd_obj);
2414         return next->do_ops->do_object_sync(env, next);
2415 }
2416
2417 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2418                                         struct md_object *obj)
2419 {
2420         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2421
2422         LASSERT(mdd_object_exists(mdd_obj));
2423         return do_version_get(env, mdd_object_child(mdd_obj));
2424 }
2425
2426 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2427                             dt_obj_version_t version)
2428 {
2429         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2430
2431         LASSERT(mdd_object_exists(mdd_obj));
2432         do_version_set(env, mdd_object_child(mdd_obj), version);
2433 }
2434
2435 const struct md_object_operations mdd_obj_ops = {
2436         .moo_permission    = mdd_permission,
2437         .moo_attr_get      = mdd_attr_get,
2438         .moo_attr_set      = mdd_attr_set,
2439         .moo_xattr_get     = mdd_xattr_get,
2440         .moo_xattr_set     = mdd_xattr_set,
2441         .moo_xattr_list    = mdd_xattr_list,
2442         .moo_xattr_del     = mdd_xattr_del,
2443         .moo_object_create = mdd_object_create,
2444         .moo_ref_add       = mdd_ref_add,
2445         .moo_ref_del       = mdd_ref_del,
2446         .moo_open          = mdd_open,
2447         .moo_close         = mdd_close,
2448         .moo_readpage      = mdd_readpage,
2449         .moo_readlink      = mdd_readlink,
2450         .moo_changelog     = mdd_changelog,
2451         .moo_capa_get      = mdd_capa_get,
2452         .moo_object_sync   = mdd_object_sync,
2453         .moo_version_get   = mdd_version_get,
2454         .moo_version_set   = mdd_version_set,
2455         .moo_path          = mdd_path,
2456         .moo_file_lock     = mdd_file_lock,
2457         .moo_file_unlock   = mdd_file_unlock,
2458 };