Whamcloud - gitweb
68504b0b86bb0b2578b813d1e0ae9b68410e049e
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_object.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49 #ifdef HAVE_EXT4_LDISKFS
50 #include <ldiskfs/ldiskfs_jbd2.h>
51 #else
52 #include <linux/jbd.h>
53 #endif
54 #include <obd.h>
55 #include <obd_class.h>
56 #include <obd_support.h>
57 #include <lprocfs_status.h>
58 /* fid_be_cpu(), fid_cpu_to_be(). */
59 #include <lustre_fid.h>
60
61 #include <lustre_param.h>
62 #ifdef HAVE_EXT4_LDISKFS
63 #include <ldiskfs/ldiskfs.h>
64 #else
65 #include <linux/ldiskfs_fs.h>
66 #endif
67 #include <lustre_mds.h>
68 #include <lustre/lustre_idl.h>
69
70 #include "mdd_internal.h"
71
72 static const struct lu_object_operations mdd_lu_obj_ops;
73
74 static int mdd_xattr_get(const struct lu_env *env,
75                          struct md_object *obj, struct lu_buf *buf,
76                          const char *name);
77
78 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
79                  void **data)
80 {
81         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
82                  PFID(mdd_object_fid(obj)));
83         mdo_data_get(env, obj, data);
84         return 0;
85 }
86
87 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
88                struct lu_attr *la, struct lustre_capa *capa)
89 {
90         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
91                  PFID(mdd_object_fid(obj)));
92         return mdo_attr_get(env, obj, la, capa);
93 }
94
95 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
96 {
97         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
98
99         if (flags & LUSTRE_APPEND_FL)
100                 obj->mod_flags |= APPEND_OBJ;
101
102         if (flags & LUSTRE_IMMUTABLE_FL)
103                 obj->mod_flags |= IMMUTE_OBJ;
104 }
105
106 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
107 {
108         struct mdd_thread_info *info;
109
110         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
111         LASSERT(info != NULL);
112         return info;
113 }
114
115 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
116 {
117         struct lu_buf *buf;
118
119         buf = &mdd_env_info(env)->mti_buf;
120         buf->lb_buf = area;
121         buf->lb_len = len;
122         return buf;
123 }
124
125 void mdd_buf_put(struct lu_buf *buf)
126 {
127         if (buf == NULL || buf->lb_buf == NULL)
128                 return;
129         if (buf->lb_vmalloc)
130                 OBD_VFREE(buf->lb_buf, buf->lb_len);
131         else
132                 OBD_FREE(buf->lb_buf, buf->lb_len);
133         buf->lb_buf = NULL;
134         buf->lb_len = 0;
135 }
136
137 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
138                                        const void *area, ssize_t len)
139 {
140         struct lu_buf *buf;
141
142         buf = &mdd_env_info(env)->mti_buf;
143         buf->lb_buf = (void *)area;
144         buf->lb_len = len;
145         return buf;
146 }
147
148 #define BUF_VMALLOC_SIZE (CFS_PAGE_SIZE<<2) /* 16k */
149 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
150 {
151         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
152
153         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
154                 if (buf->lb_vmalloc)
155                         OBD_VFREE(buf->lb_buf, buf->lb_len);
156                 else
157                         OBD_FREE(buf->lb_buf, buf->lb_len);
158                 buf->lb_buf = NULL;
159         }
160         if (buf->lb_buf == NULL) {
161                 buf->lb_len = len;
162                 if (buf->lb_len <= BUF_VMALLOC_SIZE) {
163                         OBD_ALLOC(buf->lb_buf, buf->lb_len);
164                         buf->lb_vmalloc = 0;
165                 }
166                 if (buf->lb_buf == NULL) {
167                         OBD_VMALLOC(buf->lb_buf, buf->lb_len);
168                         buf->lb_vmalloc = 1;
169                 }
170                 if (buf->lb_buf == NULL)
171                         buf->lb_len = 0;
172         }
173         return buf;
174 }
175
176 /** Increase the size of the \a mti_big_buf.
177  * preserves old data in buffer
178  * old buffer remains unchanged on error
179  * \retval 0 or -ENOMEM
180  */
181 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
182 {
183         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
184         struct lu_buf buf;
185
186         LASSERT(len >= oldbuf->lb_len);
187         if (len > BUF_VMALLOC_SIZE) {
188                 OBD_VMALLOC(buf.lb_buf, len);
189                 buf.lb_vmalloc = 1;
190         } else {
191                 OBD_ALLOC(buf.lb_buf, len);
192                 buf.lb_vmalloc = 0;
193         }
194         if (buf.lb_buf == NULL)
195                 return -ENOMEM;
196
197         buf.lb_len = len;
198         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
199
200         if (oldbuf->lb_vmalloc)
201                 OBD_VFREE(oldbuf->lb_buf, oldbuf->lb_len);
202         else
203                 OBD_FREE(oldbuf->lb_buf, oldbuf->lb_len);
204
205         memcpy(oldbuf, &buf, sizeof(buf));
206
207         return 0;
208 }
209
210 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
211                                        struct mdd_device *mdd)
212 {
213         struct mdd_thread_info *mti = mdd_env_info(env);
214         int                     max_cookie_size;
215
216         max_cookie_size = mdd_lov_cookiesize(env, mdd);
217         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
218                 if (mti->mti_max_cookie)
219                         OBD_FREE(mti->mti_max_cookie, mti->mti_max_cookie_size);
220                 mti->mti_max_cookie = NULL;
221                 mti->mti_max_cookie_size = 0;
222         }
223         if (unlikely(mti->mti_max_cookie == NULL)) {
224                 OBD_ALLOC(mti->mti_max_cookie, max_cookie_size);
225                 if (likely(mti->mti_max_cookie != NULL))
226                         mti->mti_max_cookie_size = max_cookie_size;
227         }
228         if (likely(mti->mti_max_cookie != NULL))
229                 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
230         return mti->mti_max_cookie;
231 }
232
233 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
234                                    struct mdd_device *mdd)
235 {
236         struct mdd_thread_info *mti = mdd_env_info(env);
237         int                     max_lmm_size;
238
239         max_lmm_size = mdd_lov_mdsize(env, mdd);
240         if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
241                 if (mti->mti_max_lmm)
242                         OBD_FREE(mti->mti_max_lmm, mti->mti_max_lmm_size);
243                 mti->mti_max_lmm = NULL;
244                 mti->mti_max_lmm_size = 0;
245         }
246         if (unlikely(mti->mti_max_lmm == NULL)) {
247                 OBD_ALLOC(mti->mti_max_lmm, max_lmm_size);
248                 if (unlikely(mti->mti_max_lmm != NULL))
249                         mti->mti_max_lmm_size = max_lmm_size;
250         }
251         return mti->mti_max_lmm;
252 }
253
254 struct lu_object *mdd_object_alloc(const struct lu_env *env,
255                                    const struct lu_object_header *hdr,
256                                    struct lu_device *d)
257 {
258         struct mdd_object *mdd_obj;
259
260         OBD_ALLOC_PTR(mdd_obj);
261         if (mdd_obj != NULL) {
262                 struct lu_object *o;
263
264                 o = mdd2lu_obj(mdd_obj);
265                 lu_object_init(o, NULL, d);
266                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
267                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
268                 mdd_obj->mod_count = 0;
269                 o->lo_ops = &mdd_lu_obj_ops;
270                 return o;
271         } else {
272                 return NULL;
273         }
274 }
275
276 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
277                            const struct lu_object_conf *unused)
278 {
279         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
280         struct mdd_object *mdd_obj = lu2mdd_obj(o);
281         struct lu_object  *below;
282         struct lu_device  *under;
283         ENTRY;
284
285         mdd_obj->mod_cltime = 0;
286         under = &d->mdd_child->dd_lu_dev;
287         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
288         mdd_pdlock_init(mdd_obj);
289         if (below == NULL)
290                 RETURN(-ENOMEM);
291
292         lu_object_add(o, below);
293
294         RETURN(0);
295 }
296
297 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
298 {
299         if (lu_object_exists(o))
300                 return mdd_get_flags(env, lu2mdd_obj(o));
301         else
302                 return 0;
303 }
304
305 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
306 {
307         struct mdd_object *mdd = lu2mdd_obj(o);
308
309         lu_object_fini(o);
310         OBD_FREE_PTR(mdd);
311 }
312
313 static int mdd_object_print(const struct lu_env *env, void *cookie,
314                             lu_printer_t p, const struct lu_object *o)
315 {
316         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
317         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
318                     "valid=%x, cltime="LPU64", flags=%lx)",
319                     mdd, mdd->mod_count, mdd->mod_valid,
320                     mdd->mod_cltime, mdd->mod_flags);
321 }
322
323 static const struct lu_object_operations mdd_lu_obj_ops = {
324         .loo_object_init    = mdd_object_init,
325         .loo_object_start   = mdd_object_start,
326         .loo_object_free    = mdd_object_free,
327         .loo_object_print   = mdd_object_print,
328 };
329
330 struct mdd_object *mdd_object_find(const struct lu_env *env,
331                                    struct mdd_device *d,
332                                    const struct lu_fid *f)
333 {
334         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
335 }
336
337 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
338                         const char *path, struct lu_fid *fid)
339 {
340         struct lu_buf *buf;
341         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
342         struct mdd_object *obj;
343         struct lu_name *lname = &mdd_env_info(env)->mti_name;
344         char *name;
345         int rc = 0;
346         ENTRY;
347
348         /* temp buffer for path element */
349         buf = mdd_buf_alloc(env, PATH_MAX);
350         if (buf->lb_buf == NULL)
351                 RETURN(-ENOMEM);
352
353         lname->ln_name = name = buf->lb_buf;
354         lname->ln_namelen = 0;
355         *f = mdd->mdd_root_fid;
356
357         while(1) {
358                 while (*path == '/')
359                         path++;
360                 if (*path == '\0')
361                         break;
362                 while (*path != '/' && *path != '\0') {
363                         *name = *path;
364                         path++;
365                         name++;
366                         lname->ln_namelen++;
367                 }
368
369                 *name = '\0';
370                 /* find obj corresponding to fid */
371                 obj = mdd_object_find(env, mdd, f);
372                 if (obj == NULL)
373                         GOTO(out, rc = -EREMOTE);
374                 if (IS_ERR(obj))
375                         GOTO(out, rc = -PTR_ERR(obj));
376                 /* get child fid from parent and name */
377                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
378                 mdd_object_put(env, obj);
379                 if (rc)
380                         break;
381
382                 name = buf->lb_buf;
383                 lname->ln_namelen = 0;
384         }
385
386         if (!rc)
387                 *fid = *f;
388 out:
389         RETURN(rc);
390 }
391
392 /** The maximum depth that fid2path() will search.
393  * This is limited only because we want to store the fids for
394  * historical path lookup purposes.
395  */
396 #define MAX_PATH_DEPTH 100
397
398 /** mdd_path() lookup structure. */
399 struct path_lookup_info {
400         __u64                pli_recno;        /**< history point */
401         __u64                pli_currec;       /**< current record */
402         struct lu_fid        pli_fid;
403         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
404         struct mdd_object   *pli_mdd_obj;
405         char                *pli_path;         /**< full path */
406         int                  pli_pathlen;
407         int                  pli_linkno;       /**< which hardlink to follow */
408         int                  pli_fidcount;     /**< number of \a pli_fids */
409 };
410
411 static int mdd_path_current(const struct lu_env *env,
412                             struct path_lookup_info *pli)
413 {
414         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
415         struct mdd_object *mdd_obj;
416         struct lu_buf     *buf = NULL;
417         struct link_ea_header *leh;
418         struct link_ea_entry  *lee;
419         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
420         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
421         char *ptr;
422         int reclen;
423         int rc;
424         ENTRY;
425
426         ptr = pli->pli_path + pli->pli_pathlen - 1;
427         *ptr = 0;
428         --ptr;
429         pli->pli_fidcount = 0;
430         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
431
432         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
433                 mdd_obj = mdd_object_find(env, mdd,
434                                           &pli->pli_fids[pli->pli_fidcount]);
435                 if (mdd_obj == NULL)
436                         GOTO(out, rc = -EREMOTE);
437                 if (IS_ERR(mdd_obj))
438                         GOTO(out, rc = -PTR_ERR(mdd_obj));
439                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
440                 if (rc <= 0) {
441                         mdd_object_put(env, mdd_obj);
442                         if (rc == -1)
443                                 rc = -EREMOTE;
444                         else if (rc == 0)
445                                 /* Do I need to error out here? */
446                                 rc = -ENOENT;
447                         GOTO(out, rc);
448                 }
449
450                 /* Get parent fid and object name */
451                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
452                 buf = mdd_links_get(env, mdd_obj);
453                 mdd_read_unlock(env, mdd_obj);
454                 mdd_object_put(env, mdd_obj);
455                 if (IS_ERR(buf))
456                         GOTO(out, rc = PTR_ERR(buf));
457
458                 leh = buf->lb_buf;
459                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
460                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
461
462                 /* If set, use link #linkno for path lookup, otherwise use
463                    link #0.  Only do this for the final path element. */
464                 if ((pli->pli_fidcount == 0) &&
465                     (pli->pli_linkno < leh->leh_reccount)) {
466                         int count;
467                         for (count = 0; count < pli->pli_linkno; count++) {
468                                 lee = (struct link_ea_entry *)
469                                      ((char *)lee + reclen);
470                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
471                         }
472                         if (pli->pli_linkno < leh->leh_reccount - 1)
473                                 /* indicate to user there are more links */
474                                 pli->pli_linkno++;
475                 }
476
477                 /* Pack the name in the end of the buffer */
478                 ptr -= tmpname->ln_namelen;
479                 if (ptr - 1 <= pli->pli_path)
480                         GOTO(out, rc = -EOVERFLOW);
481                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
482                 *(--ptr) = '/';
483
484                 /* Store the parent fid for historic lookup */
485                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
486                         GOTO(out, rc = -EOVERFLOW);
487                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
488         }
489
490         /* Verify that our path hasn't changed since we started the lookup.
491            Record the current index, and verify the path resolves to the
492            same fid. If it does, then the path is correct as of this index. */
493         cfs_spin_lock(&mdd->mdd_cl.mc_lock);
494         pli->pli_currec = mdd->mdd_cl.mc_index;
495         cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
496         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
497         if (rc) {
498                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
499                 GOTO (out, rc = -EAGAIN);
500         }
501         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
502                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
503                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
504                        PFID(&pli->pli_fid));
505                 GOTO(out, rc = -EAGAIN);
506         }
507
508         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
509
510         EXIT;
511 out:
512         if (buf && !IS_ERR(buf) && buf->lb_vmalloc)
513                 /* if we vmalloced a large buffer drop it */
514                 mdd_buf_put(buf);
515
516         return rc;
517 }
518
519 static int mdd_path_historic(const struct lu_env *env,
520                              struct path_lookup_info *pli)
521 {
522         return 0;
523 }
524
525 /* Returns the full path to this fid, as of changelog record recno. */
526 static int mdd_path(const struct lu_env *env, struct md_object *obj,
527                     char *path, int pathlen, __u64 *recno, int *linkno)
528 {
529         struct path_lookup_info *pli;
530         int tries = 3;
531         int rc = -EAGAIN;
532         ENTRY;
533
534         if (pathlen < 3)
535                 RETURN(-EOVERFLOW);
536
537         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
538                 path[0] = '/';
539                 path[1] = '\0';
540                 RETURN(0);
541         }
542
543         OBD_ALLOC_PTR(pli);
544         if (pli == NULL)
545                 RETURN(-ENOMEM);
546
547         pli->pli_mdd_obj = md2mdd_obj(obj);
548         pli->pli_recno = *recno;
549         pli->pli_path = path;
550         pli->pli_pathlen = pathlen;
551         pli->pli_linkno = *linkno;
552
553         /* Retry multiple times in case file is being moved */
554         while (tries-- && rc == -EAGAIN)
555                 rc = mdd_path_current(env, pli);
556
557         /* For historical path lookup, the current links may not have existed
558          * at "recno" time.  We must switch over to earlier links/parents
559          * by using the changelog records.  If the earlier parent doesn't
560          * exist, we must search back through the changelog to reconstruct
561          * its parents, then check if it exists, etc.
562          * We may ignore this problem for the initial implementation and
563          * state that an "original" hardlink must still exist for us to find
564          * historic path name. */
565         if (pli->pli_recno != -1) {
566                 rc = mdd_path_historic(env, pli);
567         } else {
568                 *recno = pli->pli_currec;
569                 /* Return next link index to caller */
570                 *linkno = pli->pli_linkno;
571         }
572
573         OBD_FREE_PTR(pli);
574
575         RETURN (rc);
576 }
577
578 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
579 {
580         struct lu_attr *la = &mdd_env_info(env)->mti_la;
581         int rc;
582
583         ENTRY;
584         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
585         if (rc == 0) {
586                 mdd_flags_xlate(obj, la->la_flags);
587                 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
588                         obj->mod_flags |= MNLINK_OBJ;
589         }
590         RETURN(rc);
591 }
592
593 /* get only inode attributes */
594 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
595                   struct md_attr *ma)
596 {
597         int rc = 0;
598         ENTRY;
599
600         if (ma->ma_valid & MA_INODE)
601                 RETURN(0);
602
603         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
604                           mdd_object_capa(env, mdd_obj));
605         if (rc == 0)
606                 ma->ma_valid |= MA_INODE;
607         RETURN(rc);
608 }
609
610 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm,
611                        int *size)
612 {
613         struct lov_desc *ldesc;
614         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
615         ENTRY;
616
617         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
618         LASSERT(ldesc != NULL);
619         LASSERT(size != NULL);
620
621         if (!lmm) {
622                 *size = 0;
623                 RETURN(0);
624         }
625
626         lmm->lmm_magic = LOV_MAGIC_V1;
627         lmm->lmm_object_seq = LOV_OBJECT_GROUP_DEFAULT;
628         lmm->lmm_pattern = ldesc->ld_pattern;
629         lmm->lmm_stripe_size = ldesc->ld_default_stripe_size;
630         lmm->lmm_stripe_count = ldesc->ld_default_stripe_count;
631         *size = sizeof(struct lov_mds_md);
632
633         RETURN(sizeof(struct lov_mds_md));
634 }
635
636 /* get lov EA only */
637 static int __mdd_lmm_get(const struct lu_env *env,
638                          struct mdd_object *mdd_obj, struct md_attr *ma)
639 {
640         int rc;
641         ENTRY;
642
643         if (ma->ma_valid & MA_LOV)
644                 RETURN(0);
645
646         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
647                         XATTR_NAME_LOV);
648         if (rc == 0 && ma->ma_need & MA_LOV_DEF)
649                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm,
650                                         &ma->ma_lmm_size);
651         if (rc > 0) {
652                 ma->ma_valid |= MA_LOV;
653                 rc = 0;
654         }
655         RETURN(rc);
656 }
657
658 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
659                        struct md_attr *ma)
660 {
661         int rc;
662         ENTRY;
663
664         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
665         rc = __mdd_lmm_get(env, mdd_obj, ma);
666         mdd_read_unlock(env, mdd_obj);
667         RETURN(rc);
668 }
669
670 /* get lmv EA only*/
671 static int __mdd_lmv_get(const struct lu_env *env,
672                          struct mdd_object *mdd_obj, struct md_attr *ma)
673 {
674         int rc;
675         ENTRY;
676
677         if (ma->ma_valid & MA_LMV)
678                 RETURN(0);
679
680         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
681                         XATTR_NAME_LMV);
682         if (rc > 0) {
683                 ma->ma_valid |= MA_LMV;
684                 rc = 0;
685         }
686         RETURN(rc);
687 }
688
689 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
690                          struct md_attr *ma)
691 {
692         struct mdd_thread_info *info = mdd_env_info(env);
693         struct lustre_mdt_attrs *lma =
694                                  (struct lustre_mdt_attrs *)info->mti_xattr_buf;
695         int lma_size;
696         int rc;
697         ENTRY;
698
699         /* If all needed data are already valid, nothing to do */
700         if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
701             (ma->ma_need & (MA_HSM | MA_SOM)))
702                 RETURN(0);
703
704         /* Read LMA from disk EA */
705         lma_size = sizeof(info->mti_xattr_buf);
706         rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
707         if (rc <= 0)
708                 RETURN(rc);
709
710         /* Useless to check LMA incompatibility because this is already done in
711          * osd_ea_fid_get(), and this will fail long before this code is
712          * called.
713          * So, if we are here, LMA is compatible.
714          */
715
716         lustre_lma_swab(lma);
717
718         /* Swab and copy LMA */
719         if (ma->ma_need & MA_HSM) {
720                 if (lma->lma_compat & LMAC_HSM)
721                         ma->ma_hsm_flags = lma->lma_flags & HSM_FLAGS_MASK;
722                 else
723                         ma->ma_hsm_flags = 0;
724                 ma->ma_valid |= MA_HSM;
725         }
726
727         /* Copy SOM */
728         if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
729                 LASSERT(ma->ma_som != NULL);
730                 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
731                 ma->ma_som->msd_size    = lma->lma_som_size;
732                 ma->ma_som->msd_blocks  = lma->lma_som_blocks;
733                 ma->ma_som->msd_mountid = lma->lma_som_mountid;
734                 ma->ma_valid |= MA_SOM;
735         }
736
737         RETURN(0);
738 }
739
740 static int mdd_attr_get_internal(const struct lu_env *env,
741                                  struct mdd_object *mdd_obj,
742                                  struct md_attr *ma)
743 {
744         int rc = 0;
745         ENTRY;
746
747         if (ma->ma_need & MA_INODE)
748                 rc = mdd_iattr_get(env, mdd_obj, ma);
749
750         if (rc == 0 && ma->ma_need & MA_LOV) {
751                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
752                     S_ISDIR(mdd_object_type(mdd_obj)))
753                         rc = __mdd_lmm_get(env, mdd_obj, ma);
754         }
755         if (rc == 0 && ma->ma_need & MA_LMV) {
756                 if (S_ISDIR(mdd_object_type(mdd_obj)))
757                         rc = __mdd_lmv_get(env, mdd_obj, ma);
758         }
759         if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
760                 if (S_ISREG(mdd_object_type(mdd_obj)))
761                         rc = __mdd_lma_get(env, mdd_obj, ma);
762         }
763 #ifdef CONFIG_FS_POSIX_ACL
764         if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
765                 if (S_ISDIR(mdd_object_type(mdd_obj)))
766                         rc = mdd_def_acl_get(env, mdd_obj, ma);
767         }
768 #endif
769         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
770                rc, ma->ma_valid);
771         RETURN(rc);
772 }
773
774 int mdd_attr_get_internal_locked(const struct lu_env *env,
775                                  struct mdd_object *mdd_obj, struct md_attr *ma)
776 {
777         int rc;
778         int needlock = ma->ma_need &
779                        (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM);
780
781         if (needlock)
782                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
783         rc = mdd_attr_get_internal(env, mdd_obj, ma);
784         if (needlock)
785                 mdd_read_unlock(env, mdd_obj);
786         return rc;
787 }
788
789 /*
790  * No permission check is needed.
791  */
792 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
793                         struct md_attr *ma)
794 {
795         struct mdd_object *mdd_obj = md2mdd_obj(obj);
796         int                rc;
797
798         ENTRY;
799         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
800         RETURN(rc);
801 }
802
803 /*
804  * No permission check is needed.
805  */
806 static int mdd_xattr_get(const struct lu_env *env,
807                          struct md_object *obj, struct lu_buf *buf,
808                          const char *name)
809 {
810         struct mdd_object *mdd_obj = md2mdd_obj(obj);
811         int rc;
812
813         ENTRY;
814
815         LASSERT(mdd_object_exists(mdd_obj));
816
817         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
818         rc = mdo_xattr_get(env, mdd_obj, buf, name,
819                            mdd_object_capa(env, mdd_obj));
820         mdd_read_unlock(env, mdd_obj);
821
822         RETURN(rc);
823 }
824
825 /*
826  * Permission check is done when open,
827  * no need check again.
828  */
829 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
830                         struct lu_buf *buf)
831 {
832         struct mdd_object *mdd_obj = md2mdd_obj(obj);
833         struct dt_object  *next;
834         loff_t             pos = 0;
835         int                rc;
836         ENTRY;
837
838         LASSERT(mdd_object_exists(mdd_obj));
839
840         next = mdd_object_child(mdd_obj);
841         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
842         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
843                                          mdd_object_capa(env, mdd_obj));
844         mdd_read_unlock(env, mdd_obj);
845         RETURN(rc);
846 }
847
848 /*
849  * No permission check is needed.
850  */
851 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
852                           struct lu_buf *buf)
853 {
854         struct mdd_object *mdd_obj = md2mdd_obj(obj);
855         int rc;
856
857         ENTRY;
858
859         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
860         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
861         mdd_read_unlock(env, mdd_obj);
862
863         RETURN(rc);
864 }
865
866 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
867                                struct mdd_object *c, struct md_attr *ma,
868                                struct thandle *handle,
869                                const struct md_op_spec *spec)
870 {
871         struct lu_attr *attr = &ma->ma_attr;
872         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
873         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
874         const struct dt_index_features *feat = spec->sp_feat;
875         int rc;
876         ENTRY;
877
878         if (!mdd_object_exists(c)) {
879                 struct dt_object *next = mdd_object_child(c);
880                 LASSERT(next);
881
882                 if (feat != &dt_directory_features && feat != NULL)
883                         dof->dof_type = DFT_INDEX;
884                 else
885                         dof->dof_type = dt_mode_to_dft(attr->la_mode);
886
887                 dof->u.dof_idx.di_feat = feat;
888
889                 /* @hint will be initialized by underlying device. */
890                 next->do_ops->do_ah_init(env, hint,
891                                          p ? mdd_object_child(p) : NULL,
892                                          attr->la_mode & S_IFMT);
893
894                 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
895                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
896         } else
897                 rc = -EEXIST;
898
899         RETURN(rc);
900 }
901
902 /**
903  * Make sure the ctime is increased only.
904  */
905 static inline int mdd_attr_check(const struct lu_env *env,
906                                  struct mdd_object *obj,
907                                  struct lu_attr *attr)
908 {
909         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
910         int rc;
911         ENTRY;
912
913         if (attr->la_valid & LA_CTIME) {
914                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
915                 if (rc)
916                         RETURN(rc);
917
918                 if (attr->la_ctime < tmp_la->la_ctime)
919                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
920                 else if (attr->la_valid == LA_CTIME &&
921                          attr->la_ctime == tmp_la->la_ctime)
922                         attr->la_valid &= ~LA_CTIME;
923         }
924         RETURN(0);
925 }
926
927 int mdd_attr_set_internal(const struct lu_env *env,
928                           struct mdd_object *obj,
929                           struct lu_attr *attr,
930                           struct thandle *handle,
931                           int needacl)
932 {
933         int rc;
934         ENTRY;
935
936         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
937 #ifdef CONFIG_FS_POSIX_ACL
938         if (!rc && (attr->la_valid & LA_MODE) && needacl)
939                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
940 #endif
941         RETURN(rc);
942 }
943
944 int mdd_attr_check_set_internal(const struct lu_env *env,
945                                 struct mdd_object *obj,
946                                 struct lu_attr *attr,
947                                 struct thandle *handle,
948                                 int needacl)
949 {
950         int rc;
951         ENTRY;
952
953         rc = mdd_attr_check(env, obj, attr);
954         if (rc)
955                 RETURN(rc);
956
957         if (attr->la_valid)
958                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
959         RETURN(rc);
960 }
961
962 static int mdd_attr_set_internal_locked(const struct lu_env *env,
963                                         struct mdd_object *obj,
964                                         struct lu_attr *attr,
965                                         struct thandle *handle,
966                                         int needacl)
967 {
968         int rc;
969         ENTRY;
970
971         needacl = needacl && (attr->la_valid & LA_MODE);
972         if (needacl)
973                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
974         rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
975         if (needacl)
976                 mdd_write_unlock(env, obj);
977         RETURN(rc);
978 }
979
980 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
981                                        struct mdd_object *obj,
982                                        struct lu_attr *attr,
983                                        struct thandle *handle,
984                                        int needacl)
985 {
986         int rc;
987         ENTRY;
988
989         needacl = needacl && (attr->la_valid & LA_MODE);
990         if (needacl)
991                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
992         rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
993         if (needacl)
994                 mdd_write_unlock(env, obj);
995         RETURN(rc);
996 }
997
998 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
999                     const struct lu_buf *buf, const char *name,
1000                     int fl, struct thandle *handle)
1001 {
1002         struct lustre_capa *capa = mdd_object_capa(env, obj);
1003         int rc = -EINVAL;
1004         ENTRY;
1005
1006         if (buf->lb_buf && buf->lb_len > 0)
1007                 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1008         else if (buf->lb_buf == NULL && buf->lb_len == 0)
1009                 rc = mdo_xattr_del(env, obj, name, handle, capa);
1010
1011         RETURN(rc);
1012 }
1013
1014 /*
1015  * This gives the same functionality as the code between
1016  * sys_chmod and inode_setattr
1017  * chown_common and inode_setattr
1018  * utimes and inode_setattr
1019  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1020  */
1021 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1022                         struct lu_attr *la, const struct md_attr *ma)
1023 {
1024         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
1025         struct md_ucred  *uc;
1026         int               rc;
1027         ENTRY;
1028
1029         if (!la->la_valid)
1030                 RETURN(0);
1031
1032         /* Do not permit change file type */
1033         if (la->la_valid & LA_TYPE)
1034                 RETURN(-EPERM);
1035
1036         /* They should not be processed by setattr */
1037         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1038                 RETURN(-EPERM);
1039
1040         /* export destroy does not have ->le_ses, but we may want
1041          * to drop LUSTRE_SOM_FL. */
1042         if (!env->le_ses)
1043                 RETURN(0);
1044
1045         uc = md_ucred(env);
1046
1047         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1048         if (rc)
1049                 RETURN(rc);
1050
1051         if (la->la_valid == LA_CTIME) {
1052                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1053                         /* This is only for set ctime when rename's source is
1054                          * on remote MDS. */
1055                         rc = mdd_may_delete(env, NULL, obj,
1056                                             (struct md_attr *)ma, 1, 0);
1057                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1058                         la->la_valid &= ~LA_CTIME;
1059                 RETURN(rc);
1060         }
1061
1062         if (la->la_valid == LA_ATIME) {
1063                 /* This is atime only set for read atime update on close. */
1064                 if (la->la_atime <= tmp_la->la_atime +
1065                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff)
1066                         la->la_valid &= ~LA_ATIME;
1067                 RETURN(0);
1068         }
1069
1070         /* Check if flags change. */
1071         if (la->la_valid & LA_FLAGS) {
1072                 unsigned int oldflags = 0;
1073                 unsigned int newflags = la->la_flags &
1074                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1075
1076                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1077                     !mdd_capable(uc, CFS_CAP_FOWNER))
1078                         RETURN(-EPERM);
1079
1080                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1081                  * only be changed by the relevant capability. */
1082                 if (mdd_is_immutable(obj))
1083                         oldflags |= LUSTRE_IMMUTABLE_FL;
1084                 if (mdd_is_append(obj))
1085                         oldflags |= LUSTRE_APPEND_FL;
1086                 if ((oldflags ^ newflags) &&
1087                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1088                         RETURN(-EPERM);
1089
1090                 if (!S_ISDIR(tmp_la->la_mode))
1091                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1092         }
1093
1094         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1095             (la->la_valid & ~LA_FLAGS) &&
1096             !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1097                 RETURN(-EPERM);
1098
1099         /* Check for setting the obj time. */
1100         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1101             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1102                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1103                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
1104                         rc = mdd_permission_internal_locked(env, obj, tmp_la,
1105                                                             MAY_WRITE,
1106                                                             MOR_TGT_CHILD);
1107                         if (rc)
1108                                 RETURN(rc);
1109                 }
1110         }
1111
1112         /* Make sure a caller can chmod. */
1113         if (la->la_valid & LA_MODE) {
1114                 /* Bypass la_vaild == LA_MODE,
1115                  * this is for changing file with SUID or SGID. */
1116                 if ((la->la_valid & ~LA_MODE) &&
1117                     !(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1118                     (uc->mu_fsuid != tmp_la->la_uid) &&
1119                     !mdd_capable(uc, CFS_CAP_FOWNER))
1120                         RETURN(-EPERM);
1121
1122                 if (la->la_mode == (cfs_umode_t) -1)
1123                         la->la_mode = tmp_la->la_mode;
1124                 else
1125                         la->la_mode = (la->la_mode & S_IALLUGO) |
1126                                       (tmp_la->la_mode & ~S_IALLUGO);
1127
1128                 /* Also check the setgid bit! */
1129                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1130                                        la->la_gid : tmp_la->la_gid) &&
1131                     !mdd_capable(uc, CFS_CAP_FSETID))
1132                         la->la_mode &= ~S_ISGID;
1133         } else {
1134                la->la_mode = tmp_la->la_mode;
1135         }
1136
1137         /* Make sure a caller can chown. */
1138         if (la->la_valid & LA_UID) {
1139                 if (la->la_uid == (uid_t) -1)
1140                         la->la_uid = tmp_la->la_uid;
1141                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1142                     (la->la_uid != tmp_la->la_uid)) &&
1143                     !mdd_capable(uc, CFS_CAP_CHOWN))
1144                         RETURN(-EPERM);
1145
1146                 /* If the user or group of a non-directory has been
1147                  * changed by a non-root user, remove the setuid bit.
1148                  * 19981026 David C Niemi <niemi@tux.org>
1149                  *
1150                  * Changed this to apply to all users, including root,
1151                  * to avoid some races. This is the behavior we had in
1152                  * 2.0. The check for non-root was definitely wrong
1153                  * for 2.2 anyway, as it should have been using
1154                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
1155                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1156                     !S_ISDIR(tmp_la->la_mode)) {
1157                         la->la_mode &= ~S_ISUID;
1158                         la->la_valid |= LA_MODE;
1159                 }
1160         }
1161
1162         /* Make sure caller can chgrp. */
1163         if (la->la_valid & LA_GID) {
1164                 if (la->la_gid == (gid_t) -1)
1165                         la->la_gid = tmp_la->la_gid;
1166                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1167                     ((la->la_gid != tmp_la->la_gid) &&
1168                     !lustre_in_group_p(uc, la->la_gid))) &&
1169                     !mdd_capable(uc, CFS_CAP_CHOWN))
1170                         RETURN(-EPERM);
1171
1172                 /* Likewise, if the user or group of a non-directory
1173                  * has been changed by a non-root user, remove the
1174                  * setgid bit UNLESS there is no group execute bit
1175                  * (this would be a file marked for mandatory
1176                  * locking).  19981026 David C Niemi <niemi@tux.org>
1177                  *
1178                  * Removed the fsuid check (see the comment above) --
1179                  * 19990830 SD. */
1180                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1181                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1182                         la->la_mode &= ~S_ISGID;
1183                         la->la_valid |= LA_MODE;
1184                 }
1185         }
1186
1187         /* For both Size-on-MDS case and truncate case,
1188          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1189          * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1190          * For SOM case, it is true, the MAY_WRITE perm has been checked
1191          * when open, no need check again. For truncate case, it is false,
1192          * the MAY_WRITE perm should be checked here. */
1193         if (ma->ma_attr_flags & MDS_SOM) {
1194                 /* For the "Size-on-MDS" setattr update, merge coming
1195                  * attributes with the set in the inode. BUG 10641 */
1196                 if ((la->la_valid & LA_ATIME) &&
1197                     (la->la_atime <= tmp_la->la_atime))
1198                         la->la_valid &= ~LA_ATIME;
1199
1200                 /* OST attributes do not have a priority over MDS attributes,
1201                  * so drop times if ctime is equal. */
1202                 if ((la->la_valid & LA_CTIME) &&
1203                     (la->la_ctime <= tmp_la->la_ctime))
1204                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1205         } else {
1206                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1207                         if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1208                               (uc->mu_fsuid == tmp_la->la_uid)) &&
1209                             !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1210                                 rc = mdd_permission_internal_locked(env, obj,
1211                                                             tmp_la, MAY_WRITE,
1212                                                             MOR_TGT_CHILD);
1213                                 if (rc)
1214                                         RETURN(rc);
1215                         }
1216                 }
1217                 if (la->la_valid & LA_CTIME) {
1218                         /* The pure setattr, it has the priority over what is
1219                          * already set, do not drop it if ctime is equal. */
1220                         if (la->la_ctime < tmp_la->la_ctime)
1221                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1222                                                   LA_CTIME);
1223                 }
1224         }
1225
1226         RETURN(0);
1227 }
1228
1229 /** Store a data change changelog record
1230  * If this fails, we must fail the whole transaction; we don't
1231  * want the change to commit without the log entry.
1232  * \param mdd_obj - mdd_object of change
1233  * \param handle - transacion handle
1234  */
1235 static int mdd_changelog_data_store(const struct lu_env     *env,
1236                                     struct mdd_device       *mdd,
1237                                     enum changelog_rec_type type,
1238                                     struct mdd_object       *mdd_obj,
1239                                     struct thandle          *handle)
1240 {
1241         const struct lu_fid *tfid = mdo2fid(mdd_obj);
1242         struct llog_changelog_rec *rec;
1243         struct lu_buf *buf;
1244         int reclen;
1245         int rc;
1246
1247         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1248                 RETURN(0);
1249
1250         LASSERT(handle != NULL);
1251         LASSERT(mdd_obj != NULL);
1252
1253         if ((type == CL_TIME) &&
1254             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1255                 /* Don't need multiple updates in this log */
1256                 /* Don't check under lock - no big deal if we get an extra
1257                    entry */
1258                 RETURN(0);
1259         }
1260
1261         reclen = llog_data_len(sizeof(*rec));
1262         buf = mdd_buf_alloc(env, reclen);
1263         if (buf->lb_buf == NULL)
1264                 RETURN(-ENOMEM);
1265         rec = (struct llog_changelog_rec *)buf->lb_buf;
1266
1267         rec->cr.cr_flags = CLF_VERSION;
1268         rec->cr.cr_type = (__u32)type;
1269         rec->cr.cr_tfid = *tfid;
1270         rec->cr.cr_namelen = 0;
1271         mdd_obj->mod_cltime = cfs_time_current_64();
1272
1273         rc = mdd_changelog_llog_write(mdd, rec, handle);
1274         if (rc < 0) {
1275                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1276                        rc, type, PFID(tfid));
1277                 return -EFAULT;
1278         }
1279
1280         return 0;
1281 }
1282
1283 /**
1284  * Should be called with write lock held.
1285  *
1286  * \see mdd_lma_set_locked().
1287  */
1288 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1289                        const struct md_attr *ma, struct thandle *handle)
1290 {
1291         struct mdd_thread_info *info = mdd_env_info(env);
1292         struct lu_buf *buf;
1293         struct lustre_mdt_attrs *lma =
1294                                 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1295         int lmasize = sizeof(struct lustre_mdt_attrs);
1296         int rc = 0;
1297
1298         ENTRY;
1299
1300         /* Either HSM or SOM part is not valid, we need to read it before */
1301         if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1302                 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1303                 if (rc <= 0)
1304                         RETURN(rc);
1305
1306                 lustre_lma_swab(lma);
1307         } else {
1308                 memset(lma, 0, lmasize);
1309         }
1310
1311         /* Copy HSM data */
1312         if (ma->ma_valid & MA_HSM) {
1313                 lma->lma_flags  |= ma->ma_hsm_flags & HSM_FLAGS_MASK;
1314                 lma->lma_compat |= LMAC_HSM;
1315         }
1316
1317         /* Copy SOM data */
1318         if (ma->ma_valid & MA_SOM) {
1319                 LASSERT(ma->ma_som != NULL);
1320                 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1321                         lma->lma_compat     &= ~LMAC_SOM;
1322                 } else {
1323                         lma->lma_compat     |= LMAC_SOM;
1324                         lma->lma_ioepoch     = ma->ma_som->msd_ioepoch;
1325                         lma->lma_som_size    = ma->ma_som->msd_size;
1326                         lma->lma_som_blocks  = ma->ma_som->msd_blocks;
1327                         lma->lma_som_mountid = ma->ma_som->msd_mountid;
1328                 }
1329         }
1330
1331         /* Copy FID */
1332         memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1333
1334         lustre_lma_swab(lma);
1335         buf = mdd_buf_get(env, lma, lmasize);
1336         rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1337
1338         RETURN(rc);
1339 }
1340
1341 /**
1342  * Save LMA extended attributes with data from \a ma.
1343  *
1344  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1345  * not, LMA EA will be first read from disk, modified and write back.
1346  *
1347  */
1348 static int mdd_lma_set_locked(const struct lu_env *env,
1349                               struct mdd_object *mdd_obj,
1350                               const struct md_attr *ma, struct thandle *handle)
1351 {
1352         int rc;
1353
1354         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1355         rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1356         mdd_write_unlock(env, mdd_obj);
1357         return rc;
1358 }
1359
1360 /* set attr and LOV EA at once, return updated attr */
1361 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1362                         const struct md_attr *ma)
1363 {
1364         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1365         struct mdd_device *mdd = mdo2mdd(obj);
1366         struct thandle *handle;
1367         struct lov_mds_md *lmm = NULL;
1368         struct llog_cookie *logcookies = NULL;
1369         int  rc, lmm_size = 0, cookie_size = 0;
1370         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1371 #ifdef HAVE_QUOTA_SUPPORT
1372         struct obd_device *obd = mdd->mdd_obd_dev;
1373         struct mds_obd *mds = &obd->u.mds;
1374         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1375         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1376         int quota_opc = 0, block_count = 0;
1377         int inode_pending[MAXQUOTAS] = { 0, 0 };
1378         int block_pending[MAXQUOTAS] = { 0, 0 };
1379 #endif
1380         ENTRY;
1381
1382         mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1383                                     MDD_TXN_ATTR_SET_OP);
1384         handle = mdd_trans_start(env, mdd);
1385         if (IS_ERR(handle))
1386                 RETURN(PTR_ERR(handle));
1387         /*TODO: add lock here*/
1388         /* start a log jounal handle if needed */
1389         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1390             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1391                 lmm_size = mdd_lov_mdsize(env, mdd);
1392                 lmm = mdd_max_lmm_get(env, mdd);
1393                 if (lmm == NULL)
1394                         GOTO(cleanup, rc = -ENOMEM);
1395
1396                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1397                                 XATTR_NAME_LOV);
1398
1399                 if (rc < 0)
1400                         GOTO(cleanup, rc);
1401         }
1402
1403         if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1404                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1405                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1406
1407         *la_copy = ma->ma_attr;
1408         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1409         if (rc)
1410                 GOTO(cleanup, rc);
1411
1412 #ifdef HAVE_QUOTA_SUPPORT
1413         if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1414                 struct obd_export *exp = md_quota(env)->mq_exp;
1415                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1416
1417                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1418                 if (!rc) {
1419                         quota_opc = FSFILT_OP_SETATTR;
1420                         mdd_quota_wrapper(la_copy, qnids);
1421                         mdd_quota_wrapper(la_tmp, qoids);
1422                         /* get file quota for new owner */
1423                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1424                                         qnids, inode_pending, 1, NULL, 0,
1425                                         NULL, 0);
1426                         block_count = (la_tmp->la_blocks + 7) >> 3;
1427                         if (block_count) {
1428                                 void *data = NULL;
1429                                 mdd_data_get(env, mdd_obj, &data);
1430                                 /* get block quota for new owner */
1431                                 lquota_chkquota(mds_quota_interface_ref, obd,
1432                                                 exp, qnids, block_pending,
1433                                                 block_count, NULL,
1434                                                 LQUOTA_FLAGS_BLK, data, 1);
1435                         }
1436                 }
1437         }
1438 #endif
1439
1440         if (la_copy->la_valid & LA_FLAGS) {
1441                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1442                                                   handle, 1);
1443                 if (rc == 0)
1444                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1445         } else if (la_copy->la_valid) {            /* setattr */
1446                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1447                                                   handle, 1);
1448                 /* journal chown/chgrp in llog, just like unlink */
1449                 if (rc == 0 && lmm_size){
1450                         cookie_size = mdd_lov_cookiesize(env, mdd);
1451                         logcookies = mdd_max_cookie_get(env, mdd);
1452                         if (logcookies == NULL)
1453                                 GOTO(cleanup, rc = -ENOMEM);
1454
1455                         if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1456                                             logcookies, cookie_size) <= 0)
1457                                 logcookies = NULL;
1458                 }
1459         }
1460
1461         if (rc == 0 && ma->ma_valid & MA_LOV) {
1462                 cfs_umode_t mode;
1463
1464                 mode = mdd_object_type(mdd_obj);
1465                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1466                         rc = mdd_lsm_sanity_check(env, mdd_obj);
1467                         if (rc)
1468                                 GOTO(cleanup, rc);
1469
1470                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1471                                             ma->ma_lmm_size, handle, 1);
1472                 }
1473
1474         }
1475         if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1476                 cfs_umode_t mode;
1477
1478                 mode = mdd_object_type(mdd_obj);
1479                 if (S_ISREG(mode))
1480                         rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1481
1482         }
1483 cleanup:
1484         if (rc == 0)
1485                 rc = mdd_changelog_data_store(env, mdd,
1486                                               (ma->ma_attr.la_valid &
1487                                                ~(LA_MTIME|LA_CTIME|LA_ATIME)) ?
1488                                               CL_SETATTR : CL_TIME,
1489                                               mdd_obj, handle);
1490         mdd_trans_stop(env, mdd, rc, handle);
1491         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1492                 /*set obd attr, if needed*/
1493                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1494                                            logcookies);
1495         }
1496 #ifdef HAVE_QUOTA_SUPPORT
1497         if (quota_opc) {
1498                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1499                                       inode_pending, 0);
1500                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1501                                       block_pending, 1);
1502                 /* Trigger dqrel/dqacq for original owner and new owner.
1503                  * If failed, the next call for lquota_chkquota will
1504                  * process it. */
1505                 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1506                               quota_opc);
1507         }
1508 #endif
1509         RETURN(rc);
1510 }
1511
1512 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1513                       const struct lu_buf *buf, const char *name, int fl,
1514                       struct thandle *handle)
1515 {
1516         int  rc;
1517         ENTRY;
1518
1519         mdd_write_lock(env, obj, MOR_TGT_CHILD);
1520         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1521         mdd_write_unlock(env, obj);
1522
1523         RETURN(rc);
1524 }
1525
1526 static int mdd_xattr_sanity_check(const struct lu_env *env,
1527                                   struct mdd_object *obj)
1528 {
1529         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1530         struct md_ucred *uc     = md_ucred(env);
1531         int rc;
1532         ENTRY;
1533
1534         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1535                 RETURN(-EPERM);
1536
1537         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1538         if (rc)
1539                 RETURN(rc);
1540
1541         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1542             !mdd_capable(uc, CFS_CAP_FOWNER))
1543                 RETURN(-EPERM);
1544
1545         RETURN(rc);
1546 }
1547
1548 /**
1549  * The caller should guarantee to update the object ctime
1550  * after xattr_set if needed.
1551  */
1552 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1553                          const struct lu_buf *buf, const char *name,
1554                          int fl)
1555 {
1556         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1557         struct mdd_device *mdd = mdo2mdd(obj);
1558         struct thandle *handle;
1559         int  rc;
1560         ENTRY;
1561
1562         rc = mdd_xattr_sanity_check(env, mdd_obj);
1563         if (rc)
1564                 RETURN(rc);
1565
1566         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1567         /* security-replated changes may require sync */
1568         if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1569             mdd->mdd_sync_permission == 1)
1570                 txn_param_sync(&mdd_env_info(env)->mti_param);
1571
1572         handle = mdd_trans_start(env, mdd);
1573         if (IS_ERR(handle))
1574                 RETURN(PTR_ERR(handle));
1575
1576         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1577
1578         /* Only record user xattr changes */
1579         if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1580             (strncmp("user.", name, 5) == 0))
1581                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1582                                               handle);
1583         mdd_trans_stop(env, mdd, rc, handle);
1584
1585         RETURN(rc);
1586 }
1587
1588 /**
1589  * The caller should guarantee to update the object ctime
1590  * after xattr_set if needed.
1591  */
1592 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1593                   const char *name)
1594 {
1595         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1596         struct mdd_device *mdd = mdo2mdd(obj);
1597         struct thandle *handle;
1598         int  rc;
1599         ENTRY;
1600
1601         rc = mdd_xattr_sanity_check(env, mdd_obj);
1602         if (rc)
1603                 RETURN(rc);
1604
1605         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1606         handle = mdd_trans_start(env, mdd);
1607         if (IS_ERR(handle))
1608                 RETURN(PTR_ERR(handle));
1609
1610         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1611         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1612                            mdd_object_capa(env, mdd_obj));
1613         mdd_write_unlock(env, mdd_obj);
1614
1615         /* Only record user xattr changes */
1616         if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1617             (strncmp("user.", name, 5) != 0))
1618                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1619                                               handle);
1620
1621         mdd_trans_stop(env, mdd, rc, handle);
1622
1623         RETURN(rc);
1624 }
1625
1626 /* partial unlink */
1627 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1628                        struct md_attr *ma)
1629 {
1630         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1631         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1632         struct mdd_device *mdd = mdo2mdd(obj);
1633         struct thandle *handle;
1634 #ifdef HAVE_QUOTA_SUPPORT
1635         struct obd_device *obd = mdd->mdd_obd_dev;
1636         struct mds_obd *mds = &obd->u.mds;
1637         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1638         int quota_opc = 0;
1639 #endif
1640         int rc;
1641         ENTRY;
1642
1643         /*
1644          * Check -ENOENT early here because we need to get object type
1645          * to calculate credits before transaction start
1646          */
1647         if (!mdd_object_exists(mdd_obj))
1648                 RETURN(-ENOENT);
1649
1650         LASSERT(mdd_object_exists(mdd_obj) > 0);
1651
1652         rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1653         if (rc)
1654                 RETURN(rc);
1655
1656         handle = mdd_trans_start(env, mdd);
1657         if (IS_ERR(handle))
1658                 RETURN(-ENOMEM);
1659
1660         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1661
1662         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1663         if (rc)
1664                 GOTO(cleanup, rc);
1665
1666         __mdd_ref_del(env, mdd_obj, handle, 0);
1667
1668         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1669                 /* unlink dot */
1670                 __mdd_ref_del(env, mdd_obj, handle, 1);
1671         }
1672
1673         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1674         la_copy->la_ctime = ma->ma_attr.la_ctime;
1675
1676         la_copy->la_valid = LA_CTIME;
1677         rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1678         if (rc)
1679                 GOTO(cleanup, rc);
1680
1681         rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1682 #ifdef HAVE_QUOTA_SUPPORT
1683         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1684             ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1685                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1686                 mdd_quota_wrapper(&ma->ma_attr, qids);
1687         }
1688 #endif
1689
1690
1691         EXIT;
1692 cleanup:
1693         mdd_write_unlock(env, mdd_obj);
1694         mdd_trans_stop(env, mdd, rc, handle);
1695 #ifdef HAVE_QUOTA_SUPPORT
1696         if (quota_opc)
1697                 /* Trigger dqrel on the owner of child. If failed,
1698                  * the next call for lquota_chkquota will process it */
1699                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1700                               quota_opc);
1701 #endif
1702         return rc;
1703 }
1704
1705 /* partial operation */
1706 static int mdd_oc_sanity_check(const struct lu_env *env,
1707                                struct mdd_object *obj,
1708                                struct md_attr *ma)
1709 {
1710         int rc;
1711         ENTRY;
1712
1713         switch (ma->ma_attr.la_mode & S_IFMT) {
1714         case S_IFREG:
1715         case S_IFDIR:
1716         case S_IFLNK:
1717         case S_IFCHR:
1718         case S_IFBLK:
1719         case S_IFIFO:
1720         case S_IFSOCK:
1721                 rc = 0;
1722                 break;
1723         default:
1724                 rc = -EINVAL;
1725                 break;
1726         }
1727         RETURN(rc);
1728 }
1729
1730 static int mdd_object_create(const struct lu_env *env,
1731                              struct md_object *obj,
1732                              const struct md_op_spec *spec,
1733                              struct md_attr *ma)
1734 {
1735
1736         struct mdd_device *mdd = mdo2mdd(obj);
1737         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1738         const struct lu_fid *pfid = spec->u.sp_pfid;
1739         struct thandle *handle;
1740 #ifdef HAVE_QUOTA_SUPPORT
1741         struct obd_device *obd = mdd->mdd_obd_dev;
1742         struct obd_export *exp = md_quota(env)->mq_exp;
1743         struct mds_obd *mds = &obd->u.mds;
1744         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1745         int quota_opc = 0, block_count = 0;
1746         int inode_pending[MAXQUOTAS] = { 0, 0 };
1747         int block_pending[MAXQUOTAS] = { 0, 0 };
1748 #endif
1749         int rc = 0;
1750         ENTRY;
1751
1752 #ifdef HAVE_QUOTA_SUPPORT
1753         if (mds->mds_quota) {
1754                 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1755                 mdd_quota_wrapper(&ma->ma_attr, qids);
1756                 /* get file quota for child */
1757                 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1758                                 qids, inode_pending, 1, NULL, 0,
1759                                 NULL, 0);
1760                 switch (ma->ma_attr.la_mode & S_IFMT) {
1761                 case S_IFLNK:
1762                 case S_IFDIR:
1763                         block_count = 2;
1764                         break;
1765                 case S_IFREG:
1766                         block_count = 1;
1767                         break;
1768                 }
1769                 /* get block quota for child */
1770                 if (block_count)
1771                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1772                                         qids, block_pending, block_count,
1773                                         NULL, LQUOTA_FLAGS_BLK, NULL, 0);
1774         }
1775 #endif
1776
1777         mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
1778         handle = mdd_trans_start(env, mdd);
1779         if (IS_ERR(handle))
1780                 GOTO(out_pending, rc = PTR_ERR(handle));
1781
1782         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1783         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1784         if (rc)
1785                 GOTO(unlock, rc);
1786
1787         rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1788         if (rc)
1789                 GOTO(unlock, rc);
1790
1791         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1792                 /* If creating the slave object, set slave EA here. */
1793                 int lmv_size = spec->u.sp_ea.eadatalen;
1794                 struct lmv_stripe_md *lmv;
1795
1796                 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1797                 LASSERT(lmv != NULL && lmv_size > 0);
1798
1799                 rc = __mdd_xattr_set(env, mdd_obj,
1800                                      mdd_buf_get_const(env, lmv, lmv_size),
1801                                      XATTR_NAME_LMV, 0, handle);
1802                 if (rc)
1803                         GOTO(unlock, rc);
1804
1805                 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1806                                            handle, 0);
1807         } else {
1808 #ifdef CONFIG_FS_POSIX_ACL
1809                 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1810                         struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1811
1812                         buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1813                         buf->lb_len = spec->u.sp_ea.eadatalen;
1814                         if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1815                                 rc = __mdd_acl_init(env, mdd_obj, buf,
1816                                                     &ma->ma_attr.la_mode,
1817                                                     handle);
1818                                 if (rc)
1819                                         GOTO(unlock, rc);
1820                                 else
1821                                         ma->ma_attr.la_valid |= LA_MODE;
1822                         }
1823
1824                         pfid = spec->u.sp_ea.fid;
1825                 }
1826 #endif
1827                 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1828                                            spec);
1829         }
1830         EXIT;
1831 unlock:
1832         if (rc == 0)
1833                 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1834         mdd_write_unlock(env, mdd_obj);
1835
1836         mdd_trans_stop(env, mdd, rc, handle);
1837 out_pending:
1838 #ifdef HAVE_QUOTA_SUPPORT
1839         if (quota_opc) {
1840                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1841                                       inode_pending, 0);
1842                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1843                                       block_pending, 1);
1844                 /* Trigger dqacq on the owner of child. If failed,
1845                  * the next call for lquota_chkquota will process it. */
1846                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1847                               quota_opc);
1848         }
1849 #endif
1850         return rc;
1851 }
1852
1853 /* partial link */
1854 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1855                        const struct md_attr *ma)
1856 {
1857         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1858         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1859         struct mdd_device *mdd = mdo2mdd(obj);
1860         struct thandle *handle;
1861         int rc;
1862         ENTRY;
1863
1864         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1865         handle = mdd_trans_start(env, mdd);
1866         if (IS_ERR(handle))
1867                 RETURN(-ENOMEM);
1868
1869         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1870         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1871         if (rc == 0)
1872                 __mdd_ref_add(env, mdd_obj, handle);
1873         mdd_write_unlock(env, mdd_obj);
1874         if (rc == 0) {
1875                 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1876                 la_copy->la_ctime = ma->ma_attr.la_ctime;
1877
1878                 la_copy->la_valid = LA_CTIME;
1879                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1880                                                         handle, 0);
1881         }
1882         mdd_trans_stop(env, mdd, 0, handle);
1883
1884         RETURN(rc);
1885 }
1886
1887 /*
1888  * do NOT or the MAY_*'s, you'll get the weakest
1889  */
1890 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1891 {
1892         int res = 0;
1893
1894         /* Sadly, NFSD reopens a file repeatedly during operation, so the
1895          * "acc_mode = 0" allowance for newly-created files isn't honoured.
1896          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1897          * owner can write to a file even if it is marked readonly to hide
1898          * its brokenness. (bug 5781) */
1899         if (flags & MDS_OPEN_OWNEROVERRIDE) {
1900                 struct md_ucred *uc = md_ucred(env);
1901
1902                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1903                     (la->la_uid == uc->mu_fsuid))
1904                         return 0;
1905         }
1906
1907         if (flags & FMODE_READ)
1908                 res |= MAY_READ;
1909         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1910                 res |= MAY_WRITE;
1911         if (flags & MDS_FMODE_EXEC)
1912                 res |= MAY_EXEC;
1913         return res;
1914 }
1915
1916 static int mdd_open_sanity_check(const struct lu_env *env,
1917                                  struct mdd_object *obj, int flag)
1918 {
1919         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1920         int mode, rc;
1921         ENTRY;
1922
1923         /* EEXIST check */
1924         if (mdd_is_dead_obj(obj))
1925                 RETURN(-ENOENT);
1926
1927         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1928         if (rc)
1929                RETURN(rc);
1930
1931         if (S_ISLNK(tmp_la->la_mode))
1932                 RETURN(-ELOOP);
1933
1934         mode = accmode(env, tmp_la, flag);
1935
1936         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1937                 RETURN(-EISDIR);
1938
1939         if (!(flag & MDS_OPEN_CREATED)) {
1940                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1941                 if (rc)
1942                         RETURN(rc);
1943         }
1944
1945         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1946             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1947                 flag &= ~MDS_OPEN_TRUNC;
1948
1949         /* For writing append-only file must open it with append mode. */
1950         if (mdd_is_append(obj)) {
1951                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
1952                         RETURN(-EPERM);
1953                 if (flag & MDS_OPEN_TRUNC)
1954                         RETURN(-EPERM);
1955         }
1956
1957 #if 0
1958         /*
1959          * Now, flag -- O_NOATIME does not be packed by client.
1960          */
1961         if (flag & O_NOATIME) {
1962                 struct md_ucred *uc = md_ucred(env);
1963
1964                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
1965                     (uc->mu_valid == UCRED_NEW)) &&
1966                     (uc->mu_fsuid != tmp_la->la_uid) &&
1967                     !mdd_capable(uc, CFS_CAP_FOWNER))
1968                         RETURN(-EPERM);
1969         }
1970 #endif
1971
1972         RETURN(0);
1973 }
1974
1975 static int mdd_open(const struct lu_env *env, struct md_object *obj,
1976                     int flags)
1977 {
1978         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1979         int rc = 0;
1980
1981         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1982
1983         rc = mdd_open_sanity_check(env, mdd_obj, flags);
1984         if (rc == 0)
1985                 mdd_obj->mod_count++;
1986
1987         mdd_write_unlock(env, mdd_obj);
1988         return rc;
1989 }
1990
1991 /* return md_attr back,
1992  * if it is last unlink then return lov ea + llog cookie*/
1993 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
1994                     struct md_attr *ma)
1995 {
1996         int rc = 0;
1997         ENTRY;
1998
1999         if (S_ISREG(mdd_object_type(obj))) {
2000                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2001                  * Caller must be ready for that. */
2002
2003                 rc = __mdd_lmm_get(env, obj, ma);
2004                 if ((ma->ma_valid & MA_LOV))
2005                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2006                                             obj, ma);
2007         }
2008         RETURN(rc);
2009 }
2010
2011 /*
2012  * No permission check is needed.
2013  */
2014 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2015                      struct md_attr *ma)
2016 {
2017         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2018         struct mdd_device *mdd = mdo2mdd(obj);
2019         struct thandle    *handle;
2020         int rc;
2021         int reset = 1;
2022
2023 #ifdef HAVE_QUOTA_SUPPORT
2024         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2025         struct mds_obd *mds = &obd->u.mds;
2026         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2027         int quota_opc = 0;
2028 #endif
2029         ENTRY;
2030
2031         rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
2032         if (rc)
2033                 RETURN(rc);
2034         handle = mdd_trans_start(env, mdo2mdd(obj));
2035         if (IS_ERR(handle))
2036                 RETURN(PTR_ERR(handle));
2037
2038         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2039         /* release open count */
2040         mdd_obj->mod_count --;
2041
2042         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2043                 /* remove link to object from orphan index */
2044                 rc = __mdd_orphan_del(env, mdd_obj, handle);
2045                 if (rc == 0) {
2046                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2047                                "list, OSS objects to be destroyed.\n",
2048                                PFID(mdd_object_fid(mdd_obj)));
2049                 } else {
2050                         CERROR("Object "DFID" can not be deleted from orphan "
2051                                 "list, maybe cause OST objects can not be "
2052                                 "destroyed (err: %d).\n",
2053                                 PFID(mdd_object_fid(mdd_obj)), rc);
2054                         /* If object was not deleted from orphan list, do not
2055                          * destroy OSS objects, which will be done when next
2056                          * recovery. */
2057                         GOTO(out, rc);
2058                 }
2059         }
2060
2061         rc = mdd_iattr_get(env, mdd_obj, ma);
2062         /* Object maybe not in orphan list originally, it is rare case for
2063          * mdd_finish_unlink() failure. */
2064         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
2065 #ifdef HAVE_QUOTA_SUPPORT
2066                 if (mds->mds_quota) {
2067                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2068                         mdd_quota_wrapper(&ma->ma_attr, qids);
2069                 }
2070 #endif
2071                 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2072                 if (ma->ma_valid & MA_FLAGS &&
2073                     ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2074                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2075                 } else {
2076                         rc = mdd_object_kill(env, mdd_obj, ma);
2077                                 if (rc == 0)
2078                                         reset = 0;
2079                 }
2080
2081                 if (rc != 0)
2082                         CERROR("Error when prepare to delete Object "DFID" , "
2083                                "which will cause OST objects can not be "
2084                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
2085         }
2086         EXIT;
2087
2088 out:
2089         if (reset)
2090                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2091
2092         mdd_write_unlock(env, mdd_obj);
2093         mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
2094 #ifdef HAVE_QUOTA_SUPPORT
2095         if (quota_opc)
2096                 /* Trigger dqrel on the owner of child. If failed,
2097                  * the next call for lquota_chkquota will process it */
2098                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2099                               quota_opc);
2100 #endif
2101         return rc;
2102 }
2103
2104 /*
2105  * Permission check is done when open,
2106  * no need check again.
2107  */
2108 static int mdd_readpage_sanity_check(const struct lu_env *env,
2109                                      struct mdd_object *obj)
2110 {
2111         struct dt_object *next = mdd_object_child(obj);
2112         int rc;
2113         ENTRY;
2114
2115         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2116                 rc = 0;
2117         else
2118                 rc = -ENOTDIR;
2119
2120         RETURN(rc);
2121 }
2122
2123 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2124                               int first, void *area, int nob,
2125                               const struct dt_it_ops *iops, struct dt_it *it,
2126                               __u64 *start, __u64 *end,
2127                               struct lu_dirent **last, __u32 attr)
2128 {
2129         int                     result;
2130         __u64                   hash = 0;
2131         struct lu_dirent       *ent;
2132
2133         if (first) {
2134                 memset(area, 0, sizeof (struct lu_dirpage));
2135                 area += sizeof (struct lu_dirpage);
2136                 nob  -= sizeof (struct lu_dirpage);
2137         }
2138
2139         ent  = area;
2140         do {
2141                 int    len;
2142                 int    recsize;
2143
2144                 len  = iops->key_size(env, it);
2145
2146                 /* IAM iterator can return record with zero len. */
2147                 if (len == 0)
2148                         goto next;
2149
2150                 hash = iops->store(env, it);
2151                 if (unlikely(first)) {
2152                         first = 0;
2153                         *start = hash;
2154                 }
2155
2156                 /* calculate max space required for lu_dirent */
2157                 recsize = lu_dirent_calc_size(len, attr);
2158
2159                 if (nob >= recsize) {
2160                         result = iops->rec(env, it, ent, attr);
2161                         if (result == -ESTALE)
2162                                 goto next;
2163                         if (result != 0)
2164                                 goto out;
2165
2166                         /* osd might not able to pack all attributes,
2167                          * so recheck rec length */
2168                         recsize = le16_to_cpu(ent->lde_reclen);
2169                 } else {
2170                         /*
2171                          * record doesn't fit into page, enlarge previous one.
2172                          */
2173                         if (*last) {
2174                                 (*last)->lde_reclen =
2175                                         cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
2176                                                         nob);
2177                                 result = 0;
2178                         } else
2179                                 result = -EINVAL;
2180
2181                         goto out;
2182                 }
2183                 *last = ent;
2184                 ent = (void *)ent + recsize;
2185                 nob -= recsize;
2186
2187 next:
2188                 result = iops->next(env, it);
2189                 if (result == -ESTALE)
2190                         goto next;
2191         } while (result == 0);
2192
2193 out:
2194         *end = hash;
2195         return result;
2196 }
2197
2198 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2199                           const struct lu_rdpg *rdpg)
2200 {
2201         struct dt_it      *it;
2202         struct dt_object  *next = mdd_object_child(obj);
2203         const struct dt_it_ops  *iops;
2204         struct page       *pg;
2205         struct lu_dirent  *last = NULL;
2206         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2207         int i;
2208         int rc;
2209         int nob;
2210         __u64 hash_start;
2211         __u64 hash_end = 0;
2212
2213         LASSERT(rdpg->rp_pages != NULL);
2214         LASSERT(next->do_index_ops != NULL);
2215
2216         if (rdpg->rp_count <= 0)
2217                 return -EFAULT;
2218
2219         /*
2220          * iterate through directory and fill pages from @rdpg
2221          */
2222         iops = &next->do_index_ops->dio_it;
2223         it = iops->init(env, next, mdd_object_capa(env, obj));
2224         if (IS_ERR(it))
2225                 return PTR_ERR(it);
2226
2227         rc = iops->load(env, it, rdpg->rp_hash);
2228
2229         if (rc == 0){
2230                 /*
2231                  * Iterator didn't find record with exactly the key requested.
2232                  *
2233                  * It is currently either
2234                  *
2235                  *     - positioned above record with key less than
2236                  *     requested---skip it.
2237                  *
2238                  *     - or not positioned at all (is in IAM_IT_SKEWED
2239                  *     state)---position it on the next item.
2240                  */
2241                 rc = iops->next(env, it);
2242         } else if (rc > 0)
2243                 rc = 0;
2244
2245         /*
2246          * At this point and across for-loop:
2247          *
2248          *  rc == 0 -> ok, proceed.
2249          *  rc >  0 -> end of directory.
2250          *  rc <  0 -> error.
2251          */
2252         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2253              i++, nob -= CFS_PAGE_SIZE) {
2254                 LASSERT(i < rdpg->rp_npages);
2255                 pg = rdpg->rp_pages[i];
2256                 rc = mdd_dir_page_build(env, mdd, !i, cfs_kmap(pg),
2257                                         min_t(int, nob, CFS_PAGE_SIZE), iops,
2258                                         it, &hash_start, &hash_end, &last,
2259                                         rdpg->rp_attrs);
2260                 if (rc != 0 || i == rdpg->rp_npages - 1) {
2261                         if (last)
2262                                 last->lde_reclen = 0;
2263                 }
2264                 cfs_kunmap(pg);
2265         }
2266         if (rc > 0) {
2267                 /*
2268                  * end of directory.
2269                  */
2270                 hash_end = DIR_END_OFF;
2271                 rc = 0;
2272         }
2273         if (rc == 0) {
2274                 struct lu_dirpage *dp;
2275
2276                 dp = cfs_kmap(rdpg->rp_pages[0]);
2277                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2278                 dp->ldp_hash_end   = cpu_to_le64(hash_end);
2279                 if (i == 0)
2280                         /*
2281                          * No pages were processed, mark this.
2282                          */
2283                         dp->ldp_flags |= LDF_EMPTY;
2284
2285                 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2286                 cfs_kunmap(rdpg->rp_pages[0]);
2287         }
2288         iops->put(env, it);
2289         iops->fini(env, it);
2290
2291         return rc;
2292 }
2293
2294 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2295                  const struct lu_rdpg *rdpg)
2296 {
2297         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2298         int rc;
2299         ENTRY;
2300
2301         LASSERT(mdd_object_exists(mdd_obj));
2302
2303         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2304         rc = mdd_readpage_sanity_check(env, mdd_obj);
2305         if (rc)
2306                 GOTO(out_unlock, rc);
2307
2308         if (mdd_is_dead_obj(mdd_obj)) {
2309                 struct page *pg;
2310                 struct lu_dirpage *dp;
2311
2312                 /*
2313                  * According to POSIX, please do not return any entry to client:
2314                  * even dot and dotdot should not be returned.
2315                  */
2316                 CWARN("readdir from dead object: "DFID"\n",
2317                         PFID(mdd_object_fid(mdd_obj)));
2318
2319                 if (rdpg->rp_count <= 0)
2320                         GOTO(out_unlock, rc = -EFAULT);
2321                 LASSERT(rdpg->rp_pages != NULL);
2322
2323                 pg = rdpg->rp_pages[0];
2324                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2325                 memset(dp, 0 , sizeof(struct lu_dirpage));
2326                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2327                 dp->ldp_hash_end   = cpu_to_le64(DIR_END_OFF);
2328                 dp->ldp_flags |= LDF_EMPTY;
2329                 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2330                 cfs_kunmap(pg);
2331                 GOTO(out_unlock, rc = 0);
2332         }
2333
2334         rc = __mdd_readpage(env, mdd_obj, rdpg);
2335
2336         EXIT;
2337 out_unlock:
2338         mdd_read_unlock(env, mdd_obj);
2339         return rc;
2340 }
2341
2342 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2343 {
2344         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2345         struct dt_object *next;
2346
2347         LASSERT(mdd_object_exists(mdd_obj));
2348         next = mdd_object_child(mdd_obj);
2349         return next->do_ops->do_object_sync(env, next);
2350 }
2351
2352 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2353                                         struct md_object *obj)
2354 {
2355         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2356
2357         LASSERT(mdd_object_exists(mdd_obj));
2358         return do_version_get(env, mdd_object_child(mdd_obj));
2359 }
2360
2361 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2362                             dt_obj_version_t version)
2363 {
2364         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2365
2366         LASSERT(mdd_object_exists(mdd_obj));
2367         return do_version_set(env, mdd_object_child(mdd_obj), version);
2368 }
2369
2370 const struct md_object_operations mdd_obj_ops = {
2371         .moo_permission    = mdd_permission,
2372         .moo_attr_get      = mdd_attr_get,
2373         .moo_attr_set      = mdd_attr_set,
2374         .moo_xattr_get     = mdd_xattr_get,
2375         .moo_xattr_set     = mdd_xattr_set,
2376         .moo_xattr_list    = mdd_xattr_list,
2377         .moo_xattr_del     = mdd_xattr_del,
2378         .moo_object_create = mdd_object_create,
2379         .moo_ref_add       = mdd_ref_add,
2380         .moo_ref_del       = mdd_ref_del,
2381         .moo_open          = mdd_open,
2382         .moo_close         = mdd_close,
2383         .moo_readpage      = mdd_readpage,
2384         .moo_readlink      = mdd_readlink,
2385         .moo_capa_get      = mdd_capa_get,
2386         .moo_object_sync   = mdd_object_sync,
2387         .moo_version_get   = mdd_version_get,
2388         .moo_version_set   = mdd_version_set,
2389         .moo_path          = mdd_path,
2390 };