Whamcloud - gitweb
0b12f777a9f5b8f2ec8aa3e50af85815d9a79903
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_object.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49 #ifdef HAVE_EXT4_LDISKFS
50 #include <ldiskfs/ldiskfs_jbd2.h>
51 #else
52 #include <linux/jbd.h>
53 #endif
54 #include <obd.h>
55 #include <obd_class.h>
56 #include <obd_support.h>
57 #include <lprocfs_status.h>
58 /* fid_be_cpu(), fid_cpu_to_be(). */
59 #include <lustre_fid.h>
60
61 #include <lustre_param.h>
62 #ifdef HAVE_EXT4_LDISKFS
63 #include <ldiskfs/ldiskfs.h>
64 #else
65 #include <linux/ldiskfs_fs.h>
66 #endif
67 #include <lustre_mds.h>
68 #include <lustre/lustre_idl.h>
69
70 #include "mdd_internal.h"
71
72 static const struct lu_object_operations mdd_lu_obj_ops;
73
74 static int mdd_xattr_get(const struct lu_env *env,
75                          struct md_object *obj, struct lu_buf *buf,
76                          const char *name);
77
78 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
79                  void **data)
80 {
81         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
82                  PFID(mdd_object_fid(obj)));
83         mdo_data_get(env, obj, data);
84         return 0;
85 }
86
87 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
88                struct lu_attr *la, struct lustre_capa *capa)
89 {
90         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
91                  PFID(mdd_object_fid(obj)));
92         return mdo_attr_get(env, obj, la, capa);
93 }
94
95 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
96 {
97         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
98
99         if (flags & LUSTRE_APPEND_FL)
100                 obj->mod_flags |= APPEND_OBJ;
101
102         if (flags & LUSTRE_IMMUTABLE_FL)
103                 obj->mod_flags |= IMMUTE_OBJ;
104 }
105
106 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
107 {
108         struct mdd_thread_info *info;
109
110         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
111         LASSERT(info != NULL);
112         return info;
113 }
114
115 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
116 {
117         struct lu_buf *buf;
118
119         buf = &mdd_env_info(env)->mti_buf;
120         buf->lb_buf = area;
121         buf->lb_len = len;
122         return buf;
123 }
124
125 void mdd_buf_put(struct lu_buf *buf)
126 {
127         if (buf == NULL || buf->lb_buf == NULL)
128                 return;
129         OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
130         buf->lb_buf = NULL;
131         buf->lb_len = 0;
132 }
133
134 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
135                                        const void *area, ssize_t len)
136 {
137         struct lu_buf *buf;
138
139         buf = &mdd_env_info(env)->mti_buf;
140         buf->lb_buf = (void *)area;
141         buf->lb_len = len;
142         return buf;
143 }
144
145 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
146 {
147         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
148
149         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
150                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
151                 buf->lb_buf = NULL;
152         }
153         if (buf->lb_buf == NULL) {
154                 buf->lb_len = len;
155                 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
156                 if (buf->lb_buf == NULL)
157                         buf->lb_len = 0;
158         }
159         return buf;
160 }
161
162 /** Increase the size of the \a mti_big_buf.
163  * preserves old data in buffer
164  * old buffer remains unchanged on error
165  * \retval 0 or -ENOMEM
166  */
167 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
168 {
169         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
170         struct lu_buf buf;
171
172         LASSERT(len >= oldbuf->lb_len);
173         OBD_ALLOC_LARGE(buf.lb_buf, len);
174
175         if (buf.lb_buf == NULL)
176                 return -ENOMEM;
177
178         buf.lb_len = len;
179         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
180
181         OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
182
183         memcpy(oldbuf, &buf, sizeof(buf));
184
185         return 0;
186 }
187
188 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
189                                        struct mdd_device *mdd)
190 {
191         struct mdd_thread_info *mti = mdd_env_info(env);
192         int                     max_cookie_size;
193
194         max_cookie_size = mdd_lov_cookiesize(env, mdd);
195         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
196                 if (mti->mti_max_cookie)
197                         OBD_FREE_LARGE(mti->mti_max_cookie,
198                                        mti->mti_max_cookie_size);
199                 mti->mti_max_cookie = NULL;
200                 mti->mti_max_cookie_size = 0;
201         }
202         if (unlikely(mti->mti_max_cookie == NULL)) {
203                 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
204                 if (likely(mti->mti_max_cookie != NULL))
205                         mti->mti_max_cookie_size = max_cookie_size;
206         }
207         if (likely(mti->mti_max_cookie != NULL))
208                 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
209         return mti->mti_max_cookie;
210 }
211
212 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
213                                    struct mdd_device *mdd)
214 {
215         struct mdd_thread_info *mti = mdd_env_info(env);
216         int                     max_lmm_size;
217
218         max_lmm_size = mdd_lov_mdsize(env, mdd);
219         if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
220                 if (mti->mti_max_lmm)
221                         OBD_FREE_LARGE(mti->mti_max_lmm, mti->mti_max_lmm_size);
222                 mti->mti_max_lmm = NULL;
223                 mti->mti_max_lmm_size = 0;
224         }
225         if (unlikely(mti->mti_max_lmm == NULL)) {
226                 OBD_ALLOC_LARGE(mti->mti_max_lmm, max_lmm_size);
227                 if (likely(mti->mti_max_lmm != NULL))
228                         mti->mti_max_lmm_size = max_lmm_size;
229         }
230         return mti->mti_max_lmm;
231 }
232
233 struct lu_object *mdd_object_alloc(const struct lu_env *env,
234                                    const struct lu_object_header *hdr,
235                                    struct lu_device *d)
236 {
237         struct mdd_object *mdd_obj;
238
239         OBD_ALLOC_PTR(mdd_obj);
240         if (mdd_obj != NULL) {
241                 struct lu_object *o;
242
243                 o = mdd2lu_obj(mdd_obj);
244                 lu_object_init(o, NULL, d);
245                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
246                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
247                 mdd_obj->mod_count = 0;
248                 o->lo_ops = &mdd_lu_obj_ops;
249                 return o;
250         } else {
251                 return NULL;
252         }
253 }
254
255 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
256                            const struct lu_object_conf *unused)
257 {
258         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
259         struct mdd_object *mdd_obj = lu2mdd_obj(o);
260         struct lu_object  *below;
261         struct lu_device  *under;
262         ENTRY;
263
264         mdd_obj->mod_cltime = 0;
265         under = &d->mdd_child->dd_lu_dev;
266         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
267         mdd_pdlock_init(mdd_obj);
268         if (below == NULL)
269                 RETURN(-ENOMEM);
270
271         lu_object_add(o, below);
272
273         RETURN(0);
274 }
275
276 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
277 {
278         if (lu_object_exists(o))
279                 return mdd_get_flags(env, lu2mdd_obj(o));
280         else
281                 return 0;
282 }
283
284 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
285 {
286         struct mdd_object *mdd = lu2mdd_obj(o);
287
288         lu_object_fini(o);
289         OBD_FREE_PTR(mdd);
290 }
291
292 static int mdd_object_print(const struct lu_env *env, void *cookie,
293                             lu_printer_t p, const struct lu_object *o)
294 {
295         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
296         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
297                     "valid=%x, cltime="LPU64", flags=%lx)",
298                     mdd, mdd->mod_count, mdd->mod_valid,
299                     mdd->mod_cltime, mdd->mod_flags);
300 }
301
302 static const struct lu_object_operations mdd_lu_obj_ops = {
303         .loo_object_init    = mdd_object_init,
304         .loo_object_start   = mdd_object_start,
305         .loo_object_free    = mdd_object_free,
306         .loo_object_print   = mdd_object_print,
307 };
308
309 struct mdd_object *mdd_object_find(const struct lu_env *env,
310                                    struct mdd_device *d,
311                                    const struct lu_fid *f)
312 {
313         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
314 }
315
316 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
317                         const char *path, struct lu_fid *fid)
318 {
319         struct lu_buf *buf;
320         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
321         struct mdd_object *obj;
322         struct lu_name *lname = &mdd_env_info(env)->mti_name;
323         char *name;
324         int rc = 0;
325         ENTRY;
326
327         /* temp buffer for path element */
328         buf = mdd_buf_alloc(env, PATH_MAX);
329         if (buf->lb_buf == NULL)
330                 RETURN(-ENOMEM);
331
332         lname->ln_name = name = buf->lb_buf;
333         lname->ln_namelen = 0;
334         *f = mdd->mdd_root_fid;
335
336         while(1) {
337                 while (*path == '/')
338                         path++;
339                 if (*path == '\0')
340                         break;
341                 while (*path != '/' && *path != '\0') {
342                         *name = *path;
343                         path++;
344                         name++;
345                         lname->ln_namelen++;
346                 }
347
348                 *name = '\0';
349                 /* find obj corresponding to fid */
350                 obj = mdd_object_find(env, mdd, f);
351                 if (obj == NULL)
352                         GOTO(out, rc = -EREMOTE);
353                 if (IS_ERR(obj))
354                         GOTO(out, rc = PTR_ERR(obj));
355                 /* get child fid from parent and name */
356                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
357                 mdd_object_put(env, obj);
358                 if (rc)
359                         break;
360
361                 name = buf->lb_buf;
362                 lname->ln_namelen = 0;
363         }
364
365         if (!rc)
366                 *fid = *f;
367 out:
368         RETURN(rc);
369 }
370
371 /** The maximum depth that fid2path() will search.
372  * This is limited only because we want to store the fids for
373  * historical path lookup purposes.
374  */
375 #define MAX_PATH_DEPTH 100
376
377 /** mdd_path() lookup structure. */
378 struct path_lookup_info {
379         __u64                pli_recno;        /**< history point */
380         __u64                pli_currec;       /**< current record */
381         struct lu_fid        pli_fid;
382         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
383         struct mdd_object   *pli_mdd_obj;
384         char                *pli_path;         /**< full path */
385         int                  pli_pathlen;
386         int                  pli_linkno;       /**< which hardlink to follow */
387         int                  pli_fidcount;     /**< number of \a pli_fids */
388 };
389
390 static int mdd_path_current(const struct lu_env *env,
391                             struct path_lookup_info *pli)
392 {
393         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
394         struct mdd_object *mdd_obj;
395         struct lu_buf     *buf = NULL;
396         struct link_ea_header *leh;
397         struct link_ea_entry  *lee;
398         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
399         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
400         char *ptr;
401         int reclen;
402         int rc;
403         ENTRY;
404
405         ptr = pli->pli_path + pli->pli_pathlen - 1;
406         *ptr = 0;
407         --ptr;
408         pli->pli_fidcount = 0;
409         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
410
411         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
412                 mdd_obj = mdd_object_find(env, mdd,
413                                           &pli->pli_fids[pli->pli_fidcount]);
414                 if (mdd_obj == NULL)
415                         GOTO(out, rc = -EREMOTE);
416                 if (IS_ERR(mdd_obj))
417                         GOTO(out, rc = PTR_ERR(mdd_obj));
418                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
419                 if (rc <= 0) {
420                         mdd_object_put(env, mdd_obj);
421                         if (rc == -1)
422                                 rc = -EREMOTE;
423                         else if (rc == 0)
424                                 /* Do I need to error out here? */
425                                 rc = -ENOENT;
426                         GOTO(out, rc);
427                 }
428
429                 /* Get parent fid and object name */
430                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
431                 buf = mdd_links_get(env, mdd_obj);
432                 mdd_read_unlock(env, mdd_obj);
433                 mdd_object_put(env, mdd_obj);
434                 if (IS_ERR(buf))
435                         GOTO(out, rc = PTR_ERR(buf));
436
437                 leh = buf->lb_buf;
438                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
439                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
440
441                 /* If set, use link #linkno for path lookup, otherwise use
442                    link #0.  Only do this for the final path element. */
443                 if ((pli->pli_fidcount == 0) &&
444                     (pli->pli_linkno < leh->leh_reccount)) {
445                         int count;
446                         for (count = 0; count < pli->pli_linkno; count++) {
447                                 lee = (struct link_ea_entry *)
448                                      ((char *)lee + reclen);
449                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
450                         }
451                         if (pli->pli_linkno < leh->leh_reccount - 1)
452                                 /* indicate to user there are more links */
453                                 pli->pli_linkno++;
454                 }
455
456                 /* Pack the name in the end of the buffer */
457                 ptr -= tmpname->ln_namelen;
458                 if (ptr - 1 <= pli->pli_path)
459                         GOTO(out, rc = -EOVERFLOW);
460                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
461                 *(--ptr) = '/';
462
463                 /* Store the parent fid for historic lookup */
464                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
465                         GOTO(out, rc = -EOVERFLOW);
466                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
467         }
468
469         /* Verify that our path hasn't changed since we started the lookup.
470            Record the current index, and verify the path resolves to the
471            same fid. If it does, then the path is correct as of this index. */
472         cfs_spin_lock(&mdd->mdd_cl.mc_lock);
473         pli->pli_currec = mdd->mdd_cl.mc_index;
474         cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
475         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
476         if (rc) {
477                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
478                 GOTO (out, rc = -EAGAIN);
479         }
480         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
481                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
482                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
483                        PFID(&pli->pli_fid));
484                 GOTO(out, rc = -EAGAIN);
485         }
486         ptr++; /* skip leading / */
487         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
488
489         EXIT;
490 out:
491         if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
492                 /* if we vmalloced a large buffer drop it */
493                 mdd_buf_put(buf);
494
495         return rc;
496 }
497
498 static int mdd_path_historic(const struct lu_env *env,
499                              struct path_lookup_info *pli)
500 {
501         return 0;
502 }
503
504 /* Returns the full path to this fid, as of changelog record recno. */
505 static int mdd_path(const struct lu_env *env, struct md_object *obj,
506                     char *path, int pathlen, __u64 *recno, int *linkno)
507 {
508         struct path_lookup_info *pli;
509         int tries = 3;
510         int rc = -EAGAIN;
511         ENTRY;
512
513         if (pathlen < 3)
514                 RETURN(-EOVERFLOW);
515
516         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
517                 path[0] = '\0';
518                 RETURN(0);
519         }
520
521         OBD_ALLOC_PTR(pli);
522         if (pli == NULL)
523                 RETURN(-ENOMEM);
524
525         pli->pli_mdd_obj = md2mdd_obj(obj);
526         pli->pli_recno = *recno;
527         pli->pli_path = path;
528         pli->pli_pathlen = pathlen;
529         pli->pli_linkno = *linkno;
530
531         /* Retry multiple times in case file is being moved */
532         while (tries-- && rc == -EAGAIN)
533                 rc = mdd_path_current(env, pli);
534
535         /* For historical path lookup, the current links may not have existed
536          * at "recno" time.  We must switch over to earlier links/parents
537          * by using the changelog records.  If the earlier parent doesn't
538          * exist, we must search back through the changelog to reconstruct
539          * its parents, then check if it exists, etc.
540          * We may ignore this problem for the initial implementation and
541          * state that an "original" hardlink must still exist for us to find
542          * historic path name. */
543         if (pli->pli_recno != -1) {
544                 rc = mdd_path_historic(env, pli);
545         } else {
546                 *recno = pli->pli_currec;
547                 /* Return next link index to caller */
548                 *linkno = pli->pli_linkno;
549         }
550
551         OBD_FREE_PTR(pli);
552
553         RETURN (rc);
554 }
555
556 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
557 {
558         struct lu_attr *la = &mdd_env_info(env)->mti_la;
559         int rc;
560
561         ENTRY;
562         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
563         if (rc == 0) {
564                 mdd_flags_xlate(obj, la->la_flags);
565                 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
566                         obj->mod_flags |= MNLINK_OBJ;
567         }
568         RETURN(rc);
569 }
570
571 /* get only inode attributes */
572 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
573                   struct md_attr *ma)
574 {
575         int rc = 0;
576         ENTRY;
577
578         if (ma->ma_valid & MA_INODE)
579                 RETURN(0);
580
581         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
582                           mdd_object_capa(env, mdd_obj));
583         if (rc == 0)
584                 ma->ma_valid |= MA_INODE;
585         RETURN(rc);
586 }
587
588 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
589 {
590         struct lov_desc *ldesc;
591         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
592         struct lov_user_md *lum = (struct lov_user_md*)lmm;
593         ENTRY;
594
595         if (!lum)
596                 RETURN(0);
597
598         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
599         LASSERT(ldesc != NULL);
600
601         lum->lmm_magic = LOV_MAGIC_V1;
602         lum->lmm_object_seq = LOV_OBJECT_GROUP_DEFAULT;
603         lum->lmm_pattern = ldesc->ld_pattern;
604         lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
605         lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
606         lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
607
608         RETURN(sizeof(*lum));
609 }
610
611 /* get lov EA only */
612 static int __mdd_lmm_get(const struct lu_env *env,
613                          struct mdd_object *mdd_obj, struct md_attr *ma)
614 {
615         int rc;
616         ENTRY;
617
618         if (ma->ma_valid & MA_LOV)
619                 RETURN(0);
620
621         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
622                         XATTR_NAME_LOV);
623         if (rc == 0 && (ma->ma_need & MA_LOV_DEF))
624                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
625         if (rc > 0) {
626                 ma->ma_lmm_size = rc;
627                 ma->ma_valid |= MA_LOV;
628                 rc = 0;
629         }
630         RETURN(rc);
631 }
632
633 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
634                        struct md_attr *ma)
635 {
636         int rc;
637         ENTRY;
638
639         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
640         rc = __mdd_lmm_get(env, mdd_obj, ma);
641         mdd_read_unlock(env, mdd_obj);
642         RETURN(rc);
643 }
644
645 /* get lmv EA only*/
646 static int __mdd_lmv_get(const struct lu_env *env,
647                          struct mdd_object *mdd_obj, struct md_attr *ma)
648 {
649         int rc;
650         ENTRY;
651
652         if (ma->ma_valid & MA_LMV)
653                 RETURN(0);
654
655         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
656                         XATTR_NAME_LMV);
657         if (rc > 0) {
658                 ma->ma_valid |= MA_LMV;
659                 rc = 0;
660         }
661         RETURN(rc);
662 }
663
664 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
665                          struct md_attr *ma)
666 {
667         struct mdd_thread_info *info = mdd_env_info(env);
668         struct lustre_mdt_attrs *lma =
669                                  (struct lustre_mdt_attrs *)info->mti_xattr_buf;
670         int lma_size;
671         int rc;
672         ENTRY;
673
674         /* If all needed data are already valid, nothing to do */
675         if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
676             (ma->ma_need & (MA_HSM | MA_SOM)))
677                 RETURN(0);
678
679         /* Read LMA from disk EA */
680         lma_size = sizeof(info->mti_xattr_buf);
681         rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
682         if (rc <= 0)
683                 RETURN(rc);
684
685         /* Useless to check LMA incompatibility because this is already done in
686          * osd_ea_fid_get(), and this will fail long before this code is
687          * called.
688          * So, if we are here, LMA is compatible.
689          */
690
691         lustre_lma_swab(lma);
692
693         /* Swab and copy LMA */
694         if (ma->ma_need & MA_HSM) {
695                 if (lma->lma_compat & LMAC_HSM)
696                         ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
697                 else
698                         ma->ma_hsm.mh_flags = 0;
699                 ma->ma_valid |= MA_HSM;
700         }
701
702         /* Copy SOM */
703         if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
704                 LASSERT(ma->ma_som != NULL);
705                 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
706                 ma->ma_som->msd_size    = lma->lma_som_size;
707                 ma->ma_som->msd_blocks  = lma->lma_som_blocks;
708                 ma->ma_som->msd_mountid = lma->lma_som_mountid;
709                 ma->ma_valid |= MA_SOM;
710         }
711
712         RETURN(0);
713 }
714
715 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
716                                  struct md_attr *ma)
717 {
718         int rc = 0;
719         ENTRY;
720
721         if (ma->ma_need & MA_INODE)
722                 rc = mdd_iattr_get(env, mdd_obj, ma);
723
724         if (rc == 0 && ma->ma_need & MA_LOV) {
725                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
726                     S_ISDIR(mdd_object_type(mdd_obj)))
727                         rc = __mdd_lmm_get(env, mdd_obj, ma);
728         }
729         if (rc == 0 && ma->ma_need & MA_LMV) {
730                 if (S_ISDIR(mdd_object_type(mdd_obj)))
731                         rc = __mdd_lmv_get(env, mdd_obj, ma);
732         }
733         if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
734                 if (S_ISREG(mdd_object_type(mdd_obj)))
735                         rc = __mdd_lma_get(env, mdd_obj, ma);
736         }
737 #ifdef CONFIG_FS_POSIX_ACL
738         if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
739                 if (S_ISDIR(mdd_object_type(mdd_obj)))
740                         rc = mdd_def_acl_get(env, mdd_obj, ma);
741         }
742 #endif
743         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
744                rc, ma->ma_valid, ma->ma_lmm);
745         RETURN(rc);
746 }
747
748 int mdd_attr_get_internal_locked(const struct lu_env *env,
749                                  struct mdd_object *mdd_obj, struct md_attr *ma)
750 {
751         int rc;
752         int needlock = ma->ma_need &
753                        (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM);
754
755         if (needlock)
756                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
757         rc = mdd_attr_get_internal(env, mdd_obj, ma);
758         if (needlock)
759                 mdd_read_unlock(env, mdd_obj);
760         return rc;
761 }
762
763 /*
764  * No permission check is needed.
765  */
766 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
767                         struct md_attr *ma)
768 {
769         struct mdd_object *mdd_obj = md2mdd_obj(obj);
770         int                rc;
771
772         ENTRY;
773         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
774         RETURN(rc);
775 }
776
777 /*
778  * No permission check is needed.
779  */
780 static int mdd_xattr_get(const struct lu_env *env,
781                          struct md_object *obj, struct lu_buf *buf,
782                          const char *name)
783 {
784         struct mdd_object *mdd_obj = md2mdd_obj(obj);
785         int rc;
786
787         ENTRY;
788
789         LASSERT(mdd_object_exists(mdd_obj));
790
791         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
792         rc = mdo_xattr_get(env, mdd_obj, buf, name,
793                            mdd_object_capa(env, mdd_obj));
794         mdd_read_unlock(env, mdd_obj);
795
796         RETURN(rc);
797 }
798
799 /*
800  * Permission check is done when open,
801  * no need check again.
802  */
803 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
804                         struct lu_buf *buf)
805 {
806         struct mdd_object *mdd_obj = md2mdd_obj(obj);
807         struct dt_object  *next;
808         loff_t             pos = 0;
809         int                rc;
810         ENTRY;
811
812         LASSERT(mdd_object_exists(mdd_obj));
813
814         next = mdd_object_child(mdd_obj);
815         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
816         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
817                                          mdd_object_capa(env, mdd_obj));
818         mdd_read_unlock(env, mdd_obj);
819         RETURN(rc);
820 }
821
822 /*
823  * No permission check is needed.
824  */
825 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
826                           struct lu_buf *buf)
827 {
828         struct mdd_object *mdd_obj = md2mdd_obj(obj);
829         int rc;
830
831         ENTRY;
832
833         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
834         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
835         mdd_read_unlock(env, mdd_obj);
836
837         RETURN(rc);
838 }
839
840 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
841                                struct mdd_object *c, struct md_attr *ma,
842                                struct thandle *handle,
843                                const struct md_op_spec *spec)
844 {
845         struct lu_attr *attr = &ma->ma_attr;
846         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
847         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
848         const struct dt_index_features *feat = spec->sp_feat;
849         int rc;
850         ENTRY;
851
852         if (!mdd_object_exists(c)) {
853                 struct dt_object *next = mdd_object_child(c);
854                 LASSERT(next);
855
856                 if (feat != &dt_directory_features && feat != NULL)
857                         dof->dof_type = DFT_INDEX;
858                 else
859                         dof->dof_type = dt_mode_to_dft(attr->la_mode);
860
861                 dof->u.dof_idx.di_feat = feat;
862
863                 /* @hint will be initialized by underlying device. */
864                 next->do_ops->do_ah_init(env, hint,
865                                          p ? mdd_object_child(p) : NULL,
866                                          attr->la_mode & S_IFMT);
867
868                 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
869                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
870         } else
871                 rc = -EEXIST;
872
873         RETURN(rc);
874 }
875
876 /**
877  * Make sure the ctime is increased only.
878  */
879 static inline int mdd_attr_check(const struct lu_env *env,
880                                  struct mdd_object *obj,
881                                  struct lu_attr *attr)
882 {
883         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
884         int rc;
885         ENTRY;
886
887         if (attr->la_valid & LA_CTIME) {
888                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
889                 if (rc)
890                         RETURN(rc);
891
892                 if (attr->la_ctime < tmp_la->la_ctime)
893                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
894                 else if (attr->la_valid == LA_CTIME &&
895                          attr->la_ctime == tmp_la->la_ctime)
896                         attr->la_valid &= ~LA_CTIME;
897         }
898         RETURN(0);
899 }
900
901 int mdd_attr_set_internal(const struct lu_env *env,
902                           struct mdd_object *obj,
903                           struct lu_attr *attr,
904                           struct thandle *handle,
905                           int needacl)
906 {
907         int rc;
908         ENTRY;
909
910         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
911 #ifdef CONFIG_FS_POSIX_ACL
912         if (!rc && (attr->la_valid & LA_MODE) && needacl)
913                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
914 #endif
915         RETURN(rc);
916 }
917
918 int mdd_attr_check_set_internal(const struct lu_env *env,
919                                 struct mdd_object *obj,
920                                 struct lu_attr *attr,
921                                 struct thandle *handle,
922                                 int needacl)
923 {
924         int rc;
925         ENTRY;
926
927         rc = mdd_attr_check(env, obj, attr);
928         if (rc)
929                 RETURN(rc);
930
931         if (attr->la_valid)
932                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
933         RETURN(rc);
934 }
935
936 static int mdd_attr_set_internal_locked(const struct lu_env *env,
937                                         struct mdd_object *obj,
938                                         struct lu_attr *attr,
939                                         struct thandle *handle,
940                                         int needacl)
941 {
942         int rc;
943         ENTRY;
944
945         needacl = needacl && (attr->la_valid & LA_MODE);
946         if (needacl)
947                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
948         rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
949         if (needacl)
950                 mdd_write_unlock(env, obj);
951         RETURN(rc);
952 }
953
954 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
955                                        struct mdd_object *obj,
956                                        struct lu_attr *attr,
957                                        struct thandle *handle,
958                                        int needacl)
959 {
960         int rc;
961         ENTRY;
962
963         needacl = needacl && (attr->la_valid & LA_MODE);
964         if (needacl)
965                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
966         rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
967         if (needacl)
968                 mdd_write_unlock(env, obj);
969         RETURN(rc);
970 }
971
972 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
973                     const struct lu_buf *buf, const char *name,
974                     int fl, struct thandle *handle)
975 {
976         struct lustre_capa *capa = mdd_object_capa(env, obj);
977         int rc = -EINVAL;
978         ENTRY;
979
980         if (buf->lb_buf && buf->lb_len > 0)
981                 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
982         else if (buf->lb_buf == NULL && buf->lb_len == 0)
983                 rc = mdo_xattr_del(env, obj, name, handle, capa);
984
985         RETURN(rc);
986 }
987
988 /*
989  * This gives the same functionality as the code between
990  * sys_chmod and inode_setattr
991  * chown_common and inode_setattr
992  * utimes and inode_setattr
993  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
994  */
995 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
996                         struct lu_attr *la, const struct md_attr *ma)
997 {
998         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
999         struct md_ucred  *uc;
1000         int               rc;
1001         ENTRY;
1002
1003         if (!la->la_valid)
1004                 RETURN(0);
1005
1006         /* Do not permit change file type */
1007         if (la->la_valid & LA_TYPE)
1008                 RETURN(-EPERM);
1009
1010         /* They should not be processed by setattr */
1011         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1012                 RETURN(-EPERM);
1013
1014         /* export destroy does not have ->le_ses, but we may want
1015          * to drop LUSTRE_SOM_FL. */
1016         if (!env->le_ses)
1017                 RETURN(0);
1018
1019         uc = md_ucred(env);
1020
1021         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1022         if (rc)
1023                 RETURN(rc);
1024
1025         if (la->la_valid == LA_CTIME) {
1026                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1027                         /* This is only for set ctime when rename's source is
1028                          * on remote MDS. */
1029                         rc = mdd_may_delete(env, NULL, obj,
1030                                             (struct md_attr *)ma, 1, 0);
1031                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1032                         la->la_valid &= ~LA_CTIME;
1033                 RETURN(rc);
1034         }
1035
1036         if (la->la_valid == LA_ATIME) {
1037                 /* This is atime only set for read atime update on close. */
1038                 if (la->la_atime > tmp_la->la_atime &&
1039                     la->la_atime <= (tmp_la->la_atime +
1040                                      mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1041                         la->la_valid &= ~LA_ATIME;
1042                 RETURN(0);
1043         }
1044
1045         /* Check if flags change. */
1046         if (la->la_valid & LA_FLAGS) {
1047                 unsigned int oldflags = 0;
1048                 unsigned int newflags = la->la_flags &
1049                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1050
1051                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1052                     !mdd_capable(uc, CFS_CAP_FOWNER))
1053                         RETURN(-EPERM);
1054
1055                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1056                  * only be changed by the relevant capability. */
1057                 if (mdd_is_immutable(obj))
1058                         oldflags |= LUSTRE_IMMUTABLE_FL;
1059                 if (mdd_is_append(obj))
1060                         oldflags |= LUSTRE_APPEND_FL;
1061                 if ((oldflags ^ newflags) &&
1062                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1063                         RETURN(-EPERM);
1064
1065                 if (!S_ISDIR(tmp_la->la_mode))
1066                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1067         }
1068
1069         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1070             (la->la_valid & ~LA_FLAGS) &&
1071             !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1072                 RETURN(-EPERM);
1073
1074         /* Check for setting the obj time. */
1075         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1076             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1077                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1078                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
1079                         rc = mdd_permission_internal_locked(env, obj, tmp_la,
1080                                                             MAY_WRITE,
1081                                                             MOR_TGT_CHILD);
1082                         if (rc)
1083                                 RETURN(rc);
1084                 }
1085         }
1086
1087         /* Make sure a caller can chmod. */
1088         if (la->la_valid & LA_MODE) {
1089                 /* Bypass la_vaild == LA_MODE,
1090                  * this is for changing file with SUID or SGID. */
1091                 if ((la->la_valid & ~LA_MODE) &&
1092                     !(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1093                     (uc->mu_fsuid != tmp_la->la_uid) &&
1094                     !mdd_capable(uc, CFS_CAP_FOWNER))
1095                         RETURN(-EPERM);
1096
1097                 if (la->la_mode == (cfs_umode_t) -1)
1098                         la->la_mode = tmp_la->la_mode;
1099                 else
1100                         la->la_mode = (la->la_mode & S_IALLUGO) |
1101                                       (tmp_la->la_mode & ~S_IALLUGO);
1102
1103                 /* Also check the setgid bit! */
1104                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1105                                        la->la_gid : tmp_la->la_gid) &&
1106                     !mdd_capable(uc, CFS_CAP_FSETID))
1107                         la->la_mode &= ~S_ISGID;
1108         } else {
1109                la->la_mode = tmp_la->la_mode;
1110         }
1111
1112         /* Make sure a caller can chown. */
1113         if (la->la_valid & LA_UID) {
1114                 if (la->la_uid == (uid_t) -1)
1115                         la->la_uid = tmp_la->la_uid;
1116                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1117                     (la->la_uid != tmp_la->la_uid)) &&
1118                     !mdd_capable(uc, CFS_CAP_CHOWN))
1119                         RETURN(-EPERM);
1120
1121                 /* If the user or group of a non-directory has been
1122                  * changed by a non-root user, remove the setuid bit.
1123                  * 19981026 David C Niemi <niemi@tux.org>
1124                  *
1125                  * Changed this to apply to all users, including root,
1126                  * to avoid some races. This is the behavior we had in
1127                  * 2.0. The check for non-root was definitely wrong
1128                  * for 2.2 anyway, as it should have been using
1129                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
1130                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1131                     !S_ISDIR(tmp_la->la_mode)) {
1132                         la->la_mode &= ~S_ISUID;
1133                         la->la_valid |= LA_MODE;
1134                 }
1135         }
1136
1137         /* Make sure caller can chgrp. */
1138         if (la->la_valid & LA_GID) {
1139                 if (la->la_gid == (gid_t) -1)
1140                         la->la_gid = tmp_la->la_gid;
1141                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1142                     ((la->la_gid != tmp_la->la_gid) &&
1143                     !lustre_in_group_p(uc, la->la_gid))) &&
1144                     !mdd_capable(uc, CFS_CAP_CHOWN))
1145                         RETURN(-EPERM);
1146
1147                 /* Likewise, if the user or group of a non-directory
1148                  * has been changed by a non-root user, remove the
1149                  * setgid bit UNLESS there is no group execute bit
1150                  * (this would be a file marked for mandatory
1151                  * locking).  19981026 David C Niemi <niemi@tux.org>
1152                  *
1153                  * Removed the fsuid check (see the comment above) --
1154                  * 19990830 SD. */
1155                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1156                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1157                         la->la_mode &= ~S_ISGID;
1158                         la->la_valid |= LA_MODE;
1159                 }
1160         }
1161
1162         /* For both Size-on-MDS case and truncate case,
1163          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1164          * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1165          * For SOM case, it is true, the MAY_WRITE perm has been checked
1166          * when open, no need check again. For truncate case, it is false,
1167          * the MAY_WRITE perm should be checked here. */
1168         if (ma->ma_attr_flags & MDS_SOM) {
1169                 /* For the "Size-on-MDS" setattr update, merge coming
1170                  * attributes with the set in the inode. BUG 10641 */
1171                 if ((la->la_valid & LA_ATIME) &&
1172                     (la->la_atime <= tmp_la->la_atime))
1173                         la->la_valid &= ~LA_ATIME;
1174
1175                 /* OST attributes do not have a priority over MDS attributes,
1176                  * so drop times if ctime is equal. */
1177                 if ((la->la_valid & LA_CTIME) &&
1178                     (la->la_ctime <= tmp_la->la_ctime))
1179                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1180         } else {
1181                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1182                         if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1183                               (uc->mu_fsuid == tmp_la->la_uid)) &&
1184                             !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1185                                 rc = mdd_permission_internal_locked(env, obj,
1186                                                             tmp_la, MAY_WRITE,
1187                                                             MOR_TGT_CHILD);
1188                                 if (rc)
1189                                         RETURN(rc);
1190                         }
1191                 }
1192                 if (la->la_valid & LA_CTIME) {
1193                         /* The pure setattr, it has the priority over what is
1194                          * already set, do not drop it if ctime is equal. */
1195                         if (la->la_ctime < tmp_la->la_ctime)
1196                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1197                                                   LA_CTIME);
1198                 }
1199         }
1200
1201         RETURN(0);
1202 }
1203
1204 /** Store a data change changelog record
1205  * If this fails, we must fail the whole transaction; we don't
1206  * want the change to commit without the log entry.
1207  * \param mdd_obj - mdd_object of change
1208  * \param handle - transacion handle
1209  */
1210 static int mdd_changelog_data_store(const struct lu_env     *env,
1211                                     struct mdd_device       *mdd,
1212                                     enum changelog_rec_type type,
1213                                     int                     flags,
1214                                     struct mdd_object       *mdd_obj,
1215                                     struct thandle          *handle)
1216 {
1217         const struct lu_fid *tfid = mdo2fid(mdd_obj);
1218         struct llog_changelog_rec *rec;
1219         struct lu_buf *buf;
1220         int reclen;
1221         int rc;
1222
1223         /* Not recording */
1224         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1225                 RETURN(0);
1226         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1227                 RETURN(0);
1228
1229         LASSERT(handle != NULL);
1230         LASSERT(mdd_obj != NULL);
1231
1232         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1233             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1234                 /* Don't need multiple updates in this log */
1235                 /* Don't check under lock - no big deal if we get an extra
1236                    entry */
1237                 RETURN(0);
1238         }
1239
1240         reclen = llog_data_len(sizeof(*rec));
1241         buf = mdd_buf_alloc(env, reclen);
1242         if (buf->lb_buf == NULL)
1243                 RETURN(-ENOMEM);
1244         rec = (struct llog_changelog_rec *)buf->lb_buf;
1245
1246         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1247         rec->cr.cr_type = (__u32)type;
1248         rec->cr.cr_tfid = *tfid;
1249         rec->cr.cr_namelen = 0;
1250         mdd_obj->mod_cltime = cfs_time_current_64();
1251
1252         rc = mdd_changelog_llog_write(mdd, rec, handle);
1253         if (rc < 0) {
1254                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1255                        rc, type, PFID(tfid));
1256                 return -EFAULT;
1257         }
1258
1259         return 0;
1260 }
1261
1262 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1263                   int flags, struct md_object *obj)
1264 {
1265         struct thandle *handle;
1266         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1267         struct mdd_device *mdd = mdo2mdd(obj);
1268         int rc;
1269         ENTRY;
1270
1271         handle = mdd_trans_start(env, mdd);
1272
1273         if (IS_ERR(handle))
1274                 return(PTR_ERR(handle));
1275
1276         rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1277                                       handle);
1278
1279         mdd_trans_stop(env, mdd, rc, handle);
1280
1281         RETURN(rc);
1282 }
1283
1284 /**
1285  * Should be called with write lock held.
1286  *
1287  * \see mdd_lma_set_locked().
1288  */
1289 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1290                        const struct md_attr *ma, struct thandle *handle)
1291 {
1292         struct mdd_thread_info *info = mdd_env_info(env);
1293         struct lu_buf *buf;
1294         struct lustre_mdt_attrs *lma =
1295                                 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1296         int lmasize = sizeof(struct lustre_mdt_attrs);
1297         int rc = 0;
1298
1299         ENTRY;
1300
1301         /* Either HSM or SOM part is not valid, we need to read it before */
1302         if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1303                 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1304                 if (rc <= 0)
1305                         RETURN(rc);
1306
1307                 lustre_lma_swab(lma);
1308         } else {
1309                 memset(lma, 0, lmasize);
1310         }
1311
1312         /* Copy HSM data */
1313         if (ma->ma_valid & MA_HSM) {
1314                 lma->lma_flags  |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1315                 lma->lma_compat |= LMAC_HSM;
1316         }
1317
1318         /* Copy SOM data */
1319         if (ma->ma_valid & MA_SOM) {
1320                 LASSERT(ma->ma_som != NULL);
1321                 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1322                         lma->lma_compat     &= ~LMAC_SOM;
1323                 } else {
1324                         lma->lma_compat     |= LMAC_SOM;
1325                         lma->lma_ioepoch     = ma->ma_som->msd_ioepoch;
1326                         lma->lma_som_size    = ma->ma_som->msd_size;
1327                         lma->lma_som_blocks  = ma->ma_som->msd_blocks;
1328                         lma->lma_som_mountid = ma->ma_som->msd_mountid;
1329                 }
1330         }
1331
1332         /* Copy FID */
1333         memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1334
1335         lustre_lma_swab(lma);
1336         buf = mdd_buf_get(env, lma, lmasize);
1337         rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1338
1339         RETURN(rc);
1340 }
1341
1342 /**
1343  * Save LMA extended attributes with data from \a ma.
1344  *
1345  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1346  * not, LMA EA will be first read from disk, modified and write back.
1347  *
1348  */
1349 static int mdd_lma_set_locked(const struct lu_env *env,
1350                               struct mdd_object *mdd_obj,
1351                               const struct md_attr *ma, struct thandle *handle)
1352 {
1353         int rc;
1354
1355         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1356         rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1357         mdd_write_unlock(env, mdd_obj);
1358         return rc;
1359 }
1360
1361 /* Precedence for choosing record type when multiple
1362  * attributes change: setattr > mtime > ctime > atime
1363  * (ctime changes when mtime does, plus chmod/chown.
1364  * atime and ctime are independent.) */
1365 static int mdd_attr_set_changelog(const struct lu_env *env,
1366                                   struct md_object *obj, struct thandle *handle,
1367                                   __u64 valid)
1368 {
1369         struct mdd_device *mdd = mdo2mdd(obj);
1370         int bits, type = 0;
1371
1372         bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1373         bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1374         bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1375         bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1376         bits = bits & mdd->mdd_cl.mc_mask;
1377         if (bits == 0)
1378                 return 0;
1379
1380         /* The record type is the lowest non-masked set bit */
1381         while (bits && ((bits & 1) == 0)) {
1382                 bits = bits >> 1;
1383                 type++;
1384         }
1385
1386         /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1387         return mdd_changelog_data_store(env, mdd, type, (int)valid,
1388                                         md2mdd_obj(obj), handle);
1389 }
1390
1391 /* set attr and LOV EA at once, return updated attr */
1392 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1393                         const struct md_attr *ma)
1394 {
1395         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1396         struct mdd_device *mdd = mdo2mdd(obj);
1397         struct thandle *handle;
1398         struct lov_mds_md *lmm = NULL;
1399         struct llog_cookie *logcookies = NULL;
1400         int  rc, lmm_size = 0, cookie_size = 0;
1401         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1402 #ifdef HAVE_QUOTA_SUPPORT
1403         struct obd_device *obd = mdd->mdd_obd_dev;
1404         struct mds_obd *mds = &obd->u.mds;
1405         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1406         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1407         int quota_opc = 0, block_count = 0;
1408         int inode_pending[MAXQUOTAS] = { 0, 0 };
1409         int block_pending[MAXQUOTAS] = { 0, 0 };
1410 #endif
1411         ENTRY;
1412
1413         *la_copy = ma->ma_attr;
1414         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1415         if (rc != 0)
1416                 RETURN(rc);
1417
1418         /* setattr on "close" only change atime, or do nothing */
1419         if (ma->ma_valid == MA_INODE &&
1420             ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1421                 RETURN(0);
1422
1423         mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1424                                     MDD_TXN_ATTR_SET_OP);
1425         handle = mdd_trans_start(env, mdd);
1426         if (IS_ERR(handle))
1427                 RETURN(PTR_ERR(handle));
1428         /*TODO: add lock here*/
1429         /* start a log jounal handle if needed */
1430         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1431             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1432                 lmm_size = mdd_lov_mdsize(env, mdd);
1433                 lmm = mdd_max_lmm_get(env, mdd);
1434                 if (lmm == NULL)
1435                         GOTO(cleanup, rc = -ENOMEM);
1436
1437                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1438                                 XATTR_NAME_LOV);
1439
1440                 if (rc < 0)
1441                         GOTO(cleanup, rc);
1442         }
1443
1444         if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1445                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1446                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1447
1448 #ifdef HAVE_QUOTA_SUPPORT
1449         if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1450                 struct obd_export *exp = md_quota(env)->mq_exp;
1451                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1452
1453                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1454                 if (!rc) {
1455                         quota_opc = FSFILT_OP_SETATTR;
1456                         mdd_quota_wrapper(la_copy, qnids);
1457                         mdd_quota_wrapper(la_tmp, qoids);
1458                         /* get file quota for new owner */
1459                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1460                                         qnids, inode_pending, 1, NULL, 0,
1461                                         NULL, 0);
1462                         block_count = (la_tmp->la_blocks + 7) >> 3;
1463                         if (block_count) {
1464                                 void *data = NULL;
1465                                 mdd_data_get(env, mdd_obj, &data);
1466                                 /* get block quota for new owner */
1467                                 lquota_chkquota(mds_quota_interface_ref, obd,
1468                                                 exp, qnids, block_pending,
1469                                                 block_count, NULL,
1470                                                 LQUOTA_FLAGS_BLK, data, 1);
1471                         }
1472                 }
1473         }
1474 #endif
1475
1476         if (la_copy->la_valid & LA_FLAGS) {
1477                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1478                                                   handle, 1);
1479                 if (rc == 0)
1480                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1481         } else if (la_copy->la_valid) {            /* setattr */
1482                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1483                                                   handle, 1);
1484                 /* journal chown/chgrp in llog, just like unlink */
1485                 if (rc == 0 && lmm_size){
1486                         cookie_size = mdd_lov_cookiesize(env, mdd);
1487                         logcookies = mdd_max_cookie_get(env, mdd);
1488                         if (logcookies == NULL)
1489                                 GOTO(cleanup, rc = -ENOMEM);
1490
1491                         if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1492                                             logcookies, cookie_size) <= 0)
1493                                 logcookies = NULL;
1494                 }
1495         }
1496
1497         if (rc == 0 && ma->ma_valid & MA_LOV) {
1498                 cfs_umode_t mode;
1499
1500                 mode = mdd_object_type(mdd_obj);
1501                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1502                         rc = mdd_lsm_sanity_check(env, mdd_obj);
1503                         if (rc)
1504                                 GOTO(cleanup, rc);
1505
1506                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1507                                             ma->ma_lmm_size, handle, 1);
1508                 }
1509
1510         }
1511         if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1512                 cfs_umode_t mode;
1513
1514                 mode = mdd_object_type(mdd_obj);
1515                 if (S_ISREG(mode))
1516                         rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1517
1518         }
1519 cleanup:
1520         if (rc == 0)
1521                 rc = mdd_attr_set_changelog(env, obj, handle,
1522                                             ma->ma_attr.la_valid);
1523         mdd_trans_stop(env, mdd, rc, handle);
1524         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1525                 /*set obd attr, if needed*/
1526                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1527                                            logcookies);
1528         }
1529 #ifdef HAVE_QUOTA_SUPPORT
1530         if (quota_opc) {
1531                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1532                                       inode_pending, 0);
1533                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1534                                       block_pending, 1);
1535                 /* Trigger dqrel/dqacq for original owner and new owner.
1536                  * If failed, the next call for lquota_chkquota will
1537                  * process it. */
1538                 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1539                               quota_opc);
1540         }
1541 #endif
1542         RETURN(rc);
1543 }
1544
1545 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1546                       const struct lu_buf *buf, const char *name, int fl,
1547                       struct thandle *handle)
1548 {
1549         int  rc;
1550         ENTRY;
1551
1552         mdd_write_lock(env, obj, MOR_TGT_CHILD);
1553         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1554         mdd_write_unlock(env, obj);
1555
1556         RETURN(rc);
1557 }
1558
1559 static int mdd_xattr_sanity_check(const struct lu_env *env,
1560                                   struct mdd_object *obj)
1561 {
1562         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1563         struct md_ucred *uc     = md_ucred(env);
1564         int rc;
1565         ENTRY;
1566
1567         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1568                 RETURN(-EPERM);
1569
1570         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1571         if (rc)
1572                 RETURN(rc);
1573
1574         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1575             !mdd_capable(uc, CFS_CAP_FOWNER))
1576                 RETURN(-EPERM);
1577
1578         RETURN(rc);
1579 }
1580
1581 /**
1582  * The caller should guarantee to update the object ctime
1583  * after xattr_set if needed.
1584  */
1585 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1586                          const struct lu_buf *buf, const char *name,
1587                          int fl)
1588 {
1589         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1590         struct mdd_device *mdd = mdo2mdd(obj);
1591         struct thandle *handle;
1592         int  rc;
1593         ENTRY;
1594
1595         rc = mdd_xattr_sanity_check(env, mdd_obj);
1596         if (rc)
1597                 RETURN(rc);
1598
1599         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1600         /* security-replated changes may require sync */
1601         if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1602             mdd->mdd_sync_permission == 1)
1603                 txn_param_sync(&mdd_env_info(env)->mti_param);
1604
1605         handle = mdd_trans_start(env, mdd);
1606         if (IS_ERR(handle))
1607                 RETURN(PTR_ERR(handle));
1608
1609         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1610
1611         /* Only record user xattr changes */
1612         if ((rc == 0) && (strncmp("user.", name, 5) == 0))
1613                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1614                                               handle);
1615         mdd_trans_stop(env, mdd, rc, handle);
1616
1617         RETURN(rc);
1618 }
1619
1620 /**
1621  * The caller should guarantee to update the object ctime
1622  * after xattr_set if needed.
1623  */
1624 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1625                   const char *name)
1626 {
1627         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1628         struct mdd_device *mdd = mdo2mdd(obj);
1629         struct thandle *handle;
1630         int  rc;
1631         ENTRY;
1632
1633         rc = mdd_xattr_sanity_check(env, mdd_obj);
1634         if (rc)
1635                 RETURN(rc);
1636
1637         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1638         handle = mdd_trans_start(env, mdd);
1639         if (IS_ERR(handle))
1640                 RETURN(PTR_ERR(handle));
1641
1642         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1643         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1644                            mdd_object_capa(env, mdd_obj));
1645         mdd_write_unlock(env, mdd_obj);
1646
1647         /* Only record user xattr changes */
1648         if ((rc == 0) && (strncmp("user.", name, 5) != 0))
1649                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1650                                               handle);
1651
1652         mdd_trans_stop(env, mdd, rc, handle);
1653
1654         RETURN(rc);
1655 }
1656
1657 /* partial unlink */
1658 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1659                        struct md_attr *ma)
1660 {
1661         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1662         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1663         struct mdd_device *mdd = mdo2mdd(obj);
1664         struct thandle *handle;
1665 #ifdef HAVE_QUOTA_SUPPORT
1666         struct obd_device *obd = mdd->mdd_obd_dev;
1667         struct mds_obd *mds = &obd->u.mds;
1668         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1669         int quota_opc = 0;
1670 #endif
1671         int rc;
1672         ENTRY;
1673
1674         /*
1675          * Check -ENOENT early here because we need to get object type
1676          * to calculate credits before transaction start
1677          */
1678         if (!mdd_object_exists(mdd_obj))
1679                 RETURN(-ENOENT);
1680
1681         LASSERT(mdd_object_exists(mdd_obj) > 0);
1682
1683         rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1684         if (rc)
1685                 RETURN(rc);
1686
1687         handle = mdd_trans_start(env, mdd);
1688         if (IS_ERR(handle))
1689                 RETURN(-ENOMEM);
1690
1691         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1692
1693         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1694         if (rc)
1695                 GOTO(cleanup, rc);
1696
1697         __mdd_ref_del(env, mdd_obj, handle, 0);
1698
1699         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1700                 /* unlink dot */
1701                 __mdd_ref_del(env, mdd_obj, handle, 1);
1702         }
1703
1704         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1705         la_copy->la_ctime = ma->ma_attr.la_ctime;
1706
1707         la_copy->la_valid = LA_CTIME;
1708         rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1709         if (rc)
1710                 GOTO(cleanup, rc);
1711
1712         rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1713 #ifdef HAVE_QUOTA_SUPPORT
1714         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1715             ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1716                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1717                 mdd_quota_wrapper(&ma->ma_attr, qids);
1718         }
1719 #endif
1720
1721
1722         EXIT;
1723 cleanup:
1724         mdd_write_unlock(env, mdd_obj);
1725         mdd_trans_stop(env, mdd, rc, handle);
1726 #ifdef HAVE_QUOTA_SUPPORT
1727         if (quota_opc)
1728                 /* Trigger dqrel on the owner of child. If failed,
1729                  * the next call for lquota_chkquota will process it */
1730                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1731                               quota_opc);
1732 #endif
1733         return rc;
1734 }
1735
1736 /* partial operation */
1737 static int mdd_oc_sanity_check(const struct lu_env *env,
1738                                struct mdd_object *obj,
1739                                struct md_attr *ma)
1740 {
1741         int rc;
1742         ENTRY;
1743
1744         switch (ma->ma_attr.la_mode & S_IFMT) {
1745         case S_IFREG:
1746         case S_IFDIR:
1747         case S_IFLNK:
1748         case S_IFCHR:
1749         case S_IFBLK:
1750         case S_IFIFO:
1751         case S_IFSOCK:
1752                 rc = 0;
1753                 break;
1754         default:
1755                 rc = -EINVAL;
1756                 break;
1757         }
1758         RETURN(rc);
1759 }
1760
1761 static int mdd_object_create(const struct lu_env *env,
1762                              struct md_object *obj,
1763                              const struct md_op_spec *spec,
1764                              struct md_attr *ma)
1765 {
1766
1767         struct mdd_device *mdd = mdo2mdd(obj);
1768         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1769         const struct lu_fid *pfid = spec->u.sp_pfid;
1770         struct thandle *handle;
1771 #ifdef HAVE_QUOTA_SUPPORT
1772         struct obd_device *obd = mdd->mdd_obd_dev;
1773         struct obd_export *exp = md_quota(env)->mq_exp;
1774         struct mds_obd *mds = &obd->u.mds;
1775         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1776         int quota_opc = 0, block_count = 0;
1777         int inode_pending[MAXQUOTAS] = { 0, 0 };
1778         int block_pending[MAXQUOTAS] = { 0, 0 };
1779 #endif
1780         int rc = 0;
1781         ENTRY;
1782
1783 #ifdef HAVE_QUOTA_SUPPORT
1784         if (mds->mds_quota) {
1785                 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1786                 mdd_quota_wrapper(&ma->ma_attr, qids);
1787                 /* get file quota for child */
1788                 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1789                                 qids, inode_pending, 1, NULL, 0,
1790                                 NULL, 0);
1791                 switch (ma->ma_attr.la_mode & S_IFMT) {
1792                 case S_IFLNK:
1793                 case S_IFDIR:
1794                         block_count = 2;
1795                         break;
1796                 case S_IFREG:
1797                         block_count = 1;
1798                         break;
1799                 }
1800                 /* get block quota for child */
1801                 if (block_count)
1802                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1803                                         qids, block_pending, block_count,
1804                                         NULL, LQUOTA_FLAGS_BLK, NULL, 0);
1805         }
1806 #endif
1807
1808         mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
1809         handle = mdd_trans_start(env, mdd);
1810         if (IS_ERR(handle))
1811                 GOTO(out_pending, rc = PTR_ERR(handle));
1812
1813         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1814         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1815         if (rc)
1816                 GOTO(unlock, rc);
1817
1818         rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1819         if (rc)
1820                 GOTO(unlock, rc);
1821
1822         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1823                 /* If creating the slave object, set slave EA here. */
1824                 int lmv_size = spec->u.sp_ea.eadatalen;
1825                 struct lmv_stripe_md *lmv;
1826
1827                 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1828                 LASSERT(lmv != NULL && lmv_size > 0);
1829
1830                 rc = __mdd_xattr_set(env, mdd_obj,
1831                                      mdd_buf_get_const(env, lmv, lmv_size),
1832                                      XATTR_NAME_LMV, 0, handle);
1833                 if (rc)
1834                         GOTO(unlock, rc);
1835
1836                 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1837                                            handle, 0);
1838         } else {
1839 #ifdef CONFIG_FS_POSIX_ACL
1840                 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1841                         struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1842
1843                         buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1844                         buf->lb_len = spec->u.sp_ea.eadatalen;
1845                         if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1846                                 rc = __mdd_acl_init(env, mdd_obj, buf,
1847                                                     &ma->ma_attr.la_mode,
1848                                                     handle);
1849                                 if (rc)
1850                                         GOTO(unlock, rc);
1851                                 else
1852                                         ma->ma_attr.la_valid |= LA_MODE;
1853                         }
1854
1855                         pfid = spec->u.sp_ea.fid;
1856                 }
1857 #endif
1858                 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1859                                            spec);
1860         }
1861         EXIT;
1862 unlock:
1863         if (rc == 0)
1864                 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1865         mdd_write_unlock(env, mdd_obj);
1866
1867         mdd_trans_stop(env, mdd, rc, handle);
1868 out_pending:
1869 #ifdef HAVE_QUOTA_SUPPORT
1870         if (quota_opc) {
1871                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1872                                       inode_pending, 0);
1873                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1874                                       block_pending, 1);
1875                 /* Trigger dqacq on the owner of child. If failed,
1876                  * the next call for lquota_chkquota will process it. */
1877                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1878                               quota_opc);
1879         }
1880 #endif
1881         return rc;
1882 }
1883
1884 /* partial link */
1885 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1886                        const struct md_attr *ma)
1887 {
1888         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1889         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1890         struct mdd_device *mdd = mdo2mdd(obj);
1891         struct thandle *handle;
1892         int rc;
1893         ENTRY;
1894
1895         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1896         handle = mdd_trans_start(env, mdd);
1897         if (IS_ERR(handle))
1898                 RETURN(-ENOMEM);
1899
1900         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1901         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1902         if (rc == 0)
1903                 __mdd_ref_add(env, mdd_obj, handle);
1904         mdd_write_unlock(env, mdd_obj);
1905         if (rc == 0) {
1906                 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1907                 la_copy->la_ctime = ma->ma_attr.la_ctime;
1908
1909                 la_copy->la_valid = LA_CTIME;
1910                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1911                                                         handle, 0);
1912         }
1913         mdd_trans_stop(env, mdd, 0, handle);
1914
1915         RETURN(rc);
1916 }
1917
1918 /*
1919  * do NOT or the MAY_*'s, you'll get the weakest
1920  */
1921 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1922 {
1923         int res = 0;
1924
1925         /* Sadly, NFSD reopens a file repeatedly during operation, so the
1926          * "acc_mode = 0" allowance for newly-created files isn't honoured.
1927          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1928          * owner can write to a file even if it is marked readonly to hide
1929          * its brokenness. (bug 5781) */
1930         if (flags & MDS_OPEN_OWNEROVERRIDE) {
1931                 struct md_ucred *uc = md_ucred(env);
1932
1933                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1934                     (la->la_uid == uc->mu_fsuid))
1935                         return 0;
1936         }
1937
1938         if (flags & FMODE_READ)
1939                 res |= MAY_READ;
1940         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1941                 res |= MAY_WRITE;
1942         if (flags & MDS_FMODE_EXEC)
1943                 res = MAY_EXEC;
1944         return res;
1945 }
1946
1947 static int mdd_open_sanity_check(const struct lu_env *env,
1948                                  struct mdd_object *obj, int flag)
1949 {
1950         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1951         int mode, rc;
1952         ENTRY;
1953
1954         /* EEXIST check */
1955         if (mdd_is_dead_obj(obj))
1956                 RETURN(-ENOENT);
1957
1958         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1959         if (rc)
1960                RETURN(rc);
1961
1962         if (S_ISLNK(tmp_la->la_mode))
1963                 RETURN(-ELOOP);
1964
1965         mode = accmode(env, tmp_la, flag);
1966
1967         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1968                 RETURN(-EISDIR);
1969
1970         if (!(flag & MDS_OPEN_CREATED)) {
1971                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1972                 if (rc)
1973                         RETURN(rc);
1974         }
1975
1976         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1977             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1978                 flag &= ~MDS_OPEN_TRUNC;
1979
1980         /* For writing append-only file must open it with append mode. */
1981         if (mdd_is_append(obj)) {
1982                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
1983                         RETURN(-EPERM);
1984                 if (flag & MDS_OPEN_TRUNC)
1985                         RETURN(-EPERM);
1986         }
1987
1988 #if 0
1989         /*
1990          * Now, flag -- O_NOATIME does not be packed by client.
1991          */
1992         if (flag & O_NOATIME) {
1993                 struct md_ucred *uc = md_ucred(env);
1994
1995                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
1996                     (uc->mu_valid == UCRED_NEW)) &&
1997                     (uc->mu_fsuid != tmp_la->la_uid) &&
1998                     !mdd_capable(uc, CFS_CAP_FOWNER))
1999                         RETURN(-EPERM);
2000         }
2001 #endif
2002
2003         RETURN(0);
2004 }
2005
2006 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2007                     int flags)
2008 {
2009         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2010         int rc = 0;
2011
2012         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2013
2014         rc = mdd_open_sanity_check(env, mdd_obj, flags);
2015         if (rc == 0)
2016                 mdd_obj->mod_count++;
2017
2018         mdd_write_unlock(env, mdd_obj);
2019         return rc;
2020 }
2021
2022 /* return md_attr back,
2023  * if it is last unlink then return lov ea + llog cookie*/
2024 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2025                     struct md_attr *ma)
2026 {
2027         int rc = 0;
2028         ENTRY;
2029
2030         if (S_ISREG(mdd_object_type(obj))) {
2031                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2032                  * Caller must be ready for that. */
2033
2034                 rc = __mdd_lmm_get(env, obj, ma);
2035                 if ((ma->ma_valid & MA_LOV))
2036                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2037                                             obj, ma);
2038         }
2039         RETURN(rc);
2040 }
2041
2042 /*
2043  * No permission check is needed.
2044  */
2045 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2046                      struct md_attr *ma)
2047 {
2048         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2049         struct mdd_device *mdd = mdo2mdd(obj);
2050         struct thandle    *handle = NULL;
2051         int rc;
2052         int reset = 1;
2053
2054 #ifdef HAVE_QUOTA_SUPPORT
2055         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2056         struct mds_obd *mds = &obd->u.mds;
2057         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2058         int quota_opc = 0;
2059 #endif
2060         ENTRY;
2061
2062         /* check without any lock */
2063         if (mdd_obj->mod_count == 1 &&
2064             (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2065  again:
2066                 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
2067                 if (rc)
2068                         RETURN(rc);
2069                 handle = mdd_trans_start(env, mdo2mdd(obj));
2070                 if (IS_ERR(handle))
2071                         RETURN(PTR_ERR(handle));
2072         }
2073
2074         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2075         if (handle == NULL &&
2076             mdd_obj->mod_count == 1 &&
2077             (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2078                 mdd_write_unlock(env, mdd_obj);
2079                 goto again;
2080         }
2081
2082         /* release open count */
2083         mdd_obj->mod_count --;
2084
2085         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2086                 /* remove link to object from orphan index */
2087                 rc = __mdd_orphan_del(env, mdd_obj, handle);
2088                 if (rc == 0) {
2089                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2090                                "list, OSS objects to be destroyed.\n",
2091                                PFID(mdd_object_fid(mdd_obj)));
2092                 } else {
2093                         CERROR("Object "DFID" can not be deleted from orphan "
2094                                 "list, maybe cause OST objects can not be "
2095                                 "destroyed (err: %d).\n",
2096                                 PFID(mdd_object_fid(mdd_obj)), rc);
2097                         /* If object was not deleted from orphan list, do not
2098                          * destroy OSS objects, which will be done when next
2099                          * recovery. */
2100                         GOTO(out, rc);
2101                 }
2102         }
2103
2104         rc = mdd_iattr_get(env, mdd_obj, ma);
2105         /* Object maybe not in orphan list originally, it is rare case for
2106          * mdd_finish_unlink() failure. */
2107         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
2108 #ifdef HAVE_QUOTA_SUPPORT
2109                 if (mds->mds_quota) {
2110                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2111                         mdd_quota_wrapper(&ma->ma_attr, qids);
2112                 }
2113 #endif
2114                 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2115                 if (ma->ma_valid & MA_FLAGS &&
2116                     ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2117                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2118                 } else {
2119                         rc = mdd_object_kill(env, mdd_obj, ma);
2120                                 if (rc == 0)
2121                                         reset = 0;
2122                 }
2123
2124                 if (rc != 0)
2125                         CERROR("Error when prepare to delete Object "DFID" , "
2126                                "which will cause OST objects can not be "
2127                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
2128         }
2129         EXIT;
2130
2131 out:
2132         if (reset)
2133                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2134
2135         mdd_write_unlock(env, mdd_obj);
2136         if (handle != NULL)
2137                 mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
2138 #ifdef HAVE_QUOTA_SUPPORT
2139         if (quota_opc)
2140                 /* Trigger dqrel on the owner of child. If failed,
2141                  * the next call for lquota_chkquota will process it */
2142                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2143                               quota_opc);
2144 #endif
2145         return rc;
2146 }
2147
2148 /*
2149  * Permission check is done when open,
2150  * no need check again.
2151  */
2152 static int mdd_readpage_sanity_check(const struct lu_env *env,
2153                                      struct mdd_object *obj)
2154 {
2155         struct dt_object *next = mdd_object_child(obj);
2156         int rc;
2157         ENTRY;
2158
2159         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2160                 rc = 0;
2161         else
2162                 rc = -ENOTDIR;
2163
2164         RETURN(rc);
2165 }
2166
2167 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2168                               int first, void *area, int nob,
2169                               const struct dt_it_ops *iops, struct dt_it *it,
2170                               __u64 *start, __u64 *end,
2171                               struct lu_dirent **last, __u32 attr)
2172 {
2173         int                     result;
2174         __u64                   hash = 0;
2175         struct lu_dirent       *ent;
2176
2177         if (first) {
2178                 memset(area, 0, sizeof (struct lu_dirpage));
2179                 area += sizeof (struct lu_dirpage);
2180                 nob  -= sizeof (struct lu_dirpage);
2181         }
2182
2183         ent  = area;
2184         do {
2185                 int    len;
2186                 int    recsize;
2187
2188                 len  = iops->key_size(env, it);
2189
2190                 /* IAM iterator can return record with zero len. */
2191                 if (len == 0)
2192                         goto next;
2193
2194                 hash = iops->store(env, it);
2195                 if (unlikely(first)) {
2196                         first = 0;
2197                         *start = hash;
2198                 }
2199
2200                 /* calculate max space required for lu_dirent */
2201                 recsize = lu_dirent_calc_size(len, attr);
2202
2203                 if (nob >= recsize) {
2204                         result = iops->rec(env, it, ent, attr);
2205                         if (result == -ESTALE)
2206                                 goto next;
2207                         if (result != 0)
2208                                 goto out;
2209
2210                         /* osd might not able to pack all attributes,
2211                          * so recheck rec length */
2212                         recsize = le16_to_cpu(ent->lde_reclen);
2213                 } else {
2214                         /*
2215                          * record doesn't fit into page, enlarge previous one.
2216                          */
2217                         if (*last) {
2218                                 (*last)->lde_reclen =
2219                                         cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
2220                                                         nob);
2221                                 result = 0;
2222                         } else
2223                                 result = -EINVAL;
2224
2225                         goto out;
2226                 }
2227                 *last = ent;
2228                 ent = (void *)ent + recsize;
2229                 nob -= recsize;
2230
2231 next:
2232                 result = iops->next(env, it);
2233                 if (result == -ESTALE)
2234                         goto next;
2235         } while (result == 0);
2236
2237 out:
2238         *end = hash;
2239         return result;
2240 }
2241
2242 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2243                           const struct lu_rdpg *rdpg)
2244 {
2245         struct dt_it      *it;
2246         struct dt_object  *next = mdd_object_child(obj);
2247         const struct dt_it_ops  *iops;
2248         struct page       *pg;
2249         struct lu_dirent  *last = NULL;
2250         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2251         int i;
2252         int rc;
2253         int nob;
2254         __u64 hash_start;
2255         __u64 hash_end = 0;
2256
2257         LASSERT(rdpg->rp_pages != NULL);
2258         LASSERT(next->do_index_ops != NULL);
2259
2260         if (rdpg->rp_count <= 0)
2261                 return -EFAULT;
2262
2263         /*
2264          * iterate through directory and fill pages from @rdpg
2265          */
2266         iops = &next->do_index_ops->dio_it;
2267         it = iops->init(env, next, mdd_object_capa(env, obj));
2268         if (IS_ERR(it))
2269                 return PTR_ERR(it);
2270
2271         rc = iops->load(env, it, rdpg->rp_hash);
2272
2273         if (rc == 0){
2274                 /*
2275                  * Iterator didn't find record with exactly the key requested.
2276                  *
2277                  * It is currently either
2278                  *
2279                  *     - positioned above record with key less than
2280                  *     requested---skip it.
2281                  *
2282                  *     - or not positioned at all (is in IAM_IT_SKEWED
2283                  *     state)---position it on the next item.
2284                  */
2285                 rc = iops->next(env, it);
2286         } else if (rc > 0)
2287                 rc = 0;
2288
2289         /*
2290          * At this point and across for-loop:
2291          *
2292          *  rc == 0 -> ok, proceed.
2293          *  rc >  0 -> end of directory.
2294          *  rc <  0 -> error.
2295          */
2296         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2297              i++, nob -= CFS_PAGE_SIZE) {
2298                 LASSERT(i < rdpg->rp_npages);
2299                 pg = rdpg->rp_pages[i];
2300                 rc = mdd_dir_page_build(env, mdd, !i, cfs_kmap(pg),
2301                                         min_t(int, nob, CFS_PAGE_SIZE), iops,
2302                                         it, &hash_start, &hash_end, &last,
2303                                         rdpg->rp_attrs);
2304                 if (rc != 0 || i == rdpg->rp_npages - 1) {
2305                         if (last)
2306                                 last->lde_reclen = 0;
2307                 }
2308                 cfs_kunmap(pg);
2309         }
2310         if (rc > 0) {
2311                 /*
2312                  * end of directory.
2313                  */
2314                 hash_end = DIR_END_OFF;
2315                 rc = 0;
2316         }
2317         if (rc == 0) {
2318                 struct lu_dirpage *dp;
2319
2320                 dp = cfs_kmap(rdpg->rp_pages[0]);
2321                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2322                 dp->ldp_hash_end   = cpu_to_le64(hash_end);
2323                 if (i == 0)
2324                         /*
2325                          * No pages were processed, mark this.
2326                          */
2327                         dp->ldp_flags |= LDF_EMPTY;
2328
2329                 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2330                 cfs_kunmap(rdpg->rp_pages[0]);
2331         }
2332         iops->put(env, it);
2333         iops->fini(env, it);
2334
2335         return rc;
2336 }
2337
2338 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2339                  const struct lu_rdpg *rdpg)
2340 {
2341         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2342         int rc;
2343         ENTRY;
2344
2345         LASSERT(mdd_object_exists(mdd_obj));
2346
2347         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2348         rc = mdd_readpage_sanity_check(env, mdd_obj);
2349         if (rc)
2350                 GOTO(out_unlock, rc);
2351
2352         if (mdd_is_dead_obj(mdd_obj)) {
2353                 struct page *pg;
2354                 struct lu_dirpage *dp;
2355
2356                 /*
2357                  * According to POSIX, please do not return any entry to client:
2358                  * even dot and dotdot should not be returned.
2359                  */
2360                 CWARN("readdir from dead object: "DFID"\n",
2361                         PFID(mdd_object_fid(mdd_obj)));
2362
2363                 if (rdpg->rp_count <= 0)
2364                         GOTO(out_unlock, rc = -EFAULT);
2365                 LASSERT(rdpg->rp_pages != NULL);
2366
2367                 pg = rdpg->rp_pages[0];
2368                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2369                 memset(dp, 0 , sizeof(struct lu_dirpage));
2370                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2371                 dp->ldp_hash_end   = cpu_to_le64(DIR_END_OFF);
2372                 dp->ldp_flags |= LDF_EMPTY;
2373                 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2374                 cfs_kunmap(pg);
2375                 GOTO(out_unlock, rc = 0);
2376         }
2377
2378         rc = __mdd_readpage(env, mdd_obj, rdpg);
2379
2380         EXIT;
2381 out_unlock:
2382         mdd_read_unlock(env, mdd_obj);
2383         return rc;
2384 }
2385
2386 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2387 {
2388         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2389         struct dt_object *next;
2390
2391         LASSERT(mdd_object_exists(mdd_obj));
2392         next = mdd_object_child(mdd_obj);
2393         return next->do_ops->do_object_sync(env, next);
2394 }
2395
2396 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2397                                         struct md_object *obj)
2398 {
2399         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2400
2401         LASSERT(mdd_object_exists(mdd_obj));
2402         return do_version_get(env, mdd_object_child(mdd_obj));
2403 }
2404
2405 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2406                             dt_obj_version_t version)
2407 {
2408         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2409
2410         LASSERT(mdd_object_exists(mdd_obj));
2411         do_version_set(env, mdd_object_child(mdd_obj), version);
2412 }
2413
2414 const struct md_object_operations mdd_obj_ops = {
2415         .moo_permission    = mdd_permission,
2416         .moo_attr_get      = mdd_attr_get,
2417         .moo_attr_set      = mdd_attr_set,
2418         .moo_xattr_get     = mdd_xattr_get,
2419         .moo_xattr_set     = mdd_xattr_set,
2420         .moo_xattr_list    = mdd_xattr_list,
2421         .moo_xattr_del     = mdd_xattr_del,
2422         .moo_object_create = mdd_object_create,
2423         .moo_ref_add       = mdd_ref_add,
2424         .moo_ref_del       = mdd_ref_del,
2425         .moo_open          = mdd_open,
2426         .moo_close         = mdd_close,
2427         .moo_readpage      = mdd_readpage,
2428         .moo_readlink      = mdd_readlink,
2429         .moo_changelog     = mdd_changelog,
2430         .moo_capa_get      = mdd_capa_get,
2431         .moo_object_sync   = mdd_object_sync,
2432         .moo_version_get   = mdd_version_get,
2433         .moo_version_set   = mdd_version_set,
2434         .moo_path          = mdd_path,
2435         .moo_file_lock     = mdd_file_lock,
2436         .moo_file_unlock   = mdd_file_unlock,
2437 };