Whamcloud - gitweb
10fea55ae184e5f4b6a978ed222473b486311495
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_object.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49 #ifdef HAVE_EXT4_LDISKFS
50 #include <ldiskfs/ldiskfs_jbd2.h>
51 #else
52 #include <linux/jbd.h>
53 #endif
54 #include <obd.h>
55 #include <obd_class.h>
56 #include <obd_support.h>
57 #include <lprocfs_status.h>
58 /* fid_be_cpu(), fid_cpu_to_be(). */
59 #include <lustre_fid.h>
60
61 #include <lustre_param.h>
62 #ifdef HAVE_EXT4_LDISKFS
63 #include <ldiskfs/ldiskfs.h>
64 #else
65 #include <linux/ldiskfs_fs.h>
66 #endif
67 #include <lustre_mds.h>
68 #include <lustre/lustre_idl.h>
69
70 #include "mdd_internal.h"
71
72 static const struct lu_object_operations mdd_lu_obj_ops;
73
74 static int mdd_xattr_get(const struct lu_env *env,
75                          struct md_object *obj, struct lu_buf *buf,
76                          const char *name);
77
78 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
79                  void **data)
80 {
81         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
82                  PFID(mdd_object_fid(obj)));
83         mdo_data_get(env, obj, data);
84         return 0;
85 }
86
87 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
88                struct lu_attr *la, struct lustre_capa *capa)
89 {
90         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
91                  PFID(mdd_object_fid(obj)));
92         return mdo_attr_get(env, obj, la, capa);
93 }
94
95 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
96 {
97         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
98
99         if (flags & LUSTRE_APPEND_FL)
100                 obj->mod_flags |= APPEND_OBJ;
101
102         if (flags & LUSTRE_IMMUTABLE_FL)
103                 obj->mod_flags |= IMMUTE_OBJ;
104 }
105
106 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
107 {
108         struct mdd_thread_info *info;
109
110         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
111         LASSERT(info != NULL);
112         return info;
113 }
114
115 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
116 {
117         struct lu_buf *buf;
118
119         buf = &mdd_env_info(env)->mti_buf;
120         buf->lb_buf = area;
121         buf->lb_len = len;
122         return buf;
123 }
124
125 void mdd_buf_put(struct lu_buf *buf)
126 {
127         if (buf == NULL || buf->lb_buf == NULL)
128                 return;
129         OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
130         buf->lb_buf = NULL;
131         buf->lb_len = 0;
132 }
133
134 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
135                                        const void *area, ssize_t len)
136 {
137         struct lu_buf *buf;
138
139         buf = &mdd_env_info(env)->mti_buf;
140         buf->lb_buf = (void *)area;
141         buf->lb_len = len;
142         return buf;
143 }
144
145 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
146 {
147         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
148
149         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
150                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
151                 buf->lb_buf = NULL;
152         }
153         if (buf->lb_buf == NULL) {
154                 buf->lb_len = len;
155                 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
156                 if (buf->lb_buf == NULL)
157                         buf->lb_len = 0;
158         }
159         return buf;
160 }
161
162 /** Increase the size of the \a mti_big_buf.
163  * preserves old data in buffer
164  * old buffer remains unchanged on error
165  * \retval 0 or -ENOMEM
166  */
167 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
168 {
169         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
170         struct lu_buf buf;
171
172         LASSERT(len >= oldbuf->lb_len);
173         OBD_ALLOC_LARGE(buf.lb_buf, len);
174
175         if (buf.lb_buf == NULL)
176                 return -ENOMEM;
177
178         buf.lb_len = len;
179         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
180
181         OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
182
183         memcpy(oldbuf, &buf, sizeof(buf));
184
185         return 0;
186 }
187
188 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
189                                        struct mdd_device *mdd)
190 {
191         struct mdd_thread_info *mti = mdd_env_info(env);
192         int                     max_cookie_size;
193
194         max_cookie_size = mdd_lov_cookiesize(env, mdd);
195         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
196                 if (mti->mti_max_cookie)
197                         OBD_FREE_LARGE(mti->mti_max_cookie,
198                                        mti->mti_max_cookie_size);
199                 mti->mti_max_cookie = NULL;
200                 mti->mti_max_cookie_size = 0;
201         }
202         if (unlikely(mti->mti_max_cookie == NULL)) {
203                 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
204                 if (likely(mti->mti_max_cookie != NULL))
205                         mti->mti_max_cookie_size = max_cookie_size;
206         }
207         if (likely(mti->mti_max_cookie != NULL))
208                 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
209         return mti->mti_max_cookie;
210 }
211
212 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
213                                    struct mdd_device *mdd)
214 {
215         struct mdd_thread_info *mti = mdd_env_info(env);
216         int                     max_lmm_size;
217
218         max_lmm_size = mdd_lov_mdsize(env, mdd);
219         if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
220                 if (mti->mti_max_lmm)
221                         OBD_FREE_LARGE(mti->mti_max_lmm, mti->mti_max_lmm_size);
222                 mti->mti_max_lmm = NULL;
223                 mti->mti_max_lmm_size = 0;
224         }
225         if (unlikely(mti->mti_max_lmm == NULL)) {
226                 OBD_ALLOC_LARGE(mti->mti_max_lmm, max_lmm_size);
227                 if (likely(mti->mti_max_lmm != NULL))
228                         mti->mti_max_lmm_size = max_lmm_size;
229         }
230         return mti->mti_max_lmm;
231 }
232
233 struct lu_object *mdd_object_alloc(const struct lu_env *env,
234                                    const struct lu_object_header *hdr,
235                                    struct lu_device *d)
236 {
237         struct mdd_object *mdd_obj;
238
239         OBD_ALLOC_PTR(mdd_obj);
240         if (mdd_obj != NULL) {
241                 struct lu_object *o;
242
243                 o = mdd2lu_obj(mdd_obj);
244                 lu_object_init(o, NULL, d);
245                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
246                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
247                 mdd_obj->mod_count = 0;
248                 o->lo_ops = &mdd_lu_obj_ops;
249                 return o;
250         } else {
251                 return NULL;
252         }
253 }
254
255 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
256                            const struct lu_object_conf *unused)
257 {
258         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
259         struct mdd_object *mdd_obj = lu2mdd_obj(o);
260         struct lu_object  *below;
261         struct lu_device  *under;
262         ENTRY;
263
264         mdd_obj->mod_cltime = 0;
265         under = &d->mdd_child->dd_lu_dev;
266         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
267         mdd_pdlock_init(mdd_obj);
268         if (below == NULL)
269                 RETURN(-ENOMEM);
270
271         lu_object_add(o, below);
272
273         RETURN(0);
274 }
275
276 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
277 {
278         if (lu_object_exists(o))
279                 return mdd_get_flags(env, lu2mdd_obj(o));
280         else
281                 return 0;
282 }
283
284 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
285 {
286         struct mdd_object *mdd = lu2mdd_obj(o);
287
288         lu_object_fini(o);
289         OBD_FREE_PTR(mdd);
290 }
291
292 static int mdd_object_print(const struct lu_env *env, void *cookie,
293                             lu_printer_t p, const struct lu_object *o)
294 {
295         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
296         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
297                     "valid=%x, cltime="LPU64", flags=%lx)",
298                     mdd, mdd->mod_count, mdd->mod_valid,
299                     mdd->mod_cltime, mdd->mod_flags);
300 }
301
302 static const struct lu_object_operations mdd_lu_obj_ops = {
303         .loo_object_init    = mdd_object_init,
304         .loo_object_start   = mdd_object_start,
305         .loo_object_free    = mdd_object_free,
306         .loo_object_print   = mdd_object_print,
307 };
308
309 struct mdd_object *mdd_object_find(const struct lu_env *env,
310                                    struct mdd_device *d,
311                                    const struct lu_fid *f)
312 {
313         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
314 }
315
316 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
317                         const char *path, struct lu_fid *fid)
318 {
319         struct lu_buf *buf;
320         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
321         struct mdd_object *obj;
322         struct lu_name *lname = &mdd_env_info(env)->mti_name;
323         char *name;
324         int rc = 0;
325         ENTRY;
326
327         /* temp buffer for path element */
328         buf = mdd_buf_alloc(env, PATH_MAX);
329         if (buf->lb_buf == NULL)
330                 RETURN(-ENOMEM);
331
332         lname->ln_name = name = buf->lb_buf;
333         lname->ln_namelen = 0;
334         *f = mdd->mdd_root_fid;
335
336         while(1) {
337                 while (*path == '/')
338                         path++;
339                 if (*path == '\0')
340                         break;
341                 while (*path != '/' && *path != '\0') {
342                         *name = *path;
343                         path++;
344                         name++;
345                         lname->ln_namelen++;
346                 }
347
348                 *name = '\0';
349                 /* find obj corresponding to fid */
350                 obj = mdd_object_find(env, mdd, f);
351                 if (obj == NULL)
352                         GOTO(out, rc = -EREMOTE);
353                 if (IS_ERR(obj))
354                         GOTO(out, rc = PTR_ERR(obj));
355                 /* get child fid from parent and name */
356                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
357                 mdd_object_put(env, obj);
358                 if (rc)
359                         break;
360
361                 name = buf->lb_buf;
362                 lname->ln_namelen = 0;
363         }
364
365         if (!rc)
366                 *fid = *f;
367 out:
368         RETURN(rc);
369 }
370
371 /** The maximum depth that fid2path() will search.
372  * This is limited only because we want to store the fids for
373  * historical path lookup purposes.
374  */
375 #define MAX_PATH_DEPTH 100
376
377 /** mdd_path() lookup structure. */
378 struct path_lookup_info {
379         __u64                pli_recno;        /**< history point */
380         __u64                pli_currec;       /**< current record */
381         struct lu_fid        pli_fid;
382         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
383         struct mdd_object   *pli_mdd_obj;
384         char                *pli_path;         /**< full path */
385         int                  pli_pathlen;
386         int                  pli_linkno;       /**< which hardlink to follow */
387         int                  pli_fidcount;     /**< number of \a pli_fids */
388 };
389
390 static int mdd_path_current(const struct lu_env *env,
391                             struct path_lookup_info *pli)
392 {
393         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
394         struct mdd_object *mdd_obj;
395         struct lu_buf     *buf = NULL;
396         struct link_ea_header *leh;
397         struct link_ea_entry  *lee;
398         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
399         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
400         char *ptr;
401         int reclen;
402         int rc;
403         ENTRY;
404
405         ptr = pli->pli_path + pli->pli_pathlen - 1;
406         *ptr = 0;
407         --ptr;
408         pli->pli_fidcount = 0;
409         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
410
411         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
412                 mdd_obj = mdd_object_find(env, mdd,
413                                           &pli->pli_fids[pli->pli_fidcount]);
414                 if (mdd_obj == NULL)
415                         GOTO(out, rc = -EREMOTE);
416                 if (IS_ERR(mdd_obj))
417                         GOTO(out, rc = PTR_ERR(mdd_obj));
418                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
419                 if (rc <= 0) {
420                         mdd_object_put(env, mdd_obj);
421                         if (rc == -1)
422                                 rc = -EREMOTE;
423                         else if (rc == 0)
424                                 /* Do I need to error out here? */
425                                 rc = -ENOENT;
426                         GOTO(out, rc);
427                 }
428
429                 /* Get parent fid and object name */
430                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
431                 buf = mdd_links_get(env, mdd_obj);
432                 mdd_read_unlock(env, mdd_obj);
433                 mdd_object_put(env, mdd_obj);
434                 if (IS_ERR(buf))
435                         GOTO(out, rc = PTR_ERR(buf));
436
437                 leh = buf->lb_buf;
438                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
439                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
440
441                 /* If set, use link #linkno for path lookup, otherwise use
442                    link #0.  Only do this for the final path element. */
443                 if ((pli->pli_fidcount == 0) &&
444                     (pli->pli_linkno < leh->leh_reccount)) {
445                         int count;
446                         for (count = 0; count < pli->pli_linkno; count++) {
447                                 lee = (struct link_ea_entry *)
448                                      ((char *)lee + reclen);
449                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
450                         }
451                         if (pli->pli_linkno < leh->leh_reccount - 1)
452                                 /* indicate to user there are more links */
453                                 pli->pli_linkno++;
454                 }
455
456                 /* Pack the name in the end of the buffer */
457                 ptr -= tmpname->ln_namelen;
458                 if (ptr - 1 <= pli->pli_path)
459                         GOTO(out, rc = -EOVERFLOW);
460                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
461                 *(--ptr) = '/';
462
463                 /* Store the parent fid for historic lookup */
464                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
465                         GOTO(out, rc = -EOVERFLOW);
466                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
467         }
468
469         /* Verify that our path hasn't changed since we started the lookup.
470            Record the current index, and verify the path resolves to the
471            same fid. If it does, then the path is correct as of this index. */
472         cfs_spin_lock(&mdd->mdd_cl.mc_lock);
473         pli->pli_currec = mdd->mdd_cl.mc_index;
474         cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
475         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
476         if (rc) {
477                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
478                 GOTO (out, rc = -EAGAIN);
479         }
480         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
481                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
482                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
483                        PFID(&pli->pli_fid));
484                 GOTO(out, rc = -EAGAIN);
485         }
486         ptr++; /* skip leading / */
487         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
488
489         EXIT;
490 out:
491         if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
492                 /* if we vmalloced a large buffer drop it */
493                 mdd_buf_put(buf);
494
495         return rc;
496 }
497
498 static int mdd_path_historic(const struct lu_env *env,
499                              struct path_lookup_info *pli)
500 {
501         return 0;
502 }
503
504 /* Returns the full path to this fid, as of changelog record recno. */
505 static int mdd_path(const struct lu_env *env, struct md_object *obj,
506                     char *path, int pathlen, __u64 *recno, int *linkno)
507 {
508         struct path_lookup_info *pli;
509         int tries = 3;
510         int rc = -EAGAIN;
511         ENTRY;
512
513         if (pathlen < 3)
514                 RETURN(-EOVERFLOW);
515
516         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
517                 path[0] = '\0';
518                 RETURN(0);
519         }
520
521         OBD_ALLOC_PTR(pli);
522         if (pli == NULL)
523                 RETURN(-ENOMEM);
524
525         pli->pli_mdd_obj = md2mdd_obj(obj);
526         pli->pli_recno = *recno;
527         pli->pli_path = path;
528         pli->pli_pathlen = pathlen;
529         pli->pli_linkno = *linkno;
530
531         /* Retry multiple times in case file is being moved */
532         while (tries-- && rc == -EAGAIN)
533                 rc = mdd_path_current(env, pli);
534
535         /* For historical path lookup, the current links may not have existed
536          * at "recno" time.  We must switch over to earlier links/parents
537          * by using the changelog records.  If the earlier parent doesn't
538          * exist, we must search back through the changelog to reconstruct
539          * its parents, then check if it exists, etc.
540          * We may ignore this problem for the initial implementation and
541          * state that an "original" hardlink must still exist for us to find
542          * historic path name. */
543         if (pli->pli_recno != -1) {
544                 rc = mdd_path_historic(env, pli);
545         } else {
546                 *recno = pli->pli_currec;
547                 /* Return next link index to caller */
548                 *linkno = pli->pli_linkno;
549         }
550
551         OBD_FREE_PTR(pli);
552
553         RETURN (rc);
554 }
555
556 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
557 {
558         struct lu_attr *la = &mdd_env_info(env)->mti_la;
559         int rc;
560
561         ENTRY;
562         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
563         if (rc == 0) {
564                 mdd_flags_xlate(obj, la->la_flags);
565                 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
566                         obj->mod_flags |= MNLINK_OBJ;
567         }
568         RETURN(rc);
569 }
570
571 /* get only inode attributes */
572 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
573                   struct md_attr *ma)
574 {
575         int rc = 0;
576         ENTRY;
577
578         if (ma->ma_valid & MA_INODE)
579                 RETURN(0);
580
581         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
582                           mdd_object_capa(env, mdd_obj));
583         if (rc == 0)
584                 ma->ma_valid |= MA_INODE;
585         RETURN(rc);
586 }
587
588 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
589 {
590         struct lov_desc *ldesc;
591         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
592         struct lov_user_md *lum = (struct lov_user_md*)lmm;
593         ENTRY;
594
595         if (!lum)
596                 RETURN(0);
597
598         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
599         LASSERT(ldesc != NULL);
600
601         lum->lmm_magic = LOV_MAGIC_V1;
602         lum->lmm_object_seq = LOV_OBJECT_GROUP_DEFAULT;
603         lum->lmm_pattern = ldesc->ld_pattern;
604         lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
605         lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
606         lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
607
608         RETURN(sizeof(*lum));
609 }
610
611 static int is_rootdir(struct mdd_object *mdd_obj)
612 {
613         const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
614         const struct lu_fid *fid = mdo2fid(mdd_obj);
615
616         return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
617 }
618
619 /* get lov EA only */
620 static int __mdd_lmm_get(const struct lu_env *env,
621                          struct mdd_object *mdd_obj, struct md_attr *ma)
622 {
623         int rc;
624         ENTRY;
625
626         if (ma->ma_valid & MA_LOV)
627                 RETURN(0);
628
629         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
630                         XATTR_NAME_LOV);
631         if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
632                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
633         if (rc > 0) {
634                 ma->ma_lmm_size = rc;
635                 ma->ma_valid |= MA_LOV;
636                 rc = 0;
637         }
638         RETURN(rc);
639 }
640
641 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
642                        struct md_attr *ma)
643 {
644         int rc;
645         ENTRY;
646
647         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
648         rc = __mdd_lmm_get(env, mdd_obj, ma);
649         mdd_read_unlock(env, mdd_obj);
650         RETURN(rc);
651 }
652
653 /* get lmv EA only*/
654 static int __mdd_lmv_get(const struct lu_env *env,
655                          struct mdd_object *mdd_obj, struct md_attr *ma)
656 {
657         int rc;
658         ENTRY;
659
660         if (ma->ma_valid & MA_LMV)
661                 RETURN(0);
662
663         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
664                         XATTR_NAME_LMV);
665         if (rc > 0) {
666                 ma->ma_valid |= MA_LMV;
667                 rc = 0;
668         }
669         RETURN(rc);
670 }
671
672 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
673                          struct md_attr *ma)
674 {
675         struct mdd_thread_info *info = mdd_env_info(env);
676         struct lustre_mdt_attrs *lma =
677                                  (struct lustre_mdt_attrs *)info->mti_xattr_buf;
678         int lma_size;
679         int rc;
680         ENTRY;
681
682         /* If all needed data are already valid, nothing to do */
683         if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
684             (ma->ma_need & (MA_HSM | MA_SOM)))
685                 RETURN(0);
686
687         /* Read LMA from disk EA */
688         lma_size = sizeof(info->mti_xattr_buf);
689         rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
690         if (rc <= 0)
691                 RETURN(rc);
692
693         /* Useless to check LMA incompatibility because this is already done in
694          * osd_ea_fid_get(), and this will fail long before this code is
695          * called.
696          * So, if we are here, LMA is compatible.
697          */
698
699         lustre_lma_swab(lma);
700
701         /* Swab and copy LMA */
702         if (ma->ma_need & MA_HSM) {
703                 if (lma->lma_compat & LMAC_HSM)
704                         ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
705                 else
706                         ma->ma_hsm.mh_flags = 0;
707                 ma->ma_valid |= MA_HSM;
708         }
709
710         /* Copy SOM */
711         if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
712                 LASSERT(ma->ma_som != NULL);
713                 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
714                 ma->ma_som->msd_size    = lma->lma_som_size;
715                 ma->ma_som->msd_blocks  = lma->lma_som_blocks;
716                 ma->ma_som->msd_mountid = lma->lma_som_mountid;
717                 ma->ma_valid |= MA_SOM;
718         }
719
720         RETURN(0);
721 }
722
723 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
724                                  struct md_attr *ma)
725 {
726         int rc = 0;
727         ENTRY;
728
729         if (ma->ma_need & MA_INODE)
730                 rc = mdd_iattr_get(env, mdd_obj, ma);
731
732         if (rc == 0 && ma->ma_need & MA_LOV) {
733                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
734                     S_ISDIR(mdd_object_type(mdd_obj)))
735                         rc = __mdd_lmm_get(env, mdd_obj, ma);
736         }
737         if (rc == 0 && ma->ma_need & MA_LMV) {
738                 if (S_ISDIR(mdd_object_type(mdd_obj)))
739                         rc = __mdd_lmv_get(env, mdd_obj, ma);
740         }
741         if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
742                 if (S_ISREG(mdd_object_type(mdd_obj)))
743                         rc = __mdd_lma_get(env, mdd_obj, ma);
744         }
745 #ifdef CONFIG_FS_POSIX_ACL
746         if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
747                 if (S_ISDIR(mdd_object_type(mdd_obj)))
748                         rc = mdd_def_acl_get(env, mdd_obj, ma);
749         }
750 #endif
751         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
752                rc, ma->ma_valid, ma->ma_lmm);
753         RETURN(rc);
754 }
755
756 int mdd_attr_get_internal_locked(const struct lu_env *env,
757                                  struct mdd_object *mdd_obj, struct md_attr *ma)
758 {
759         int rc;
760         int needlock = ma->ma_need &
761                        (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM);
762
763         if (needlock)
764                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
765         rc = mdd_attr_get_internal(env, mdd_obj, ma);
766         if (needlock)
767                 mdd_read_unlock(env, mdd_obj);
768         return rc;
769 }
770
771 /*
772  * No permission check is needed.
773  */
774 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
775                         struct md_attr *ma)
776 {
777         struct mdd_object *mdd_obj = md2mdd_obj(obj);
778         int                rc;
779
780         ENTRY;
781         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
782         RETURN(rc);
783 }
784
785 /*
786  * No permission check is needed.
787  */
788 static int mdd_xattr_get(const struct lu_env *env,
789                          struct md_object *obj, struct lu_buf *buf,
790                          const char *name)
791 {
792         struct mdd_object *mdd_obj = md2mdd_obj(obj);
793         int rc;
794
795         ENTRY;
796
797         LASSERT(mdd_object_exists(mdd_obj));
798
799         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
800         rc = mdo_xattr_get(env, mdd_obj, buf, name,
801                            mdd_object_capa(env, mdd_obj));
802         mdd_read_unlock(env, mdd_obj);
803
804         RETURN(rc);
805 }
806
807 /*
808  * Permission check is done when open,
809  * no need check again.
810  */
811 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
812                         struct lu_buf *buf)
813 {
814         struct mdd_object *mdd_obj = md2mdd_obj(obj);
815         struct dt_object  *next;
816         loff_t             pos = 0;
817         int                rc;
818         ENTRY;
819
820         LASSERT(mdd_object_exists(mdd_obj));
821
822         next = mdd_object_child(mdd_obj);
823         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
824         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
825                                          mdd_object_capa(env, mdd_obj));
826         mdd_read_unlock(env, mdd_obj);
827         RETURN(rc);
828 }
829
830 /*
831  * No permission check is needed.
832  */
833 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
834                           struct lu_buf *buf)
835 {
836         struct mdd_object *mdd_obj = md2mdd_obj(obj);
837         int rc;
838
839         ENTRY;
840
841         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
842         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
843         mdd_read_unlock(env, mdd_obj);
844
845         RETURN(rc);
846 }
847
848 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
849                                struct mdd_object *c, struct md_attr *ma,
850                                struct thandle *handle,
851                                const struct md_op_spec *spec)
852 {
853         struct lu_attr *attr = &ma->ma_attr;
854         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
855         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
856         const struct dt_index_features *feat = spec->sp_feat;
857         int rc;
858         ENTRY;
859
860         if (!mdd_object_exists(c)) {
861                 struct dt_object *next = mdd_object_child(c);
862                 LASSERT(next);
863
864                 if (feat != &dt_directory_features && feat != NULL)
865                         dof->dof_type = DFT_INDEX;
866                 else
867                         dof->dof_type = dt_mode_to_dft(attr->la_mode);
868
869                 dof->u.dof_idx.di_feat = feat;
870
871                 /* @hint will be initialized by underlying device. */
872                 next->do_ops->do_ah_init(env, hint,
873                                          p ? mdd_object_child(p) : NULL,
874                                          attr->la_mode & S_IFMT);
875
876                 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
877                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
878         } else
879                 rc = -EEXIST;
880
881         RETURN(rc);
882 }
883
884 /**
885  * Make sure the ctime is increased only.
886  */
887 static inline int mdd_attr_check(const struct lu_env *env,
888                                  struct mdd_object *obj,
889                                  struct lu_attr *attr)
890 {
891         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
892         int rc;
893         ENTRY;
894
895         if (attr->la_valid & LA_CTIME) {
896                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
897                 if (rc)
898                         RETURN(rc);
899
900                 if (attr->la_ctime < tmp_la->la_ctime)
901                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
902                 else if (attr->la_valid == LA_CTIME &&
903                          attr->la_ctime == tmp_la->la_ctime)
904                         attr->la_valid &= ~LA_CTIME;
905         }
906         RETURN(0);
907 }
908
909 int mdd_attr_set_internal(const struct lu_env *env,
910                           struct mdd_object *obj,
911                           struct lu_attr *attr,
912                           struct thandle *handle,
913                           int needacl)
914 {
915         int rc;
916         ENTRY;
917
918         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
919 #ifdef CONFIG_FS_POSIX_ACL
920         if (!rc && (attr->la_valid & LA_MODE) && needacl)
921                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
922 #endif
923         RETURN(rc);
924 }
925
926 int mdd_attr_check_set_internal(const struct lu_env *env,
927                                 struct mdd_object *obj,
928                                 struct lu_attr *attr,
929                                 struct thandle *handle,
930                                 int needacl)
931 {
932         int rc;
933         ENTRY;
934
935         rc = mdd_attr_check(env, obj, attr);
936         if (rc)
937                 RETURN(rc);
938
939         if (attr->la_valid)
940                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
941         RETURN(rc);
942 }
943
944 static int mdd_attr_set_internal_locked(const struct lu_env *env,
945                                         struct mdd_object *obj,
946                                         struct lu_attr *attr,
947                                         struct thandle *handle,
948                                         int needacl)
949 {
950         int rc;
951         ENTRY;
952
953         needacl = needacl && (attr->la_valid & LA_MODE);
954         if (needacl)
955                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
956         rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
957         if (needacl)
958                 mdd_write_unlock(env, obj);
959         RETURN(rc);
960 }
961
962 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
963                                        struct mdd_object *obj,
964                                        struct lu_attr *attr,
965                                        struct thandle *handle,
966                                        int needacl)
967 {
968         int rc;
969         ENTRY;
970
971         needacl = needacl && (attr->la_valid & LA_MODE);
972         if (needacl)
973                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
974         rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
975         if (needacl)
976                 mdd_write_unlock(env, obj);
977         RETURN(rc);
978 }
979
980 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
981                     const struct lu_buf *buf, const char *name,
982                     int fl, struct thandle *handle)
983 {
984         struct lustre_capa *capa = mdd_object_capa(env, obj);
985         int rc = -EINVAL;
986         ENTRY;
987
988         if (buf->lb_buf && buf->lb_len > 0)
989                 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
990         else if (buf->lb_buf == NULL && buf->lb_len == 0)
991                 rc = mdo_xattr_del(env, obj, name, handle, capa);
992
993         RETURN(rc);
994 }
995
996 /*
997  * This gives the same functionality as the code between
998  * sys_chmod and inode_setattr
999  * chown_common and inode_setattr
1000  * utimes and inode_setattr
1001  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1002  */
1003 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1004                         struct lu_attr *la, const struct md_attr *ma)
1005 {
1006         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
1007         struct md_ucred  *uc;
1008         int               rc;
1009         ENTRY;
1010
1011         if (!la->la_valid)
1012                 RETURN(0);
1013
1014         /* Do not permit change file type */
1015         if (la->la_valid & LA_TYPE)
1016                 RETURN(-EPERM);
1017
1018         /* They should not be processed by setattr */
1019         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1020                 RETURN(-EPERM);
1021
1022         /* export destroy does not have ->le_ses, but we may want
1023          * to drop LUSTRE_SOM_FL. */
1024         if (!env->le_ses)
1025                 RETURN(0);
1026
1027         uc = md_ucred(env);
1028
1029         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1030         if (rc)
1031                 RETURN(rc);
1032
1033         if (la->la_valid == LA_CTIME) {
1034                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1035                         /* This is only for set ctime when rename's source is
1036                          * on remote MDS. */
1037                         rc = mdd_may_delete(env, NULL, obj,
1038                                             (struct md_attr *)ma, 1, 0);
1039                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1040                         la->la_valid &= ~LA_CTIME;
1041                 RETURN(rc);
1042         }
1043
1044         if (la->la_valid == LA_ATIME) {
1045                 /* This is atime only set for read atime update on close. */
1046                 if (la->la_atime >= tmp_la->la_atime &&
1047                     la->la_atime < (tmp_la->la_atime +
1048                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1049                         la->la_valid &= ~LA_ATIME;
1050                 RETURN(0);
1051         }
1052
1053         /* Check if flags change. */
1054         if (la->la_valid & LA_FLAGS) {
1055                 unsigned int oldflags = 0;
1056                 unsigned int newflags = la->la_flags &
1057                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1058
1059                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1060                     !mdd_capable(uc, CFS_CAP_FOWNER))
1061                         RETURN(-EPERM);
1062
1063                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1064                  * only be changed by the relevant capability. */
1065                 if (mdd_is_immutable(obj))
1066                         oldflags |= LUSTRE_IMMUTABLE_FL;
1067                 if (mdd_is_append(obj))
1068                         oldflags |= LUSTRE_APPEND_FL;
1069                 if ((oldflags ^ newflags) &&
1070                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1071                         RETURN(-EPERM);
1072
1073                 if (!S_ISDIR(tmp_la->la_mode))
1074                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1075         }
1076
1077         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1078             (la->la_valid & ~LA_FLAGS) &&
1079             !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1080                 RETURN(-EPERM);
1081
1082         /* Check for setting the obj time. */
1083         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1084             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1085                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1086                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
1087                         rc = mdd_permission_internal_locked(env, obj, tmp_la,
1088                                                             MAY_WRITE,
1089                                                             MOR_TGT_CHILD);
1090                         if (rc)
1091                                 RETURN(rc);
1092                 }
1093         }
1094
1095         if (la->la_valid & LA_KILL_SUID) {
1096                 la->la_valid &= ~LA_KILL_SUID;
1097                 if ((tmp_la->la_mode & S_ISUID) &&
1098                     !(la->la_valid & LA_MODE)) {
1099                         la->la_mode = tmp_la->la_mode;
1100                         la->la_valid |= LA_MODE;
1101                 }
1102                 la->la_mode &= ~S_ISUID;
1103         }
1104
1105         if (la->la_valid & LA_KILL_SGID) {
1106                 la->la_valid &= ~LA_KILL_SGID;
1107                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1108                                         (S_ISGID | S_IXGRP)) &&
1109                     !(la->la_valid & LA_MODE)) {
1110                         la->la_mode = tmp_la->la_mode;
1111                         la->la_valid |= LA_MODE;
1112                 }
1113                 la->la_mode &= ~S_ISGID;
1114         }
1115
1116         /* Make sure a caller can chmod. */
1117         if (la->la_valid & LA_MODE) {
1118                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1119                     (uc->mu_fsuid != tmp_la->la_uid) &&
1120                     !mdd_capable(uc, CFS_CAP_FOWNER))
1121                         RETURN(-EPERM);
1122
1123                 if (la->la_mode == (cfs_umode_t) -1)
1124                         la->la_mode = tmp_la->la_mode;
1125                 else
1126                         la->la_mode = (la->la_mode & S_IALLUGO) |
1127                                       (tmp_la->la_mode & ~S_IALLUGO);
1128
1129                 /* Also check the setgid bit! */
1130                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1131                                        la->la_gid : tmp_la->la_gid) &&
1132                     !mdd_capable(uc, CFS_CAP_FSETID))
1133                         la->la_mode &= ~S_ISGID;
1134         } else {
1135                la->la_mode = tmp_la->la_mode;
1136         }
1137
1138         /* Make sure a caller can chown. */
1139         if (la->la_valid & LA_UID) {
1140                 if (la->la_uid == (uid_t) -1)
1141                         la->la_uid = tmp_la->la_uid;
1142                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1143                     (la->la_uid != tmp_la->la_uid)) &&
1144                     !mdd_capable(uc, CFS_CAP_CHOWN))
1145                         RETURN(-EPERM);
1146
1147                 /* If the user or group of a non-directory has been
1148                  * changed by a non-root user, remove the setuid bit.
1149                  * 19981026 David C Niemi <niemi@tux.org>
1150                  *
1151                  * Changed this to apply to all users, including root,
1152                  * to avoid some races. This is the behavior we had in
1153                  * 2.0. The check for non-root was definitely wrong
1154                  * for 2.2 anyway, as it should have been using
1155                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
1156                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1157                     !S_ISDIR(tmp_la->la_mode)) {
1158                         la->la_mode &= ~S_ISUID;
1159                         la->la_valid |= LA_MODE;
1160                 }
1161         }
1162
1163         /* Make sure caller can chgrp. */
1164         if (la->la_valid & LA_GID) {
1165                 if (la->la_gid == (gid_t) -1)
1166                         la->la_gid = tmp_la->la_gid;
1167                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1168                     ((la->la_gid != tmp_la->la_gid) &&
1169                     !lustre_in_group_p(uc, la->la_gid))) &&
1170                     !mdd_capable(uc, CFS_CAP_CHOWN))
1171                         RETURN(-EPERM);
1172
1173                 /* Likewise, if the user or group of a non-directory
1174                  * has been changed by a non-root user, remove the
1175                  * setgid bit UNLESS there is no group execute bit
1176                  * (this would be a file marked for mandatory
1177                  * locking).  19981026 David C Niemi <niemi@tux.org>
1178                  *
1179                  * Removed the fsuid check (see the comment above) --
1180                  * 19990830 SD. */
1181                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1182                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1183                         la->la_mode &= ~S_ISGID;
1184                         la->la_valid |= LA_MODE;
1185                 }
1186         }
1187
1188         /* For both Size-on-MDS case and truncate case,
1189          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1190          * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1191          * For SOM case, it is true, the MAY_WRITE perm has been checked
1192          * when open, no need check again. For truncate case, it is false,
1193          * the MAY_WRITE perm should be checked here. */
1194         if (ma->ma_attr_flags & MDS_SOM) {
1195                 /* For the "Size-on-MDS" setattr update, merge coming
1196                  * attributes with the set in the inode. BUG 10641 */
1197                 if ((la->la_valid & LA_ATIME) &&
1198                     (la->la_atime <= tmp_la->la_atime))
1199                         la->la_valid &= ~LA_ATIME;
1200
1201                 /* OST attributes do not have a priority over MDS attributes,
1202                  * so drop times if ctime is equal. */
1203                 if ((la->la_valid & LA_CTIME) &&
1204                     (la->la_ctime <= tmp_la->la_ctime))
1205                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1206         } else {
1207                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1208                         if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1209                               (uc->mu_fsuid == tmp_la->la_uid)) &&
1210                             !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1211                                 rc = mdd_permission_internal_locked(env, obj,
1212                                                             tmp_la, MAY_WRITE,
1213                                                             MOR_TGT_CHILD);
1214                                 if (rc)
1215                                         RETURN(rc);
1216                         }
1217                 }
1218                 if (la->la_valid & LA_CTIME) {
1219                         /* The pure setattr, it has the priority over what is
1220                          * already set, do not drop it if ctime is equal. */
1221                         if (la->la_ctime < tmp_la->la_ctime)
1222                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1223                                                   LA_CTIME);
1224                 }
1225         }
1226
1227         RETURN(0);
1228 }
1229
1230 /** Store a data change changelog record
1231  * If this fails, we must fail the whole transaction; we don't
1232  * want the change to commit without the log entry.
1233  * \param mdd_obj - mdd_object of change
1234  * \param handle - transacion handle
1235  */
1236 static int mdd_changelog_data_store(const struct lu_env     *env,
1237                                     struct mdd_device       *mdd,
1238                                     enum changelog_rec_type type,
1239                                     int                     flags,
1240                                     struct mdd_object       *mdd_obj,
1241                                     struct thandle          *handle)
1242 {
1243         const struct lu_fid *tfid = mdo2fid(mdd_obj);
1244         struct llog_changelog_rec *rec;
1245         struct lu_buf *buf;
1246         int reclen;
1247         int rc;
1248
1249         /* Not recording */
1250         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1251                 RETURN(0);
1252         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1253                 RETURN(0);
1254
1255         LASSERT(handle != NULL);
1256         LASSERT(mdd_obj != NULL);
1257
1258         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1259             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1260                 /* Don't need multiple updates in this log */
1261                 /* Don't check under lock - no big deal if we get an extra
1262                    entry */
1263                 RETURN(0);
1264         }
1265
1266         reclen = llog_data_len(sizeof(*rec));
1267         buf = mdd_buf_alloc(env, reclen);
1268         if (buf->lb_buf == NULL)
1269                 RETURN(-ENOMEM);
1270         rec = (struct llog_changelog_rec *)buf->lb_buf;
1271
1272         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1273         rec->cr.cr_type = (__u32)type;
1274         rec->cr.cr_tfid = *tfid;
1275         rec->cr.cr_namelen = 0;
1276         mdd_obj->mod_cltime = cfs_time_current_64();
1277
1278         rc = mdd_changelog_llog_write(mdd, rec, handle);
1279         if (rc < 0) {
1280                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1281                        rc, type, PFID(tfid));
1282                 return -EFAULT;
1283         }
1284
1285         return 0;
1286 }
1287
1288 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1289                   int flags, struct md_object *obj)
1290 {
1291         struct thandle *handle;
1292         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1293         struct mdd_device *mdd = mdo2mdd(obj);
1294         int rc;
1295         ENTRY;
1296
1297         handle = mdd_trans_start(env, mdd);
1298
1299         if (IS_ERR(handle))
1300                 return(PTR_ERR(handle));
1301
1302         rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1303                                       handle);
1304
1305         mdd_trans_stop(env, mdd, rc, handle);
1306
1307         RETURN(rc);
1308 }
1309
1310 /**
1311  * Should be called with write lock held.
1312  *
1313  * \see mdd_lma_set_locked().
1314  */
1315 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1316                        const struct md_attr *ma, struct thandle *handle)
1317 {
1318         struct mdd_thread_info *info = mdd_env_info(env);
1319         struct lu_buf *buf;
1320         struct lustre_mdt_attrs *lma =
1321                                 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1322         int lmasize = sizeof(struct lustre_mdt_attrs);
1323         int rc = 0;
1324
1325         ENTRY;
1326
1327         /* Either HSM or SOM part is not valid, we need to read it before */
1328         if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1329                 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1330                 if (rc <= 0)
1331                         RETURN(rc);
1332
1333                 lustre_lma_swab(lma);
1334         } else {
1335                 memset(lma, 0, lmasize);
1336         }
1337
1338         /* Copy HSM data */
1339         if (ma->ma_valid & MA_HSM) {
1340                 lma->lma_flags  |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1341                 lma->lma_compat |= LMAC_HSM;
1342         }
1343
1344         /* Copy SOM data */
1345         if (ma->ma_valid & MA_SOM) {
1346                 LASSERT(ma->ma_som != NULL);
1347                 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1348                         lma->lma_compat     &= ~LMAC_SOM;
1349                 } else {
1350                         lma->lma_compat     |= LMAC_SOM;
1351                         lma->lma_ioepoch     = ma->ma_som->msd_ioepoch;
1352                         lma->lma_som_size    = ma->ma_som->msd_size;
1353                         lma->lma_som_blocks  = ma->ma_som->msd_blocks;
1354                         lma->lma_som_mountid = ma->ma_som->msd_mountid;
1355                 }
1356         }
1357
1358         /* Copy FID */
1359         memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1360
1361         lustre_lma_swab(lma);
1362         buf = mdd_buf_get(env, lma, lmasize);
1363         rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1364
1365         RETURN(rc);
1366 }
1367
1368 /**
1369  * Save LMA extended attributes with data from \a ma.
1370  *
1371  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1372  * not, LMA EA will be first read from disk, modified and write back.
1373  *
1374  */
1375 static int mdd_lma_set_locked(const struct lu_env *env,
1376                               struct mdd_object *mdd_obj,
1377                               const struct md_attr *ma, struct thandle *handle)
1378 {
1379         int rc;
1380
1381         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1382         rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1383         mdd_write_unlock(env, mdd_obj);
1384         return rc;
1385 }
1386
1387 /* Precedence for choosing record type when multiple
1388  * attributes change: setattr > mtime > ctime > atime
1389  * (ctime changes when mtime does, plus chmod/chown.
1390  * atime and ctime are independent.) */
1391 static int mdd_attr_set_changelog(const struct lu_env *env,
1392                                   struct md_object *obj, struct thandle *handle,
1393                                   __u64 valid)
1394 {
1395         struct mdd_device *mdd = mdo2mdd(obj);
1396         int bits, type = 0;
1397
1398         bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1399         bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1400         bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1401         bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1402         bits = bits & mdd->mdd_cl.mc_mask;
1403         if (bits == 0)
1404                 return 0;
1405
1406         /* The record type is the lowest non-masked set bit */
1407         while (bits && ((bits & 1) == 0)) {
1408                 bits = bits >> 1;
1409                 type++;
1410         }
1411
1412         /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1413         return mdd_changelog_data_store(env, mdd, type, (int)valid,
1414                                         md2mdd_obj(obj), handle);
1415 }
1416
1417 /* set attr and LOV EA at once, return updated attr */
1418 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1419                         const struct md_attr *ma)
1420 {
1421         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1422         struct mdd_device *mdd = mdo2mdd(obj);
1423         struct thandle *handle;
1424         struct lov_mds_md *lmm = NULL;
1425         struct llog_cookie *logcookies = NULL;
1426         int  rc, lmm_size = 0, cookie_size = 0;
1427         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1428 #ifdef HAVE_QUOTA_SUPPORT
1429         struct obd_device *obd = mdd->mdd_obd_dev;
1430         struct mds_obd *mds = &obd->u.mds;
1431         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1432         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1433         int quota_opc = 0, block_count = 0;
1434         int inode_pending[MAXQUOTAS] = { 0, 0 };
1435         int block_pending[MAXQUOTAS] = { 0, 0 };
1436 #endif
1437         ENTRY;
1438
1439         *la_copy = ma->ma_attr;
1440         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1441         if (rc != 0)
1442                 RETURN(rc);
1443
1444         /* setattr on "close" only change atime, or do nothing */
1445         if (ma->ma_valid == MA_INODE &&
1446             ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1447                 RETURN(0);
1448
1449         mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1450                                     MDD_TXN_ATTR_SET_OP);
1451         handle = mdd_trans_start(env, mdd);
1452         if (IS_ERR(handle))
1453                 RETURN(PTR_ERR(handle));
1454         /*TODO: add lock here*/
1455         /* start a log jounal handle if needed */
1456         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1457             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1458                 lmm_size = mdd_lov_mdsize(env, mdd);
1459                 lmm = mdd_max_lmm_get(env, mdd);
1460                 if (lmm == NULL)
1461                         GOTO(cleanup, rc = -ENOMEM);
1462
1463                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1464                                 XATTR_NAME_LOV);
1465
1466                 if (rc < 0)
1467                         GOTO(cleanup, rc);
1468         }
1469
1470         if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1471                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1472                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1473
1474 #ifdef HAVE_QUOTA_SUPPORT
1475         if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1476                 struct obd_export *exp = md_quota(env)->mq_exp;
1477                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1478
1479                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1480                 if (!rc) {
1481                         quota_opc = FSFILT_OP_SETATTR;
1482                         mdd_quota_wrapper(la_copy, qnids);
1483                         mdd_quota_wrapper(la_tmp, qoids);
1484                         /* get file quota for new owner */
1485                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1486                                         qnids, inode_pending, 1, NULL, 0,
1487                                         NULL, 0);
1488                         block_count = (la_tmp->la_blocks + 7) >> 3;
1489                         if (block_count) {
1490                                 void *data = NULL;
1491                                 mdd_data_get(env, mdd_obj, &data);
1492                                 /* get block quota for new owner */
1493                                 lquota_chkquota(mds_quota_interface_ref, obd,
1494                                                 exp, qnids, block_pending,
1495                                                 block_count, NULL,
1496                                                 LQUOTA_FLAGS_BLK, data, 1);
1497                         }
1498                 }
1499         }
1500 #endif
1501
1502         if (la_copy->la_valid & LA_FLAGS) {
1503                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1504                                                   handle, 1);
1505                 if (rc == 0)
1506                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1507         } else if (la_copy->la_valid) {            /* setattr */
1508                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1509                                                   handle, 1);
1510                 /* journal chown/chgrp in llog, just like unlink */
1511                 if (rc == 0 && lmm_size){
1512                         cookie_size = mdd_lov_cookiesize(env, mdd);
1513                         logcookies = mdd_max_cookie_get(env, mdd);
1514                         if (logcookies == NULL)
1515                                 GOTO(cleanup, rc = -ENOMEM);
1516
1517                         if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1518                                             logcookies, cookie_size) <= 0)
1519                                 logcookies = NULL;
1520                 }
1521         }
1522
1523         if (rc == 0 && ma->ma_valid & MA_LOV) {
1524                 cfs_umode_t mode;
1525
1526                 mode = mdd_object_type(mdd_obj);
1527                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1528                         rc = mdd_lsm_sanity_check(env, mdd_obj);
1529                         if (rc)
1530                                 GOTO(cleanup, rc);
1531
1532                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1533                                             ma->ma_lmm_size, handle, 1);
1534                 }
1535
1536         }
1537         if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1538                 cfs_umode_t mode;
1539
1540                 mode = mdd_object_type(mdd_obj);
1541                 if (S_ISREG(mode))
1542                         rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1543
1544         }
1545 cleanup:
1546         if (rc == 0)
1547                 rc = mdd_attr_set_changelog(env, obj, handle,
1548                                             ma->ma_attr.la_valid);
1549         mdd_trans_stop(env, mdd, rc, handle);
1550         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1551                 /*set obd attr, if needed*/
1552                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1553                                            logcookies);
1554         }
1555 #ifdef HAVE_QUOTA_SUPPORT
1556         if (quota_opc) {
1557                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1558                                       inode_pending, 0);
1559                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1560                                       block_pending, 1);
1561                 /* Trigger dqrel/dqacq for original owner and new owner.
1562                  * If failed, the next call for lquota_chkquota will
1563                  * process it. */
1564                 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1565                               quota_opc);
1566         }
1567 #endif
1568         RETURN(rc);
1569 }
1570
1571 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1572                       const struct lu_buf *buf, const char *name, int fl,
1573                       struct thandle *handle)
1574 {
1575         int  rc;
1576         ENTRY;
1577
1578         mdd_write_lock(env, obj, MOR_TGT_CHILD);
1579         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1580         mdd_write_unlock(env, obj);
1581
1582         RETURN(rc);
1583 }
1584
1585 static int mdd_xattr_sanity_check(const struct lu_env *env,
1586                                   struct mdd_object *obj)
1587 {
1588         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1589         struct md_ucred *uc     = md_ucred(env);
1590         int rc;
1591         ENTRY;
1592
1593         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1594                 RETURN(-EPERM);
1595
1596         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1597         if (rc)
1598                 RETURN(rc);
1599
1600         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1601             !mdd_capable(uc, CFS_CAP_FOWNER))
1602                 RETURN(-EPERM);
1603
1604         RETURN(rc);
1605 }
1606
1607 /**
1608  * The caller should guarantee to update the object ctime
1609  * after xattr_set if needed.
1610  */
1611 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1612                          const struct lu_buf *buf, const char *name,
1613                          int fl)
1614 {
1615         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1616         struct mdd_device *mdd = mdo2mdd(obj);
1617         struct thandle *handle;
1618         int  rc;
1619         ENTRY;
1620
1621         rc = mdd_xattr_sanity_check(env, mdd_obj);
1622         if (rc)
1623                 RETURN(rc);
1624
1625         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1626         /* security-replated changes may require sync */
1627         if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1628             mdd->mdd_sync_permission == 1)
1629                 txn_param_sync(&mdd_env_info(env)->mti_param);
1630
1631         handle = mdd_trans_start(env, mdd);
1632         if (IS_ERR(handle))
1633                 RETURN(PTR_ERR(handle));
1634
1635         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1636
1637         /* Only record user xattr changes */
1638         if ((rc == 0) && (strncmp("user.", name, 5) == 0))
1639                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1640                                               handle);
1641         mdd_trans_stop(env, mdd, rc, handle);
1642
1643         RETURN(rc);
1644 }
1645
1646 /**
1647  * The caller should guarantee to update the object ctime
1648  * after xattr_set if needed.
1649  */
1650 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1651                   const char *name)
1652 {
1653         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1654         struct mdd_device *mdd = mdo2mdd(obj);
1655         struct thandle *handle;
1656         int  rc;
1657         ENTRY;
1658
1659         rc = mdd_xattr_sanity_check(env, mdd_obj);
1660         if (rc)
1661                 RETURN(rc);
1662
1663         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1664         handle = mdd_trans_start(env, mdd);
1665         if (IS_ERR(handle))
1666                 RETURN(PTR_ERR(handle));
1667
1668         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1669         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1670                            mdd_object_capa(env, mdd_obj));
1671         mdd_write_unlock(env, mdd_obj);
1672
1673         /* Only record user xattr changes */
1674         if ((rc == 0) && (strncmp("user.", name, 5) != 0))
1675                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1676                                               handle);
1677
1678         mdd_trans_stop(env, mdd, rc, handle);
1679
1680         RETURN(rc);
1681 }
1682
1683 /* partial unlink */
1684 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1685                        struct md_attr *ma)
1686 {
1687         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1688         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1689         struct mdd_device *mdd = mdo2mdd(obj);
1690         struct thandle *handle;
1691 #ifdef HAVE_QUOTA_SUPPORT
1692         struct obd_device *obd = mdd->mdd_obd_dev;
1693         struct mds_obd *mds = &obd->u.mds;
1694         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1695         int quota_opc = 0;
1696 #endif
1697         int rc;
1698         ENTRY;
1699
1700         /*
1701          * Check -ENOENT early here because we need to get object type
1702          * to calculate credits before transaction start
1703          */
1704         if (!mdd_object_exists(mdd_obj))
1705                 RETURN(-ENOENT);
1706
1707         LASSERT(mdd_object_exists(mdd_obj) > 0);
1708
1709         rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1710         if (rc)
1711                 RETURN(rc);
1712
1713         handle = mdd_trans_start(env, mdd);
1714         if (IS_ERR(handle))
1715                 RETURN(-ENOMEM);
1716
1717         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1718
1719         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1720         if (rc)
1721                 GOTO(cleanup, rc);
1722
1723         __mdd_ref_del(env, mdd_obj, handle, 0);
1724
1725         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1726                 /* unlink dot */
1727                 __mdd_ref_del(env, mdd_obj, handle, 1);
1728         }
1729
1730         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1731         la_copy->la_ctime = ma->ma_attr.la_ctime;
1732
1733         la_copy->la_valid = LA_CTIME;
1734         rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1735         if (rc)
1736                 GOTO(cleanup, rc);
1737
1738         rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1739 #ifdef HAVE_QUOTA_SUPPORT
1740         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1741             ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1742                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1743                 mdd_quota_wrapper(&ma->ma_attr, qids);
1744         }
1745 #endif
1746
1747
1748         EXIT;
1749 cleanup:
1750         mdd_write_unlock(env, mdd_obj);
1751         mdd_trans_stop(env, mdd, rc, handle);
1752 #ifdef HAVE_QUOTA_SUPPORT
1753         if (quota_opc)
1754                 /* Trigger dqrel on the owner of child. If failed,
1755                  * the next call for lquota_chkquota will process it */
1756                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1757                               quota_opc);
1758 #endif
1759         return rc;
1760 }
1761
1762 /* partial operation */
1763 static int mdd_oc_sanity_check(const struct lu_env *env,
1764                                struct mdd_object *obj,
1765                                struct md_attr *ma)
1766 {
1767         int rc;
1768         ENTRY;
1769
1770         switch (ma->ma_attr.la_mode & S_IFMT) {
1771         case S_IFREG:
1772         case S_IFDIR:
1773         case S_IFLNK:
1774         case S_IFCHR:
1775         case S_IFBLK:
1776         case S_IFIFO:
1777         case S_IFSOCK:
1778                 rc = 0;
1779                 break;
1780         default:
1781                 rc = -EINVAL;
1782                 break;
1783         }
1784         RETURN(rc);
1785 }
1786
1787 static int mdd_object_create(const struct lu_env *env,
1788                              struct md_object *obj,
1789                              const struct md_op_spec *spec,
1790                              struct md_attr *ma)
1791 {
1792
1793         struct mdd_device *mdd = mdo2mdd(obj);
1794         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1795         const struct lu_fid *pfid = spec->u.sp_pfid;
1796         struct thandle *handle;
1797 #ifdef HAVE_QUOTA_SUPPORT
1798         struct obd_device *obd = mdd->mdd_obd_dev;
1799         struct obd_export *exp = md_quota(env)->mq_exp;
1800         struct mds_obd *mds = &obd->u.mds;
1801         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1802         int quota_opc = 0, block_count = 0;
1803         int inode_pending[MAXQUOTAS] = { 0, 0 };
1804         int block_pending[MAXQUOTAS] = { 0, 0 };
1805 #endif
1806         int rc = 0;
1807         ENTRY;
1808
1809 #ifdef HAVE_QUOTA_SUPPORT
1810         if (mds->mds_quota) {
1811                 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1812                 mdd_quota_wrapper(&ma->ma_attr, qids);
1813                 /* get file quota for child */
1814                 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1815                                 qids, inode_pending, 1, NULL, 0,
1816                                 NULL, 0);
1817                 switch (ma->ma_attr.la_mode & S_IFMT) {
1818                 case S_IFLNK:
1819                 case S_IFDIR:
1820                         block_count = 2;
1821                         break;
1822                 case S_IFREG:
1823                         block_count = 1;
1824                         break;
1825                 }
1826                 /* get block quota for child */
1827                 if (block_count)
1828                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1829                                         qids, block_pending, block_count,
1830                                         NULL, LQUOTA_FLAGS_BLK, NULL, 0);
1831         }
1832 #endif
1833
1834         mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
1835         handle = mdd_trans_start(env, mdd);
1836         if (IS_ERR(handle))
1837                 GOTO(out_pending, rc = PTR_ERR(handle));
1838
1839         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1840         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1841         if (rc)
1842                 GOTO(unlock, rc);
1843
1844         rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1845         if (rc)
1846                 GOTO(unlock, rc);
1847
1848         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1849                 /* If creating the slave object, set slave EA here. */
1850                 int lmv_size = spec->u.sp_ea.eadatalen;
1851                 struct lmv_stripe_md *lmv;
1852
1853                 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1854                 LASSERT(lmv != NULL && lmv_size > 0);
1855
1856                 rc = __mdd_xattr_set(env, mdd_obj,
1857                                      mdd_buf_get_const(env, lmv, lmv_size),
1858                                      XATTR_NAME_LMV, 0, handle);
1859                 if (rc)
1860                         GOTO(unlock, rc);
1861
1862                 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1863                                            handle, 0);
1864         } else {
1865 #ifdef CONFIG_FS_POSIX_ACL
1866                 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1867                         struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1868
1869                         buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1870                         buf->lb_len = spec->u.sp_ea.eadatalen;
1871                         if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1872                                 rc = __mdd_acl_init(env, mdd_obj, buf,
1873                                                     &ma->ma_attr.la_mode,
1874                                                     handle);
1875                                 if (rc)
1876                                         GOTO(unlock, rc);
1877                                 else
1878                                         ma->ma_attr.la_valid |= LA_MODE;
1879                         }
1880
1881                         pfid = spec->u.sp_ea.fid;
1882                 }
1883 #endif
1884                 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1885                                            spec);
1886         }
1887         EXIT;
1888 unlock:
1889         if (rc == 0)
1890                 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1891         mdd_write_unlock(env, mdd_obj);
1892
1893         mdd_trans_stop(env, mdd, rc, handle);
1894 out_pending:
1895 #ifdef HAVE_QUOTA_SUPPORT
1896         if (quota_opc) {
1897                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1898                                       inode_pending, 0);
1899                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1900                                       block_pending, 1);
1901                 /* Trigger dqacq on the owner of child. If failed,
1902                  * the next call for lquota_chkquota will process it. */
1903                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1904                               quota_opc);
1905         }
1906 #endif
1907         return rc;
1908 }
1909
1910 /* partial link */
1911 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1912                        const struct md_attr *ma)
1913 {
1914         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1915         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1916         struct mdd_device *mdd = mdo2mdd(obj);
1917         struct thandle *handle;
1918         int rc;
1919         ENTRY;
1920
1921         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1922         handle = mdd_trans_start(env, mdd);
1923         if (IS_ERR(handle))
1924                 RETURN(-ENOMEM);
1925
1926         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1927         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1928         if (rc == 0)
1929                 __mdd_ref_add(env, mdd_obj, handle);
1930         mdd_write_unlock(env, mdd_obj);
1931         if (rc == 0) {
1932                 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1933                 la_copy->la_ctime = ma->ma_attr.la_ctime;
1934
1935                 la_copy->la_valid = LA_CTIME;
1936                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1937                                                         handle, 0);
1938         }
1939         mdd_trans_stop(env, mdd, 0, handle);
1940
1941         RETURN(rc);
1942 }
1943
1944 /*
1945  * do NOT or the MAY_*'s, you'll get the weakest
1946  */
1947 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1948 {
1949         int res = 0;
1950
1951         /* Sadly, NFSD reopens a file repeatedly during operation, so the
1952          * "acc_mode = 0" allowance for newly-created files isn't honoured.
1953          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1954          * owner can write to a file even if it is marked readonly to hide
1955          * its brokenness. (bug 5781) */
1956         if (flags & MDS_OPEN_OWNEROVERRIDE) {
1957                 struct md_ucred *uc = md_ucred(env);
1958
1959                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1960                     (la->la_uid == uc->mu_fsuid))
1961                         return 0;
1962         }
1963
1964         if (flags & FMODE_READ)
1965                 res |= MAY_READ;
1966         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1967                 res |= MAY_WRITE;
1968         if (flags & MDS_FMODE_EXEC)
1969                 res = MAY_EXEC;
1970         return res;
1971 }
1972
1973 static int mdd_open_sanity_check(const struct lu_env *env,
1974                                  struct mdd_object *obj, int flag)
1975 {
1976         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1977         int mode, rc;
1978         ENTRY;
1979
1980         /* EEXIST check */
1981         if (mdd_is_dead_obj(obj))
1982                 RETURN(-ENOENT);
1983
1984         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1985         if (rc)
1986                RETURN(rc);
1987
1988         if (S_ISLNK(tmp_la->la_mode))
1989                 RETURN(-ELOOP);
1990
1991         mode = accmode(env, tmp_la, flag);
1992
1993         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1994                 RETURN(-EISDIR);
1995
1996         if (!(flag & MDS_OPEN_CREATED)) {
1997                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1998                 if (rc)
1999                         RETURN(rc);
2000         }
2001
2002         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2003             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2004                 flag &= ~MDS_OPEN_TRUNC;
2005
2006         /* For writing append-only file must open it with append mode. */
2007         if (mdd_is_append(obj)) {
2008                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2009                         RETURN(-EPERM);
2010                 if (flag & MDS_OPEN_TRUNC)
2011                         RETURN(-EPERM);
2012         }
2013
2014 #if 0
2015         /*
2016          * Now, flag -- O_NOATIME does not be packed by client.
2017          */
2018         if (flag & O_NOATIME) {
2019                 struct md_ucred *uc = md_ucred(env);
2020
2021                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2022                     (uc->mu_valid == UCRED_NEW)) &&
2023                     (uc->mu_fsuid != tmp_la->la_uid) &&
2024                     !mdd_capable(uc, CFS_CAP_FOWNER))
2025                         RETURN(-EPERM);
2026         }
2027 #endif
2028
2029         RETURN(0);
2030 }
2031
2032 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2033                     int flags)
2034 {
2035         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2036         int rc = 0;
2037
2038         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2039
2040         rc = mdd_open_sanity_check(env, mdd_obj, flags);
2041         if (rc == 0)
2042                 mdd_obj->mod_count++;
2043
2044         mdd_write_unlock(env, mdd_obj);
2045         return rc;
2046 }
2047
2048 /* return md_attr back,
2049  * if it is last unlink then return lov ea + llog cookie*/
2050 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2051                     struct md_attr *ma)
2052 {
2053         int rc = 0;
2054         ENTRY;
2055
2056         if (S_ISREG(mdd_object_type(obj))) {
2057                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2058                  * Caller must be ready for that. */
2059
2060                 rc = __mdd_lmm_get(env, obj, ma);
2061                 if ((ma->ma_valid & MA_LOV))
2062                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2063                                             obj, ma);
2064         }
2065         RETURN(rc);
2066 }
2067
2068 /*
2069  * No permission check is needed.
2070  */
2071 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2072                      struct md_attr *ma)
2073 {
2074         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2075         struct mdd_device *mdd = mdo2mdd(obj);
2076         struct thandle    *handle = NULL;
2077         int rc;
2078         int reset = 1;
2079
2080 #ifdef HAVE_QUOTA_SUPPORT
2081         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2082         struct mds_obd *mds = &obd->u.mds;
2083         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2084         int quota_opc = 0;
2085 #endif
2086         ENTRY;
2087
2088         if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2089                 mdd_obj->mod_count--;
2090
2091                 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2092                         CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2093                                "list\n", PFID(mdd_object_fid(mdd_obj)));
2094                 RETURN(0);
2095         }
2096
2097         /* check without any lock */
2098         if (mdd_obj->mod_count == 1 &&
2099             (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2100  again:
2101                 rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
2102                 if (rc)
2103                         RETURN(rc);
2104                 handle = mdd_trans_start(env, mdo2mdd(obj));
2105                 if (IS_ERR(handle))
2106                         RETURN(PTR_ERR(handle));
2107         }
2108
2109         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2110         if (handle == NULL && mdd_obj->mod_count == 1 &&
2111             (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2112                 mdd_write_unlock(env, mdd_obj);
2113                 goto again;
2114         }
2115
2116         /* release open count */
2117         mdd_obj->mod_count --;
2118
2119         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2120                 /* remove link to object from orphan index */
2121                 rc = __mdd_orphan_del(env, mdd_obj, handle);
2122                 if (rc == 0) {
2123                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2124                                "list, OSS objects to be destroyed.\n",
2125                                PFID(mdd_object_fid(mdd_obj)));
2126                 } else {
2127                         CERROR("Object "DFID" can not be deleted from orphan "
2128                                 "list, maybe cause OST objects can not be "
2129                                 "destroyed (err: %d).\n",
2130                                 PFID(mdd_object_fid(mdd_obj)), rc);
2131                         /* If object was not deleted from orphan list, do not
2132                          * destroy OSS objects, which will be done when next
2133                          * recovery. */
2134                         GOTO(out, rc);
2135                 }
2136         }
2137
2138         rc = mdd_iattr_get(env, mdd_obj, ma);
2139         /* Object maybe not in orphan list originally, it is rare case for
2140          * mdd_finish_unlink() failure. */
2141         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
2142 #ifdef HAVE_QUOTA_SUPPORT
2143                 if (mds->mds_quota) {
2144                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2145                         mdd_quota_wrapper(&ma->ma_attr, qids);
2146                 }
2147 #endif
2148                 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2149                 if (ma->ma_valid & MA_FLAGS &&
2150                     ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2151                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2152                 } else {
2153                         rc = mdd_object_kill(env, mdd_obj, ma);
2154                         if (rc == 0)
2155                                 reset = 0;
2156                 }
2157
2158                 if (rc != 0)
2159                         CERROR("Error when prepare to delete Object "DFID" , "
2160                                "which will cause OST objects can not be "
2161                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
2162         }
2163         EXIT;
2164
2165 out:
2166         if (reset)
2167                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2168
2169         mdd_write_unlock(env, mdd_obj);
2170         if (handle != NULL)
2171                 mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
2172 #ifdef HAVE_QUOTA_SUPPORT
2173         if (quota_opc)
2174                 /* Trigger dqrel on the owner of child. If failed,
2175                  * the next call for lquota_chkquota will process it */
2176                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2177                               quota_opc);
2178 #endif
2179         return rc;
2180 }
2181
2182 /*
2183  * Permission check is done when open,
2184  * no need check again.
2185  */
2186 static int mdd_readpage_sanity_check(const struct lu_env *env,
2187                                      struct mdd_object *obj)
2188 {
2189         struct dt_object *next = mdd_object_child(obj);
2190         int rc;
2191         ENTRY;
2192
2193         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2194                 rc = 0;
2195         else
2196                 rc = -ENOTDIR;
2197
2198         RETURN(rc);
2199 }
2200
2201 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2202                               int first, void *area, int nob,
2203                               const struct dt_it_ops *iops, struct dt_it *it,
2204                               __u64 *start, __u64 *end,
2205                               struct lu_dirent **last, __u32 attr)
2206 {
2207         int                     result;
2208         __u64                   hash = 0;
2209         struct lu_dirent       *ent;
2210
2211         if (first) {
2212                 memset(area, 0, sizeof (struct lu_dirpage));
2213                 area += sizeof (struct lu_dirpage);
2214                 nob  -= sizeof (struct lu_dirpage);
2215         }
2216
2217         ent  = area;
2218         do {
2219                 int    len;
2220                 int    recsize;
2221
2222                 len  = iops->key_size(env, it);
2223
2224                 /* IAM iterator can return record with zero len. */
2225                 if (len == 0)
2226                         goto next;
2227
2228                 hash = iops->store(env, it);
2229                 if (unlikely(first)) {
2230                         first = 0;
2231                         *start = hash;
2232                 }
2233
2234                 /* calculate max space required for lu_dirent */
2235                 recsize = lu_dirent_calc_size(len, attr);
2236
2237                 if (nob >= recsize) {
2238                         result = iops->rec(env, it, ent, attr);
2239                         if (result == -ESTALE)
2240                                 goto next;
2241                         if (result != 0)
2242                                 goto out;
2243
2244                         /* osd might not able to pack all attributes,
2245                          * so recheck rec length */
2246                         recsize = le16_to_cpu(ent->lde_reclen);
2247                 } else {
2248                         /*
2249                          * record doesn't fit into page, enlarge previous one.
2250                          */
2251                         if (*last) {
2252                                 (*last)->lde_reclen =
2253                                         cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
2254                                                         nob);
2255                                 result = 0;
2256                         } else
2257                                 result = -EINVAL;
2258
2259                         goto out;
2260                 }
2261                 *last = ent;
2262                 ent = (void *)ent + recsize;
2263                 nob -= recsize;
2264
2265 next:
2266                 result = iops->next(env, it);
2267                 if (result == -ESTALE)
2268                         goto next;
2269         } while (result == 0);
2270
2271 out:
2272         *end = hash;
2273         return result;
2274 }
2275
2276 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2277                           const struct lu_rdpg *rdpg)
2278 {
2279         struct dt_it      *it;
2280         struct dt_object  *next = mdd_object_child(obj);
2281         const struct dt_it_ops  *iops;
2282         struct page       *pg;
2283         struct lu_dirent  *last = NULL;
2284         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2285         int i;
2286         int rc;
2287         int nob;
2288         __u64 hash_start;
2289         __u64 hash_end = 0;
2290
2291         LASSERT(rdpg->rp_pages != NULL);
2292         LASSERT(next->do_index_ops != NULL);
2293
2294         if (rdpg->rp_count <= 0)
2295                 return -EFAULT;
2296
2297         /*
2298          * iterate through directory and fill pages from @rdpg
2299          */
2300         iops = &next->do_index_ops->dio_it;
2301         it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
2302         if (IS_ERR(it))
2303                 return PTR_ERR(it);
2304
2305         rc = iops->load(env, it, rdpg->rp_hash);
2306
2307         if (rc == 0){
2308                 /*
2309                  * Iterator didn't find record with exactly the key requested.
2310                  *
2311                  * It is currently either
2312                  *
2313                  *     - positioned above record with key less than
2314                  *     requested---skip it.
2315                  *
2316                  *     - or not positioned at all (is in IAM_IT_SKEWED
2317                  *     state)---position it on the next item.
2318                  */
2319                 rc = iops->next(env, it);
2320         } else if (rc > 0)
2321                 rc = 0;
2322
2323         /*
2324          * At this point and across for-loop:
2325          *
2326          *  rc == 0 -> ok, proceed.
2327          *  rc >  0 -> end of directory.
2328          *  rc <  0 -> error.
2329          */
2330         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2331              i++, nob -= CFS_PAGE_SIZE) {
2332                 LASSERT(i < rdpg->rp_npages);
2333                 pg = rdpg->rp_pages[i];
2334                 rc = mdd_dir_page_build(env, mdd, !i, cfs_kmap(pg),
2335                                         min_t(int, nob, CFS_PAGE_SIZE), iops,
2336                                         it, &hash_start, &hash_end, &last,
2337                                         rdpg->rp_attrs);
2338                 if (rc != 0 || i == rdpg->rp_npages - 1) {
2339                         if (last)
2340                                 last->lde_reclen = 0;
2341                 }
2342                 cfs_kunmap(pg);
2343         }
2344         if (rc > 0) {
2345                 /*
2346                  * end of directory.
2347                  */
2348                 hash_end = DIR_END_OFF;
2349                 rc = 0;
2350         }
2351         if (rc == 0) {
2352                 struct lu_dirpage *dp;
2353
2354                 dp = cfs_kmap(rdpg->rp_pages[0]);
2355                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2356                 dp->ldp_hash_end   = cpu_to_le64(hash_end);
2357                 if (i == 0)
2358                         /*
2359                          * No pages were processed, mark this.
2360                          */
2361                         dp->ldp_flags |= LDF_EMPTY;
2362
2363                 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2364                 cfs_kunmap(rdpg->rp_pages[0]);
2365         }
2366         iops->put(env, it);
2367         iops->fini(env, it);
2368
2369         return rc;
2370 }
2371
2372 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2373                  const struct lu_rdpg *rdpg)
2374 {
2375         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2376         int rc;
2377         ENTRY;
2378
2379         LASSERT(mdd_object_exists(mdd_obj));
2380
2381         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2382         rc = mdd_readpage_sanity_check(env, mdd_obj);
2383         if (rc)
2384                 GOTO(out_unlock, rc);
2385
2386         if (mdd_is_dead_obj(mdd_obj)) {
2387                 struct page *pg;
2388                 struct lu_dirpage *dp;
2389
2390                 /*
2391                  * According to POSIX, please do not return any entry to client:
2392                  * even dot and dotdot should not be returned.
2393                  */
2394                 CWARN("readdir from dead object: "DFID"\n",
2395                         PFID(mdd_object_fid(mdd_obj)));
2396
2397                 if (rdpg->rp_count <= 0)
2398                         GOTO(out_unlock, rc = -EFAULT);
2399                 LASSERT(rdpg->rp_pages != NULL);
2400
2401                 pg = rdpg->rp_pages[0];
2402                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2403                 memset(dp, 0 , sizeof(struct lu_dirpage));
2404                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2405                 dp->ldp_hash_end   = cpu_to_le64(DIR_END_OFF);
2406                 dp->ldp_flags |= LDF_EMPTY;
2407                 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2408                 cfs_kunmap(pg);
2409                 GOTO(out_unlock, rc = 0);
2410         }
2411
2412         rc = __mdd_readpage(env, mdd_obj, rdpg);
2413
2414         EXIT;
2415 out_unlock:
2416         mdd_read_unlock(env, mdd_obj);
2417         return rc;
2418 }
2419
2420 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2421 {
2422         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2423         struct dt_object *next;
2424
2425         LASSERT(mdd_object_exists(mdd_obj));
2426         next = mdd_object_child(mdd_obj);
2427         return next->do_ops->do_object_sync(env, next);
2428 }
2429
2430 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2431                                         struct md_object *obj)
2432 {
2433         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2434
2435         LASSERT(mdd_object_exists(mdd_obj));
2436         return do_version_get(env, mdd_object_child(mdd_obj));
2437 }
2438
2439 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2440                             dt_obj_version_t version)
2441 {
2442         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2443
2444         LASSERT(mdd_object_exists(mdd_obj));
2445         do_version_set(env, mdd_object_child(mdd_obj), version);
2446 }
2447
2448 const struct md_object_operations mdd_obj_ops = {
2449         .moo_permission    = mdd_permission,
2450         .moo_attr_get      = mdd_attr_get,
2451         .moo_attr_set      = mdd_attr_set,
2452         .moo_xattr_get     = mdd_xattr_get,
2453         .moo_xattr_set     = mdd_xattr_set,
2454         .moo_xattr_list    = mdd_xattr_list,
2455         .moo_xattr_del     = mdd_xattr_del,
2456         .moo_object_create = mdd_object_create,
2457         .moo_ref_add       = mdd_ref_add,
2458         .moo_ref_del       = mdd_ref_del,
2459         .moo_open          = mdd_open,
2460         .moo_close         = mdd_close,
2461         .moo_readpage      = mdd_readpage,
2462         .moo_readlink      = mdd_readlink,
2463         .moo_changelog     = mdd_changelog,
2464         .moo_capa_get      = mdd_capa_get,
2465         .moo_object_sync   = mdd_object_sync,
2466         .moo_version_get   = mdd_version_get,
2467         .moo_version_set   = mdd_version_set,
2468         .moo_path          = mdd_path,
2469         .moo_file_lock     = mdd_file_lock,
2470         .moo_file_unlock   = mdd_file_unlock,
2471 };