Whamcloud - gitweb
6f57ea80200376d18cdbb2f3dea639b5e4539e97
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_object.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49 #ifdef HAVE_EXT4_LDISKFS
50 #include <ldiskfs/ldiskfs_jbd2.h>
51 #else
52 #include <linux/jbd.h>
53 #endif
54 #include <obd.h>
55 #include <obd_class.h>
56 #include <obd_support.h>
57 #include <lprocfs_status.h>
58 /* fid_be_cpu(), fid_cpu_to_be(). */
59 #include <lustre_fid.h>
60
61 #include <lustre_param.h>
62 #ifdef HAVE_EXT4_LDISKFS
63 #include <ldiskfs/ldiskfs.h>
64 #else
65 #include <linux/ldiskfs_fs.h>
66 #endif
67 #include <lustre_mds.h>
68 #include <lustre/lustre_idl.h>
69
70 #include "mdd_internal.h"
71
72 static const struct lu_object_operations mdd_lu_obj_ops;
73
74 static int mdd_xattr_get(const struct lu_env *env,
75                          struct md_object *obj, struct lu_buf *buf,
76                          const char *name);
77
78 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
79                  void **data)
80 {
81         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
82                  PFID(mdd_object_fid(obj)));
83         mdo_data_get(env, obj, data);
84         return 0;
85 }
86
87 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
88                struct lu_attr *la, struct lustre_capa *capa)
89 {
90         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
91                  PFID(mdd_object_fid(obj)));
92         return mdo_attr_get(env, obj, la, capa);
93 }
94
95 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
96 {
97         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
98
99         if (flags & LUSTRE_APPEND_FL)
100                 obj->mod_flags |= APPEND_OBJ;
101
102         if (flags & LUSTRE_IMMUTABLE_FL)
103                 obj->mod_flags |= IMMUTE_OBJ;
104 }
105
106 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
107 {
108         struct mdd_thread_info *info;
109
110         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
111         LASSERT(info != NULL);
112         return info;
113 }
114
115 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
116 {
117         struct lu_buf *buf;
118
119         buf = &mdd_env_info(env)->mti_buf;
120         buf->lb_buf = area;
121         buf->lb_len = len;
122         return buf;
123 }
124
125 void mdd_buf_put(struct lu_buf *buf)
126 {
127         if (buf == NULL || buf->lb_buf == NULL)
128                 return;
129         if (buf->lb_vmalloc)
130                 OBD_VFREE(buf->lb_buf, buf->lb_len);
131         else
132                 OBD_FREE(buf->lb_buf, buf->lb_len);
133         buf->lb_buf = NULL;
134         buf->lb_len = 0;
135 }
136
137 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
138                                        const void *area, ssize_t len)
139 {
140         struct lu_buf *buf;
141
142         buf = &mdd_env_info(env)->mti_buf;
143         buf->lb_buf = (void *)area;
144         buf->lb_len = len;
145         return buf;
146 }
147
148 #define BUF_VMALLOC_SIZE (CFS_PAGE_SIZE<<2) /* 16k */
149 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
150 {
151         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
152
153         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
154                 if (buf->lb_vmalloc)
155                         OBD_VFREE(buf->lb_buf, buf->lb_len);
156                 else
157                         OBD_FREE(buf->lb_buf, buf->lb_len);
158                 buf->lb_buf = NULL;
159         }
160         if (buf->lb_buf == NULL) {
161                 buf->lb_len = len;
162                 if (buf->lb_len <= BUF_VMALLOC_SIZE) {
163                         OBD_ALLOC(buf->lb_buf, buf->lb_len);
164                         buf->lb_vmalloc = 0;
165                 }
166                 if (buf->lb_buf == NULL) {
167                         OBD_VMALLOC(buf->lb_buf, buf->lb_len);
168                         buf->lb_vmalloc = 1;
169                 }
170                 if (buf->lb_buf == NULL)
171                         buf->lb_len = 0;
172         }
173         return buf;
174 }
175
176 /** Increase the size of the \a mti_big_buf.
177  * preserves old data in buffer
178  * old buffer remains unchanged on error
179  * \retval 0 or -ENOMEM
180  */
181 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
182 {
183         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
184         struct lu_buf buf;
185
186         LASSERT(len >= oldbuf->lb_len);
187         if (len > BUF_VMALLOC_SIZE) {
188                 OBD_VMALLOC(buf.lb_buf, len);
189                 buf.lb_vmalloc = 1;
190         } else {
191                 OBD_ALLOC(buf.lb_buf, len);
192                 buf.lb_vmalloc = 0;
193         }
194         if (buf.lb_buf == NULL)
195                 return -ENOMEM;
196
197         buf.lb_len = len;
198         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
199
200         if (oldbuf->lb_vmalloc)
201                 OBD_VFREE(oldbuf->lb_buf, oldbuf->lb_len);
202         else
203                 OBD_FREE(oldbuf->lb_buf, oldbuf->lb_len);
204
205         memcpy(oldbuf, &buf, sizeof(buf));
206
207         return 0;
208 }
209
210 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
211                                        struct mdd_device *mdd)
212 {
213         struct mdd_thread_info *mti = mdd_env_info(env);
214         int                     max_cookie_size;
215
216         max_cookie_size = mdd_lov_cookiesize(env, mdd);
217         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
218                 if (mti->mti_max_cookie)
219                         OBD_FREE(mti->mti_max_cookie, mti->mti_max_cookie_size);
220                 mti->mti_max_cookie = NULL;
221                 mti->mti_max_cookie_size = 0;
222         }
223         if (unlikely(mti->mti_max_cookie == NULL)) {
224                 OBD_ALLOC(mti->mti_max_cookie, max_cookie_size);
225                 if (likely(mti->mti_max_cookie != NULL))
226                         mti->mti_max_cookie_size = max_cookie_size;
227         }
228         if (likely(mti->mti_max_cookie != NULL))
229                 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
230         return mti->mti_max_cookie;
231 }
232
233 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
234                                    struct mdd_device *mdd)
235 {
236         struct mdd_thread_info *mti = mdd_env_info(env);
237         int                     max_lmm_size;
238
239         max_lmm_size = mdd_lov_mdsize(env, mdd);
240         if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
241                 if (mti->mti_max_lmm)
242                         OBD_FREE(mti->mti_max_lmm, mti->mti_max_lmm_size);
243                 mti->mti_max_lmm = NULL;
244                 mti->mti_max_lmm_size = 0;
245         }
246         if (unlikely(mti->mti_max_lmm == NULL)) {
247                 OBD_ALLOC(mti->mti_max_lmm, max_lmm_size);
248                 if (unlikely(mti->mti_max_lmm != NULL))
249                         mti->mti_max_lmm_size = max_lmm_size;
250         }
251         return mti->mti_max_lmm;
252 }
253
254 struct lu_object *mdd_object_alloc(const struct lu_env *env,
255                                    const struct lu_object_header *hdr,
256                                    struct lu_device *d)
257 {
258         struct mdd_object *mdd_obj;
259
260         OBD_ALLOC_PTR(mdd_obj);
261         if (mdd_obj != NULL) {
262                 struct lu_object *o;
263
264                 o = mdd2lu_obj(mdd_obj);
265                 lu_object_init(o, NULL, d);
266                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
267                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
268                 mdd_obj->mod_count = 0;
269                 o->lo_ops = &mdd_lu_obj_ops;
270                 return o;
271         } else {
272                 return NULL;
273         }
274 }
275
276 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
277                            const struct lu_object_conf *unused)
278 {
279         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
280         struct mdd_object *mdd_obj = lu2mdd_obj(o);
281         struct lu_object  *below;
282         struct lu_device  *under;
283         ENTRY;
284
285         mdd_obj->mod_cltime = 0;
286         under = &d->mdd_child->dd_lu_dev;
287         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
288         mdd_pdlock_init(mdd_obj);
289         if (below == NULL)
290                 RETURN(-ENOMEM);
291
292         lu_object_add(o, below);
293
294         RETURN(0);
295 }
296
297 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
298 {
299         if (lu_object_exists(o))
300                 return mdd_get_flags(env, lu2mdd_obj(o));
301         else
302                 return 0;
303 }
304
305 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
306 {
307         struct mdd_object *mdd = lu2mdd_obj(o);
308
309         lu_object_fini(o);
310         OBD_FREE_PTR(mdd);
311 }
312
313 static int mdd_object_print(const struct lu_env *env, void *cookie,
314                             lu_printer_t p, const struct lu_object *o)
315 {
316         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
317         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
318                     "valid=%x, cltime="LPU64", flags=%lx)",
319                     mdd, mdd->mod_count, mdd->mod_valid,
320                     mdd->mod_cltime, mdd->mod_flags);
321 }
322
323 static const struct lu_object_operations mdd_lu_obj_ops = {
324         .loo_object_init    = mdd_object_init,
325         .loo_object_start   = mdd_object_start,
326         .loo_object_free    = mdd_object_free,
327         .loo_object_print   = mdd_object_print,
328 };
329
330 struct mdd_object *mdd_object_find(const struct lu_env *env,
331                                    struct mdd_device *d,
332                                    const struct lu_fid *f)
333 {
334         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
335 }
336
337 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
338                         const char *path, struct lu_fid *fid)
339 {
340         struct lu_buf *buf;
341         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
342         struct mdd_object *obj;
343         struct lu_name *lname = &mdd_env_info(env)->mti_name;
344         char *name;
345         int rc = 0;
346         ENTRY;
347
348         /* temp buffer for path element */
349         buf = mdd_buf_alloc(env, PATH_MAX);
350         if (buf->lb_buf == NULL)
351                 RETURN(-ENOMEM);
352
353         lname->ln_name = name = buf->lb_buf;
354         lname->ln_namelen = 0;
355         *f = mdd->mdd_root_fid;
356
357         while(1) {
358                 while (*path == '/')
359                         path++;
360                 if (*path == '\0')
361                         break;
362                 while (*path != '/' && *path != '\0') {
363                         *name = *path;
364                         path++;
365                         name++;
366                         lname->ln_namelen++;
367                 }
368
369                 *name = '\0';
370                 /* find obj corresponding to fid */
371                 obj = mdd_object_find(env, mdd, f);
372                 if (obj == NULL)
373                         GOTO(out, rc = -EREMOTE);
374                 if (IS_ERR(obj))
375                         GOTO(out, rc = -PTR_ERR(obj));
376                 /* get child fid from parent and name */
377                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
378                 mdd_object_put(env, obj);
379                 if (rc)
380                         break;
381
382                 name = buf->lb_buf;
383                 lname->ln_namelen = 0;
384         }
385
386         if (!rc)
387                 *fid = *f;
388 out:
389         RETURN(rc);
390 }
391
392 /** The maximum depth that fid2path() will search.
393  * This is limited only because we want to store the fids for
394  * historical path lookup purposes.
395  */
396 #define MAX_PATH_DEPTH 100
397
398 /** mdd_path() lookup structure. */
399 struct path_lookup_info {
400         __u64                pli_recno;        /**< history point */
401         __u64                pli_currec;       /**< current record */
402         struct lu_fid        pli_fid;
403         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
404         struct mdd_object   *pli_mdd_obj;
405         char                *pli_path;         /**< full path */
406         int                  pli_pathlen;
407         int                  pli_linkno;       /**< which hardlink to follow */
408         int                  pli_fidcount;     /**< number of \a pli_fids */
409 };
410
411 static int mdd_path_current(const struct lu_env *env,
412                             struct path_lookup_info *pli)
413 {
414         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
415         struct mdd_object *mdd_obj;
416         struct lu_buf     *buf = NULL;
417         struct link_ea_header *leh;
418         struct link_ea_entry  *lee;
419         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
420         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
421         char *ptr;
422         int reclen;
423         int rc;
424         ENTRY;
425
426         ptr = pli->pli_path + pli->pli_pathlen - 1;
427         *ptr = 0;
428         --ptr;
429         pli->pli_fidcount = 0;
430         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
431
432         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
433                 mdd_obj = mdd_object_find(env, mdd,
434                                           &pli->pli_fids[pli->pli_fidcount]);
435                 if (mdd_obj == NULL)
436                         GOTO(out, rc = -EREMOTE);
437                 if (IS_ERR(mdd_obj))
438                         GOTO(out, rc = -PTR_ERR(mdd_obj));
439                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
440                 if (rc <= 0) {
441                         mdd_object_put(env, mdd_obj);
442                         if (rc == -1)
443                                 rc = -EREMOTE;
444                         else if (rc == 0)
445                                 /* Do I need to error out here? */
446                                 rc = -ENOENT;
447                         GOTO(out, rc);
448                 }
449
450                 /* Get parent fid and object name */
451                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
452                 buf = mdd_links_get(env, mdd_obj);
453                 mdd_read_unlock(env, mdd_obj);
454                 mdd_object_put(env, mdd_obj);
455                 if (IS_ERR(buf))
456                         GOTO(out, rc = PTR_ERR(buf));
457
458                 leh = buf->lb_buf;
459                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
460                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
461
462                 /* If set, use link #linkno for path lookup, otherwise use
463                    link #0.  Only do this for the final path element. */
464                 if ((pli->pli_fidcount == 0) &&
465                     (pli->pli_linkno < leh->leh_reccount)) {
466                         int count;
467                         for (count = 0; count < pli->pli_linkno; count++) {
468                                 lee = (struct link_ea_entry *)
469                                      ((char *)lee + reclen);
470                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
471                         }
472                         if (pli->pli_linkno < leh->leh_reccount - 1)
473                                 /* indicate to user there are more links */
474                                 pli->pli_linkno++;
475                 }
476
477                 /* Pack the name in the end of the buffer */
478                 ptr -= tmpname->ln_namelen;
479                 if (ptr - 1 <= pli->pli_path)
480                         GOTO(out, rc = -EOVERFLOW);
481                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
482                 *(--ptr) = '/';
483
484                 /* Store the parent fid for historic lookup */
485                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
486                         GOTO(out, rc = -EOVERFLOW);
487                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
488         }
489
490         /* Verify that our path hasn't changed since we started the lookup.
491            Record the current index, and verify the path resolves to the
492            same fid. If it does, then the path is correct as of this index. */
493         cfs_spin_lock(&mdd->mdd_cl.mc_lock);
494         pli->pli_currec = mdd->mdd_cl.mc_index;
495         cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
496         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
497         if (rc) {
498                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
499                 GOTO (out, rc = -EAGAIN);
500         }
501         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
502                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
503                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
504                        PFID(&pli->pli_fid));
505                 GOTO(out, rc = -EAGAIN);
506         }
507         ptr++; /* skip leading / */
508         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
509
510         EXIT;
511 out:
512         if (buf && !IS_ERR(buf) && buf->lb_vmalloc)
513                 /* if we vmalloced a large buffer drop it */
514                 mdd_buf_put(buf);
515
516         return rc;
517 }
518
519 static int mdd_path_historic(const struct lu_env *env,
520                              struct path_lookup_info *pli)
521 {
522         return 0;
523 }
524
525 /* Returns the full path to this fid, as of changelog record recno. */
526 static int mdd_path(const struct lu_env *env, struct md_object *obj,
527                     char *path, int pathlen, __u64 *recno, int *linkno)
528 {
529         struct path_lookup_info *pli;
530         int tries = 3;
531         int rc = -EAGAIN;
532         ENTRY;
533
534         if (pathlen < 3)
535                 RETURN(-EOVERFLOW);
536
537         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
538                 path[0] = '\0';
539                 RETURN(0);
540         }
541
542         OBD_ALLOC_PTR(pli);
543         if (pli == NULL)
544                 RETURN(-ENOMEM);
545
546         pli->pli_mdd_obj = md2mdd_obj(obj);
547         pli->pli_recno = *recno;
548         pli->pli_path = path;
549         pli->pli_pathlen = pathlen;
550         pli->pli_linkno = *linkno;
551
552         /* Retry multiple times in case file is being moved */
553         while (tries-- && rc == -EAGAIN)
554                 rc = mdd_path_current(env, pli);
555
556         /* For historical path lookup, the current links may not have existed
557          * at "recno" time.  We must switch over to earlier links/parents
558          * by using the changelog records.  If the earlier parent doesn't
559          * exist, we must search back through the changelog to reconstruct
560          * its parents, then check if it exists, etc.
561          * We may ignore this problem for the initial implementation and
562          * state that an "original" hardlink must still exist for us to find
563          * historic path name. */
564         if (pli->pli_recno != -1) {
565                 rc = mdd_path_historic(env, pli);
566         } else {
567                 *recno = pli->pli_currec;
568                 /* Return next link index to caller */
569                 *linkno = pli->pli_linkno;
570         }
571
572         OBD_FREE_PTR(pli);
573
574         RETURN (rc);
575 }
576
577 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
578 {
579         struct lu_attr *la = &mdd_env_info(env)->mti_la;
580         int rc;
581
582         ENTRY;
583         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
584         if (rc == 0) {
585                 mdd_flags_xlate(obj, la->la_flags);
586                 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
587                         obj->mod_flags |= MNLINK_OBJ;
588         }
589         RETURN(rc);
590 }
591
592 /* get only inode attributes */
593 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
594                   struct md_attr *ma)
595 {
596         int rc = 0;
597         ENTRY;
598
599         if (ma->ma_valid & MA_INODE)
600                 RETURN(0);
601
602         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
603                           mdd_object_capa(env, mdd_obj));
604         if (rc == 0)
605                 ma->ma_valid |= MA_INODE;
606         RETURN(rc);
607 }
608
609 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm,
610                        int *size)
611 {
612         struct lov_desc *ldesc;
613         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
614         ENTRY;
615
616         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
617         LASSERT(ldesc != NULL);
618
619         if (!lmm) {
620                 *size = 0;
621                 RETURN(0);
622         }
623
624         lmm->lmm_magic = LOV_MAGIC_V1;
625         lmm->lmm_object_seq = LOV_OBJECT_GROUP_DEFAULT;
626         lmm->lmm_pattern = ldesc->ld_pattern;
627         lmm->lmm_stripe_size = ldesc->ld_default_stripe_size;
628         lmm->lmm_stripe_count = ldesc->ld_default_stripe_count;
629         *size = sizeof(struct lov_mds_md);
630
631         RETURN(sizeof(struct lov_mds_md));
632 }
633
634 /* get lov EA only */
635 static int __mdd_lmm_get(const struct lu_env *env,
636                          struct mdd_object *mdd_obj, struct md_attr *ma)
637 {
638         int rc;
639         ENTRY;
640
641         if (ma->ma_valid & MA_LOV)
642                 RETURN(0);
643
644         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
645                         XATTR_NAME_LOV);
646         if (rc == 0 && (ma->ma_need & MA_LOV_DEF))
647                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm,
648                                         &ma->ma_lmm_size);
649         if (rc > 0) {
650                 ma->ma_valid |= MA_LOV;
651                 rc = 0;
652         }
653         RETURN(rc);
654 }
655
656 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
657                        struct md_attr *ma)
658 {
659         int rc;
660         ENTRY;
661
662         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
663         rc = __mdd_lmm_get(env, mdd_obj, ma);
664         mdd_read_unlock(env, mdd_obj);
665         RETURN(rc);
666 }
667
668 /* get lmv EA only*/
669 static int __mdd_lmv_get(const struct lu_env *env,
670                          struct mdd_object *mdd_obj, struct md_attr *ma)
671 {
672         int rc;
673         ENTRY;
674
675         if (ma->ma_valid & MA_LMV)
676                 RETURN(0);
677
678         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
679                         XATTR_NAME_LMV);
680         if (rc > 0) {
681                 ma->ma_valid |= MA_LMV;
682                 rc = 0;
683         }
684         RETURN(rc);
685 }
686
687 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
688                          struct md_attr *ma)
689 {
690         struct mdd_thread_info *info = mdd_env_info(env);
691         struct lustre_mdt_attrs *lma =
692                                  (struct lustre_mdt_attrs *)info->mti_xattr_buf;
693         int lma_size;
694         int rc;
695         ENTRY;
696
697         /* If all needed data are already valid, nothing to do */
698         if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
699             (ma->ma_need & (MA_HSM | MA_SOM)))
700                 RETURN(0);
701
702         /* Read LMA from disk EA */
703         lma_size = sizeof(info->mti_xattr_buf);
704         rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
705         if (rc <= 0)
706                 RETURN(rc);
707
708         /* Useless to check LMA incompatibility because this is already done in
709          * osd_ea_fid_get(), and this will fail long before this code is
710          * called.
711          * So, if we are here, LMA is compatible.
712          */
713
714         lustre_lma_swab(lma);
715
716         /* Swab and copy LMA */
717         if (ma->ma_need & MA_HSM) {
718                 if (lma->lma_compat & LMAC_HSM)
719                         ma->ma_hsm_flags = lma->lma_flags & HSM_FLAGS_MASK;
720                 else
721                         ma->ma_hsm_flags = 0;
722                 ma->ma_valid |= MA_HSM;
723         }
724
725         /* Copy SOM */
726         if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
727                 LASSERT(ma->ma_som != NULL);
728                 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
729                 ma->ma_som->msd_size    = lma->lma_som_size;
730                 ma->ma_som->msd_blocks  = lma->lma_som_blocks;
731                 ma->ma_som->msd_mountid = lma->lma_som_mountid;
732                 ma->ma_valid |= MA_SOM;
733         }
734
735         RETURN(0);
736 }
737
738 static int mdd_attr_get_internal(const struct lu_env *env,
739                                  struct mdd_object *mdd_obj,
740                                  struct md_attr *ma)
741 {
742         int rc = 0;
743         ENTRY;
744
745         if (ma->ma_need & MA_INODE)
746                 rc = mdd_iattr_get(env, mdd_obj, ma);
747
748         if (rc == 0 && ma->ma_need & MA_LOV) {
749                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
750                     S_ISDIR(mdd_object_type(mdd_obj)))
751                         rc = __mdd_lmm_get(env, mdd_obj, ma);
752         }
753         if (rc == 0 && ma->ma_need & MA_LMV) {
754                 if (S_ISDIR(mdd_object_type(mdd_obj)))
755                         rc = __mdd_lmv_get(env, mdd_obj, ma);
756         }
757         if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
758                 if (S_ISREG(mdd_object_type(mdd_obj)))
759                         rc = __mdd_lma_get(env, mdd_obj, ma);
760         }
761 #ifdef CONFIG_FS_POSIX_ACL
762         if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
763                 if (S_ISDIR(mdd_object_type(mdd_obj)))
764                         rc = mdd_def_acl_get(env, mdd_obj, ma);
765         }
766 #endif
767         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
768                rc, ma->ma_valid);
769         RETURN(rc);
770 }
771
772 int mdd_attr_get_internal_locked(const struct lu_env *env,
773                                  struct mdd_object *mdd_obj, struct md_attr *ma)
774 {
775         int rc;
776         int needlock = ma->ma_need &
777                        (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM);
778
779         if (needlock)
780                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
781         rc = mdd_attr_get_internal(env, mdd_obj, ma);
782         if (needlock)
783                 mdd_read_unlock(env, mdd_obj);
784         return rc;
785 }
786
787 /*
788  * No permission check is needed.
789  */
790 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
791                         struct md_attr *ma)
792 {
793         struct mdd_object *mdd_obj = md2mdd_obj(obj);
794         int                rc;
795
796         ENTRY;
797         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
798         RETURN(rc);
799 }
800
801 /*
802  * No permission check is needed.
803  */
804 static int mdd_xattr_get(const struct lu_env *env,
805                          struct md_object *obj, struct lu_buf *buf,
806                          const char *name)
807 {
808         struct mdd_object *mdd_obj = md2mdd_obj(obj);
809         int rc;
810
811         ENTRY;
812
813         LASSERT(mdd_object_exists(mdd_obj));
814
815         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
816         rc = mdo_xattr_get(env, mdd_obj, buf, name,
817                            mdd_object_capa(env, mdd_obj));
818         mdd_read_unlock(env, mdd_obj);
819
820         RETURN(rc);
821 }
822
823 /*
824  * Permission check is done when open,
825  * no need check again.
826  */
827 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
828                         struct lu_buf *buf)
829 {
830         struct mdd_object *mdd_obj = md2mdd_obj(obj);
831         struct dt_object  *next;
832         loff_t             pos = 0;
833         int                rc;
834         ENTRY;
835
836         LASSERT(mdd_object_exists(mdd_obj));
837
838         next = mdd_object_child(mdd_obj);
839         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
840         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
841                                          mdd_object_capa(env, mdd_obj));
842         mdd_read_unlock(env, mdd_obj);
843         RETURN(rc);
844 }
845
846 /*
847  * No permission check is needed.
848  */
849 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
850                           struct lu_buf *buf)
851 {
852         struct mdd_object *mdd_obj = md2mdd_obj(obj);
853         int rc;
854
855         ENTRY;
856
857         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
858         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
859         mdd_read_unlock(env, mdd_obj);
860
861         RETURN(rc);
862 }
863
864 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
865                                struct mdd_object *c, struct md_attr *ma,
866                                struct thandle *handle,
867                                const struct md_op_spec *spec)
868 {
869         struct lu_attr *attr = &ma->ma_attr;
870         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
871         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
872         const struct dt_index_features *feat = spec->sp_feat;
873         int rc;
874         ENTRY;
875
876         if (!mdd_object_exists(c)) {
877                 struct dt_object *next = mdd_object_child(c);
878                 LASSERT(next);
879
880                 if (feat != &dt_directory_features && feat != NULL)
881                         dof->dof_type = DFT_INDEX;
882                 else
883                         dof->dof_type = dt_mode_to_dft(attr->la_mode);
884
885                 dof->u.dof_idx.di_feat = feat;
886
887                 /* @hint will be initialized by underlying device. */
888                 next->do_ops->do_ah_init(env, hint,
889                                          p ? mdd_object_child(p) : NULL,
890                                          attr->la_mode & S_IFMT);
891
892                 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
893                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
894         } else
895                 rc = -EEXIST;
896
897         RETURN(rc);
898 }
899
900 /**
901  * Make sure the ctime is increased only.
902  */
903 static inline int mdd_attr_check(const struct lu_env *env,
904                                  struct mdd_object *obj,
905                                  struct lu_attr *attr)
906 {
907         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
908         int rc;
909         ENTRY;
910
911         if (attr->la_valid & LA_CTIME) {
912                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
913                 if (rc)
914                         RETURN(rc);
915
916                 if (attr->la_ctime < tmp_la->la_ctime)
917                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
918                 else if (attr->la_valid == LA_CTIME &&
919                          attr->la_ctime == tmp_la->la_ctime)
920                         attr->la_valid &= ~LA_CTIME;
921         }
922         RETURN(0);
923 }
924
925 int mdd_attr_set_internal(const struct lu_env *env,
926                           struct mdd_object *obj,
927                           struct lu_attr *attr,
928                           struct thandle *handle,
929                           int needacl)
930 {
931         int rc;
932         ENTRY;
933
934         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
935 #ifdef CONFIG_FS_POSIX_ACL
936         if (!rc && (attr->la_valid & LA_MODE) && needacl)
937                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
938 #endif
939         RETURN(rc);
940 }
941
942 int mdd_attr_check_set_internal(const struct lu_env *env,
943                                 struct mdd_object *obj,
944                                 struct lu_attr *attr,
945                                 struct thandle *handle,
946                                 int needacl)
947 {
948         int rc;
949         ENTRY;
950
951         rc = mdd_attr_check(env, obj, attr);
952         if (rc)
953                 RETURN(rc);
954
955         if (attr->la_valid)
956                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
957         RETURN(rc);
958 }
959
960 static int mdd_attr_set_internal_locked(const struct lu_env *env,
961                                         struct mdd_object *obj,
962                                         struct lu_attr *attr,
963                                         struct thandle *handle,
964                                         int needacl)
965 {
966         int rc;
967         ENTRY;
968
969         needacl = needacl && (attr->la_valid & LA_MODE);
970         if (needacl)
971                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
972         rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
973         if (needacl)
974                 mdd_write_unlock(env, obj);
975         RETURN(rc);
976 }
977
978 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
979                                        struct mdd_object *obj,
980                                        struct lu_attr *attr,
981                                        struct thandle *handle,
982                                        int needacl)
983 {
984         int rc;
985         ENTRY;
986
987         needacl = needacl && (attr->la_valid & LA_MODE);
988         if (needacl)
989                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
990         rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
991         if (needacl)
992                 mdd_write_unlock(env, obj);
993         RETURN(rc);
994 }
995
996 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
997                     const struct lu_buf *buf, const char *name,
998                     int fl, struct thandle *handle)
999 {
1000         struct lustre_capa *capa = mdd_object_capa(env, obj);
1001         int rc = -EINVAL;
1002         ENTRY;
1003
1004         if (buf->lb_buf && buf->lb_len > 0)
1005                 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1006         else if (buf->lb_buf == NULL && buf->lb_len == 0)
1007                 rc = mdo_xattr_del(env, obj, name, handle, capa);
1008
1009         RETURN(rc);
1010 }
1011
1012 /*
1013  * This gives the same functionality as the code between
1014  * sys_chmod and inode_setattr
1015  * chown_common and inode_setattr
1016  * utimes and inode_setattr
1017  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1018  */
1019 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1020                         struct lu_attr *la, const struct md_attr *ma)
1021 {
1022         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
1023         struct md_ucred  *uc;
1024         int               rc;
1025         ENTRY;
1026
1027         if (!la->la_valid)
1028                 RETURN(0);
1029
1030         /* Do not permit change file type */
1031         if (la->la_valid & LA_TYPE)
1032                 RETURN(-EPERM);
1033
1034         /* They should not be processed by setattr */
1035         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1036                 RETURN(-EPERM);
1037
1038         /* export destroy does not have ->le_ses, but we may want
1039          * to drop LUSTRE_SOM_FL. */
1040         if (!env->le_ses)
1041                 RETURN(0);
1042
1043         uc = md_ucred(env);
1044
1045         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1046         if (rc)
1047                 RETURN(rc);
1048
1049         if (la->la_valid == LA_CTIME) {
1050                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1051                         /* This is only for set ctime when rename's source is
1052                          * on remote MDS. */
1053                         rc = mdd_may_delete(env, NULL, obj,
1054                                             (struct md_attr *)ma, 1, 0);
1055                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1056                         la->la_valid &= ~LA_CTIME;
1057                 RETURN(rc);
1058         }
1059
1060         if (la->la_valid == LA_ATIME) {
1061                 /* This is atime only set for read atime update on close. */
1062                 if (la->la_atime <= tmp_la->la_atime +
1063                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff)
1064                         la->la_valid &= ~LA_ATIME;
1065                 RETURN(0);
1066         }
1067
1068         /* Check if flags change. */
1069         if (la->la_valid & LA_FLAGS) {
1070                 unsigned int oldflags = 0;
1071                 unsigned int newflags = la->la_flags &
1072                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1073
1074                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1075                     !mdd_capable(uc, CFS_CAP_FOWNER))
1076                         RETURN(-EPERM);
1077
1078                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1079                  * only be changed by the relevant capability. */
1080                 if (mdd_is_immutable(obj))
1081                         oldflags |= LUSTRE_IMMUTABLE_FL;
1082                 if (mdd_is_append(obj))
1083                         oldflags |= LUSTRE_APPEND_FL;
1084                 if ((oldflags ^ newflags) &&
1085                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1086                         RETURN(-EPERM);
1087
1088                 if (!S_ISDIR(tmp_la->la_mode))
1089                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1090         }
1091
1092         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1093             (la->la_valid & ~LA_FLAGS) &&
1094             !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1095                 RETURN(-EPERM);
1096
1097         /* Check for setting the obj time. */
1098         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1099             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1100                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1101                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
1102                         rc = mdd_permission_internal_locked(env, obj, tmp_la,
1103                                                             MAY_WRITE,
1104                                                             MOR_TGT_CHILD);
1105                         if (rc)
1106                                 RETURN(rc);
1107                 }
1108         }
1109
1110         /* Make sure a caller can chmod. */
1111         if (la->la_valid & LA_MODE) {
1112                 /* Bypass la_vaild == LA_MODE,
1113                  * this is for changing file with SUID or SGID. */
1114                 if ((la->la_valid & ~LA_MODE) &&
1115                     !(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1116                     (uc->mu_fsuid != tmp_la->la_uid) &&
1117                     !mdd_capable(uc, CFS_CAP_FOWNER))
1118                         RETURN(-EPERM);
1119
1120                 if (la->la_mode == (cfs_umode_t) -1)
1121                         la->la_mode = tmp_la->la_mode;
1122                 else
1123                         la->la_mode = (la->la_mode & S_IALLUGO) |
1124                                       (tmp_la->la_mode & ~S_IALLUGO);
1125
1126                 /* Also check the setgid bit! */
1127                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1128                                        la->la_gid : tmp_la->la_gid) &&
1129                     !mdd_capable(uc, CFS_CAP_FSETID))
1130                         la->la_mode &= ~S_ISGID;
1131         } else {
1132                la->la_mode = tmp_la->la_mode;
1133         }
1134
1135         /* Make sure a caller can chown. */
1136         if (la->la_valid & LA_UID) {
1137                 if (la->la_uid == (uid_t) -1)
1138                         la->la_uid = tmp_la->la_uid;
1139                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1140                     (la->la_uid != tmp_la->la_uid)) &&
1141                     !mdd_capable(uc, CFS_CAP_CHOWN))
1142                         RETURN(-EPERM);
1143
1144                 /* If the user or group of a non-directory has been
1145                  * changed by a non-root user, remove the setuid bit.
1146                  * 19981026 David C Niemi <niemi@tux.org>
1147                  *
1148                  * Changed this to apply to all users, including root,
1149                  * to avoid some races. This is the behavior we had in
1150                  * 2.0. The check for non-root was definitely wrong
1151                  * for 2.2 anyway, as it should have been using
1152                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
1153                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1154                     !S_ISDIR(tmp_la->la_mode)) {
1155                         la->la_mode &= ~S_ISUID;
1156                         la->la_valid |= LA_MODE;
1157                 }
1158         }
1159
1160         /* Make sure caller can chgrp. */
1161         if (la->la_valid & LA_GID) {
1162                 if (la->la_gid == (gid_t) -1)
1163                         la->la_gid = tmp_la->la_gid;
1164                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1165                     ((la->la_gid != tmp_la->la_gid) &&
1166                     !lustre_in_group_p(uc, la->la_gid))) &&
1167                     !mdd_capable(uc, CFS_CAP_CHOWN))
1168                         RETURN(-EPERM);
1169
1170                 /* Likewise, if the user or group of a non-directory
1171                  * has been changed by a non-root user, remove the
1172                  * setgid bit UNLESS there is no group execute bit
1173                  * (this would be a file marked for mandatory
1174                  * locking).  19981026 David C Niemi <niemi@tux.org>
1175                  *
1176                  * Removed the fsuid check (see the comment above) --
1177                  * 19990830 SD. */
1178                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1179                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1180                         la->la_mode &= ~S_ISGID;
1181                         la->la_valid |= LA_MODE;
1182                 }
1183         }
1184
1185         /* For both Size-on-MDS case and truncate case,
1186          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1187          * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1188          * For SOM case, it is true, the MAY_WRITE perm has been checked
1189          * when open, no need check again. For truncate case, it is false,
1190          * the MAY_WRITE perm should be checked here. */
1191         if (ma->ma_attr_flags & MDS_SOM) {
1192                 /* For the "Size-on-MDS" setattr update, merge coming
1193                  * attributes with the set in the inode. BUG 10641 */
1194                 if ((la->la_valid & LA_ATIME) &&
1195                     (la->la_atime <= tmp_la->la_atime))
1196                         la->la_valid &= ~LA_ATIME;
1197
1198                 /* OST attributes do not have a priority over MDS attributes,
1199                  * so drop times if ctime is equal. */
1200                 if ((la->la_valid & LA_CTIME) &&
1201                     (la->la_ctime <= tmp_la->la_ctime))
1202                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1203         } else {
1204                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1205                         if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1206                               (uc->mu_fsuid == tmp_la->la_uid)) &&
1207                             !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1208                                 rc = mdd_permission_internal_locked(env, obj,
1209                                                             tmp_la, MAY_WRITE,
1210                                                             MOR_TGT_CHILD);
1211                                 if (rc)
1212                                         RETURN(rc);
1213                         }
1214                 }
1215                 if (la->la_valid & LA_CTIME) {
1216                         /* The pure setattr, it has the priority over what is
1217                          * already set, do not drop it if ctime is equal. */
1218                         if (la->la_ctime < tmp_la->la_ctime)
1219                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1220                                                   LA_CTIME);
1221                 }
1222         }
1223
1224         RETURN(0);
1225 }
1226
1227 /** Store a data change changelog record
1228  * If this fails, we must fail the whole transaction; we don't
1229  * want the change to commit without the log entry.
1230  * \param mdd_obj - mdd_object of change
1231  * \param handle - transacion handle
1232  */
1233 static int mdd_changelog_data_store(const struct lu_env     *env,
1234                                     struct mdd_device       *mdd,
1235                                     enum changelog_rec_type type,
1236                                     struct mdd_object       *mdd_obj,
1237                                     struct thandle          *handle)
1238 {
1239         const struct lu_fid *tfid = mdo2fid(mdd_obj);
1240         struct llog_changelog_rec *rec;
1241         struct lu_buf *buf;
1242         int reclen;
1243         int rc;
1244
1245         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1246                 RETURN(0);
1247
1248         LASSERT(handle != NULL);
1249         LASSERT(mdd_obj != NULL);
1250
1251         if ((type == CL_TIME) &&
1252             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1253                 /* Don't need multiple updates in this log */
1254                 /* Don't check under lock - no big deal if we get an extra
1255                    entry */
1256                 RETURN(0);
1257         }
1258
1259         reclen = llog_data_len(sizeof(*rec));
1260         buf = mdd_buf_alloc(env, reclen);
1261         if (buf->lb_buf == NULL)
1262                 RETURN(-ENOMEM);
1263         rec = (struct llog_changelog_rec *)buf->lb_buf;
1264
1265         rec->cr.cr_flags = CLF_VERSION;
1266         rec->cr.cr_type = (__u32)type;
1267         rec->cr.cr_tfid = *tfid;
1268         rec->cr.cr_namelen = 0;
1269         mdd_obj->mod_cltime = cfs_time_current_64();
1270
1271         rc = mdd_changelog_llog_write(mdd, rec, handle);
1272         if (rc < 0) {
1273                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1274                        rc, type, PFID(tfid));
1275                 return -EFAULT;
1276         }
1277
1278         return 0;
1279 }
1280
1281 /**
1282  * Should be called with write lock held.
1283  *
1284  * \see mdd_lma_set_locked().
1285  */
1286 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1287                        const struct md_attr *ma, struct thandle *handle)
1288 {
1289         struct mdd_thread_info *info = mdd_env_info(env);
1290         struct lu_buf *buf;
1291         struct lustre_mdt_attrs *lma =
1292                                 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1293         int lmasize = sizeof(struct lustre_mdt_attrs);
1294         int rc = 0;
1295
1296         ENTRY;
1297
1298         /* Either HSM or SOM part is not valid, we need to read it before */
1299         if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1300                 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1301                 if (rc <= 0)
1302                         RETURN(rc);
1303
1304                 lustre_lma_swab(lma);
1305         } else {
1306                 memset(lma, 0, lmasize);
1307         }
1308
1309         /* Copy HSM data */
1310         if (ma->ma_valid & MA_HSM) {
1311                 lma->lma_flags  |= ma->ma_hsm_flags & HSM_FLAGS_MASK;
1312                 lma->lma_compat |= LMAC_HSM;
1313         }
1314
1315         /* Copy SOM data */
1316         if (ma->ma_valid & MA_SOM) {
1317                 LASSERT(ma->ma_som != NULL);
1318                 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1319                         lma->lma_compat     &= ~LMAC_SOM;
1320                 } else {
1321                         lma->lma_compat     |= LMAC_SOM;
1322                         lma->lma_ioepoch     = ma->ma_som->msd_ioepoch;
1323                         lma->lma_som_size    = ma->ma_som->msd_size;
1324                         lma->lma_som_blocks  = ma->ma_som->msd_blocks;
1325                         lma->lma_som_mountid = ma->ma_som->msd_mountid;
1326                 }
1327         }
1328
1329         /* Copy FID */
1330         memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1331
1332         lustre_lma_swab(lma);
1333         buf = mdd_buf_get(env, lma, lmasize);
1334         rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1335
1336         RETURN(rc);
1337 }
1338
1339 /**
1340  * Save LMA extended attributes with data from \a ma.
1341  *
1342  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1343  * not, LMA EA will be first read from disk, modified and write back.
1344  *
1345  */
1346 static int mdd_lma_set_locked(const struct lu_env *env,
1347                               struct mdd_object *mdd_obj,
1348                               const struct md_attr *ma, struct thandle *handle)
1349 {
1350         int rc;
1351
1352         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1353         rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1354         mdd_write_unlock(env, mdd_obj);
1355         return rc;
1356 }
1357
1358 /* set attr and LOV EA at once, return updated attr */
1359 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1360                         const struct md_attr *ma)
1361 {
1362         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1363         struct mdd_device *mdd = mdo2mdd(obj);
1364         struct thandle *handle;
1365         struct lov_mds_md *lmm = NULL;
1366         struct llog_cookie *logcookies = NULL;
1367         int  rc, lmm_size = 0, cookie_size = 0;
1368         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1369 #ifdef HAVE_QUOTA_SUPPORT
1370         struct obd_device *obd = mdd->mdd_obd_dev;
1371         struct mds_obd *mds = &obd->u.mds;
1372         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1373         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1374         int quota_opc = 0, block_count = 0;
1375         int inode_pending[MAXQUOTAS] = { 0, 0 };
1376         int block_pending[MAXQUOTAS] = { 0, 0 };
1377 #endif
1378         ENTRY;
1379
1380         mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1381                                     MDD_TXN_ATTR_SET_OP);
1382         handle = mdd_trans_start(env, mdd);
1383         if (IS_ERR(handle))
1384                 RETURN(PTR_ERR(handle));
1385         /*TODO: add lock here*/
1386         /* start a log jounal handle if needed */
1387         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1388             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1389                 lmm_size = mdd_lov_mdsize(env, mdd);
1390                 lmm = mdd_max_lmm_get(env, mdd);
1391                 if (lmm == NULL)
1392                         GOTO(cleanup, rc = -ENOMEM);
1393
1394                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1395                                 XATTR_NAME_LOV);
1396
1397                 if (rc < 0)
1398                         GOTO(cleanup, rc);
1399         }
1400
1401         if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1402                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1403                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1404
1405         *la_copy = ma->ma_attr;
1406         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1407         if (rc)
1408                 GOTO(cleanup, rc);
1409
1410 #ifdef HAVE_QUOTA_SUPPORT
1411         if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1412                 struct obd_export *exp = md_quota(env)->mq_exp;
1413                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1414
1415                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1416                 if (!rc) {
1417                         quota_opc = FSFILT_OP_SETATTR;
1418                         mdd_quota_wrapper(la_copy, qnids);
1419                         mdd_quota_wrapper(la_tmp, qoids);
1420                         /* get file quota for new owner */
1421                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1422                                         qnids, inode_pending, 1, NULL, 0,
1423                                         NULL, 0);
1424                         block_count = (la_tmp->la_blocks + 7) >> 3;
1425                         if (block_count) {
1426                                 void *data = NULL;
1427                                 mdd_data_get(env, mdd_obj, &data);
1428                                 /* get block quota for new owner */
1429                                 lquota_chkquota(mds_quota_interface_ref, obd,
1430                                                 exp, qnids, block_pending,
1431                                                 block_count, NULL,
1432                                                 LQUOTA_FLAGS_BLK, data, 1);
1433                         }
1434                 }
1435         }
1436 #endif
1437
1438         if (la_copy->la_valid & LA_FLAGS) {
1439                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1440                                                   handle, 1);
1441                 if (rc == 0)
1442                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1443         } else if (la_copy->la_valid) {            /* setattr */
1444                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1445                                                   handle, 1);
1446                 /* journal chown/chgrp in llog, just like unlink */
1447                 if (rc == 0 && lmm_size){
1448                         cookie_size = mdd_lov_cookiesize(env, mdd);
1449                         logcookies = mdd_max_cookie_get(env, mdd);
1450                         if (logcookies == NULL)
1451                                 GOTO(cleanup, rc = -ENOMEM);
1452
1453                         if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1454                                             logcookies, cookie_size) <= 0)
1455                                 logcookies = NULL;
1456                 }
1457         }
1458
1459         if (rc == 0 && ma->ma_valid & MA_LOV) {
1460                 cfs_umode_t mode;
1461
1462                 mode = mdd_object_type(mdd_obj);
1463                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1464                         rc = mdd_lsm_sanity_check(env, mdd_obj);
1465                         if (rc)
1466                                 GOTO(cleanup, rc);
1467
1468                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1469                                             ma->ma_lmm_size, handle, 1);
1470                 }
1471
1472         }
1473         if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1474                 cfs_umode_t mode;
1475
1476                 mode = mdd_object_type(mdd_obj);
1477                 if (S_ISREG(mode))
1478                         rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1479
1480         }
1481 cleanup:
1482         if (rc == 0)
1483                 rc = mdd_changelog_data_store(env, mdd,
1484                                               (ma->ma_attr.la_valid &
1485                                                ~(LA_MTIME|LA_CTIME|LA_ATIME)) ?
1486                                               CL_SETATTR : CL_TIME,
1487                                               mdd_obj, handle);
1488         mdd_trans_stop(env, mdd, rc, handle);
1489         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1490                 /*set obd attr, if needed*/
1491                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1492                                            logcookies);
1493         }
1494 #ifdef HAVE_QUOTA_SUPPORT
1495         if (quota_opc) {
1496                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1497                                       inode_pending, 0);
1498                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1499                                       block_pending, 1);
1500                 /* Trigger dqrel/dqacq for original owner and new owner.
1501                  * If failed, the next call for lquota_chkquota will
1502                  * process it. */
1503                 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1504                               quota_opc);
1505         }
1506 #endif
1507         RETURN(rc);
1508 }
1509
1510 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1511                       const struct lu_buf *buf, const char *name, int fl,
1512                       struct thandle *handle)
1513 {
1514         int  rc;
1515         ENTRY;
1516
1517         mdd_write_lock(env, obj, MOR_TGT_CHILD);
1518         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1519         mdd_write_unlock(env, obj);
1520
1521         RETURN(rc);
1522 }
1523
1524 static int mdd_xattr_sanity_check(const struct lu_env *env,
1525                                   struct mdd_object *obj)
1526 {
1527         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1528         struct md_ucred *uc     = md_ucred(env);
1529         int rc;
1530         ENTRY;
1531
1532         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1533                 RETURN(-EPERM);
1534
1535         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1536         if (rc)
1537                 RETURN(rc);
1538
1539         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1540             !mdd_capable(uc, CFS_CAP_FOWNER))
1541                 RETURN(-EPERM);
1542
1543         RETURN(rc);
1544 }
1545
1546 /**
1547  * The caller should guarantee to update the object ctime
1548  * after xattr_set if needed.
1549  */
1550 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1551                          const struct lu_buf *buf, const char *name,
1552                          int fl)
1553 {
1554         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1555         struct mdd_device *mdd = mdo2mdd(obj);
1556         struct thandle *handle;
1557         int  rc;
1558         ENTRY;
1559
1560         rc = mdd_xattr_sanity_check(env, mdd_obj);
1561         if (rc)
1562                 RETURN(rc);
1563
1564         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1565         /* security-replated changes may require sync */
1566         if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1567             mdd->mdd_sync_permission == 1)
1568                 txn_param_sync(&mdd_env_info(env)->mti_param);
1569
1570         handle = mdd_trans_start(env, mdd);
1571         if (IS_ERR(handle))
1572                 RETURN(PTR_ERR(handle));
1573
1574         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1575
1576         /* Only record user xattr changes */
1577         if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1578             (strncmp("user.", name, 5) == 0))
1579                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1580                                               handle);
1581         mdd_trans_stop(env, mdd, rc, handle);
1582
1583         RETURN(rc);
1584 }
1585
1586 /**
1587  * The caller should guarantee to update the object ctime
1588  * after xattr_set if needed.
1589  */
1590 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1591                   const char *name)
1592 {
1593         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1594         struct mdd_device *mdd = mdo2mdd(obj);
1595         struct thandle *handle;
1596         int  rc;
1597         ENTRY;
1598
1599         rc = mdd_xattr_sanity_check(env, mdd_obj);
1600         if (rc)
1601                 RETURN(rc);
1602
1603         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1604         handle = mdd_trans_start(env, mdd);
1605         if (IS_ERR(handle))
1606                 RETURN(PTR_ERR(handle));
1607
1608         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1609         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1610                            mdd_object_capa(env, mdd_obj));
1611         mdd_write_unlock(env, mdd_obj);
1612
1613         /* Only record user xattr changes */
1614         if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1615             (strncmp("user.", name, 5) != 0))
1616                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1617                                               handle);
1618
1619         mdd_trans_stop(env, mdd, rc, handle);
1620
1621         RETURN(rc);
1622 }
1623
1624 /* partial unlink */
1625 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1626                        struct md_attr *ma)
1627 {
1628         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1629         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1630         struct mdd_device *mdd = mdo2mdd(obj);
1631         struct thandle *handle;
1632 #ifdef HAVE_QUOTA_SUPPORT
1633         struct obd_device *obd = mdd->mdd_obd_dev;
1634         struct mds_obd *mds = &obd->u.mds;
1635         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1636         int quota_opc = 0;
1637 #endif
1638         int rc;
1639         ENTRY;
1640
1641         /*
1642          * Check -ENOENT early here because we need to get object type
1643          * to calculate credits before transaction start
1644          */
1645         if (!mdd_object_exists(mdd_obj))
1646                 RETURN(-ENOENT);
1647
1648         LASSERT(mdd_object_exists(mdd_obj) > 0);
1649
1650         rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1651         if (rc)
1652                 RETURN(rc);
1653
1654         handle = mdd_trans_start(env, mdd);
1655         if (IS_ERR(handle))
1656                 RETURN(-ENOMEM);
1657
1658         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1659
1660         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1661         if (rc)
1662                 GOTO(cleanup, rc);
1663
1664         __mdd_ref_del(env, mdd_obj, handle, 0);
1665
1666         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1667                 /* unlink dot */
1668                 __mdd_ref_del(env, mdd_obj, handle, 1);
1669         }
1670
1671         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1672         la_copy->la_ctime = ma->ma_attr.la_ctime;
1673
1674         la_copy->la_valid = LA_CTIME;
1675         rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1676         if (rc)
1677                 GOTO(cleanup, rc);
1678
1679         rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1680 #ifdef HAVE_QUOTA_SUPPORT
1681         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1682             ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1683                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1684                 mdd_quota_wrapper(&ma->ma_attr, qids);
1685         }
1686 #endif
1687
1688
1689         EXIT;
1690 cleanup:
1691         mdd_write_unlock(env, mdd_obj);
1692         mdd_trans_stop(env, mdd, rc, handle);
1693 #ifdef HAVE_QUOTA_SUPPORT
1694         if (quota_opc)
1695                 /* Trigger dqrel on the owner of child. If failed,
1696                  * the next call for lquota_chkquota will process it */
1697                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1698                               quota_opc);
1699 #endif
1700         return rc;
1701 }
1702
1703 /* partial operation */
1704 static int mdd_oc_sanity_check(const struct lu_env *env,
1705                                struct mdd_object *obj,
1706                                struct md_attr *ma)
1707 {
1708         int rc;
1709         ENTRY;
1710
1711         switch (ma->ma_attr.la_mode & S_IFMT) {
1712         case S_IFREG:
1713         case S_IFDIR:
1714         case S_IFLNK:
1715         case S_IFCHR:
1716         case S_IFBLK:
1717         case S_IFIFO:
1718         case S_IFSOCK:
1719                 rc = 0;
1720                 break;
1721         default:
1722                 rc = -EINVAL;
1723                 break;
1724         }
1725         RETURN(rc);
1726 }
1727
1728 static int mdd_object_create(const struct lu_env *env,
1729                              struct md_object *obj,
1730                              const struct md_op_spec *spec,
1731                              struct md_attr *ma)
1732 {
1733
1734         struct mdd_device *mdd = mdo2mdd(obj);
1735         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1736         const struct lu_fid *pfid = spec->u.sp_pfid;
1737         struct thandle *handle;
1738 #ifdef HAVE_QUOTA_SUPPORT
1739         struct obd_device *obd = mdd->mdd_obd_dev;
1740         struct obd_export *exp = md_quota(env)->mq_exp;
1741         struct mds_obd *mds = &obd->u.mds;
1742         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1743         int quota_opc = 0, block_count = 0;
1744         int inode_pending[MAXQUOTAS] = { 0, 0 };
1745         int block_pending[MAXQUOTAS] = { 0, 0 };
1746 #endif
1747         int rc = 0;
1748         ENTRY;
1749
1750 #ifdef HAVE_QUOTA_SUPPORT
1751         if (mds->mds_quota) {
1752                 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1753                 mdd_quota_wrapper(&ma->ma_attr, qids);
1754                 /* get file quota for child */
1755                 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1756                                 qids, inode_pending, 1, NULL, 0,
1757                                 NULL, 0);
1758                 switch (ma->ma_attr.la_mode & S_IFMT) {
1759                 case S_IFLNK:
1760                 case S_IFDIR:
1761                         block_count = 2;
1762                         break;
1763                 case S_IFREG:
1764                         block_count = 1;
1765                         break;
1766                 }
1767                 /* get block quota for child */
1768                 if (block_count)
1769                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1770                                         qids, block_pending, block_count,
1771                                         NULL, LQUOTA_FLAGS_BLK, NULL, 0);
1772         }
1773 #endif
1774
1775         mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
1776         handle = mdd_trans_start(env, mdd);
1777         if (IS_ERR(handle))
1778                 GOTO(out_pending, rc = PTR_ERR(handle));
1779
1780         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1781         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1782         if (rc)
1783                 GOTO(unlock, rc);
1784
1785         rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1786         if (rc)
1787                 GOTO(unlock, rc);
1788
1789         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1790                 /* If creating the slave object, set slave EA here. */
1791                 int lmv_size = spec->u.sp_ea.eadatalen;
1792                 struct lmv_stripe_md *lmv;
1793
1794                 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1795                 LASSERT(lmv != NULL && lmv_size > 0);
1796
1797                 rc = __mdd_xattr_set(env, mdd_obj,
1798                                      mdd_buf_get_const(env, lmv, lmv_size),
1799                                      XATTR_NAME_LMV, 0, handle);
1800                 if (rc)
1801                         GOTO(unlock, rc);
1802
1803                 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1804                                            handle, 0);
1805         } else {
1806 #ifdef CONFIG_FS_POSIX_ACL
1807                 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1808                         struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1809
1810                         buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1811                         buf->lb_len = spec->u.sp_ea.eadatalen;
1812                         if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1813                                 rc = __mdd_acl_init(env, mdd_obj, buf,
1814                                                     &ma->ma_attr.la_mode,
1815                                                     handle);
1816                                 if (rc)
1817                                         GOTO(unlock, rc);
1818                                 else
1819                                         ma->ma_attr.la_valid |= LA_MODE;
1820                         }
1821
1822                         pfid = spec->u.sp_ea.fid;
1823                 }
1824 #endif
1825                 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1826                                            spec);
1827         }
1828         EXIT;
1829 unlock:
1830         if (rc == 0)
1831                 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1832         mdd_write_unlock(env, mdd_obj);
1833
1834         mdd_trans_stop(env, mdd, rc, handle);
1835 out_pending:
1836 #ifdef HAVE_QUOTA_SUPPORT
1837         if (quota_opc) {
1838                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1839                                       inode_pending, 0);
1840                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1841                                       block_pending, 1);
1842                 /* Trigger dqacq on the owner of child. If failed,
1843                  * the next call for lquota_chkquota will process it. */
1844                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1845                               quota_opc);
1846         }
1847 #endif
1848         return rc;
1849 }
1850
1851 /* partial link */
1852 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1853                        const struct md_attr *ma)
1854 {
1855         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1856         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1857         struct mdd_device *mdd = mdo2mdd(obj);
1858         struct thandle *handle;
1859         int rc;
1860         ENTRY;
1861
1862         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1863         handle = mdd_trans_start(env, mdd);
1864         if (IS_ERR(handle))
1865                 RETURN(-ENOMEM);
1866
1867         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1868         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1869         if (rc == 0)
1870                 __mdd_ref_add(env, mdd_obj, handle);
1871         mdd_write_unlock(env, mdd_obj);
1872         if (rc == 0) {
1873                 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1874                 la_copy->la_ctime = ma->ma_attr.la_ctime;
1875
1876                 la_copy->la_valid = LA_CTIME;
1877                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1878                                                         handle, 0);
1879         }
1880         mdd_trans_stop(env, mdd, 0, handle);
1881
1882         RETURN(rc);
1883 }
1884
1885 /*
1886  * do NOT or the MAY_*'s, you'll get the weakest
1887  */
1888 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1889 {
1890         int res = 0;
1891
1892         /* Sadly, NFSD reopens a file repeatedly during operation, so the
1893          * "acc_mode = 0" allowance for newly-created files isn't honoured.
1894          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1895          * owner can write to a file even if it is marked readonly to hide
1896          * its brokenness. (bug 5781) */
1897         if (flags & MDS_OPEN_OWNEROVERRIDE) {
1898                 struct md_ucred *uc = md_ucred(env);
1899
1900                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1901                     (la->la_uid == uc->mu_fsuid))
1902                         return 0;
1903         }
1904
1905         if (flags & FMODE_READ)
1906                 res |= MAY_READ;
1907         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1908                 res |= MAY_WRITE;
1909         if (flags & MDS_FMODE_EXEC)
1910                 res |= MAY_EXEC;
1911         return res;
1912 }
1913
1914 static int mdd_open_sanity_check(const struct lu_env *env,
1915                                  struct mdd_object *obj, int flag)
1916 {
1917         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1918         int mode, rc;
1919         ENTRY;
1920
1921         /* EEXIST check */
1922         if (mdd_is_dead_obj(obj))
1923                 RETURN(-ENOENT);
1924
1925         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1926         if (rc)
1927                RETURN(rc);
1928
1929         if (S_ISLNK(tmp_la->la_mode))
1930                 RETURN(-ELOOP);
1931
1932         mode = accmode(env, tmp_la, flag);
1933
1934         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1935                 RETURN(-EISDIR);
1936
1937         if (!(flag & MDS_OPEN_CREATED)) {
1938                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1939                 if (rc)
1940                         RETURN(rc);
1941         }
1942
1943         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1944             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1945                 flag &= ~MDS_OPEN_TRUNC;
1946
1947         /* For writing append-only file must open it with append mode. */
1948         if (mdd_is_append(obj)) {
1949                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
1950                         RETURN(-EPERM);
1951                 if (flag & MDS_OPEN_TRUNC)
1952                         RETURN(-EPERM);
1953         }
1954
1955 #if 0
1956         /*
1957          * Now, flag -- O_NOATIME does not be packed by client.
1958          */
1959         if (flag & O_NOATIME) {
1960                 struct md_ucred *uc = md_ucred(env);
1961
1962                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
1963                     (uc->mu_valid == UCRED_NEW)) &&
1964                     (uc->mu_fsuid != tmp_la->la_uid) &&
1965                     !mdd_capable(uc, CFS_CAP_FOWNER))
1966                         RETURN(-EPERM);
1967         }
1968 #endif
1969
1970         RETURN(0);
1971 }
1972
1973 static int mdd_open(const struct lu_env *env, struct md_object *obj,
1974                     int flags)
1975 {
1976         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1977         int rc = 0;
1978
1979         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1980
1981         rc = mdd_open_sanity_check(env, mdd_obj, flags);
1982         if (rc == 0)
1983                 mdd_obj->mod_count++;
1984
1985         mdd_write_unlock(env, mdd_obj);
1986         return rc;
1987 }
1988
1989 /* return md_attr back,
1990  * if it is last unlink then return lov ea + llog cookie*/
1991 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
1992                     struct md_attr *ma)
1993 {
1994         int rc = 0;
1995         ENTRY;
1996
1997         if (S_ISREG(mdd_object_type(obj))) {
1998                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
1999                  * Caller must be ready for that. */
2000
2001                 rc = __mdd_lmm_get(env, obj, ma);
2002                 if ((ma->ma_valid & MA_LOV))
2003                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2004                                             obj, ma);
2005         }
2006         RETURN(rc);
2007 }
2008
2009 /*
2010  * No permission check is needed.
2011  */
2012 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2013                      struct md_attr *ma)
2014 {
2015         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2016         struct mdd_device *mdd = mdo2mdd(obj);
2017         struct thandle    *handle;
2018         int rc;
2019         int reset = 1;
2020
2021 #ifdef HAVE_QUOTA_SUPPORT
2022         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2023         struct mds_obd *mds = &obd->u.mds;
2024         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2025         int quota_opc = 0;
2026 #endif
2027         ENTRY;
2028
2029         rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
2030         if (rc)
2031                 RETURN(rc);
2032         handle = mdd_trans_start(env, mdo2mdd(obj));
2033         if (IS_ERR(handle))
2034                 RETURN(PTR_ERR(handle));
2035
2036         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2037         /* release open count */
2038         mdd_obj->mod_count --;
2039
2040         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2041                 /* remove link to object from orphan index */
2042                 rc = __mdd_orphan_del(env, mdd_obj, handle);
2043                 if (rc == 0) {
2044                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2045                                "list, OSS objects to be destroyed.\n",
2046                                PFID(mdd_object_fid(mdd_obj)));
2047                 } else {
2048                         CERROR("Object "DFID" can not be deleted from orphan "
2049                                 "list, maybe cause OST objects can not be "
2050                                 "destroyed (err: %d).\n",
2051                                 PFID(mdd_object_fid(mdd_obj)), rc);
2052                         /* If object was not deleted from orphan list, do not
2053                          * destroy OSS objects, which will be done when next
2054                          * recovery. */
2055                         GOTO(out, rc);
2056                 }
2057         }
2058
2059         rc = mdd_iattr_get(env, mdd_obj, ma);
2060         /* Object maybe not in orphan list originally, it is rare case for
2061          * mdd_finish_unlink() failure. */
2062         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
2063 #ifdef HAVE_QUOTA_SUPPORT
2064                 if (mds->mds_quota) {
2065                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2066                         mdd_quota_wrapper(&ma->ma_attr, qids);
2067                 }
2068 #endif
2069                 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2070                 if (ma->ma_valid & MA_FLAGS &&
2071                     ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2072                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2073                 } else {
2074                         rc = mdd_object_kill(env, mdd_obj, ma);
2075                                 if (rc == 0)
2076                                         reset = 0;
2077                 }
2078
2079                 if (rc != 0)
2080                         CERROR("Error when prepare to delete Object "DFID" , "
2081                                "which will cause OST objects can not be "
2082                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
2083         }
2084         EXIT;
2085
2086 out:
2087         if (reset)
2088                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2089
2090         mdd_write_unlock(env, mdd_obj);
2091         mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
2092 #ifdef HAVE_QUOTA_SUPPORT
2093         if (quota_opc)
2094                 /* Trigger dqrel on the owner of child. If failed,
2095                  * the next call for lquota_chkquota will process it */
2096                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2097                               quota_opc);
2098 #endif
2099         return rc;
2100 }
2101
2102 /*
2103  * Permission check is done when open,
2104  * no need check again.
2105  */
2106 static int mdd_readpage_sanity_check(const struct lu_env *env,
2107                                      struct mdd_object *obj)
2108 {
2109         struct dt_object *next = mdd_object_child(obj);
2110         int rc;
2111         ENTRY;
2112
2113         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2114                 rc = 0;
2115         else
2116                 rc = -ENOTDIR;
2117
2118         RETURN(rc);
2119 }
2120
2121 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2122                               int first, void *area, int nob,
2123                               const struct dt_it_ops *iops, struct dt_it *it,
2124                               __u64 *start, __u64 *end,
2125                               struct lu_dirent **last, __u32 attr)
2126 {
2127         int                     result;
2128         __u64                   hash = 0;
2129         struct lu_dirent       *ent;
2130
2131         if (first) {
2132                 memset(area, 0, sizeof (struct lu_dirpage));
2133                 area += sizeof (struct lu_dirpage);
2134                 nob  -= sizeof (struct lu_dirpage);
2135         }
2136
2137         ent  = area;
2138         do {
2139                 int    len;
2140                 int    recsize;
2141
2142                 len  = iops->key_size(env, it);
2143
2144                 /* IAM iterator can return record with zero len. */
2145                 if (len == 0)
2146                         goto next;
2147
2148                 hash = iops->store(env, it);
2149                 if (unlikely(first)) {
2150                         first = 0;
2151                         *start = hash;
2152                 }
2153
2154                 /* calculate max space required for lu_dirent */
2155                 recsize = lu_dirent_calc_size(len, attr);
2156
2157                 if (nob >= recsize) {
2158                         result = iops->rec(env, it, ent, attr);
2159                         if (result == -ESTALE)
2160                                 goto next;
2161                         if (result != 0)
2162                                 goto out;
2163
2164                         /* osd might not able to pack all attributes,
2165                          * so recheck rec length */
2166                         recsize = le16_to_cpu(ent->lde_reclen);
2167                 } else {
2168                         /*
2169                          * record doesn't fit into page, enlarge previous one.
2170                          */
2171                         if (*last) {
2172                                 (*last)->lde_reclen =
2173                                         cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
2174                                                         nob);
2175                                 result = 0;
2176                         } else
2177                                 result = -EINVAL;
2178
2179                         goto out;
2180                 }
2181                 *last = ent;
2182                 ent = (void *)ent + recsize;
2183                 nob -= recsize;
2184
2185 next:
2186                 result = iops->next(env, it);
2187                 if (result == -ESTALE)
2188                         goto next;
2189         } while (result == 0);
2190
2191 out:
2192         *end = hash;
2193         return result;
2194 }
2195
2196 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2197                           const struct lu_rdpg *rdpg)
2198 {
2199         struct dt_it      *it;
2200         struct dt_object  *next = mdd_object_child(obj);
2201         const struct dt_it_ops  *iops;
2202         struct page       *pg;
2203         struct lu_dirent  *last = NULL;
2204         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2205         int i;
2206         int rc;
2207         int nob;
2208         __u64 hash_start;
2209         __u64 hash_end = 0;
2210
2211         LASSERT(rdpg->rp_pages != NULL);
2212         LASSERT(next->do_index_ops != NULL);
2213
2214         if (rdpg->rp_count <= 0)
2215                 return -EFAULT;
2216
2217         /*
2218          * iterate through directory and fill pages from @rdpg
2219          */
2220         iops = &next->do_index_ops->dio_it;
2221         it = iops->init(env, next, mdd_object_capa(env, obj));
2222         if (IS_ERR(it))
2223                 return PTR_ERR(it);
2224
2225         rc = iops->load(env, it, rdpg->rp_hash);
2226
2227         if (rc == 0){
2228                 /*
2229                  * Iterator didn't find record with exactly the key requested.
2230                  *
2231                  * It is currently either
2232                  *
2233                  *     - positioned above record with key less than
2234                  *     requested---skip it.
2235                  *
2236                  *     - or not positioned at all (is in IAM_IT_SKEWED
2237                  *     state)---position it on the next item.
2238                  */
2239                 rc = iops->next(env, it);
2240         } else if (rc > 0)
2241                 rc = 0;
2242
2243         /*
2244          * At this point and across for-loop:
2245          *
2246          *  rc == 0 -> ok, proceed.
2247          *  rc >  0 -> end of directory.
2248          *  rc <  0 -> error.
2249          */
2250         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2251              i++, nob -= CFS_PAGE_SIZE) {
2252                 LASSERT(i < rdpg->rp_npages);
2253                 pg = rdpg->rp_pages[i];
2254                 rc = mdd_dir_page_build(env, mdd, !i, cfs_kmap(pg),
2255                                         min_t(int, nob, CFS_PAGE_SIZE), iops,
2256                                         it, &hash_start, &hash_end, &last,
2257                                         rdpg->rp_attrs);
2258                 if (rc != 0 || i == rdpg->rp_npages - 1) {
2259                         if (last)
2260                                 last->lde_reclen = 0;
2261                 }
2262                 cfs_kunmap(pg);
2263         }
2264         if (rc > 0) {
2265                 /*
2266                  * end of directory.
2267                  */
2268                 hash_end = DIR_END_OFF;
2269                 rc = 0;
2270         }
2271         if (rc == 0) {
2272                 struct lu_dirpage *dp;
2273
2274                 dp = cfs_kmap(rdpg->rp_pages[0]);
2275                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2276                 dp->ldp_hash_end   = cpu_to_le64(hash_end);
2277                 if (i == 0)
2278                         /*
2279                          * No pages were processed, mark this.
2280                          */
2281                         dp->ldp_flags |= LDF_EMPTY;
2282
2283                 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2284                 cfs_kunmap(rdpg->rp_pages[0]);
2285         }
2286         iops->put(env, it);
2287         iops->fini(env, it);
2288
2289         return rc;
2290 }
2291
2292 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2293                  const struct lu_rdpg *rdpg)
2294 {
2295         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2296         int rc;
2297         ENTRY;
2298
2299         LASSERT(mdd_object_exists(mdd_obj));
2300
2301         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2302         rc = mdd_readpage_sanity_check(env, mdd_obj);
2303         if (rc)
2304                 GOTO(out_unlock, rc);
2305
2306         if (mdd_is_dead_obj(mdd_obj)) {
2307                 struct page *pg;
2308                 struct lu_dirpage *dp;
2309
2310                 /*
2311                  * According to POSIX, please do not return any entry to client:
2312                  * even dot and dotdot should not be returned.
2313                  */
2314                 CWARN("readdir from dead object: "DFID"\n",
2315                         PFID(mdd_object_fid(mdd_obj)));
2316
2317                 if (rdpg->rp_count <= 0)
2318                         GOTO(out_unlock, rc = -EFAULT);
2319                 LASSERT(rdpg->rp_pages != NULL);
2320
2321                 pg = rdpg->rp_pages[0];
2322                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2323                 memset(dp, 0 , sizeof(struct lu_dirpage));
2324                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2325                 dp->ldp_hash_end   = cpu_to_le64(DIR_END_OFF);
2326                 dp->ldp_flags |= LDF_EMPTY;
2327                 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2328                 cfs_kunmap(pg);
2329                 GOTO(out_unlock, rc = 0);
2330         }
2331
2332         rc = __mdd_readpage(env, mdd_obj, rdpg);
2333
2334         EXIT;
2335 out_unlock:
2336         mdd_read_unlock(env, mdd_obj);
2337         return rc;
2338 }
2339
2340 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2341 {
2342         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2343         struct dt_object *next;
2344
2345         LASSERT(mdd_object_exists(mdd_obj));
2346         next = mdd_object_child(mdd_obj);
2347         return next->do_ops->do_object_sync(env, next);
2348 }
2349
2350 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2351                                         struct md_object *obj)
2352 {
2353         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2354
2355         LASSERT(mdd_object_exists(mdd_obj));
2356         return do_version_get(env, mdd_object_child(mdd_obj));
2357 }
2358
2359 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2360                             dt_obj_version_t version)
2361 {
2362         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2363
2364         LASSERT(mdd_object_exists(mdd_obj));
2365         do_version_set(env, mdd_object_child(mdd_obj), version);
2366 }
2367
2368 const struct md_object_operations mdd_obj_ops = {
2369         .moo_permission    = mdd_permission,
2370         .moo_attr_get      = mdd_attr_get,
2371         .moo_attr_set      = mdd_attr_set,
2372         .moo_xattr_get     = mdd_xattr_get,
2373         .moo_xattr_set     = mdd_xattr_set,
2374         .moo_xattr_list    = mdd_xattr_list,
2375         .moo_xattr_del     = mdd_xattr_del,
2376         .moo_object_create = mdd_object_create,
2377         .moo_ref_add       = mdd_ref_add,
2378         .moo_ref_del       = mdd_ref_del,
2379         .moo_open          = mdd_open,
2380         .moo_close         = mdd_close,
2381         .moo_readpage      = mdd_readpage,
2382         .moo_readlink      = mdd_readlink,
2383         .moo_capa_get      = mdd_capa_get,
2384         .moo_object_sync   = mdd_object_sync,
2385         .moo_version_get   = mdd_version_get,
2386         .moo_version_set   = mdd_version_set,
2387         .moo_path          = mdd_path,
2388 };