Whamcloud - gitweb
Introduce .gitignore files.
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_object.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49 #ifdef HAVE_EXT4_LDISKFS
50 #include <ldiskfs/ldiskfs_jbd2.h>
51 #else
52 #include <linux/jbd.h>
53 #endif
54 #include <obd.h>
55 #include <obd_class.h>
56 #include <obd_support.h>
57 #include <lprocfs_status.h>
58 /* fid_be_cpu(), fid_cpu_to_be(). */
59 #include <lustre_fid.h>
60
61 #include <lustre_param.h>
62 #ifdef HAVE_EXT4_LDISKFS
63 #include <ldiskfs/ldiskfs.h>
64 #else
65 #include <linux/ldiskfs_fs.h>
66 #endif
67 #include <lustre_mds.h>
68 #include <lustre/lustre_idl.h>
69
70 #include "mdd_internal.h"
71
72 static const struct lu_object_operations mdd_lu_obj_ops;
73
74 static int mdd_xattr_get(const struct lu_env *env,
75                          struct md_object *obj, struct lu_buf *buf,
76                          const char *name);
77
78 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
79                  void **data)
80 {
81         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
82                  PFID(mdd_object_fid(obj)));
83         mdo_data_get(env, obj, data);
84         return 0;
85 }
86
87 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
88                struct lu_attr *la, struct lustre_capa *capa)
89 {
90         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
91                  PFID(mdd_object_fid(obj)));
92         return mdo_attr_get(env, obj, la, capa);
93 }
94
95 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
96 {
97         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
98
99         if (flags & LUSTRE_APPEND_FL)
100                 obj->mod_flags |= APPEND_OBJ;
101
102         if (flags & LUSTRE_IMMUTABLE_FL)
103                 obj->mod_flags |= IMMUTE_OBJ;
104 }
105
106 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
107 {
108         struct mdd_thread_info *info;
109
110         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
111         LASSERT(info != NULL);
112         return info;
113 }
114
115 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
116 {
117         struct lu_buf *buf;
118
119         buf = &mdd_env_info(env)->mti_buf;
120         buf->lb_buf = area;
121         buf->lb_len = len;
122         return buf;
123 }
124
125 void mdd_buf_put(struct lu_buf *buf)
126 {
127         if (buf == NULL || buf->lb_buf == NULL)
128                 return;
129         if (buf->lb_vmalloc)
130                 OBD_VFREE(buf->lb_buf, buf->lb_len);
131         else
132                 OBD_FREE(buf->lb_buf, buf->lb_len);
133         buf->lb_buf = NULL;
134 }
135
136 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
137                                        const void *area, ssize_t len)
138 {
139         struct lu_buf *buf;
140
141         buf = &mdd_env_info(env)->mti_buf;
142         buf->lb_buf = (void *)area;
143         buf->lb_len = len;
144         return buf;
145 }
146
147 #define BUF_VMALLOC_SIZE (CFS_PAGE_SIZE<<2) /* 16k */
148 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
149 {
150         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
151
152         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
153                 if (buf->lb_vmalloc)
154                         OBD_VFREE(buf->lb_buf, buf->lb_len);
155                 else
156                         OBD_FREE(buf->lb_buf, buf->lb_len);
157                 buf->lb_buf = NULL;
158         }
159         if (buf->lb_buf == NULL) {
160                 buf->lb_len = len;
161                 if (buf->lb_len <= BUF_VMALLOC_SIZE) {
162                         OBD_ALLOC(buf->lb_buf, buf->lb_len);
163                         buf->lb_vmalloc = 0;
164                 }
165                 if (buf->lb_buf == NULL) {
166                         OBD_VMALLOC(buf->lb_buf, buf->lb_len);
167                         buf->lb_vmalloc = 1;
168                 }
169                 if (buf->lb_buf == NULL)
170                         buf->lb_len = 0;
171         }
172         return buf;
173 }
174
175 /** Increase the size of the \a mti_big_buf.
176  * preserves old data in buffer
177  * old buffer remains unchanged on error
178  * \retval 0 or -ENOMEM
179  */
180 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
181 {
182         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
183         struct lu_buf buf;
184
185         LASSERT(len >= oldbuf->lb_len);
186         if (len > BUF_VMALLOC_SIZE) {
187                 OBD_VMALLOC(buf.lb_buf, len);
188                 buf.lb_vmalloc = 1;
189         } else {
190                 OBD_ALLOC(buf.lb_buf, len);
191                 buf.lb_vmalloc = 0;
192         }
193         if (buf.lb_buf == NULL)
194                 return -ENOMEM;
195
196         buf.lb_len = len;
197         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
198
199         if (oldbuf->lb_vmalloc)
200                 OBD_VFREE(oldbuf->lb_buf, oldbuf->lb_len);
201         else
202                 OBD_FREE(oldbuf->lb_buf, oldbuf->lb_len);
203
204         memcpy(oldbuf, &buf, sizeof(buf));
205
206         return 0;
207 }
208
209 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
210                                        struct mdd_device *mdd)
211 {
212         struct mdd_thread_info *mti = mdd_env_info(env);
213         int                     max_cookie_size;
214
215         max_cookie_size = mdd_lov_cookiesize(env, mdd);
216         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
217                 if (mti->mti_max_cookie)
218                         OBD_FREE(mti->mti_max_cookie, mti->mti_max_cookie_size);
219                 mti->mti_max_cookie = NULL;
220                 mti->mti_max_cookie_size = 0;
221         }
222         if (unlikely(mti->mti_max_cookie == NULL)) {
223                 OBD_ALLOC(mti->mti_max_cookie, max_cookie_size);
224                 if (likely(mti->mti_max_cookie != NULL))
225                         mti->mti_max_cookie_size = max_cookie_size;
226         }
227         if (likely(mti->mti_max_cookie != NULL))
228                 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
229         return mti->mti_max_cookie;
230 }
231
232 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
233                                    struct mdd_device *mdd)
234 {
235         struct mdd_thread_info *mti = mdd_env_info(env);
236         int                     max_lmm_size;
237
238         max_lmm_size = mdd_lov_mdsize(env, mdd);
239         if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
240                 if (mti->mti_max_lmm)
241                         OBD_FREE(mti->mti_max_lmm, mti->mti_max_lmm_size);
242                 mti->mti_max_lmm = NULL;
243                 mti->mti_max_lmm_size = 0;
244         }
245         if (unlikely(mti->mti_max_lmm == NULL)) {
246                 OBD_ALLOC(mti->mti_max_lmm, max_lmm_size);
247                 if (unlikely(mti->mti_max_lmm != NULL))
248                         mti->mti_max_lmm_size = max_lmm_size;
249         }
250         return mti->mti_max_lmm;
251 }
252
253 struct lu_object *mdd_object_alloc(const struct lu_env *env,
254                                    const struct lu_object_header *hdr,
255                                    struct lu_device *d)
256 {
257         struct mdd_object *mdd_obj;
258
259         OBD_ALLOC_PTR(mdd_obj);
260         if (mdd_obj != NULL) {
261                 struct lu_object *o;
262
263                 o = mdd2lu_obj(mdd_obj);
264                 lu_object_init(o, NULL, d);
265                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
266                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
267                 mdd_obj->mod_count = 0;
268                 o->lo_ops = &mdd_lu_obj_ops;
269                 return o;
270         } else {
271                 return NULL;
272         }
273 }
274
275 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
276                            const struct lu_object_conf *unused)
277 {
278         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
279         struct mdd_object *mdd_obj = lu2mdd_obj(o);
280         struct lu_object  *below;
281         struct lu_device  *under;
282         ENTRY;
283
284         mdd_obj->mod_cltime = 0;
285         under = &d->mdd_child->dd_lu_dev;
286         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
287         mdd_pdlock_init(mdd_obj);
288         if (below == NULL)
289                 RETURN(-ENOMEM);
290
291         lu_object_add(o, below);
292
293         RETURN(0);
294 }
295
296 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
297 {
298         if (lu_object_exists(o))
299                 return mdd_get_flags(env, lu2mdd_obj(o));
300         else
301                 return 0;
302 }
303
304 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
305 {
306         struct mdd_object *mdd = lu2mdd_obj(o);
307
308         lu_object_fini(o);
309         OBD_FREE_PTR(mdd);
310 }
311
312 static int mdd_object_print(const struct lu_env *env, void *cookie,
313                             lu_printer_t p, const struct lu_object *o)
314 {
315         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
316         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
317                     "valid=%x, cltime=%llu, flags=%lx)",
318                     mdd, mdd->mod_count, mdd->mod_valid,
319                     mdd->mod_cltime, mdd->mod_flags);
320 }
321
322 static const struct lu_object_operations mdd_lu_obj_ops = {
323         .loo_object_init    = mdd_object_init,
324         .loo_object_start   = mdd_object_start,
325         .loo_object_free    = mdd_object_free,
326         .loo_object_print   = mdd_object_print,
327 };
328
329 struct mdd_object *mdd_object_find(const struct lu_env *env,
330                                    struct mdd_device *d,
331                                    const struct lu_fid *f)
332 {
333         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
334 }
335
336 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
337                         const char *path, struct lu_fid *fid)
338 {
339         struct lu_buf *buf;
340         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
341         struct mdd_object *obj;
342         struct lu_name *lname = &mdd_env_info(env)->mti_name;
343         char *name;
344         int rc = 0;
345         ENTRY;
346
347         /* temp buffer for path element */
348         buf = mdd_buf_alloc(env, PATH_MAX);
349         if (buf->lb_buf == NULL)
350                 RETURN(-ENOMEM);
351
352         lname->ln_name = name = buf->lb_buf;
353         lname->ln_namelen = 0;
354         *f = mdd->mdd_root_fid;
355
356         while(1) {
357                 while (*path == '/')
358                         path++;
359                 if (*path == '\0')
360                         break;
361                 while (*path != '/' && *path != '\0') {
362                         *name = *path;
363                         path++;
364                         name++;
365                         lname->ln_namelen++;
366                 }
367
368                 *name = '\0';
369                 /* find obj corresponding to fid */
370                 obj = mdd_object_find(env, mdd, f);
371                 if (obj == NULL)
372                         GOTO(out, rc = -EREMOTE);
373                 if (IS_ERR(obj))
374                         GOTO(out, rc = -PTR_ERR(obj));
375                 /* get child fid from parent and name */
376                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
377                 mdd_object_put(env, obj);
378                 if (rc)
379                         break;
380
381                 name = buf->lb_buf;
382                 lname->ln_namelen = 0;
383         }
384
385         if (!rc)
386                 *fid = *f;
387 out:
388         RETURN(rc);
389 }
390
391 /** The maximum depth that fid2path() will search.
392  * This is limited only because we want to store the fids for
393  * historical path lookup purposes.
394  */
395 #define MAX_PATH_DEPTH 100
396
397 /** mdd_path() lookup structure. */
398 struct path_lookup_info {
399         __u64                pli_recno;        /**< history point */
400         __u64                pli_currec;       /**< current record */
401         struct lu_fid        pli_fid;
402         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
403         struct mdd_object   *pli_mdd_obj;
404         char                *pli_path;         /**< full path */
405         int                  pli_pathlen;
406         int                  pli_linkno;       /**< which hardlink to follow */
407         int                  pli_fidcount;     /**< number of \a pli_fids */
408 };
409
410 static int mdd_path_current(const struct lu_env *env,
411                             struct path_lookup_info *pli)
412 {
413         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
414         struct mdd_object *mdd_obj;
415         struct lu_buf     *buf = NULL;
416         struct link_ea_header *leh;
417         struct link_ea_entry  *lee;
418         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
419         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
420         char *ptr;
421         int reclen;
422         int rc;
423         ENTRY;
424
425         ptr = pli->pli_path + pli->pli_pathlen - 1;
426         *ptr = 0;
427         --ptr;
428         pli->pli_fidcount = 0;
429         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
430
431         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
432                 mdd_obj = mdd_object_find(env, mdd,
433                                           &pli->pli_fids[pli->pli_fidcount]);
434                 if (mdd_obj == NULL)
435                         GOTO(out, rc = -EREMOTE);
436                 if (IS_ERR(mdd_obj))
437                         GOTO(out, rc = -PTR_ERR(mdd_obj));
438                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
439                 if (rc <= 0) {
440                         mdd_object_put(env, mdd_obj);
441                         if (rc == -1)
442                                 rc = -EREMOTE;
443                         else if (rc == 0)
444                                 /* Do I need to error out here? */
445                                 rc = -ENOENT;
446                         GOTO(out, rc);
447                 }
448
449                 /* Get parent fid and object name */
450                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
451                 buf = mdd_links_get(env, mdd_obj);
452                 mdd_read_unlock(env, mdd_obj);
453                 mdd_object_put(env, mdd_obj);
454                 if (IS_ERR(buf))
455                         GOTO(out, rc = PTR_ERR(buf));
456
457                 leh = buf->lb_buf;
458                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
459                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
460
461                 /* If set, use link #linkno for path lookup, otherwise use
462                    link #0.  Only do this for the final path element. */
463                 if ((pli->pli_fidcount == 0) &&
464                     (pli->pli_linkno < leh->leh_reccount)) {
465                         int count;
466                         for (count = 0; count < pli->pli_linkno; count++) {
467                                 lee = (struct link_ea_entry *)
468                                      ((char *)lee + reclen);
469                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
470                         }
471                         if (pli->pli_linkno < leh->leh_reccount - 1)
472                                 /* indicate to user there are more links */
473                                 pli->pli_linkno++;
474                 }
475
476                 /* Pack the name in the end of the buffer */
477                 ptr -= tmpname->ln_namelen;
478                 if (ptr - 1 <= pli->pli_path)
479                         GOTO(out, rc = -EOVERFLOW);
480                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
481                 *(--ptr) = '/';
482
483                 /* Store the parent fid for historic lookup */
484                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
485                         GOTO(out, rc = -EOVERFLOW);
486                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
487         }
488
489         /* Verify that our path hasn't changed since we started the lookup.
490            Record the current index, and verify the path resolves to the
491            same fid. If it does, then the path is correct as of this index. */
492         spin_lock(&mdd->mdd_cl.mc_lock);
493         pli->pli_currec = mdd->mdd_cl.mc_index;
494         spin_unlock(&mdd->mdd_cl.mc_lock);
495         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
496         if (rc) {
497                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
498                 GOTO (out, rc = -EAGAIN);
499         }
500         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
501                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
502                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
503                        PFID(&pli->pli_fid));
504                 GOTO(out, rc = -EAGAIN);
505         }
506
507         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
508
509         EXIT;
510 out:
511         if (buf && !IS_ERR(buf) && buf->lb_vmalloc)
512                 /* if we vmalloced a large buffer drop it */
513                 mdd_buf_put(buf);
514
515         return rc;
516 }
517
518 static int mdd_path_historic(const struct lu_env *env,
519                              struct path_lookup_info *pli)
520 {
521         return 0;
522 }
523
524 /* Returns the full path to this fid, as of changelog record recno. */
525 static int mdd_path(const struct lu_env *env, struct md_object *obj,
526                     char *path, int pathlen, __u64 *recno, int *linkno)
527 {
528         struct path_lookup_info *pli;
529         int tries = 3;
530         int rc = -EAGAIN;
531         ENTRY;
532
533         if (pathlen < 3)
534                 RETURN(-EOVERFLOW);
535
536         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
537                 path[0] = '/';
538                 path[1] = '\0';
539                 RETURN(0);
540         }
541
542         OBD_ALLOC_PTR(pli);
543         if (pli == NULL)
544                 RETURN(-ENOMEM);
545
546         pli->pli_mdd_obj = md2mdd_obj(obj);
547         pli->pli_recno = *recno;
548         pli->pli_path = path;
549         pli->pli_pathlen = pathlen;
550         pli->pli_linkno = *linkno;
551
552         /* Retry multiple times in case file is being moved */
553         while (tries-- && rc == -EAGAIN)
554                 rc = mdd_path_current(env, pli);
555
556         /* For historical path lookup, the current links may not have existed
557          * at "recno" time.  We must switch over to earlier links/parents
558          * by using the changelog records.  If the earlier parent doesn't
559          * exist, we must search back through the changelog to reconstruct
560          * its parents, then check if it exists, etc.
561          * We may ignore this problem for the initial implementation and
562          * state that an "original" hardlink must still exist for us to find
563          * historic path name. */
564         if (pli->pli_recno != -1) {
565                 rc = mdd_path_historic(env, pli);
566         } else {
567                 *recno = pli->pli_currec;
568                 /* Return next link index to caller */
569                 *linkno = pli->pli_linkno;
570         }
571
572         OBD_FREE_PTR(pli);
573
574         RETURN (rc);
575 }
576
577 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
578 {
579         struct lu_attr *la = &mdd_env_info(env)->mti_la;
580         int rc;
581
582         ENTRY;
583         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
584         if (rc == 0) {
585                 mdd_flags_xlate(obj, la->la_flags);
586                 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
587                         obj->mod_flags |= MNLINK_OBJ;
588         }
589         RETURN(rc);
590 }
591
592 /* get only inode attributes */
593 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
594                   struct md_attr *ma)
595 {
596         int rc = 0;
597         ENTRY;
598
599         if (ma->ma_valid & MA_INODE)
600                 RETURN(0);
601
602         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
603                           mdd_object_capa(env, mdd_obj));
604         if (rc == 0)
605                 ma->ma_valid |= MA_INODE;
606         RETURN(rc);
607 }
608
609 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm,
610                        int *size)
611 {
612         struct lov_desc *ldesc;
613         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
614         ENTRY;
615
616         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
617         LASSERT(ldesc != NULL);
618
619         if (!lmm)
620                 RETURN(0);
621
622         lmm->lmm_magic = LOV_MAGIC_V1;
623         lmm->lmm_object_gr = LOV_OBJECT_GROUP_DEFAULT;
624         lmm->lmm_pattern = ldesc->ld_pattern;
625         lmm->lmm_stripe_size = ldesc->ld_default_stripe_size;
626         lmm->lmm_stripe_count = ldesc->ld_default_stripe_count;
627         *size = sizeof(struct lov_mds_md);
628
629         RETURN(sizeof(struct lov_mds_md));
630 }
631
632 /* get lov EA only */
633 static int __mdd_lmm_get(const struct lu_env *env,
634                          struct mdd_object *mdd_obj, struct md_attr *ma)
635 {
636         int rc;
637         ENTRY;
638
639         if (ma->ma_valid & MA_LOV)
640                 RETURN(0);
641
642         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
643                         XATTR_NAME_LOV);
644
645         if (rc == 0 && (ma->ma_need & MA_LOV_DEF)) {
646                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm,
647                                 &ma->ma_lmm_size);
648         }
649
650         if (rc > 0) {
651                 ma->ma_valid |= MA_LOV;
652                 rc = 0;
653         }
654         RETURN(rc);
655 }
656
657 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
658                        struct md_attr *ma)
659 {
660         int rc;
661         ENTRY;
662
663         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
664         rc = __mdd_lmm_get(env, mdd_obj, ma);
665         mdd_read_unlock(env, mdd_obj);
666         RETURN(rc);
667 }
668
669 /* get lmv EA only*/
670 static int __mdd_lmv_get(const struct lu_env *env,
671                          struct mdd_object *mdd_obj, struct md_attr *ma)
672 {
673         int rc;
674         ENTRY;
675
676         if (ma->ma_valid & MA_LMV)
677                 RETURN(0);
678
679         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
680                         XATTR_NAME_LMV);
681         if (rc > 0) {
682                 ma->ma_valid |= MA_LMV;
683                 rc = 0;
684         }
685         RETURN(rc);
686 }
687
688 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
689                          struct md_attr *ma)
690 {
691         struct mdd_thread_info *info = mdd_env_info(env);
692         struct lustre_mdt_attrs *lma =
693                                  (struct lustre_mdt_attrs *)info->mti_xattr_buf;
694         int lma_size;
695         int rc;
696         ENTRY;
697
698         /* If all needed data are already valid, nothing to do */
699         if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
700             (ma->ma_need & (MA_HSM | MA_SOM)))
701                 RETURN(0);
702
703         /* Read LMA from disk EA */
704         lma_size = sizeof(info->mti_xattr_buf);
705         rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
706         if (rc <= 0)
707                 RETURN(rc);
708
709         /* Useless to check LMA incompatibility because this is already done in
710          * osd_ea_fid_get(), and this will fail long before this code is
711          * called.
712          * So, if we are here, LMA is compatible.
713          */
714
715         lustre_lma_swab(lma);
716
717         /* Swab and copy LMA */
718         if (ma->ma_need & MA_HSM) {
719                 if (lma->lma_compat & LMAC_HSM)
720                         ma->ma_hsm_flags = lma->lma_flags & HSM_FLAGS_MASK;
721                 else
722                         ma->ma_hsm_flags = 0;
723                 ma->ma_valid |= MA_HSM;
724         }
725         if (ma->ma_need & MA_SOM) {
726
727                 /* XXX: Here, copy and swab SoM data, and then remove this
728                  * assert. */
729                 LASSERT(!(ma->ma_need & MA_SOM));
730
731                 ma->ma_valid |= MA_SOM;
732         }
733
734         RETURN(0);
735 }
736
737 static int mdd_attr_get_internal(const struct lu_env *env,
738                                  struct mdd_object *mdd_obj,
739                                  struct md_attr *ma)
740 {
741         int rc = 0;
742         ENTRY;
743
744         if (ma->ma_need & MA_INODE)
745                 rc = mdd_iattr_get(env, mdd_obj, ma);
746
747         if (rc == 0 && ma->ma_need & MA_LOV) {
748                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
749                     S_ISDIR(mdd_object_type(mdd_obj)))
750                         rc = __mdd_lmm_get(env, mdd_obj, ma);
751         }
752         if (rc == 0 && ma->ma_need & MA_LMV) {
753                 if (S_ISDIR(mdd_object_type(mdd_obj)))
754                         rc = __mdd_lmv_get(env, mdd_obj, ma);
755         }
756         if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
757                 if (S_ISREG(mdd_object_type(mdd_obj)))
758                         rc = __mdd_lma_get(env, mdd_obj, ma);
759         }
760 #ifdef CONFIG_FS_POSIX_ACL
761         if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
762                 if (S_ISDIR(mdd_object_type(mdd_obj)))
763                         rc = mdd_def_acl_get(env, mdd_obj, ma);
764         }
765 #endif
766         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
767                rc, ma->ma_valid);
768         RETURN(rc);
769 }
770
771 int mdd_attr_get_internal_locked(const struct lu_env *env,
772                                  struct mdd_object *mdd_obj, struct md_attr *ma)
773 {
774         int rc;
775         int needlock = ma->ma_need &
776                        (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM);
777
778         if (needlock)
779                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
780         rc = mdd_attr_get_internal(env, mdd_obj, ma);
781         if (needlock)
782                 mdd_read_unlock(env, mdd_obj);
783         return rc;
784 }
785
786 /*
787  * No permission check is needed.
788  */
789 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
790                         struct md_attr *ma)
791 {
792         struct mdd_object *mdd_obj = md2mdd_obj(obj);
793         int                rc;
794
795         ENTRY;
796         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
797         RETURN(rc);
798 }
799
800 /*
801  * No permission check is needed.
802  */
803 static int mdd_xattr_get(const struct lu_env *env,
804                          struct md_object *obj, struct lu_buf *buf,
805                          const char *name)
806 {
807         struct mdd_object *mdd_obj = md2mdd_obj(obj);
808         int rc;
809
810         ENTRY;
811
812         LASSERT(mdd_object_exists(mdd_obj));
813
814         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
815         rc = mdo_xattr_get(env, mdd_obj, buf, name,
816                            mdd_object_capa(env, mdd_obj));
817         mdd_read_unlock(env, mdd_obj);
818
819         RETURN(rc);
820 }
821
822 /*
823  * Permission check is done when open,
824  * no need check again.
825  */
826 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
827                         struct lu_buf *buf)
828 {
829         struct mdd_object *mdd_obj = md2mdd_obj(obj);
830         struct dt_object  *next;
831         loff_t             pos = 0;
832         int                rc;
833         ENTRY;
834
835         LASSERT(mdd_object_exists(mdd_obj));
836
837         next = mdd_object_child(mdd_obj);
838         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
839         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
840                                          mdd_object_capa(env, mdd_obj));
841         mdd_read_unlock(env, mdd_obj);
842         RETURN(rc);
843 }
844
845 /*
846  * No permission check is needed.
847  */
848 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
849                           struct lu_buf *buf)
850 {
851         struct mdd_object *mdd_obj = md2mdd_obj(obj);
852         int rc;
853
854         ENTRY;
855
856         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
857         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
858         mdd_read_unlock(env, mdd_obj);
859
860         RETURN(rc);
861 }
862
863 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
864                                struct mdd_object *c, struct md_attr *ma,
865                                struct thandle *handle,
866                                const struct md_op_spec *spec)
867 {
868         struct lu_attr *attr = &ma->ma_attr;
869         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
870         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
871         const struct dt_index_features *feat = spec->sp_feat;
872         int rc;
873         ENTRY;
874
875         if (!mdd_object_exists(c)) {
876                 struct dt_object *next = mdd_object_child(c);
877                 LASSERT(next);
878
879                 if (feat != &dt_directory_features && feat != NULL)
880                         dof->dof_type = DFT_INDEX;
881                 else
882                         dof->dof_type = dt_mode_to_dft(attr->la_mode);
883
884                 dof->u.dof_idx.di_feat = feat;
885
886                 /* @hint will be initialized by underlying device. */
887                 next->do_ops->do_ah_init(env, hint,
888                                          p ? mdd_object_child(p) : NULL,
889                                          attr->la_mode & S_IFMT);
890
891                 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
892                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
893         } else
894                 rc = -EEXIST;
895
896         RETURN(rc);
897 }
898
899 /**
900  * Make sure the ctime is increased only.
901  */
902 static inline int mdd_attr_check(const struct lu_env *env,
903                                  struct mdd_object *obj,
904                                  struct lu_attr *attr)
905 {
906         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
907         int rc;
908         ENTRY;
909
910         if (attr->la_valid & LA_CTIME) {
911                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
912                 if (rc)
913                         RETURN(rc);
914
915                 if (attr->la_ctime < tmp_la->la_ctime)
916                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
917                 else if (attr->la_valid == LA_CTIME &&
918                          attr->la_ctime == tmp_la->la_ctime)
919                         attr->la_valid &= ~LA_CTIME;
920         }
921         RETURN(0);
922 }
923
924 int mdd_attr_set_internal(const struct lu_env *env,
925                           struct mdd_object *obj,
926                           struct lu_attr *attr,
927                           struct thandle *handle,
928                           int needacl)
929 {
930         int rc;
931         ENTRY;
932
933         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
934 #ifdef CONFIG_FS_POSIX_ACL
935         if (!rc && (attr->la_valid & LA_MODE) && needacl)
936                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
937 #endif
938         RETURN(rc);
939 }
940
941 int mdd_attr_check_set_internal(const struct lu_env *env,
942                                 struct mdd_object *obj,
943                                 struct lu_attr *attr,
944                                 struct thandle *handle,
945                                 int needacl)
946 {
947         int rc;
948         ENTRY;
949
950         rc = mdd_attr_check(env, obj, attr);
951         if (rc)
952                 RETURN(rc);
953
954         if (attr->la_valid)
955                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
956         RETURN(rc);
957 }
958
959 static int mdd_attr_set_internal_locked(const struct lu_env *env,
960                                         struct mdd_object *obj,
961                                         struct lu_attr *attr,
962                                         struct thandle *handle,
963                                         int needacl)
964 {
965         int rc;
966         ENTRY;
967
968         needacl = needacl && (attr->la_valid & LA_MODE);
969         if (needacl)
970                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
971         rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
972         if (needacl)
973                 mdd_write_unlock(env, obj);
974         RETURN(rc);
975 }
976
977 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
978                                        struct mdd_object *obj,
979                                        struct lu_attr *attr,
980                                        struct thandle *handle,
981                                        int needacl)
982 {
983         int rc;
984         ENTRY;
985
986         needacl = needacl && (attr->la_valid & LA_MODE);
987         if (needacl)
988                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
989         rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
990         if (needacl)
991                 mdd_write_unlock(env, obj);
992         RETURN(rc);
993 }
994
995 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
996                     const struct lu_buf *buf, const char *name,
997                     int fl, struct thandle *handle)
998 {
999         struct lustre_capa *capa = mdd_object_capa(env, obj);
1000         int rc = -EINVAL;
1001         ENTRY;
1002
1003         if (buf->lb_buf && buf->lb_len > 0)
1004                 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1005         else if (buf->lb_buf == NULL && buf->lb_len == 0)
1006                 rc = mdo_xattr_del(env, obj, name, handle, capa);
1007
1008         RETURN(rc);
1009 }
1010
1011 /*
1012  * This gives the same functionality as the code between
1013  * sys_chmod and inode_setattr
1014  * chown_common and inode_setattr
1015  * utimes and inode_setattr
1016  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1017  */
1018 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1019                         struct lu_attr *la, const struct md_attr *ma)
1020 {
1021         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
1022         struct md_ucred  *uc         = md_ucred(env);
1023         int               rc;
1024         ENTRY;
1025
1026         if (!la->la_valid)
1027                 RETURN(0);
1028
1029         /* Do not permit change file type */
1030         if (la->la_valid & LA_TYPE)
1031                 RETURN(-EPERM);
1032
1033         /* They should not be processed by setattr */
1034         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1035                 RETURN(-EPERM);
1036
1037         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1038         if (rc)
1039                 RETURN(rc);
1040
1041         if (la->la_valid == LA_CTIME) {
1042                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1043                         /* This is only for set ctime when rename's source is
1044                          * on remote MDS. */
1045                         rc = mdd_may_delete(env, NULL, obj,
1046                                             (struct md_attr *)ma, 1, 0);
1047                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1048                         la->la_valid &= ~LA_CTIME;
1049                 RETURN(rc);
1050         }
1051
1052         if (la->la_valid == LA_ATIME) {
1053                 /* This is atime only set for read atime update on close. */
1054                 if (la->la_atime <= tmp_la->la_atime +
1055                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff)
1056                         la->la_valid &= ~LA_ATIME;
1057                 RETURN(0);
1058         }
1059
1060         /* Check if flags change. */
1061         if (la->la_valid & LA_FLAGS) {
1062                 unsigned int oldflags = 0;
1063                 unsigned int newflags = la->la_flags &
1064                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1065
1066                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1067                     !mdd_capable(uc, CFS_CAP_FOWNER))
1068                         RETURN(-EPERM);
1069
1070                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1071                  * only be changed by the relevant capability. */
1072                 if (mdd_is_immutable(obj))
1073                         oldflags |= LUSTRE_IMMUTABLE_FL;
1074                 if (mdd_is_append(obj))
1075                         oldflags |= LUSTRE_APPEND_FL;
1076                 if ((oldflags ^ newflags) &&
1077                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1078                         RETURN(-EPERM);
1079
1080                 if (!S_ISDIR(tmp_la->la_mode))
1081                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1082         }
1083
1084         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1085             (la->la_valid & ~LA_FLAGS) &&
1086             !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1087                 RETURN(-EPERM);
1088
1089         /* Check for setting the obj time. */
1090         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1091             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1092                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1093                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
1094                         rc = mdd_permission_internal_locked(env, obj, tmp_la,
1095                                                             MAY_WRITE,
1096                                                             MOR_TGT_CHILD);
1097                         if (rc)
1098                                 RETURN(rc);
1099                 }
1100         }
1101
1102         /* Make sure a caller can chmod. */
1103         if (la->la_valid & LA_MODE) {
1104                 /* Bypass la_vaild == LA_MODE,
1105                  * this is for changing file with SUID or SGID. */
1106                 if ((la->la_valid & ~LA_MODE) &&
1107                     !(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1108                     (uc->mu_fsuid != tmp_la->la_uid) &&
1109                     !mdd_capable(uc, CFS_CAP_FOWNER))
1110                         RETURN(-EPERM);
1111
1112                 if (la->la_mode == (umode_t) -1)
1113                         la->la_mode = tmp_la->la_mode;
1114                 else
1115                         la->la_mode = (la->la_mode & S_IALLUGO) |
1116                                       (tmp_la->la_mode & ~S_IALLUGO);
1117
1118                 /* Also check the setgid bit! */
1119                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1120                                        la->la_gid : tmp_la->la_gid) &&
1121                     !mdd_capable(uc, CFS_CAP_FSETID))
1122                         la->la_mode &= ~S_ISGID;
1123         } else {
1124                la->la_mode = tmp_la->la_mode;
1125         }
1126
1127         /* Make sure a caller can chown. */
1128         if (la->la_valid & LA_UID) {
1129                 if (la->la_uid == (uid_t) -1)
1130                         la->la_uid = tmp_la->la_uid;
1131                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1132                     (la->la_uid != tmp_la->la_uid)) &&
1133                     !mdd_capable(uc, CFS_CAP_CHOWN))
1134                         RETURN(-EPERM);
1135
1136                 /* If the user or group of a non-directory has been
1137                  * changed by a non-root user, remove the setuid bit.
1138                  * 19981026 David C Niemi <niemi@tux.org>
1139                  *
1140                  * Changed this to apply to all users, including root,
1141                  * to avoid some races. This is the behavior we had in
1142                  * 2.0. The check for non-root was definitely wrong
1143                  * for 2.2 anyway, as it should have been using
1144                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
1145                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1146                     !S_ISDIR(tmp_la->la_mode)) {
1147                         la->la_mode &= ~S_ISUID;
1148                         la->la_valid |= LA_MODE;
1149                 }
1150         }
1151
1152         /* Make sure caller can chgrp. */
1153         if (la->la_valid & LA_GID) {
1154                 if (la->la_gid == (gid_t) -1)
1155                         la->la_gid = tmp_la->la_gid;
1156                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1157                     ((la->la_gid != tmp_la->la_gid) &&
1158                     !lustre_in_group_p(uc, la->la_gid))) &&
1159                     !mdd_capable(uc, CFS_CAP_CHOWN))
1160                         RETURN(-EPERM);
1161
1162                 /* Likewise, if the user or group of a non-directory
1163                  * has been changed by a non-root user, remove the
1164                  * setgid bit UNLESS there is no group execute bit
1165                  * (this would be a file marked for mandatory
1166                  * locking).  19981026 David C Niemi <niemi@tux.org>
1167                  *
1168                  * Removed the fsuid check (see the comment above) --
1169                  * 19990830 SD. */
1170                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1171                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1172                         la->la_mode &= ~S_ISGID;
1173                         la->la_valid |= LA_MODE;
1174                 }
1175         }
1176
1177         /* For both Size-on-MDS case and truncate case,
1178          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1179          * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1180          * For SOM case, it is true, the MAY_WRITE perm has been checked
1181          * when open, no need check again. For truncate case, it is false,
1182          * the MAY_WRITE perm should be checked here. */
1183         if (ma->ma_attr_flags & MDS_SOM) {
1184                 /* For the "Size-on-MDS" setattr update, merge coming
1185                  * attributes with the set in the inode. BUG 10641 */
1186                 if ((la->la_valid & LA_ATIME) &&
1187                     (la->la_atime <= tmp_la->la_atime))
1188                         la->la_valid &= ~LA_ATIME;
1189
1190                 /* OST attributes do not have a priority over MDS attributes,
1191                  * so drop times if ctime is equal. */
1192                 if ((la->la_valid & LA_CTIME) &&
1193                     (la->la_ctime <= tmp_la->la_ctime))
1194                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1195         } else {
1196                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1197                         if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1198                               (uc->mu_fsuid == tmp_la->la_uid)) &&
1199                             !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1200                                 rc = mdd_permission_internal_locked(env, obj,
1201                                                             tmp_la, MAY_WRITE,
1202                                                             MOR_TGT_CHILD);
1203                                 if (rc)
1204                                         RETURN(rc);
1205                         }
1206                 }
1207                 if (la->la_valid & LA_CTIME) {
1208                         /* The pure setattr, it has the priority over what is
1209                          * already set, do not drop it if ctime is equal. */
1210                         if (la->la_ctime < tmp_la->la_ctime)
1211                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1212                                                   LA_CTIME);
1213                 }
1214         }
1215
1216         RETURN(0);
1217 }
1218
1219 /** Store a data change changelog record
1220  * If this fails, we must fail the whole transaction; we don't
1221  * want the change to commit without the log entry.
1222  * \param mdd_obj - mdd_object of change
1223  * \param handle - transacion handle
1224  */
1225 static int mdd_changelog_data_store(const struct lu_env     *env,
1226                                     struct mdd_device       *mdd,
1227                                     enum changelog_rec_type type,
1228                                     struct mdd_object       *mdd_obj,
1229                                     struct thandle          *handle)
1230 {
1231         const struct lu_fid *tfid = mdo2fid(mdd_obj);
1232         struct llog_changelog_rec *rec;
1233         struct lu_buf *buf;
1234         int reclen;
1235         int rc;
1236
1237         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1238                 RETURN(0);
1239
1240         LASSERT(handle != NULL);
1241         LASSERT(mdd_obj != NULL);
1242
1243         if ((type == CL_SETATTR) &&
1244             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1245                 /* Don't need multiple updates in this log */
1246                 /* Don't check under lock - no big deal if we get an extra
1247                    entry */
1248                 RETURN(0);
1249         }
1250
1251         reclen = llog_data_len(sizeof(*rec));
1252         buf = mdd_buf_alloc(env, reclen);
1253         if (buf->lb_buf == NULL)
1254                 RETURN(-ENOMEM);
1255         rec = (struct llog_changelog_rec *)buf->lb_buf;
1256
1257         rec->cr.cr_flags = CLF_VERSION;
1258         rec->cr.cr_type = (__u32)type;
1259         rec->cr.cr_tfid = *tfid;
1260         rec->cr.cr_namelen = 0;
1261         mdd_obj->mod_cltime = cfs_time_current_64();
1262
1263         rc = mdd_changelog_llog_write(mdd, rec, handle);
1264         if (rc < 0) {
1265                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1266                        rc, type, PFID(tfid));
1267                 return -EFAULT;
1268         }
1269
1270         return 0;
1271 }
1272
1273 /**
1274  * Should be called with write lock held.
1275  *
1276  * \see mdd_lma_set_locked().
1277  */
1278 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1279                        const struct md_attr *ma, struct thandle *handle)
1280 {
1281         struct mdd_thread_info *info = mdd_env_info(env);
1282         struct lu_buf *buf;
1283         struct lustre_mdt_attrs *lma =
1284                                 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1285         int lmasize = sizeof(struct lustre_mdt_attrs);
1286         int rc = 0;
1287
1288         ENTRY;
1289
1290         memset(lma, 0, lmasize);
1291
1292         /* Either HSM or SOM part is not valid, we need to read it before */
1293         if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1294                 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1295                 if (rc)
1296                         RETURN(rc);
1297
1298                 lustre_lma_swab(lma);
1299         }
1300
1301         /* Copy HSM data */
1302         if (ma->ma_valid & MA_HSM) {
1303                 lma->lma_flags  |= ma->ma_hsm_flags & HSM_FLAGS_MASK;
1304                 lma->lma_compat |= LMAC_HSM;
1305         }
1306         /* XXX: Copy SOM data */
1307         if (ma->ma_valid & MA_SOM) {
1308                 /*
1309                 lma->lma_compat |= LMAC_SOM;
1310                 */
1311                 LASSERT(!(ma->ma_valid & MA_SOM));
1312         }
1313
1314         /* Copy FID */
1315         memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1316
1317         lustre_lma_swab(lma);
1318         buf = mdd_buf_get(env, lma, lmasize);
1319         rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1320
1321         RETURN(rc);
1322 }
1323
1324 /**
1325  * Save LMA extended attributes with data from \a ma.
1326  *
1327  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1328  * not, LMA EA will be first read from disk, modified and write back.
1329  *
1330  */
1331 static int mdd_lma_set_locked(const struct lu_env *env,
1332                               struct mdd_object *mdd_obj,
1333                               const struct md_attr *ma, struct thandle *handle)
1334 {
1335         int rc;
1336
1337         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1338         rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1339         mdd_write_unlock(env, mdd_obj);
1340         return rc;
1341 }
1342
1343 /* set attr and LOV EA at once, return updated attr */
1344 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1345                         const struct md_attr *ma)
1346 {
1347         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1348         struct mdd_device *mdd = mdo2mdd(obj);
1349         struct thandle *handle;
1350         struct lov_mds_md *lmm = NULL;
1351         struct llog_cookie *logcookies = NULL;
1352         int  rc, lmm_size = 0, cookie_size = 0;
1353         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1354 #ifdef HAVE_QUOTA_SUPPORT
1355         struct obd_device *obd = mdd->mdd_obd_dev;
1356         struct obd_export *exp = md_quota(env)->mq_exp;
1357         struct mds_obd *mds = &obd->u.mds;
1358         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1359         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1360         int quota_opc = 0, block_count = 0;
1361         int inode_pending[MAXQUOTAS] = { 0, 0 };
1362         int block_pending[MAXQUOTAS] = { 0, 0 };
1363 #endif
1364         ENTRY;
1365
1366         mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1367                                     MDD_TXN_ATTR_SET_OP);
1368         handle = mdd_trans_start(env, mdd);
1369         if (IS_ERR(handle))
1370                 RETURN(PTR_ERR(handle));
1371         /*TODO: add lock here*/
1372         /* start a log jounal handle if needed */
1373         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1374             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1375                 lmm_size = mdd_lov_mdsize(env, mdd);
1376                 lmm = mdd_max_lmm_get(env, mdd);
1377                 if (lmm == NULL)
1378                         GOTO(cleanup, rc = -ENOMEM);
1379
1380                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1381                                 XATTR_NAME_LOV);
1382
1383                 if (rc < 0)
1384                         GOTO(cleanup, rc);
1385         }
1386
1387         if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1388                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1389                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1390
1391         *la_copy = ma->ma_attr;
1392         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1393         if (rc)
1394                 GOTO(cleanup, rc);
1395
1396 #ifdef HAVE_QUOTA_SUPPORT
1397         if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1398                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1399
1400                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1401                 if (!rc) {
1402                         quota_opc = FSFILT_OP_SETATTR;
1403                         mdd_quota_wrapper(la_copy, qnids);
1404                         mdd_quota_wrapper(la_tmp, qoids);
1405                         /* get file quota for new owner */
1406                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1407                                         qnids, inode_pending, 1, NULL, 0,
1408                                         NULL, 0);
1409                         block_count = (la_tmp->la_blocks + 7) >> 3;
1410                         if (block_count) {
1411                                 void *data = NULL;
1412                                 mdd_data_get(env, mdd_obj, &data);
1413                                 /* get block quota for new owner */
1414                                 lquota_chkquota(mds_quota_interface_ref, obd,
1415                                                 exp, qnids, block_pending,
1416                                                 block_count, NULL,
1417                                                 LQUOTA_FLAGS_BLK, data, 1);
1418                         }
1419                 }
1420         }
1421 #endif
1422
1423         if (la_copy->la_valid & LA_FLAGS) {
1424                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1425                                                   handle, 1);
1426                 if (rc == 0)
1427                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1428         } else if (la_copy->la_valid) {            /* setattr */
1429                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1430                                                   handle, 1);
1431                 /* journal chown/chgrp in llog, just like unlink */
1432                 if (rc == 0 && lmm_size){
1433                         cookie_size = mdd_lov_cookiesize(env, mdd);
1434                         logcookies = mdd_max_cookie_get(env, mdd);
1435                         if (logcookies == NULL)
1436                                 GOTO(cleanup, rc = -ENOMEM);
1437
1438                         if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1439                                             logcookies, cookie_size) <= 0)
1440                                 logcookies = NULL;
1441                 }
1442         }
1443
1444         if (rc == 0 && ma->ma_valid & MA_LOV) {
1445                 umode_t mode;
1446
1447                 mode = mdd_object_type(mdd_obj);
1448                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1449                         rc = mdd_lsm_sanity_check(env, mdd_obj);
1450                         if (rc)
1451                                 GOTO(cleanup, rc);
1452
1453                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1454                                             ma->ma_lmm_size, handle, 1);
1455                 }
1456
1457         }
1458         if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1459                 umode_t mode;
1460
1461                 mode = mdd_object_type(mdd_obj);
1462                 if (S_ISREG(mode))
1463                         rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1464
1465         }
1466 cleanup:
1467         if ((rc == 0) && (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME)))
1468                 rc = mdd_changelog_data_store(env, mdd, CL_SETATTR, mdd_obj,
1469                                               handle);
1470         mdd_trans_stop(env, mdd, rc, handle);
1471         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1472                 /*set obd attr, if needed*/
1473                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1474                                            logcookies);
1475         }
1476 #ifdef HAVE_QUOTA_SUPPORT
1477         if (quota_opc) {
1478                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1479                                       inode_pending, 0);
1480                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1481                                       block_pending, 1);
1482                 /* Trigger dqrel/dqacq for original owner and new owner.
1483                  * If failed, the next call for lquota_chkquota will
1484                  * process it. */
1485                 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1486                               quota_opc);
1487         }
1488 #endif
1489         RETURN(rc);
1490 }
1491
1492 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1493                       const struct lu_buf *buf, const char *name, int fl,
1494                       struct thandle *handle)
1495 {
1496         int  rc;
1497         ENTRY;
1498
1499         mdd_write_lock(env, obj, MOR_TGT_CHILD);
1500         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1501         mdd_write_unlock(env, obj);
1502
1503         RETURN(rc);
1504 }
1505
1506 static int mdd_xattr_sanity_check(const struct lu_env *env,
1507                                   struct mdd_object *obj)
1508 {
1509         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1510         struct md_ucred *uc     = md_ucred(env);
1511         int rc;
1512         ENTRY;
1513
1514         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1515                 RETURN(-EPERM);
1516
1517         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1518         if (rc)
1519                 RETURN(rc);
1520
1521         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1522             !mdd_capable(uc, CFS_CAP_FOWNER))
1523                 RETURN(-EPERM);
1524
1525         RETURN(rc);
1526 }
1527
1528 /**
1529  * The caller should guarantee to update the object ctime
1530  * after xattr_set if needed.
1531  */
1532 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1533                          const struct lu_buf *buf, const char *name,
1534                          int fl)
1535 {
1536         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1537         struct mdd_device *mdd = mdo2mdd(obj);
1538         struct thandle *handle;
1539         int  rc;
1540         ENTRY;
1541
1542         rc = mdd_xattr_sanity_check(env, mdd_obj);
1543         if (rc)
1544                 RETURN(rc);
1545
1546         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1547         /* security-replated changes may require sync */
1548         if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1549             mdd->mdd_sync_permission == 1)
1550                 txn_param_sync(&mdd_env_info(env)->mti_param);
1551
1552         handle = mdd_trans_start(env, mdd);
1553         if (IS_ERR(handle))
1554                 RETURN(PTR_ERR(handle));
1555
1556         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1557
1558         /* Only record user xattr changes */
1559         if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1560             (strncmp("user.", name, 5) == 0))
1561                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1562                                               handle);
1563         mdd_trans_stop(env, mdd, rc, handle);
1564
1565         RETURN(rc);
1566 }
1567
1568 /**
1569  * The caller should guarantee to update the object ctime
1570  * after xattr_set if needed.
1571  */
1572 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1573                   const char *name)
1574 {
1575         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1576         struct mdd_device *mdd = mdo2mdd(obj);
1577         struct thandle *handle;
1578         int  rc;
1579         ENTRY;
1580
1581         rc = mdd_xattr_sanity_check(env, mdd_obj);
1582         if (rc)
1583                 RETURN(rc);
1584
1585         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1586         handle = mdd_trans_start(env, mdd);
1587         if (IS_ERR(handle))
1588                 RETURN(PTR_ERR(handle));
1589
1590         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1591         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1592                            mdd_object_capa(env, mdd_obj));
1593         mdd_write_unlock(env, mdd_obj);
1594
1595         /* Only record user xattr changes */
1596         if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1597             (strncmp("user.", name, 5) != 0))
1598                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1599                                               handle);
1600
1601         mdd_trans_stop(env, mdd, rc, handle);
1602
1603         RETURN(rc);
1604 }
1605
1606 /* partial unlink */
1607 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1608                        struct md_attr *ma)
1609 {
1610         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1611         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1612         struct mdd_device *mdd = mdo2mdd(obj);
1613         struct thandle *handle;
1614 #ifdef HAVE_QUOTA_SUPPORT
1615         struct obd_device *obd = mdd->mdd_obd_dev;
1616         struct mds_obd *mds = &obd->u.mds;
1617         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1618         int quota_opc = 0;
1619 #endif
1620         int rc;
1621         ENTRY;
1622
1623         /*
1624          * Check -ENOENT early here because we need to get object type
1625          * to calculate credits before transaction start
1626          */
1627         if (!mdd_object_exists(mdd_obj))
1628                 RETURN(-ENOENT);
1629
1630         LASSERT(mdd_object_exists(mdd_obj) > 0);
1631
1632         rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1633         if (rc)
1634                 RETURN(rc);
1635
1636         handle = mdd_trans_start(env, mdd);
1637         if (IS_ERR(handle))
1638                 RETURN(-ENOMEM);
1639
1640         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1641
1642         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1643         if (rc)
1644                 GOTO(cleanup, rc);
1645
1646         __mdd_ref_del(env, mdd_obj, handle, 0);
1647
1648         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1649                 /* unlink dot */
1650                 __mdd_ref_del(env, mdd_obj, handle, 1);
1651         }
1652
1653         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1654         la_copy->la_ctime = ma->ma_attr.la_ctime;
1655
1656         la_copy->la_valid = LA_CTIME;
1657         rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1658         if (rc)
1659                 GOTO(cleanup, rc);
1660
1661         rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1662 #ifdef HAVE_QUOTA_SUPPORT
1663         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1664             ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1665                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1666                 mdd_quota_wrapper(&ma->ma_attr, qids);
1667         }
1668 #endif
1669
1670
1671         EXIT;
1672 cleanup:
1673         mdd_write_unlock(env, mdd_obj);
1674         mdd_trans_stop(env, mdd, rc, handle);
1675 #ifdef HAVE_QUOTA_SUPPORT
1676         if (quota_opc)
1677                 /* Trigger dqrel on the owner of child. If failed,
1678                  * the next call for lquota_chkquota will process it */
1679                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1680                               quota_opc);
1681 #endif
1682         return rc;
1683 }
1684
1685 /* partial operation */
1686 static int mdd_oc_sanity_check(const struct lu_env *env,
1687                                struct mdd_object *obj,
1688                                struct md_attr *ma)
1689 {
1690         int rc;
1691         ENTRY;
1692
1693         switch (ma->ma_attr.la_mode & S_IFMT) {
1694         case S_IFREG:
1695         case S_IFDIR:
1696         case S_IFLNK:
1697         case S_IFCHR:
1698         case S_IFBLK:
1699         case S_IFIFO:
1700         case S_IFSOCK:
1701                 rc = 0;
1702                 break;
1703         default:
1704                 rc = -EINVAL;
1705                 break;
1706         }
1707         RETURN(rc);
1708 }
1709
1710 static int mdd_object_create(const struct lu_env *env,
1711                              struct md_object *obj,
1712                              const struct md_op_spec *spec,
1713                              struct md_attr *ma)
1714 {
1715
1716         struct mdd_device *mdd = mdo2mdd(obj);
1717         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1718         const struct lu_fid *pfid = spec->u.sp_pfid;
1719         struct thandle *handle;
1720 #ifdef HAVE_QUOTA_SUPPORT
1721         struct obd_device *obd = mdd->mdd_obd_dev;
1722         struct obd_export *exp = md_quota(env)->mq_exp;
1723         struct mds_obd *mds = &obd->u.mds;
1724         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1725         int quota_opc = 0, block_count = 0;
1726         int inode_pending[MAXQUOTAS] = { 0, 0 };
1727         int block_pending[MAXQUOTAS] = { 0, 0 };
1728 #endif
1729         int rc = 0;
1730         ENTRY;
1731
1732 #ifdef HAVE_QUOTA_SUPPORT
1733         if (mds->mds_quota) {
1734                 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1735                 mdd_quota_wrapper(&ma->ma_attr, qids);
1736                 /* get file quota for child */
1737                 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1738                                 qids, inode_pending, 1, NULL, 0,
1739                                 NULL, 0);
1740                 switch (ma->ma_attr.la_mode & S_IFMT) {
1741                 case S_IFLNK:
1742                 case S_IFDIR:
1743                         block_count = 2;
1744                         break;
1745                 case S_IFREG:
1746                         block_count = 1;
1747                         break;
1748                 }
1749                 /* get block quota for child */
1750                 if (block_count)
1751                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1752                                         qids, block_pending, block_count,
1753                                         NULL, LQUOTA_FLAGS_BLK, NULL, 0);
1754         }
1755 #endif
1756
1757         mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
1758         handle = mdd_trans_start(env, mdd);
1759         if (IS_ERR(handle))
1760                 GOTO(out_pending, rc = PTR_ERR(handle));
1761
1762         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1763         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1764         if (rc)
1765                 GOTO(unlock, rc);
1766
1767         rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1768         if (rc)
1769                 GOTO(unlock, rc);
1770
1771         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1772                 /* If creating the slave object, set slave EA here. */
1773                 int lmv_size = spec->u.sp_ea.eadatalen;
1774                 struct lmv_stripe_md *lmv;
1775
1776                 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1777                 LASSERT(lmv != NULL && lmv_size > 0);
1778
1779                 rc = __mdd_xattr_set(env, mdd_obj,
1780                                      mdd_buf_get_const(env, lmv, lmv_size),
1781                                      XATTR_NAME_LMV, 0, handle);
1782                 if (rc)
1783                         GOTO(unlock, rc);
1784
1785                 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1786                                            handle, 0);
1787         } else {
1788 #ifdef CONFIG_FS_POSIX_ACL
1789                 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1790                         struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1791
1792                         buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1793                         buf->lb_len = spec->u.sp_ea.eadatalen;
1794                         if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1795                                 rc = __mdd_acl_init(env, mdd_obj, buf,
1796                                                     &ma->ma_attr.la_mode,
1797                                                     handle);
1798                                 if (rc)
1799                                         GOTO(unlock, rc);
1800                                 else
1801                                         ma->ma_attr.la_valid |= LA_MODE;
1802                         }
1803
1804                         pfid = spec->u.sp_ea.fid;
1805                 }
1806 #endif
1807                 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1808                                            spec);
1809         }
1810         EXIT;
1811 unlock:
1812         if (rc == 0)
1813                 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1814         mdd_write_unlock(env, mdd_obj);
1815
1816         mdd_trans_stop(env, mdd, rc, handle);
1817 out_pending:
1818 #ifdef HAVE_QUOTA_SUPPORT
1819         if (quota_opc) {
1820                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1821                                       inode_pending, 0);
1822                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1823                                       block_pending, 1);
1824                 /* Trigger dqacq on the owner of child. If failed,
1825                  * the next call for lquota_chkquota will process it. */
1826                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1827                               quota_opc);
1828         }
1829 #endif
1830         return rc;
1831 }
1832
1833 /* partial link */
1834 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1835                        const struct md_attr *ma)
1836 {
1837         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1838         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1839         struct mdd_device *mdd = mdo2mdd(obj);
1840         struct thandle *handle;
1841         int rc;
1842         ENTRY;
1843
1844         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1845         handle = mdd_trans_start(env, mdd);
1846         if (IS_ERR(handle))
1847                 RETURN(-ENOMEM);
1848
1849         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1850         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1851         if (rc == 0)
1852                 __mdd_ref_add(env, mdd_obj, handle);
1853         mdd_write_unlock(env, mdd_obj);
1854         if (rc == 0) {
1855                 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1856                 la_copy->la_ctime = ma->ma_attr.la_ctime;
1857
1858                 la_copy->la_valid = LA_CTIME;
1859                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1860                                                         handle, 0);
1861         }
1862         mdd_trans_stop(env, mdd, 0, handle);
1863
1864         RETURN(rc);
1865 }
1866
1867 /*
1868  * do NOT or the MAY_*'s, you'll get the weakest
1869  */
1870 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1871 {
1872         int res = 0;
1873
1874         /* Sadly, NFSD reopens a file repeatedly during operation, so the
1875          * "acc_mode = 0" allowance for newly-created files isn't honoured.
1876          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1877          * owner can write to a file even if it is marked readonly to hide
1878          * its brokenness. (bug 5781) */
1879         if (flags & MDS_OPEN_OWNEROVERRIDE) {
1880                 struct md_ucred *uc = md_ucred(env);
1881
1882                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1883                     (la->la_uid == uc->mu_fsuid))
1884                         return 0;
1885         }
1886
1887         if (flags & FMODE_READ)
1888                 res |= MAY_READ;
1889         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1890                 res |= MAY_WRITE;
1891         if (flags & MDS_FMODE_EXEC)
1892                 res |= MAY_EXEC;
1893         return res;
1894 }
1895
1896 static int mdd_open_sanity_check(const struct lu_env *env,
1897                                  struct mdd_object *obj, int flag)
1898 {
1899         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1900         int mode, rc;
1901         ENTRY;
1902
1903         /* EEXIST check */
1904         if (mdd_is_dead_obj(obj))
1905                 RETURN(-ENOENT);
1906
1907         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1908         if (rc)
1909                RETURN(rc);
1910
1911         if (S_ISLNK(tmp_la->la_mode))
1912                 RETURN(-ELOOP);
1913
1914         mode = accmode(env, tmp_la, flag);
1915
1916         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1917                 RETURN(-EISDIR);
1918
1919         if (!(flag & MDS_OPEN_CREATED)) {
1920                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1921                 if (rc)
1922                         RETURN(rc);
1923         }
1924
1925         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1926             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1927                 flag &= ~MDS_OPEN_TRUNC;
1928
1929         /* For writing append-only file must open it with append mode. */
1930         if (mdd_is_append(obj)) {
1931                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
1932                         RETURN(-EPERM);
1933                 if (flag & MDS_OPEN_TRUNC)
1934                         RETURN(-EPERM);
1935         }
1936
1937 #if 0
1938         /*
1939          * Now, flag -- O_NOATIME does not be packed by client.
1940          */
1941         if (flag & O_NOATIME) {
1942                 struct md_ucred *uc = md_ucred(env);
1943
1944                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
1945                     (uc->mu_valid == UCRED_NEW)) &&
1946                     (uc->mu_fsuid != tmp_la->la_uid) &&
1947                     !mdd_capable(uc, CFS_CAP_FOWNER))
1948                         RETURN(-EPERM);
1949         }
1950 #endif
1951
1952         RETURN(0);
1953 }
1954
1955 static int mdd_open(const struct lu_env *env, struct md_object *obj,
1956                     int flags)
1957 {
1958         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1959         int rc = 0;
1960
1961         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1962
1963         rc = mdd_open_sanity_check(env, mdd_obj, flags);
1964         if (rc == 0)
1965                 mdd_obj->mod_count++;
1966
1967         mdd_write_unlock(env, mdd_obj);
1968         return rc;
1969 }
1970
1971 /* return md_attr back,
1972  * if it is last unlink then return lov ea + llog cookie*/
1973 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
1974                     struct md_attr *ma)
1975 {
1976         int rc = 0;
1977         ENTRY;
1978
1979         if (S_ISREG(mdd_object_type(obj))) {
1980                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
1981                  * Caller must be ready for that. */
1982
1983                 rc = __mdd_lmm_get(env, obj, ma);
1984                 if ((ma->ma_valid & MA_LOV))
1985                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
1986                                             obj, ma);
1987         }
1988         RETURN(rc);
1989 }
1990
1991 /*
1992  * No permission check is needed.
1993  */
1994 static int mdd_close(const struct lu_env *env, struct md_object *obj,
1995                      struct md_attr *ma)
1996 {
1997         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1998         struct mdd_device *mdd = mdo2mdd(obj);
1999         struct thandle    *handle;
2000         int rc;
2001         int reset = 1;
2002
2003 #ifdef HAVE_QUOTA_SUPPORT
2004         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2005         struct mds_obd *mds = &obd->u.mds;
2006         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2007         int quota_opc = 0;
2008 #endif
2009         ENTRY;
2010
2011         rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
2012         if (rc)
2013                 RETURN(rc);
2014         handle = mdd_trans_start(env, mdo2mdd(obj));
2015         if (IS_ERR(handle))
2016                 RETURN(PTR_ERR(handle));
2017
2018         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2019         /* release open count */
2020         mdd_obj->mod_count --;
2021
2022         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2023                 /* remove link to object from orphan index */
2024                 rc = __mdd_orphan_del(env, mdd_obj, handle);
2025                 if (rc == 0) {
2026                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2027                                "list, OSS objects to be destroyed.\n",
2028                                PFID(mdd_object_fid(mdd_obj)));
2029                 } else {
2030                         CERROR("Object "DFID" can not be deleted from orphan "
2031                                 "list, maybe cause OST objects can not be "
2032                                 "destroyed (err: %d).\n",
2033                                 PFID(mdd_object_fid(mdd_obj)), rc);
2034                         /* If object was not deleted from orphan list, do not
2035                          * destroy OSS objects, which will be done when next
2036                          * recovery. */
2037                         GOTO(out, rc);
2038                 }
2039         }
2040
2041         rc = mdd_iattr_get(env, mdd_obj, ma);
2042         /* Object maybe not in orphan list originally, it is rare case for
2043          * mdd_finish_unlink() failure. */
2044         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
2045 #ifdef HAVE_QUOTA_SUPPORT
2046                 if (mds->mds_quota) {
2047                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2048                         mdd_quota_wrapper(&ma->ma_attr, qids);
2049                 }
2050 #endif
2051                 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2052                 if (ma->ma_valid & MA_FLAGS &&
2053                     ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2054                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2055                 } else {
2056                         rc = mdd_object_kill(env, mdd_obj, ma);
2057                                 if (rc == 0)
2058                                         reset = 0;
2059                 }
2060
2061                 if (rc != 0)
2062                         CERROR("Error when prepare to delete Object "DFID" , "
2063                                "which will cause OST objects can not be "
2064                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
2065         }
2066         EXIT;
2067
2068 out:
2069         if (reset)
2070                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2071
2072         mdd_write_unlock(env, mdd_obj);
2073         mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
2074 #ifdef HAVE_QUOTA_SUPPORT
2075         if (quota_opc)
2076                 /* Trigger dqrel on the owner of child. If failed,
2077                  * the next call for lquota_chkquota will process it */
2078                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2079                               quota_opc);
2080 #endif
2081         return rc;
2082 }
2083
2084 /*
2085  * Permission check is done when open,
2086  * no need check again.
2087  */
2088 static int mdd_readpage_sanity_check(const struct lu_env *env,
2089                                      struct mdd_object *obj)
2090 {
2091         struct dt_object *next = mdd_object_child(obj);
2092         int rc;
2093         ENTRY;
2094
2095         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2096                 rc = 0;
2097         else
2098                 rc = -ENOTDIR;
2099
2100         RETURN(rc);
2101 }
2102
2103 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2104                               int first, void *area, int nob,
2105                               const struct dt_it_ops *iops, struct dt_it *it,
2106                               __u64 *start, __u64 *end,
2107                               struct lu_dirent **last, __u32 attr)
2108 {
2109         int                     result;
2110         __u64                   hash = 0;
2111         struct lu_dirent       *ent;
2112
2113         if (first) {
2114                 memset(area, 0, sizeof (struct lu_dirpage));
2115                 area += sizeof (struct lu_dirpage);
2116                 nob  -= sizeof (struct lu_dirpage);
2117         }
2118
2119         ent  = area;
2120         do {
2121                 int    len;
2122                 int    recsize;
2123
2124                 len  = iops->key_size(env, it);
2125
2126                 /* IAM iterator can return record with zero len. */
2127                 if (len == 0)
2128                         goto next;
2129
2130                 hash = iops->store(env, it);
2131                 if (unlikely(first)) {
2132                         first = 0;
2133                         *start = hash;
2134                 }
2135
2136                 /* calculate max space required for lu_dirent */
2137                 recsize = lu_dirent_calc_size(len, attr);
2138
2139                 if (nob >= recsize) {
2140                         result = iops->rec(env, it, ent, attr);
2141                         if (result == -ESTALE)
2142                                 goto next;
2143                         if (result != 0)
2144                                 goto out;
2145
2146                         /* osd might not able to pack all attributes,
2147                          * so recheck rec length */
2148                         recsize = le16_to_cpu(ent->lde_reclen);
2149                 } else {
2150                         /*
2151                          * record doesn't fit into page, enlarge previous one.
2152                          */
2153                         if (*last) {
2154                                 (*last)->lde_reclen =
2155                                         cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
2156                                                         nob);
2157                                 result = 0;
2158                         } else
2159                                 result = -EINVAL;
2160
2161                         goto out;
2162                 }
2163                 *last = ent;
2164                 ent = (void *)ent + recsize;
2165                 nob -= recsize;
2166
2167 next:
2168                 result = iops->next(env, it);
2169                 if (result == -ESTALE)
2170                         goto next;
2171         } while (result == 0);
2172
2173 out:
2174         *end = hash;
2175         return result;
2176 }
2177
2178 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2179                           const struct lu_rdpg *rdpg)
2180 {
2181         struct dt_it      *it;
2182         struct dt_object  *next = mdd_object_child(obj);
2183         const struct dt_it_ops  *iops;
2184         struct page       *pg;
2185         struct lu_dirent  *last = NULL;
2186         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2187         int i;
2188         int rc;
2189         int nob;
2190         __u64 hash_start;
2191         __u64 hash_end = 0;
2192
2193         LASSERT(rdpg->rp_pages != NULL);
2194         LASSERT(next->do_index_ops != NULL);
2195
2196         if (rdpg->rp_count <= 0)
2197                 return -EFAULT;
2198
2199         /*
2200          * iterate through directory and fill pages from @rdpg
2201          */
2202         iops = &next->do_index_ops->dio_it;
2203         it = iops->init(env, next, mdd_object_capa(env, obj));
2204         if (IS_ERR(it))
2205                 return PTR_ERR(it);
2206
2207         rc = iops->load(env, it, rdpg->rp_hash);
2208
2209         if (rc == 0){
2210                 /*
2211                  * Iterator didn't find record with exactly the key requested.
2212                  *
2213                  * It is currently either
2214                  *
2215                  *     - positioned above record with key less than
2216                  *     requested---skip it.
2217                  *
2218                  *     - or not positioned at all (is in IAM_IT_SKEWED
2219                  *     state)---position it on the next item.
2220                  */
2221                 rc = iops->next(env, it);
2222         } else if (rc > 0)
2223                 rc = 0;
2224
2225         /*
2226          * At this point and across for-loop:
2227          *
2228          *  rc == 0 -> ok, proceed.
2229          *  rc >  0 -> end of directory.
2230          *  rc <  0 -> error.
2231          */
2232         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2233              i++, nob -= CFS_PAGE_SIZE) {
2234                 LASSERT(i < rdpg->rp_npages);
2235                 pg = rdpg->rp_pages[i];
2236                 rc = mdd_dir_page_build(env, mdd, !i, cfs_kmap(pg),
2237                                         min_t(int, nob, CFS_PAGE_SIZE), iops,
2238                                         it, &hash_start, &hash_end, &last,
2239                                         rdpg->rp_attrs);
2240                 if (rc != 0 || i == rdpg->rp_npages - 1) {
2241                         if (last)
2242                                 last->lde_reclen = 0;
2243                 }
2244                 cfs_kunmap(pg);
2245         }
2246         if (rc > 0) {
2247                 /*
2248                  * end of directory.
2249                  */
2250                 hash_end = DIR_END_OFF;
2251                 rc = 0;
2252         }
2253         if (rc == 0) {
2254                 struct lu_dirpage *dp;
2255
2256                 dp = cfs_kmap(rdpg->rp_pages[0]);
2257                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2258                 dp->ldp_hash_end   = cpu_to_le64(hash_end);
2259                 if (i == 0)
2260                         /*
2261                          * No pages were processed, mark this.
2262                          */
2263                         dp->ldp_flags |= LDF_EMPTY;
2264
2265                 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2266                 cfs_kunmap(rdpg->rp_pages[0]);
2267         }
2268         iops->put(env, it);
2269         iops->fini(env, it);
2270
2271         return rc;
2272 }
2273
2274 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2275                  const struct lu_rdpg *rdpg)
2276 {
2277         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2278         int rc;
2279         ENTRY;
2280
2281         LASSERT(mdd_object_exists(mdd_obj));
2282
2283         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2284         rc = mdd_readpage_sanity_check(env, mdd_obj);
2285         if (rc)
2286                 GOTO(out_unlock, rc);
2287
2288         if (mdd_is_dead_obj(mdd_obj)) {
2289                 struct page *pg;
2290                 struct lu_dirpage *dp;
2291
2292                 /*
2293                  * According to POSIX, please do not return any entry to client:
2294                  * even dot and dotdot should not be returned.
2295                  */
2296                 CWARN("readdir from dead object: "DFID"\n",
2297                         PFID(mdd_object_fid(mdd_obj)));
2298
2299                 if (rdpg->rp_count <= 0)
2300                         GOTO(out_unlock, rc = -EFAULT);
2301                 LASSERT(rdpg->rp_pages != NULL);
2302
2303                 pg = rdpg->rp_pages[0];
2304                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2305                 memset(dp, 0 , sizeof(struct lu_dirpage));
2306                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2307                 dp->ldp_hash_end   = cpu_to_le64(DIR_END_OFF);
2308                 dp->ldp_flags |= LDF_EMPTY;
2309                 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2310                 cfs_kunmap(pg);
2311                 GOTO(out_unlock, rc = 0);
2312         }
2313
2314         rc = __mdd_readpage(env, mdd_obj, rdpg);
2315
2316         EXIT;
2317 out_unlock:
2318         mdd_read_unlock(env, mdd_obj);
2319         return rc;
2320 }
2321
2322 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2323 {
2324         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2325         struct dt_object *next;
2326
2327         LASSERT(mdd_object_exists(mdd_obj));
2328         next = mdd_object_child(mdd_obj);
2329         return next->do_ops->do_object_sync(env, next);
2330 }
2331
2332 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2333                                         struct md_object *obj)
2334 {
2335         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2336
2337         LASSERT(mdd_object_exists(mdd_obj));
2338         return do_version_get(env, mdd_object_child(mdd_obj));
2339 }
2340
2341 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2342                             dt_obj_version_t version)
2343 {
2344         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2345
2346         LASSERT(mdd_object_exists(mdd_obj));
2347         return do_version_set(env, mdd_object_child(mdd_obj), version);
2348 }
2349
2350 const struct md_object_operations mdd_obj_ops = {
2351         .moo_permission    = mdd_permission,
2352         .moo_attr_get      = mdd_attr_get,
2353         .moo_attr_set      = mdd_attr_set,
2354         .moo_xattr_get     = mdd_xattr_get,
2355         .moo_xattr_set     = mdd_xattr_set,
2356         .moo_xattr_list    = mdd_xattr_list,
2357         .moo_xattr_del     = mdd_xattr_del,
2358         .moo_object_create = mdd_object_create,
2359         .moo_ref_add       = mdd_ref_add,
2360         .moo_ref_del       = mdd_ref_del,
2361         .moo_open          = mdd_open,
2362         .moo_close         = mdd_close,
2363         .moo_readpage      = mdd_readpage,
2364         .moo_readlink      = mdd_readlink,
2365         .moo_capa_get      = mdd_capa_get,
2366         .moo_object_sync   = mdd_object_sync,
2367         .moo_version_get   = mdd_version_get,
2368         .moo_version_set   = mdd_version_set,
2369         .moo_path          = mdd_path,
2370 };