Whamcloud - gitweb
ace6de2880bd9b2b050b6b2fee8aa21bdb862595
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * Copyright (c) 2011 Whamcloud, Inc.
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  *
39  * lustre/mdd/mdd_object.c
40  *
41  * Lustre Metadata Server (mdd) routines
42  *
43  * Author: Wang Di <wangdi@clusterfs.com>
44  */
45
46 #ifndef EXPORT_SYMTAB
47 # define EXPORT_SYMTAB
48 #endif
49 #define DEBUG_SUBSYSTEM S_MDS
50
51 #include <linux/module.h>
52 #include <obd.h>
53 #include <obd_class.h>
54 #include <obd_support.h>
55 #include <lprocfs_status.h>
56 /* fid_be_cpu(), fid_cpu_to_be(). */
57 #include <lustre_fid.h>
58 #include <obd_lov.h>
59
60 #include <lustre_param.h>
61 #include <lustre_mds.h>
62 #include <lustre/lustre_idl.h>
63
64 #include "mdd_internal.h"
65
66 static const struct lu_object_operations mdd_lu_obj_ops;
67
68 static int mdd_xattr_get(const struct lu_env *env,
69                          struct md_object *obj, struct lu_buf *buf,
70                          const char *name);
71
72 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
73                  void **data)
74 {
75         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
76                  PFID(mdd_object_fid(obj)));
77         mdo_data_get(env, obj, data);
78         return 0;
79 }
80
81 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
82                struct lu_attr *la, struct lustre_capa *capa)
83 {
84         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
85                  PFID(mdd_object_fid(obj)));
86         return mdo_attr_get(env, obj, la, capa);
87 }
88
89 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
90 {
91         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
92
93         if (flags & LUSTRE_APPEND_FL)
94                 obj->mod_flags |= APPEND_OBJ;
95
96         if (flags & LUSTRE_IMMUTABLE_FL)
97                 obj->mod_flags |= IMMUTE_OBJ;
98 }
99
100 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
101 {
102         struct mdd_thread_info *info;
103
104         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
105         LASSERT(info != NULL);
106         return info;
107 }
108
109 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
110 {
111         struct lu_buf *buf;
112
113         buf = &mdd_env_info(env)->mti_buf;
114         buf->lb_buf = area;
115         buf->lb_len = len;
116         return buf;
117 }
118
119 void mdd_buf_put(struct lu_buf *buf)
120 {
121         if (buf == NULL || buf->lb_buf == NULL)
122                 return;
123         OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
124         buf->lb_buf = NULL;
125         buf->lb_len = 0;
126 }
127
128 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
129                                        const void *area, ssize_t len)
130 {
131         struct lu_buf *buf;
132
133         buf = &mdd_env_info(env)->mti_buf;
134         buf->lb_buf = (void *)area;
135         buf->lb_len = len;
136         return buf;
137 }
138
139 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
140 {
141         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
142
143         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
144                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
145                 buf->lb_buf = NULL;
146         }
147         if (buf->lb_buf == NULL) {
148                 buf->lb_len = len;
149                 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
150                 if (buf->lb_buf == NULL)
151                         buf->lb_len = 0;
152         }
153         return buf;
154 }
155
156 /** Increase the size of the \a mti_big_buf.
157  * preserves old data in buffer
158  * old buffer remains unchanged on error
159  * \retval 0 or -ENOMEM
160  */
161 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
162 {
163         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
164         struct lu_buf buf;
165
166         LASSERT(len >= oldbuf->lb_len);
167         OBD_ALLOC_LARGE(buf.lb_buf, len);
168
169         if (buf.lb_buf == NULL)
170                 return -ENOMEM;
171
172         buf.lb_len = len;
173         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
174
175         OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
176
177         memcpy(oldbuf, &buf, sizeof(buf));
178
179         return 0;
180 }
181
182 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
183                                        struct mdd_device *mdd)
184 {
185         struct mdd_thread_info *mti = mdd_env_info(env);
186         int                     max_cookie_size;
187
188         max_cookie_size = mdd_lov_cookiesize(env, mdd);
189         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
190                 if (mti->mti_max_cookie)
191                         OBD_FREE_LARGE(mti->mti_max_cookie,
192                                        mti->mti_max_cookie_size);
193                 mti->mti_max_cookie = NULL;
194                 mti->mti_max_cookie_size = 0;
195         }
196         if (unlikely(mti->mti_max_cookie == NULL)) {
197                 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
198                 if (likely(mti->mti_max_cookie != NULL))
199                         mti->mti_max_cookie_size = max_cookie_size;
200         }
201         if (likely(mti->mti_max_cookie != NULL))
202                 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
203         return mti->mti_max_cookie;
204 }
205
206 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
207                                    struct mdd_device *mdd)
208 {
209         struct mdd_thread_info *mti = mdd_env_info(env);
210         int                     max_lmm_size;
211
212         max_lmm_size = mdd_lov_mdsize(env, mdd);
213         if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
214                 if (mti->mti_max_lmm)
215                         OBD_FREE_LARGE(mti->mti_max_lmm, mti->mti_max_lmm_size);
216                 mti->mti_max_lmm = NULL;
217                 mti->mti_max_lmm_size = 0;
218         }
219         if (unlikely(mti->mti_max_lmm == NULL)) {
220                 OBD_ALLOC_LARGE(mti->mti_max_lmm, max_lmm_size);
221                 if (likely(mti->mti_max_lmm != NULL))
222                         mti->mti_max_lmm_size = max_lmm_size;
223         }
224         return mti->mti_max_lmm;
225 }
226
227 struct lu_object *mdd_object_alloc(const struct lu_env *env,
228                                    const struct lu_object_header *hdr,
229                                    struct lu_device *d)
230 {
231         struct mdd_object *mdd_obj;
232
233         OBD_ALLOC_PTR(mdd_obj);
234         if (mdd_obj != NULL) {
235                 struct lu_object *o;
236
237                 o = mdd2lu_obj(mdd_obj);
238                 lu_object_init(o, NULL, d);
239                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
240                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
241                 mdd_obj->mod_count = 0;
242                 o->lo_ops = &mdd_lu_obj_ops;
243                 return o;
244         } else {
245                 return NULL;
246         }
247 }
248
249 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
250                            const struct lu_object_conf *unused)
251 {
252         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
253         struct mdd_object *mdd_obj = lu2mdd_obj(o);
254         struct lu_object  *below;
255         struct lu_device  *under;
256         ENTRY;
257
258         mdd_obj->mod_cltime = 0;
259         under = &d->mdd_child->dd_lu_dev;
260         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
261         mdd_pdlock_init(mdd_obj);
262         if (below == NULL)
263                 RETURN(-ENOMEM);
264
265         lu_object_add(o, below);
266
267         RETURN(0);
268 }
269
270 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
271 {
272         if (lu_object_exists(o))
273                 return mdd_get_flags(env, lu2mdd_obj(o));
274         else
275                 return 0;
276 }
277
278 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
279 {
280         struct mdd_object *mdd = lu2mdd_obj(o);
281
282         lu_object_fini(o);
283         OBD_FREE_PTR(mdd);
284 }
285
286 static int mdd_object_print(const struct lu_env *env, void *cookie,
287                             lu_printer_t p, const struct lu_object *o)
288 {
289         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
290         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
291                     "valid=%x, cltime="LPU64", flags=%lx)",
292                     mdd, mdd->mod_count, mdd->mod_valid,
293                     mdd->mod_cltime, mdd->mod_flags);
294 }
295
296 static const struct lu_object_operations mdd_lu_obj_ops = {
297         .loo_object_init    = mdd_object_init,
298         .loo_object_start   = mdd_object_start,
299         .loo_object_free    = mdd_object_free,
300         .loo_object_print   = mdd_object_print,
301 };
302
303 struct mdd_object *mdd_object_find(const struct lu_env *env,
304                                    struct mdd_device *d,
305                                    const struct lu_fid *f)
306 {
307         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
308 }
309
310 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
311                         const char *path, struct lu_fid *fid)
312 {
313         struct lu_buf *buf;
314         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
315         struct mdd_object *obj;
316         struct lu_name *lname = &mdd_env_info(env)->mti_name;
317         char *name;
318         int rc = 0;
319         ENTRY;
320
321         /* temp buffer for path element */
322         buf = mdd_buf_alloc(env, PATH_MAX);
323         if (buf->lb_buf == NULL)
324                 RETURN(-ENOMEM);
325
326         lname->ln_name = name = buf->lb_buf;
327         lname->ln_namelen = 0;
328         *f = mdd->mdd_root_fid;
329
330         while(1) {
331                 while (*path == '/')
332                         path++;
333                 if (*path == '\0')
334                         break;
335                 while (*path != '/' && *path != '\0') {
336                         *name = *path;
337                         path++;
338                         name++;
339                         lname->ln_namelen++;
340                 }
341
342                 *name = '\0';
343                 /* find obj corresponding to fid */
344                 obj = mdd_object_find(env, mdd, f);
345                 if (obj == NULL)
346                         GOTO(out, rc = -EREMOTE);
347                 if (IS_ERR(obj))
348                         GOTO(out, rc = PTR_ERR(obj));
349                 /* get child fid from parent and name */
350                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
351                 mdd_object_put(env, obj);
352                 if (rc)
353                         break;
354
355                 name = buf->lb_buf;
356                 lname->ln_namelen = 0;
357         }
358
359         if (!rc)
360                 *fid = *f;
361 out:
362         RETURN(rc);
363 }
364
365 /** The maximum depth that fid2path() will search.
366  * This is limited only because we want to store the fids for
367  * historical path lookup purposes.
368  */
369 #define MAX_PATH_DEPTH 100
370
371 /** mdd_path() lookup structure. */
372 struct path_lookup_info {
373         __u64                pli_recno;        /**< history point */
374         __u64                pli_currec;       /**< current record */
375         struct lu_fid        pli_fid;
376         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
377         struct mdd_object   *pli_mdd_obj;
378         char                *pli_path;         /**< full path */
379         int                  pli_pathlen;
380         int                  pli_linkno;       /**< which hardlink to follow */
381         int                  pli_fidcount;     /**< number of \a pli_fids */
382 };
383
384 static int mdd_path_current(const struct lu_env *env,
385                             struct path_lookup_info *pli)
386 {
387         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
388         struct mdd_object *mdd_obj;
389         struct lu_buf     *buf = NULL;
390         struct link_ea_header *leh;
391         struct link_ea_entry  *lee;
392         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
393         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
394         char *ptr;
395         int reclen;
396         int rc;
397         ENTRY;
398
399         ptr = pli->pli_path + pli->pli_pathlen - 1;
400         *ptr = 0;
401         --ptr;
402         pli->pli_fidcount = 0;
403         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
404
405         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
406                 mdd_obj = mdd_object_find(env, mdd,
407                                           &pli->pli_fids[pli->pli_fidcount]);
408                 if (mdd_obj == NULL)
409                         GOTO(out, rc = -EREMOTE);
410                 if (IS_ERR(mdd_obj))
411                         GOTO(out, rc = PTR_ERR(mdd_obj));
412                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
413                 if (rc <= 0) {
414                         mdd_object_put(env, mdd_obj);
415                         if (rc == -1)
416                                 rc = -EREMOTE;
417                         else if (rc == 0)
418                                 /* Do I need to error out here? */
419                                 rc = -ENOENT;
420                         GOTO(out, rc);
421                 }
422
423                 /* Get parent fid and object name */
424                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
425                 buf = mdd_links_get(env, mdd_obj);
426                 mdd_read_unlock(env, mdd_obj);
427                 mdd_object_put(env, mdd_obj);
428                 if (IS_ERR(buf))
429                         GOTO(out, rc = PTR_ERR(buf));
430
431                 leh = buf->lb_buf;
432                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
433                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
434
435                 /* If set, use link #linkno for path lookup, otherwise use
436                    link #0.  Only do this for the final path element. */
437                 if ((pli->pli_fidcount == 0) &&
438                     (pli->pli_linkno < leh->leh_reccount)) {
439                         int count;
440                         for (count = 0; count < pli->pli_linkno; count++) {
441                                 lee = (struct link_ea_entry *)
442                                      ((char *)lee + reclen);
443                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
444                         }
445                         if (pli->pli_linkno < leh->leh_reccount - 1)
446                                 /* indicate to user there are more links */
447                                 pli->pli_linkno++;
448                 }
449
450                 /* Pack the name in the end of the buffer */
451                 ptr -= tmpname->ln_namelen;
452                 if (ptr - 1 <= pli->pli_path)
453                         GOTO(out, rc = -EOVERFLOW);
454                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
455                 *(--ptr) = '/';
456
457                 /* Store the parent fid for historic lookup */
458                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
459                         GOTO(out, rc = -EOVERFLOW);
460                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
461         }
462
463         /* Verify that our path hasn't changed since we started the lookup.
464            Record the current index, and verify the path resolves to the
465            same fid. If it does, then the path is correct as of this index. */
466         cfs_spin_lock(&mdd->mdd_cl.mc_lock);
467         pli->pli_currec = mdd->mdd_cl.mc_index;
468         cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
469         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
470         if (rc) {
471                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
472                 GOTO (out, rc = -EAGAIN);
473         }
474         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
475                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
476                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
477                        PFID(&pli->pli_fid));
478                 GOTO(out, rc = -EAGAIN);
479         }
480         ptr++; /* skip leading / */
481         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
482
483         EXIT;
484 out:
485         if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
486                 /* if we vmalloced a large buffer drop it */
487                 mdd_buf_put(buf);
488
489         return rc;
490 }
491
492 static int mdd_path_historic(const struct lu_env *env,
493                              struct path_lookup_info *pli)
494 {
495         return 0;
496 }
497
498 /* Returns the full path to this fid, as of changelog record recno. */
499 static int mdd_path(const struct lu_env *env, struct md_object *obj,
500                     char *path, int pathlen, __u64 *recno, int *linkno)
501 {
502         struct path_lookup_info *pli;
503         int tries = 3;
504         int rc = -EAGAIN;
505         ENTRY;
506
507         if (pathlen < 3)
508                 RETURN(-EOVERFLOW);
509
510         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
511                 path[0] = '\0';
512                 RETURN(0);
513         }
514
515         OBD_ALLOC_PTR(pli);
516         if (pli == NULL)
517                 RETURN(-ENOMEM);
518
519         pli->pli_mdd_obj = md2mdd_obj(obj);
520         pli->pli_recno = *recno;
521         pli->pli_path = path;
522         pli->pli_pathlen = pathlen;
523         pli->pli_linkno = *linkno;
524
525         /* Retry multiple times in case file is being moved */
526         while (tries-- && rc == -EAGAIN)
527                 rc = mdd_path_current(env, pli);
528
529         /* For historical path lookup, the current links may not have existed
530          * at "recno" time.  We must switch over to earlier links/parents
531          * by using the changelog records.  If the earlier parent doesn't
532          * exist, we must search back through the changelog to reconstruct
533          * its parents, then check if it exists, etc.
534          * We may ignore this problem for the initial implementation and
535          * state that an "original" hardlink must still exist for us to find
536          * historic path name. */
537         if (pli->pli_recno != -1) {
538                 rc = mdd_path_historic(env, pli);
539         } else {
540                 *recno = pli->pli_currec;
541                 /* Return next link index to caller */
542                 *linkno = pli->pli_linkno;
543         }
544
545         OBD_FREE_PTR(pli);
546
547         RETURN (rc);
548 }
549
550 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
551 {
552         struct lu_attr *la = &mdd_env_info(env)->mti_la;
553         int rc;
554
555         ENTRY;
556         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
557         if (rc == 0) {
558                 mdd_flags_xlate(obj, la->la_flags);
559                 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
560                         obj->mod_flags |= MNLINK_OBJ;
561         }
562         RETURN(rc);
563 }
564
565 /* get only inode attributes */
566 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
567                   struct md_attr *ma)
568 {
569         int rc = 0;
570         ENTRY;
571
572         if (ma->ma_valid & MA_INODE)
573                 RETURN(0);
574
575         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
576                           mdd_object_capa(env, mdd_obj));
577         if (rc == 0)
578                 ma->ma_valid |= MA_INODE;
579         RETURN(rc);
580 }
581
582 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
583 {
584         struct lov_desc *ldesc;
585         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
586         struct lov_user_md *lum = (struct lov_user_md*)lmm;
587         ENTRY;
588
589         if (!lum)
590                 RETURN(0);
591
592         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
593         LASSERT(ldesc != NULL);
594
595         lum->lmm_magic = LOV_MAGIC_V1;
596         lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
597         lum->lmm_pattern = ldesc->ld_pattern;
598         lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
599         lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
600         lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
601
602         RETURN(sizeof(*lum));
603 }
604
605 static int is_rootdir(struct mdd_object *mdd_obj)
606 {
607         const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
608         const struct lu_fid *fid = mdo2fid(mdd_obj);
609
610         return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
611 }
612
613 /* get lov EA only */
614 static int __mdd_lmm_get(const struct lu_env *env,
615                          struct mdd_object *mdd_obj, struct md_attr *ma)
616 {
617         int rc;
618         ENTRY;
619
620         if (ma->ma_valid & MA_LOV)
621                 RETURN(0);
622
623         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
624                         XATTR_NAME_LOV);
625         if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
626                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
627         if (rc > 0) {
628                 ma->ma_lmm_size = rc;
629                 ma->ma_layout_gen = ma->ma_lmm->lmm_layout_gen;
630                 ma->ma_valid |= MA_LOV | MA_LAY_GEN;
631                 rc = 0;
632         }
633         RETURN(rc);
634 }
635
636 /* get the first parent fid from link EA */
637 static int mdd_pfid_get(const struct lu_env *env,
638                         struct mdd_object *mdd_obj, struct md_attr *ma)
639 {
640         struct lu_buf *buf;
641         struct link_ea_header *leh;
642         struct link_ea_entry *lee;
643         struct lu_fid *pfid = &ma->ma_pfid;
644         ENTRY;
645
646         if (ma->ma_valid & MA_PFID)
647                 RETURN(0);
648
649         buf = mdd_links_get(env, mdd_obj);
650         if (IS_ERR(buf))
651                 RETURN(PTR_ERR(buf));
652
653         leh = buf->lb_buf;
654         lee = (struct link_ea_entry *)(leh + 1);
655         memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
656         fid_be_to_cpu(pfid, pfid);
657         ma->ma_valid |= MA_PFID;
658         if (buf->lb_len > OBD_ALLOC_BIG)
659                 /* if we vmalloced a large buffer drop it */
660                 mdd_buf_put(buf);
661         RETURN(0);
662 }
663
664 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
665                        struct md_attr *ma)
666 {
667         int rc;
668         ENTRY;
669
670         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
671         rc = __mdd_lmm_get(env, mdd_obj, ma);
672         mdd_read_unlock(env, mdd_obj);
673         RETURN(rc);
674 }
675
676 /* get lmv EA only*/
677 static int __mdd_lmv_get(const struct lu_env *env,
678                          struct mdd_object *mdd_obj, struct md_attr *ma)
679 {
680         int rc;
681         ENTRY;
682
683         if (ma->ma_valid & MA_LMV)
684                 RETURN(0);
685
686         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
687                         XATTR_NAME_LMV);
688         if (rc > 0) {
689                 ma->ma_valid |= MA_LMV;
690                 rc = 0;
691         }
692         RETURN(rc);
693 }
694
695 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
696                          struct md_attr *ma)
697 {
698         struct mdd_thread_info *info = mdd_env_info(env);
699         struct lustre_mdt_attrs *lma =
700                                  (struct lustre_mdt_attrs *)info->mti_xattr_buf;
701         int lma_size;
702         int rc;
703         ENTRY;
704
705         /* If all needed data are already valid, nothing to do */
706         if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
707             (ma->ma_need & (MA_HSM | MA_SOM)))
708                 RETURN(0);
709
710         /* Read LMA from disk EA */
711         lma_size = sizeof(info->mti_xattr_buf);
712         rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
713         if (rc <= 0)
714                 RETURN(rc);
715
716         /* Useless to check LMA incompatibility because this is already done in
717          * osd_ea_fid_get(), and this will fail long before this code is
718          * called.
719          * So, if we are here, LMA is compatible.
720          */
721
722         lustre_lma_swab(lma);
723
724         /* Swab and copy LMA */
725         if (ma->ma_need & MA_HSM) {
726                 if (lma->lma_compat & LMAC_HSM)
727                         ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
728                 else
729                         ma->ma_hsm.mh_flags = 0;
730                 ma->ma_valid |= MA_HSM;
731         }
732
733         /* Copy SOM */
734         if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
735                 LASSERT(ma->ma_som != NULL);
736                 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
737                 ma->ma_som->msd_size    = lma->lma_som_size;
738                 ma->ma_som->msd_blocks  = lma->lma_som_blocks;
739                 ma->ma_som->msd_mountid = lma->lma_som_mountid;
740                 ma->ma_valid |= MA_SOM;
741         }
742
743         RETURN(0);
744 }
745
746 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
747                                  struct md_attr *ma)
748 {
749         int rc = 0;
750         ENTRY;
751
752         if (ma->ma_need & MA_INODE)
753                 rc = mdd_iattr_get(env, mdd_obj, ma);
754
755         if (rc == 0 && ma->ma_need & MA_LOV) {
756                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
757                     S_ISDIR(mdd_object_type(mdd_obj)))
758                         rc = __mdd_lmm_get(env, mdd_obj, ma);
759         }
760         if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
761                 if (S_ISREG(mdd_object_type(mdd_obj)))
762                         rc = mdd_pfid_get(env, mdd_obj, ma);
763         }
764         if (rc == 0 && ma->ma_need & MA_LMV) {
765                 if (S_ISDIR(mdd_object_type(mdd_obj)))
766                         rc = __mdd_lmv_get(env, mdd_obj, ma);
767         }
768         if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
769                 if (S_ISREG(mdd_object_type(mdd_obj)))
770                         rc = __mdd_lma_get(env, mdd_obj, ma);
771         }
772 #ifdef CONFIG_FS_POSIX_ACL
773         if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
774                 if (S_ISDIR(mdd_object_type(mdd_obj)))
775                         rc = mdd_def_acl_get(env, mdd_obj, ma);
776         }
777 #endif
778         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
779                rc, ma->ma_valid, ma->ma_lmm);
780         RETURN(rc);
781 }
782
783 int mdd_attr_get_internal_locked(const struct lu_env *env,
784                                  struct mdd_object *mdd_obj, struct md_attr *ma)
785 {
786         int rc;
787         int needlock = ma->ma_need &
788                        (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
789
790         if (needlock)
791                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
792         rc = mdd_attr_get_internal(env, mdd_obj, ma);
793         if (needlock)
794                 mdd_read_unlock(env, mdd_obj);
795         return rc;
796 }
797
798 /*
799  * No permission check is needed.
800  */
801 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
802                         struct md_attr *ma)
803 {
804         struct mdd_object *mdd_obj = md2mdd_obj(obj);
805         int                rc;
806
807         ENTRY;
808         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
809         RETURN(rc);
810 }
811
812 /*
813  * No permission check is needed.
814  */
815 static int mdd_xattr_get(const struct lu_env *env,
816                          struct md_object *obj, struct lu_buf *buf,
817                          const char *name)
818 {
819         struct mdd_object *mdd_obj = md2mdd_obj(obj);
820         int rc;
821
822         ENTRY;
823
824         LASSERT(mdd_object_exists(mdd_obj));
825
826         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
827         rc = mdo_xattr_get(env, mdd_obj, buf, name,
828                            mdd_object_capa(env, mdd_obj));
829         mdd_read_unlock(env, mdd_obj);
830
831         RETURN(rc);
832 }
833
834 /*
835  * Permission check is done when open,
836  * no need check again.
837  */
838 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
839                         struct lu_buf *buf)
840 {
841         struct mdd_object *mdd_obj = md2mdd_obj(obj);
842         struct dt_object  *next;
843         loff_t             pos = 0;
844         int                rc;
845         ENTRY;
846
847         LASSERT(mdd_object_exists(mdd_obj));
848
849         next = mdd_object_child(mdd_obj);
850         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
851         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
852                                          mdd_object_capa(env, mdd_obj));
853         mdd_read_unlock(env, mdd_obj);
854         RETURN(rc);
855 }
856
857 /*
858  * No permission check is needed.
859  */
860 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
861                           struct lu_buf *buf)
862 {
863         struct mdd_object *mdd_obj = md2mdd_obj(obj);
864         int rc;
865
866         ENTRY;
867
868         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
869         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
870         mdd_read_unlock(env, mdd_obj);
871
872         RETURN(rc);
873 }
874
875 int mdd_declare_object_create_internal(const struct lu_env *env,
876                                        struct mdd_object *p,
877                                        struct mdd_object *c,
878                                        struct md_attr *ma,
879                                        struct thandle *handle,
880                                        const struct md_op_spec *spec)
881 {
882         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
883         const struct dt_index_features *feat = spec->sp_feat;
884         int rc;
885         ENTRY;
886
887         if (feat != &dt_directory_features && feat != NULL)
888                 dof->dof_type = DFT_INDEX;
889         else
890                 dof->dof_type = dt_mode_to_dft(ma->ma_attr.la_mode);
891
892         dof->u.dof_idx.di_feat = feat;
893
894         rc = mdo_declare_create_obj(env, c, &ma->ma_attr, NULL, dof, handle);
895
896         RETURN(rc);
897 }
898
899 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
900                                struct mdd_object *c, struct md_attr *ma,
901                                struct thandle *handle,
902                                const struct md_op_spec *spec)
903 {
904         struct lu_attr *attr = &ma->ma_attr;
905         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
906         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
907         const struct dt_index_features *feat = spec->sp_feat;
908         int rc;
909         ENTRY;
910
911         if (!mdd_object_exists(c)) {
912                 struct dt_object *next = mdd_object_child(c);
913                 LASSERT(next);
914
915                 if (feat != &dt_directory_features && feat != NULL)
916                         dof->dof_type = DFT_INDEX;
917                 else
918                         dof->dof_type = dt_mode_to_dft(attr->la_mode);
919
920                 dof->u.dof_idx.di_feat = feat;
921
922                 /* @hint will be initialized by underlying device. */
923                 next->do_ops->do_ah_init(env, hint,
924                                          p ? mdd_object_child(p) : NULL,
925                                          attr->la_mode & S_IFMT);
926
927                 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
928                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
929         } else
930                 rc = -EEXIST;
931
932         RETURN(rc);
933 }
934
935 /**
936  * Make sure the ctime is increased only.
937  */
938 static inline int mdd_attr_check(const struct lu_env *env,
939                                  struct mdd_object *obj,
940                                  struct lu_attr *attr)
941 {
942         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
943         int rc;
944         ENTRY;
945
946         if (attr->la_valid & LA_CTIME) {
947                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
948                 if (rc)
949                         RETURN(rc);
950
951                 if (attr->la_ctime < tmp_la->la_ctime)
952                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
953                 else if (attr->la_valid == LA_CTIME &&
954                          attr->la_ctime == tmp_la->la_ctime)
955                         attr->la_valid &= ~LA_CTIME;
956         }
957         RETURN(0);
958 }
959
960 int mdd_attr_set_internal(const struct lu_env *env,
961                           struct mdd_object *obj,
962                           struct lu_attr *attr,
963                           struct thandle *handle,
964                           int needacl)
965 {
966         int rc;
967         ENTRY;
968
969         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
970 #ifdef CONFIG_FS_POSIX_ACL
971         if (!rc && (attr->la_valid & LA_MODE) && needacl)
972                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
973 #endif
974         RETURN(rc);
975 }
976
977 int mdd_attr_check_set_internal(const struct lu_env *env,
978                                 struct mdd_object *obj,
979                                 struct lu_attr *attr,
980                                 struct thandle *handle,
981                                 int needacl)
982 {
983         int rc;
984         ENTRY;
985
986         rc = mdd_attr_check(env, obj, attr);
987         if (rc)
988                 RETURN(rc);
989
990         if (attr->la_valid)
991                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
992         RETURN(rc);
993 }
994
995 static int mdd_attr_set_internal_locked(const struct lu_env *env,
996                                         struct mdd_object *obj,
997                                         struct lu_attr *attr,
998                                         struct thandle *handle,
999                                         int needacl)
1000 {
1001         int rc;
1002         ENTRY;
1003
1004         needacl = needacl && (attr->la_valid & LA_MODE);
1005         if (needacl)
1006                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1007         rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1008         if (needacl)
1009                 mdd_write_unlock(env, obj);
1010         RETURN(rc);
1011 }
1012
1013 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
1014                                        struct mdd_object *obj,
1015                                        struct lu_attr *attr,
1016                                        struct thandle *handle,
1017                                        int needacl)
1018 {
1019         int rc;
1020         ENTRY;
1021
1022         needacl = needacl && (attr->la_valid & LA_MODE);
1023         if (needacl)
1024                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1025         rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
1026         if (needacl)
1027                 mdd_write_unlock(env, obj);
1028         RETURN(rc);
1029 }
1030
1031 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
1032                     const struct lu_buf *buf, const char *name,
1033                     int fl, struct thandle *handle)
1034 {
1035         struct lustre_capa *capa = mdd_object_capa(env, obj);
1036         int rc = -EINVAL;
1037         ENTRY;
1038
1039         if (buf->lb_buf && buf->lb_len > 0)
1040                 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1041         else if (buf->lb_buf == NULL && buf->lb_len == 0)
1042                 rc = mdo_xattr_del(env, obj, name, handle, capa);
1043
1044         RETURN(rc);
1045 }
1046
1047 /*
1048  * This gives the same functionality as the code between
1049  * sys_chmod and inode_setattr
1050  * chown_common and inode_setattr
1051  * utimes and inode_setattr
1052  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1053  */
1054 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1055                         struct lu_attr *la, const struct md_attr *ma)
1056 {
1057         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
1058         struct md_ucred  *uc;
1059         int               rc;
1060         ENTRY;
1061
1062         if (!la->la_valid)
1063                 RETURN(0);
1064
1065         /* Do not permit change file type */
1066         if (la->la_valid & LA_TYPE)
1067                 RETURN(-EPERM);
1068
1069         /* They should not be processed by setattr */
1070         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1071                 RETURN(-EPERM);
1072
1073         /* export destroy does not have ->le_ses, but we may want
1074          * to drop LUSTRE_SOM_FL. */
1075         if (!env->le_ses)
1076                 RETURN(0);
1077
1078         uc = md_ucred(env);
1079
1080         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1081         if (rc)
1082                 RETURN(rc);
1083
1084         if (la->la_valid == LA_CTIME) {
1085                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1086                         /* This is only for set ctime when rename's source is
1087                          * on remote MDS. */
1088                         rc = mdd_may_delete(env, NULL, obj,
1089                                             (struct md_attr *)ma, 1, 0);
1090                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1091                         la->la_valid &= ~LA_CTIME;
1092                 RETURN(rc);
1093         }
1094
1095         if (la->la_valid == LA_ATIME) {
1096                 /* This is atime only set for read atime update on close. */
1097                 if (la->la_atime >= tmp_la->la_atime &&
1098                     la->la_atime < (tmp_la->la_atime +
1099                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1100                         la->la_valid &= ~LA_ATIME;
1101                 RETURN(0);
1102         }
1103
1104         /* Check if flags change. */
1105         if (la->la_valid & LA_FLAGS) {
1106                 unsigned int oldflags = 0;
1107                 unsigned int newflags = la->la_flags &
1108                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1109
1110                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1111                     !mdd_capable(uc, CFS_CAP_FOWNER))
1112                         RETURN(-EPERM);
1113
1114                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1115                  * only be changed by the relevant capability. */
1116                 if (mdd_is_immutable(obj))
1117                         oldflags |= LUSTRE_IMMUTABLE_FL;
1118                 if (mdd_is_append(obj))
1119                         oldflags |= LUSTRE_APPEND_FL;
1120                 if ((oldflags ^ newflags) &&
1121                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1122                         RETURN(-EPERM);
1123
1124                 if (!S_ISDIR(tmp_la->la_mode))
1125                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1126         }
1127
1128         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1129             (la->la_valid & ~LA_FLAGS) &&
1130             !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1131                 RETURN(-EPERM);
1132
1133         /* Check for setting the obj time. */
1134         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1135             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1136                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1137                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
1138                         rc = mdd_permission_internal_locked(env, obj, tmp_la,
1139                                                             MAY_WRITE,
1140                                                             MOR_TGT_CHILD);
1141                         if (rc)
1142                                 RETURN(rc);
1143                 }
1144         }
1145
1146         if (la->la_valid & LA_KILL_SUID) {
1147                 la->la_valid &= ~LA_KILL_SUID;
1148                 if ((tmp_la->la_mode & S_ISUID) &&
1149                     !(la->la_valid & LA_MODE)) {
1150                         la->la_mode = tmp_la->la_mode;
1151                         la->la_valid |= LA_MODE;
1152                 }
1153                 la->la_mode &= ~S_ISUID;
1154         }
1155
1156         if (la->la_valid & LA_KILL_SGID) {
1157                 la->la_valid &= ~LA_KILL_SGID;
1158                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1159                                         (S_ISGID | S_IXGRP)) &&
1160                     !(la->la_valid & LA_MODE)) {
1161                         la->la_mode = tmp_la->la_mode;
1162                         la->la_valid |= LA_MODE;
1163                 }
1164                 la->la_mode &= ~S_ISGID;
1165         }
1166
1167         /* Make sure a caller can chmod. */
1168         if (la->la_valid & LA_MODE) {
1169                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1170                     (uc->mu_fsuid != tmp_la->la_uid) &&
1171                     !mdd_capable(uc, CFS_CAP_FOWNER))
1172                         RETURN(-EPERM);
1173
1174                 if (la->la_mode == (cfs_umode_t) -1)
1175                         la->la_mode = tmp_la->la_mode;
1176                 else
1177                         la->la_mode = (la->la_mode & S_IALLUGO) |
1178                                       (tmp_la->la_mode & ~S_IALLUGO);
1179
1180                 /* Also check the setgid bit! */
1181                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1182                                        la->la_gid : tmp_la->la_gid) &&
1183                     !mdd_capable(uc, CFS_CAP_FSETID))
1184                         la->la_mode &= ~S_ISGID;
1185         } else {
1186                la->la_mode = tmp_la->la_mode;
1187         }
1188
1189         /* Make sure a caller can chown. */
1190         if (la->la_valid & LA_UID) {
1191                 if (la->la_uid == (uid_t) -1)
1192                         la->la_uid = tmp_la->la_uid;
1193                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1194                     (la->la_uid != tmp_la->la_uid)) &&
1195                     !mdd_capable(uc, CFS_CAP_CHOWN))
1196                         RETURN(-EPERM);
1197
1198                 /* If the user or group of a non-directory has been
1199                  * changed by a non-root user, remove the setuid bit.
1200                  * 19981026 David C Niemi <niemi@tux.org>
1201                  *
1202                  * Changed this to apply to all users, including root,
1203                  * to avoid some races. This is the behavior we had in
1204                  * 2.0. The check for non-root was definitely wrong
1205                  * for 2.2 anyway, as it should have been using
1206                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
1207                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1208                     !S_ISDIR(tmp_la->la_mode)) {
1209                         la->la_mode &= ~S_ISUID;
1210                         la->la_valid |= LA_MODE;
1211                 }
1212         }
1213
1214         /* Make sure caller can chgrp. */
1215         if (la->la_valid & LA_GID) {
1216                 if (la->la_gid == (gid_t) -1)
1217                         la->la_gid = tmp_la->la_gid;
1218                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1219                     ((la->la_gid != tmp_la->la_gid) &&
1220                     !lustre_in_group_p(uc, la->la_gid))) &&
1221                     !mdd_capable(uc, CFS_CAP_CHOWN))
1222                         RETURN(-EPERM);
1223
1224                 /* Likewise, if the user or group of a non-directory
1225                  * has been changed by a non-root user, remove the
1226                  * setgid bit UNLESS there is no group execute bit
1227                  * (this would be a file marked for mandatory
1228                  * locking).  19981026 David C Niemi <niemi@tux.org>
1229                  *
1230                  * Removed the fsuid check (see the comment above) --
1231                  * 19990830 SD. */
1232                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1233                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1234                         la->la_mode &= ~S_ISGID;
1235                         la->la_valid |= LA_MODE;
1236                 }
1237         }
1238
1239         /* For both Size-on-MDS case and truncate case,
1240          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1241          * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1242          * For SOM case, it is true, the MAY_WRITE perm has been checked
1243          * when open, no need check again. For truncate case, it is false,
1244          * the MAY_WRITE perm should be checked here. */
1245         if (ma->ma_attr_flags & MDS_SOM) {
1246                 /* For the "Size-on-MDS" setattr update, merge coming
1247                  * attributes with the set in the inode. BUG 10641 */
1248                 if ((la->la_valid & LA_ATIME) &&
1249                     (la->la_atime <= tmp_la->la_atime))
1250                         la->la_valid &= ~LA_ATIME;
1251
1252                 /* OST attributes do not have a priority over MDS attributes,
1253                  * so drop times if ctime is equal. */
1254                 if ((la->la_valid & LA_CTIME) &&
1255                     (la->la_ctime <= tmp_la->la_ctime))
1256                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1257         } else {
1258                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1259                         if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1260                               (uc->mu_fsuid == tmp_la->la_uid)) &&
1261                             !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1262                                 rc = mdd_permission_internal_locked(env, obj,
1263                                                             tmp_la, MAY_WRITE,
1264                                                             MOR_TGT_CHILD);
1265                                 if (rc)
1266                                         RETURN(rc);
1267                         }
1268                 }
1269                 if (la->la_valid & LA_CTIME) {
1270                         /* The pure setattr, it has the priority over what is
1271                          * already set, do not drop it if ctime is equal. */
1272                         if (la->la_ctime < tmp_la->la_ctime)
1273                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1274                                                   LA_CTIME);
1275                 }
1276         }
1277
1278         RETURN(0);
1279 }
1280
1281 /** Store a data change changelog record
1282  * If this fails, we must fail the whole transaction; we don't
1283  * want the change to commit without the log entry.
1284  * \param mdd_obj - mdd_object of change
1285  * \param handle - transacion handle
1286  */
1287 static int mdd_changelog_data_store(const struct lu_env     *env,
1288                                     struct mdd_device       *mdd,
1289                                     enum changelog_rec_type type,
1290                                     int                     flags,
1291                                     struct mdd_object       *mdd_obj,
1292                                     struct thandle          *handle)
1293 {
1294         const struct lu_fid *tfid = mdo2fid(mdd_obj);
1295         struct llog_changelog_rec *rec;
1296         struct thandle *th = NULL;
1297         struct lu_buf *buf;
1298         int reclen;
1299         int rc;
1300
1301         /* Not recording */
1302         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1303                 RETURN(0);
1304         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1305                 RETURN(0);
1306
1307         LASSERT(mdd_obj != NULL);
1308         LASSERT(handle != NULL);
1309
1310         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1311             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1312                 /* Don't need multiple updates in this log */
1313                 /* Don't check under lock - no big deal if we get an extra
1314                    entry */
1315                 RETURN(0);
1316         }
1317
1318         reclen = llog_data_len(sizeof(*rec));
1319         buf = mdd_buf_alloc(env, reclen);
1320         if (buf->lb_buf == NULL)
1321                 RETURN(-ENOMEM);
1322         rec = (struct llog_changelog_rec *)buf->lb_buf;
1323
1324         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1325         rec->cr.cr_type = (__u32)type;
1326         rec->cr.cr_tfid = *tfid;
1327         rec->cr.cr_namelen = 0;
1328         mdd_obj->mod_cltime = cfs_time_current_64();
1329
1330         rc = mdd_changelog_llog_write(mdd, rec, handle ? : th);
1331
1332         if (th)
1333                 mdd_trans_stop(env, mdd, rc, th);
1334
1335         if (rc < 0) {
1336                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1337                        rc, type, PFID(tfid));
1338                 return -EFAULT;
1339         }
1340
1341         return 0;
1342 }
1343
1344 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1345                   int flags, struct md_object *obj)
1346 {
1347         struct thandle *handle;
1348         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1349         struct mdd_device *mdd = mdo2mdd(obj);
1350         int rc;
1351         ENTRY;
1352
1353         handle = mdd_trans_create(env, mdd);
1354         if (IS_ERR(handle))
1355                 return(PTR_ERR(handle));
1356
1357         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1358         if (rc)
1359                 GOTO(stop, rc);
1360
1361         rc = mdd_trans_start(env, mdd, handle);
1362         if (rc)
1363                 GOTO(stop, rc);
1364
1365         rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1366                                       handle);
1367
1368 stop:
1369         mdd_trans_stop(env, mdd, rc, handle);
1370
1371         RETURN(rc);
1372 }
1373
1374 /**
1375  * Should be called with write lock held.
1376  *
1377  * \see mdd_lma_set_locked().
1378  */
1379 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1380                        const struct md_attr *ma, struct thandle *handle)
1381 {
1382         struct mdd_thread_info *info = mdd_env_info(env);
1383         struct lu_buf *buf;
1384         struct lustre_mdt_attrs *lma =
1385                                 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1386         int lmasize = sizeof(struct lustre_mdt_attrs);
1387         int rc = 0;
1388
1389         ENTRY;
1390
1391         /* Either HSM or SOM part is not valid, we need to read it before */
1392         if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1393                 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1394                 if (rc <= 0)
1395                         RETURN(rc);
1396
1397                 lustre_lma_swab(lma);
1398         } else {
1399                 memset(lma, 0, lmasize);
1400         }
1401
1402         /* Copy HSM data */
1403         if (ma->ma_valid & MA_HSM) {
1404                 lma->lma_flags  |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1405                 lma->lma_compat |= LMAC_HSM;
1406         }
1407
1408         /* Copy SOM data */
1409         if (ma->ma_valid & MA_SOM) {
1410                 LASSERT(ma->ma_som != NULL);
1411                 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1412                         lma->lma_compat     &= ~LMAC_SOM;
1413                 } else {
1414                         lma->lma_compat     |= LMAC_SOM;
1415                         lma->lma_ioepoch     = ma->ma_som->msd_ioepoch;
1416                         lma->lma_som_size    = ma->ma_som->msd_size;
1417                         lma->lma_som_blocks  = ma->ma_som->msd_blocks;
1418                         lma->lma_som_mountid = ma->ma_som->msd_mountid;
1419                 }
1420         }
1421
1422         /* Copy FID */
1423         memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1424
1425         lustre_lma_swab(lma);
1426         buf = mdd_buf_get(env, lma, lmasize);
1427         rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1428
1429         RETURN(rc);
1430 }
1431
1432 /**
1433  * Save LMA extended attributes with data from \a ma.
1434  *
1435  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1436  * not, LMA EA will be first read from disk, modified and write back.
1437  *
1438  */
1439 static int mdd_lma_set_locked(const struct lu_env *env,
1440                               struct mdd_object *mdd_obj,
1441                               const struct md_attr *ma, struct thandle *handle)
1442 {
1443         int rc;
1444
1445         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1446         rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1447         mdd_write_unlock(env, mdd_obj);
1448         return rc;
1449 }
1450
1451 /* Precedence for choosing record type when multiple
1452  * attributes change: setattr > mtime > ctime > atime
1453  * (ctime changes when mtime does, plus chmod/chown.
1454  * atime and ctime are independent.) */
1455 static int mdd_attr_set_changelog(const struct lu_env *env,
1456                                   struct md_object *obj, struct thandle *handle,
1457                                   __u64 valid)
1458 {
1459         struct mdd_device *mdd = mdo2mdd(obj);
1460         int bits, type = 0;
1461
1462         bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1463         bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1464         bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1465         bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1466         bits = bits & mdd->mdd_cl.mc_mask;
1467         if (bits == 0)
1468                 return 0;
1469
1470         /* The record type is the lowest non-masked set bit */
1471         while (bits && ((bits & 1) == 0)) {
1472                 bits = bits >> 1;
1473                 type++;
1474         }
1475
1476         /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1477         return mdd_changelog_data_store(env, mdd, type, (int)valid,
1478                                         md2mdd_obj(obj), handle);
1479 }
1480
1481 static int mdd_declare_attr_set(const struct lu_env *env,
1482                                 struct mdd_device *mdd,
1483                                 struct mdd_object *obj,
1484                                 const struct md_attr *ma,
1485                                 struct lov_mds_md *lmm,
1486                                 struct thandle *handle)
1487 {
1488         struct lu_buf  *buf = &mdd_env_info(env)->mti_buf;
1489         int             rc, i;
1490
1491         rc = mdo_declare_attr_set(env, obj, &ma->ma_attr, handle);
1492         if (rc)
1493                 return rc;
1494
1495         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1496         if (rc)
1497                 return rc;
1498
1499         if (ma->ma_valid & MA_LOV) {
1500                 buf->lb_buf = NULL;
1501                 buf->lb_len = ma->ma_lmm_size;
1502                 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
1503                                            0, handle);
1504                 if (rc)
1505                         return rc;
1506         }
1507
1508         if (ma->ma_valid & (MA_HSM | MA_SOM)) {
1509                 buf->lb_buf = NULL;
1510                 buf->lb_len = sizeof(struct lustre_mdt_attrs);
1511                 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA,
1512                                            0, handle);
1513                 if (rc)
1514                         return rc;
1515         }
1516
1517         /* basically the log is the same as in unlink case */
1518         if (lmm) {
1519                 __u16 stripe;
1520
1521                 if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V1 &&
1522                                 le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V3) {
1523                         CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n",
1524                                mdd->mdd_obd_dev->obd_name,
1525                                le32_to_cpu(lmm->lmm_magic),
1526                                PFID(lu_object_fid(&obj->mod_obj.mo_lu)));
1527                         return -EINVAL;
1528                 }
1529
1530                 stripe = le16_to_cpu(lmm->lmm_stripe_count);
1531                 if (stripe == LOV_ALL_STRIPES) {
1532                         struct lov_desc *ldesc;
1533
1534                         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
1535                         LASSERT(ldesc != NULL);
1536                         stripe = ldesc->ld_tgt_count;
1537                 }
1538
1539                 for (i = 0; i < stripe; i++) {
1540                         rc = mdd_declare_llog_record(env, mdd,
1541                                         sizeof(struct llog_unlink_rec),
1542                                         handle);
1543                         if (rc)
1544                                 return rc;
1545                 }
1546         }
1547
1548         return rc;
1549 }
1550
1551 /* set attr and LOV EA at once, return updated attr */
1552 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1553                         const struct md_attr *ma)
1554 {
1555         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1556         struct mdd_device *mdd = mdo2mdd(obj);
1557         struct thandle *handle;
1558         struct lov_mds_md *lmm = NULL;
1559         struct llog_cookie *logcookies = NULL;
1560         int  rc, lmm_size = 0, cookie_size = 0;
1561         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1562         struct obd_device *obd = mdd->mdd_obd_dev;
1563         struct mds_obd *mds = &obd->u.mds;
1564 #ifdef HAVE_QUOTA_SUPPORT
1565         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1566         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1567         int quota_opc = 0, block_count = 0;
1568         int inode_pending[MAXQUOTAS] = { 0, 0 };
1569         int block_pending[MAXQUOTAS] = { 0, 0 };
1570 #endif
1571         ENTRY;
1572
1573         *la_copy = ma->ma_attr;
1574         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1575         if (rc != 0)
1576                 RETURN(rc);
1577
1578         /* setattr on "close" only change atime, or do nothing */
1579         if (ma->ma_valid == MA_INODE &&
1580             ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1581                 RETURN(0);
1582
1583         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1584             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1585                 lmm_size = mdd_lov_mdsize(env, mdd);
1586                 lmm = mdd_max_lmm_get(env, mdd);
1587                 if (lmm == NULL)
1588                         RETURN(-ENOMEM);
1589
1590                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1591                                 XATTR_NAME_LOV);
1592
1593                 if (rc < 0)
1594                         RETURN(rc);
1595         }
1596
1597         handle = mdd_trans_create(env, mdd);
1598         if (IS_ERR(handle))
1599                 RETURN(PTR_ERR(handle));
1600
1601         rc = mdd_declare_attr_set(env, mdd, mdd_obj, ma,
1602                                   lmm_size > 0 ? lmm : NULL, handle);
1603         if (rc)
1604                 GOTO(stop, rc);
1605
1606         /* permission changes may require sync operation */
1607         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1608                 handle->th_sync = !!mdd->mdd_sync_permission;
1609
1610         rc = mdd_trans_start(env, mdd, handle);
1611         if (rc)
1612                 GOTO(stop, rc);
1613
1614         /* permission changes may require sync operation */
1615         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1616                 handle->th_sync |= mdd->mdd_sync_permission;
1617
1618         if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1619                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1620                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1621
1622 #ifdef HAVE_QUOTA_SUPPORT
1623         if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1624                 struct obd_export *exp = md_quota(env)->mq_exp;
1625                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1626
1627                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1628                 if (!rc) {
1629                         quota_opc = FSFILT_OP_SETATTR;
1630                         mdd_quota_wrapper(la_copy, qnids);
1631                         mdd_quota_wrapper(la_tmp, qoids);
1632                         /* get file quota for new owner */
1633                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1634                                         qnids, inode_pending, 1, NULL, 0,
1635                                         NULL, 0);
1636                         block_count = (la_tmp->la_blocks + 7) >> 3;
1637                         if (block_count) {
1638                                 void *data = NULL;
1639                                 mdd_data_get(env, mdd_obj, &data);
1640                                 /* get block quota for new owner */
1641                                 lquota_chkquota(mds_quota_interface_ref, obd,
1642                                                 exp, qnids, block_pending,
1643                                                 block_count, NULL,
1644                                                 LQUOTA_FLAGS_BLK, data, 1);
1645                         }
1646                 }
1647         }
1648 #endif
1649
1650         if (la_copy->la_valid & LA_FLAGS) {
1651                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1652                                                   handle, 1);
1653                 if (rc == 0)
1654                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1655         } else if (la_copy->la_valid) {            /* setattr */
1656                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1657                                                   handle, 1);
1658                 /* journal chown/chgrp in llog, just like unlink */
1659                 if (rc == 0 && lmm_size){
1660                         cookie_size = mdd_lov_cookiesize(env, mdd);
1661                         logcookies = mdd_max_cookie_get(env, mdd);
1662                         if (logcookies == NULL)
1663                                 GOTO(cleanup, rc = -ENOMEM);
1664
1665                         if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1666                                             logcookies, cookie_size) <= 0)
1667                                 logcookies = NULL;
1668                 }
1669         }
1670
1671         if (rc == 0 && ma->ma_valid & MA_LOV) {
1672                 cfs_umode_t mode;
1673
1674                 mode = mdd_object_type(mdd_obj);
1675                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1676                         rc = mdd_lsm_sanity_check(env, mdd_obj);
1677                         if (rc)
1678                                 GOTO(cleanup, rc);
1679
1680                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1681                                             ma->ma_lmm_size, handle, 1);
1682                 }
1683
1684         }
1685         if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1686                 cfs_umode_t mode;
1687
1688                 mode = mdd_object_type(mdd_obj);
1689                 if (S_ISREG(mode))
1690                         rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1691
1692         }
1693 cleanup:
1694         if (rc == 0)
1695                 rc = mdd_attr_set_changelog(env, obj, handle,
1696                                             ma->ma_attr.la_valid);
1697 stop:
1698         mdd_trans_stop(env, mdd, rc, handle);
1699         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1700                 /*set obd attr, if needed*/
1701                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1702                                            logcookies);
1703         }
1704 #ifdef HAVE_QUOTA_SUPPORT
1705         if (quota_opc) {
1706                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1707                                       inode_pending, 0);
1708                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1709                                       block_pending, 1);
1710                 /* Trigger dqrel/dqacq for original owner and new owner.
1711                  * If failed, the next call for lquota_chkquota will
1712                  * process it. */
1713                 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1714                               quota_opc);
1715         }
1716 #endif
1717         RETURN(rc);
1718 }
1719
1720 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1721                       const struct lu_buf *buf, const char *name, int fl,
1722                       struct thandle *handle)
1723 {
1724         int  rc;
1725         ENTRY;
1726
1727         mdd_write_lock(env, obj, MOR_TGT_CHILD);
1728         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1729         mdd_write_unlock(env, obj);
1730
1731         RETURN(rc);
1732 }
1733
1734 static int mdd_xattr_sanity_check(const struct lu_env *env,
1735                                   struct mdd_object *obj)
1736 {
1737         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1738         struct md_ucred *uc     = md_ucred(env);
1739         int rc;
1740         ENTRY;
1741
1742         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1743                 RETURN(-EPERM);
1744
1745         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1746         if (rc)
1747                 RETURN(rc);
1748
1749         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1750             !mdd_capable(uc, CFS_CAP_FOWNER))
1751                 RETURN(-EPERM);
1752
1753         RETURN(rc);
1754 }
1755
1756 static int mdd_declare_xattr_set(const struct lu_env *env,
1757                                  struct mdd_device *mdd,
1758                                  struct mdd_object *obj,
1759                                  const struct lu_buf *buf,
1760                                  const char *name,
1761                                  struct thandle *handle)
1762
1763 {
1764         int rc;
1765
1766         rc = mdo_declare_xattr_set(env, obj, buf, name, 0, handle);
1767         if (rc)
1768                 return rc;
1769
1770         /* Only record user xattr changes */
1771         if ((strncmp("user.", name, 5) == 0))
1772                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1773
1774         return rc;
1775 }
1776
1777 /**
1778  * The caller should guarantee to update the object ctime
1779  * after xattr_set if needed.
1780  */
1781 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1782                          const struct lu_buf *buf, const char *name,
1783                          int fl)
1784 {
1785         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1786         struct mdd_device *mdd = mdo2mdd(obj);
1787         struct thandle *handle;
1788         int  rc;
1789         ENTRY;
1790
1791         rc = mdd_xattr_sanity_check(env, mdd_obj);
1792         if (rc)
1793                 RETURN(rc);
1794
1795         handle = mdd_trans_create(env, mdd);
1796         if (IS_ERR(handle))
1797                 RETURN(PTR_ERR(handle));
1798
1799         /* security-replated changes may require sync */
1800         if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1801             mdd->mdd_sync_permission == 1)
1802                 handle->th_sync = 1;
1803
1804         rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, handle);
1805         if (rc)
1806                 GOTO(stop, rc);
1807
1808         rc = mdd_trans_start(env, mdd, handle);
1809         if (rc)
1810                 GOTO(stop, rc);
1811
1812         /* security-replated changes may require sync */
1813         if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1814                 handle->th_sync |= mdd->mdd_sync_permission;
1815
1816         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1817
1818         /* Only record system & user xattr changes */
1819         if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1820                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1821                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1822                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1823                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1824                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1825                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1826                                               handle);
1827
1828 stop:
1829         mdd_trans_stop(env, mdd, rc, handle);
1830
1831         RETURN(rc);
1832 }
1833
1834 static int mdd_declare_xattr_del(const struct lu_env *env,
1835                                  struct mdd_device *mdd,
1836                                  struct mdd_object *obj,
1837                                  const char *name,
1838                                  struct thandle *handle)
1839 {
1840         int rc;
1841
1842         rc = mdo_declare_xattr_del(env, obj, name, handle);
1843         if (rc)
1844                 return rc;
1845
1846         /* Only record user xattr changes */
1847         if ((strncmp("user.", name, 5) == 0))
1848                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1849
1850         return rc;
1851 }
1852
1853 /**
1854  * The caller should guarantee to update the object ctime
1855  * after xattr_set if needed.
1856  */
1857 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1858                   const char *name)
1859 {
1860         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1861         struct mdd_device *mdd = mdo2mdd(obj);
1862         struct thandle *handle;
1863         int  rc;
1864         ENTRY;
1865
1866         rc = mdd_xattr_sanity_check(env, mdd_obj);
1867         if (rc)
1868                 RETURN(rc);
1869
1870         handle = mdd_trans_create(env, mdd);
1871         if (IS_ERR(handle))
1872                 RETURN(PTR_ERR(handle));
1873
1874         rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
1875         if (rc)
1876                 GOTO(stop, rc);
1877
1878         rc = mdd_trans_start(env, mdd, handle);
1879         if (rc)
1880                 GOTO(stop, rc);
1881
1882         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1883         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1884                            mdd_object_capa(env, mdd_obj));
1885         mdd_write_unlock(env, mdd_obj);
1886
1887         /* Only record system & user xattr changes */
1888         if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1889                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1890                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1891                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1892                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1893                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1894                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1895                                               handle);
1896
1897 stop:
1898         mdd_trans_stop(env, mdd, rc, handle);
1899
1900         RETURN(rc);
1901 }
1902
1903 /* partial unlink */
1904 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1905                        struct md_attr *ma)
1906 {
1907         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1908         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1909         struct mdd_device *mdd = mdo2mdd(obj);
1910         struct thandle *handle;
1911 #ifdef HAVE_QUOTA_SUPPORT
1912         struct obd_device *obd = mdd->mdd_obd_dev;
1913         struct mds_obd *mds = &obd->u.mds;
1914         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1915         int quota_opc = 0;
1916 #endif
1917         int rc;
1918         ENTRY;
1919
1920         /* XXX: this code won't be used ever:
1921          * DNE uses slightly different approach */
1922         LBUG();
1923
1924         /*
1925          * Check -ENOENT early here because we need to get object type
1926          * to calculate credits before transaction start
1927          */
1928         if (!mdd_object_exists(mdd_obj))
1929                 RETURN(-ENOENT);
1930
1931         LASSERT(mdd_object_exists(mdd_obj) > 0);
1932
1933         handle = mdd_trans_create(env, mdd);
1934         if (IS_ERR(handle))
1935                 RETURN(-ENOMEM);
1936
1937         rc = mdd_trans_start(env, mdd, handle);
1938
1939         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1940
1941         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1942         if (rc)
1943                 GOTO(cleanup, rc);
1944
1945         __mdd_ref_del(env, mdd_obj, handle, 0);
1946
1947         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1948                 /* unlink dot */
1949                 __mdd_ref_del(env, mdd_obj, handle, 1);
1950         }
1951
1952         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1953         la_copy->la_ctime = ma->ma_attr.la_ctime;
1954
1955         la_copy->la_valid = LA_CTIME;
1956         rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1957         if (rc)
1958                 GOTO(cleanup, rc);
1959
1960         rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1961 #ifdef HAVE_QUOTA_SUPPORT
1962         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1963             ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1964                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1965                 mdd_quota_wrapper(&ma->ma_attr, qids);
1966         }
1967 #endif
1968
1969
1970         EXIT;
1971 cleanup:
1972         mdd_write_unlock(env, mdd_obj);
1973         mdd_trans_stop(env, mdd, rc, handle);
1974 #ifdef HAVE_QUOTA_SUPPORT
1975         if (quota_opc)
1976                 /* Trigger dqrel on the owner of child. If failed,
1977                  * the next call for lquota_chkquota will process it */
1978                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1979                               quota_opc);
1980 #endif
1981         return rc;
1982 }
1983
1984 /* partial operation */
1985 static int mdd_oc_sanity_check(const struct lu_env *env,
1986                                struct mdd_object *obj,
1987                                struct md_attr *ma)
1988 {
1989         int rc;
1990         ENTRY;
1991
1992         switch (ma->ma_attr.la_mode & S_IFMT) {
1993         case S_IFREG:
1994         case S_IFDIR:
1995         case S_IFLNK:
1996         case S_IFCHR:
1997         case S_IFBLK:
1998         case S_IFIFO:
1999         case S_IFSOCK:
2000                 rc = 0;
2001                 break;
2002         default:
2003                 rc = -EINVAL;
2004                 break;
2005         }
2006         RETURN(rc);
2007 }
2008
2009 static int mdd_object_create(const struct lu_env *env,
2010                              struct md_object *obj,
2011                              const struct md_op_spec *spec,
2012                              struct md_attr *ma)
2013 {
2014
2015         struct mdd_device *mdd = mdo2mdd(obj);
2016         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2017         const struct lu_fid *pfid = spec->u.sp_pfid;
2018         struct thandle *handle;
2019 #ifdef HAVE_QUOTA_SUPPORT
2020         struct obd_device *obd = mdd->mdd_obd_dev;
2021         struct obd_export *exp = md_quota(env)->mq_exp;
2022         struct mds_obd *mds = &obd->u.mds;
2023         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2024         int quota_opc = 0, block_count = 0;
2025         int inode_pending[MAXQUOTAS] = { 0, 0 };
2026         int block_pending[MAXQUOTAS] = { 0, 0 };
2027 #endif
2028         int rc = 0;
2029         ENTRY;
2030
2031         /* XXX: this code won't be used ever:
2032          * DNE uses slightly different approach */
2033         LBUG();
2034
2035 #ifdef HAVE_QUOTA_SUPPORT
2036         if (mds->mds_quota) {
2037                 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
2038                 mdd_quota_wrapper(&ma->ma_attr, qids);
2039                 /* get file quota for child */
2040                 lquota_chkquota(mds_quota_interface_ref, obd, exp,
2041                                 qids, inode_pending, 1, NULL, 0,
2042                                 NULL, 0);
2043                 switch (ma->ma_attr.la_mode & S_IFMT) {
2044                 case S_IFLNK:
2045                 case S_IFDIR:
2046                         block_count = 2;
2047                         break;
2048                 case S_IFREG:
2049                         block_count = 1;
2050                         break;
2051                 }
2052                 /* get block quota for child */
2053                 if (block_count)
2054                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
2055                                         qids, block_pending, block_count,
2056                                         NULL, LQUOTA_FLAGS_BLK, NULL, 0);
2057         }
2058 #endif
2059
2060         handle = mdd_trans_create(env, mdd);
2061         if (IS_ERR(handle))
2062                 GOTO(out_pending, rc = PTR_ERR(handle));
2063
2064         rc = mdd_trans_start(env, mdd, handle);
2065
2066         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2067         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
2068         if (rc)
2069                 GOTO(unlock, rc);
2070
2071         rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
2072         if (rc)
2073                 GOTO(unlock, rc);
2074
2075         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
2076                 /* If creating the slave object, set slave EA here. */
2077                 int lmv_size = spec->u.sp_ea.eadatalen;
2078                 struct lmv_stripe_md *lmv;
2079
2080                 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
2081                 LASSERT(lmv != NULL && lmv_size > 0);
2082
2083                 rc = __mdd_xattr_set(env, mdd_obj,
2084                                      mdd_buf_get_const(env, lmv, lmv_size),
2085                                      XATTR_NAME_LMV, 0, handle);
2086                 if (rc)
2087                         GOTO(unlock, rc);
2088
2089                 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
2090                                            handle, 0);
2091         } else {
2092 #ifdef CONFIG_FS_POSIX_ACL
2093                 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
2094                         struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
2095
2096                         buf->lb_buf = (void *)spec->u.sp_ea.eadata;
2097                         buf->lb_len = spec->u.sp_ea.eadatalen;
2098                         if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
2099                                 rc = __mdd_acl_init(env, mdd_obj, buf,
2100                                                     &ma->ma_attr.la_mode,
2101                                                     handle);
2102                                 if (rc)
2103                                         GOTO(unlock, rc);
2104                                 else
2105                                         ma->ma_attr.la_valid |= LA_MODE;
2106                         }
2107
2108                         pfid = spec->u.sp_ea.fid;
2109                 }
2110 #endif
2111                 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
2112                                            spec);
2113         }
2114         EXIT;
2115 unlock:
2116         if (rc == 0)
2117                 rc = mdd_attr_get_internal(env, mdd_obj, ma);
2118         mdd_write_unlock(env, mdd_obj);
2119
2120         mdd_trans_stop(env, mdd, rc, handle);
2121 out_pending:
2122 #ifdef HAVE_QUOTA_SUPPORT
2123         if (quota_opc) {
2124                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2125                                       inode_pending, 0);
2126                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2127                                       block_pending, 1);
2128                 /* Trigger dqacq on the owner of child. If failed,
2129                  * the next call for lquota_chkquota will process it. */
2130                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2131                               quota_opc);
2132         }
2133 #endif
2134         return rc;
2135 }
2136
2137 /* partial link */
2138 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
2139                        const struct md_attr *ma)
2140 {
2141         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
2142         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2143         struct mdd_device *mdd = mdo2mdd(obj);
2144         struct thandle *handle;
2145         int rc;
2146         ENTRY;
2147
2148         /* XXX: this code won't be used ever:
2149          * DNE uses slightly different approach */
2150         LBUG();
2151
2152         handle = mdd_trans_create(env, mdd);
2153         if (IS_ERR(handle))
2154                 RETURN(-ENOMEM);
2155
2156         rc = mdd_trans_start(env, mdd, handle);
2157
2158         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2159         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
2160         if (rc == 0)
2161                 __mdd_ref_add(env, mdd_obj, handle);
2162         mdd_write_unlock(env, mdd_obj);
2163         if (rc == 0) {
2164                 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2165                 la_copy->la_ctime = ma->ma_attr.la_ctime;
2166
2167                 la_copy->la_valid = LA_CTIME;
2168                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
2169                                                         handle, 0);
2170         }
2171         mdd_trans_stop(env, mdd, 0, handle);
2172
2173         RETURN(rc);
2174 }
2175
2176 /*
2177  * do NOT or the MAY_*'s, you'll get the weakest
2178  */
2179 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
2180 {
2181         int res = 0;
2182
2183         /* Sadly, NFSD reopens a file repeatedly during operation, so the
2184          * "acc_mode = 0" allowance for newly-created files isn't honoured.
2185          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
2186          * owner can write to a file even if it is marked readonly to hide
2187          * its brokenness. (bug 5781) */
2188         if (flags & MDS_OPEN_OWNEROVERRIDE) {
2189                 struct md_ucred *uc = md_ucred(env);
2190
2191                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
2192                     (la->la_uid == uc->mu_fsuid))
2193                         return 0;
2194         }
2195
2196         if (flags & FMODE_READ)
2197                 res |= MAY_READ;
2198         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2199                 res |= MAY_WRITE;
2200         if (flags & MDS_FMODE_EXEC)
2201                 res = MAY_EXEC;
2202         return res;
2203 }
2204
2205 static int mdd_open_sanity_check(const struct lu_env *env,
2206                                  struct mdd_object *obj, int flag)
2207 {
2208         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2209         int mode, rc;
2210         ENTRY;
2211
2212         /* EEXIST check */
2213         if (mdd_is_dead_obj(obj))
2214                 RETURN(-ENOENT);
2215
2216         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
2217         if (rc)
2218                RETURN(rc);
2219
2220         if (S_ISLNK(tmp_la->la_mode))
2221                 RETURN(-ELOOP);
2222
2223         mode = accmode(env, tmp_la, flag);
2224
2225         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2226                 RETURN(-EISDIR);
2227
2228         if (!(flag & MDS_OPEN_CREATED)) {
2229                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
2230                 if (rc)
2231                         RETURN(rc);
2232         }
2233
2234         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2235             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2236                 flag &= ~MDS_OPEN_TRUNC;
2237
2238         /* For writing append-only file must open it with append mode. */
2239         if (mdd_is_append(obj)) {
2240                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2241                         RETURN(-EPERM);
2242                 if (flag & MDS_OPEN_TRUNC)
2243                         RETURN(-EPERM);
2244         }
2245
2246 #if 0
2247         /*
2248          * Now, flag -- O_NOATIME does not be packed by client.
2249          */
2250         if (flag & O_NOATIME) {
2251                 struct md_ucred *uc = md_ucred(env);
2252
2253                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2254                     (uc->mu_valid == UCRED_NEW)) &&
2255                     (uc->mu_fsuid != tmp_la->la_uid) &&
2256                     !mdd_capable(uc, CFS_CAP_FOWNER))
2257                         RETURN(-EPERM);
2258         }
2259 #endif
2260
2261         RETURN(0);
2262 }
2263
2264 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2265                     int flags)
2266 {
2267         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2268         int rc = 0;
2269
2270         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2271
2272         rc = mdd_open_sanity_check(env, mdd_obj, flags);
2273         if (rc == 0)
2274                 mdd_obj->mod_count++;
2275
2276         mdd_write_unlock(env, mdd_obj);
2277         return rc;
2278 }
2279
2280 int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
2281                             struct md_attr *ma, struct thandle *handle)
2282 {
2283         int rc;
2284
2285         rc = mdd_declare_unlink_log(env, obj, ma, handle);
2286         if (rc)
2287                 return rc;
2288
2289         return mdo_declare_destroy(env, obj, handle);
2290 }
2291
2292 /* return md_attr back,
2293  * if it is last unlink then return lov ea + llog cookie*/
2294 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2295                     struct md_attr *ma, struct thandle *handle)
2296 {
2297         int rc = 0;
2298         ENTRY;
2299
2300         if (S_ISREG(mdd_object_type(obj))) {
2301                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2302                  * Caller must be ready for that. */
2303
2304                 rc = __mdd_lmm_get(env, obj, ma);
2305                 if ((ma->ma_valid & MA_LOV))
2306                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2307                                             obj, ma);
2308         }
2309
2310         if (rc == 0)
2311                 rc = mdo_destroy(env, obj, handle);
2312
2313         RETURN(rc);
2314 }
2315
2316 static int mdd_declare_close(const struct lu_env *env,
2317                              struct mdd_object *obj,
2318                              struct md_attr *ma,
2319                              struct thandle *handle)
2320 {
2321         int rc;
2322
2323         rc = orph_declare_index_delete(env, obj, handle);
2324         if (rc)
2325                 return rc;
2326
2327         return mdd_declare_object_kill(env, obj, ma, handle);
2328 }
2329
2330 /*
2331  * No permission check is needed.
2332  */
2333 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2334                      struct md_attr *ma, int mode)
2335 {
2336         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2337         struct mdd_device *mdd = mdo2mdd(obj);
2338         struct thandle    *handle = NULL;
2339         int rc;
2340         int reset = 1;
2341
2342 #ifdef HAVE_QUOTA_SUPPORT
2343         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2344         struct mds_obd *mds = &obd->u.mds;
2345         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2346         int quota_opc = 0;
2347 #endif
2348         ENTRY;
2349
2350         if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2351                 mdd_obj->mod_count--;
2352
2353                 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2354                         CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2355                                "list\n", PFID(mdd_object_fid(mdd_obj)));
2356                 RETURN(0);
2357         }
2358
2359         /* check without any lock */
2360         if (mdd_obj->mod_count == 1 &&
2361             (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2362  again:
2363                 handle = mdd_trans_create(env, mdo2mdd(obj));
2364                 if (IS_ERR(handle))
2365                         RETURN(PTR_ERR(handle));
2366
2367                 rc = mdd_declare_close(env, mdd_obj, ma, handle);
2368                 if (rc)
2369                         GOTO(stop, rc);
2370
2371                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
2372                 if (rc)
2373                         GOTO(stop, rc);
2374
2375                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2376                 if (rc)
2377                         GOTO(stop, rc);
2378         }
2379
2380         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2381         if (handle == NULL && mdd_obj->mod_count == 1 &&
2382             (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2383                 mdd_write_unlock(env, mdd_obj);
2384                 goto again;
2385         }
2386
2387         /* release open count */
2388         mdd_obj->mod_count --;
2389
2390         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2391                 /* remove link to object from orphan index */
2392                 LASSERT(handle != NULL);
2393                 rc = __mdd_orphan_del(env, mdd_obj, handle);
2394                 if (rc == 0) {
2395                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2396                                "list, OSS objects to be destroyed.\n",
2397                                PFID(mdd_object_fid(mdd_obj)));
2398                 } else {
2399                         CERROR("Object "DFID" can not be deleted from orphan "
2400                                 "list, maybe cause OST objects can not be "
2401                                 "destroyed (err: %d).\n",
2402                                 PFID(mdd_object_fid(mdd_obj)), rc);
2403                         /* If object was not deleted from orphan list, do not
2404                          * destroy OSS objects, which will be done when next
2405                          * recovery. */
2406                         GOTO(out, rc);
2407                 }
2408         }
2409
2410         rc = mdd_iattr_get(env, mdd_obj, ma);
2411         /* Object maybe not in orphan list originally, it is rare case for
2412          * mdd_finish_unlink() failure. */
2413         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
2414 #ifdef HAVE_QUOTA_SUPPORT
2415                 if (mds->mds_quota) {
2416                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2417                         mdd_quota_wrapper(&ma->ma_attr, qids);
2418                 }
2419 #endif
2420                 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2421                 if (ma->ma_valid & MA_FLAGS &&
2422                     ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2423                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2424                 } else {
2425                         if (handle == NULL) {
2426                                 handle = mdd_trans_create(env, mdo2mdd(obj));
2427                                 if (IS_ERR(handle))
2428                                         GOTO(out, rc = PTR_ERR(handle));
2429
2430                                 rc = mdd_declare_object_kill(env, mdd_obj, ma,
2431                                                              handle);
2432                                 if (rc)
2433                                         GOTO(out, rc);
2434
2435                                 rc = mdd_declare_changelog_store(env, mdd,
2436                                                                  NULL, handle);
2437                                 if (rc)
2438                                         GOTO(stop, rc);
2439
2440                                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2441                                 if (rc)
2442                                         GOTO(out, rc);
2443                         }
2444
2445                         rc = mdd_object_kill(env, mdd_obj, ma, handle);
2446                         if (rc == 0)
2447                                 reset = 0;
2448                 }
2449
2450                 if (rc != 0)
2451                         CERROR("Error when prepare to delete Object "DFID" , "
2452                                "which will cause OST objects can not be "
2453                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
2454         }
2455         EXIT;
2456
2457 out:
2458         if (reset)
2459                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2460
2461         mdd_write_unlock(env, mdd_obj);
2462
2463         if (rc == 0 &&
2464             (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
2465             !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
2466                 if (handle == NULL) {
2467                         handle = mdd_trans_create(env, mdo2mdd(obj));
2468                         if (IS_ERR(handle))
2469                                 GOTO(stop, rc = IS_ERR(handle));
2470
2471                         rc = mdd_declare_changelog_store(env, mdd, NULL,
2472                                                          handle);
2473                         if (rc)
2474                                 GOTO(stop, rc);
2475
2476                         rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2477                         if (rc)
2478                                 GOTO(stop, rc);
2479                 }
2480
2481                 mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
2482                                          mdd_obj, handle);
2483         }
2484
2485 stop:
2486         if (handle != NULL)
2487                 mdd_trans_stop(env, mdd, rc, handle);
2488 #ifdef HAVE_QUOTA_SUPPORT
2489         if (quota_opc)
2490                 /* Trigger dqrel on the owner of child. If failed,
2491                  * the next call for lquota_chkquota will process it */
2492                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2493                               quota_opc);
2494 #endif
2495         return rc;
2496 }
2497
2498 /*
2499  * Permission check is done when open,
2500  * no need check again.
2501  */
2502 static int mdd_readpage_sanity_check(const struct lu_env *env,
2503                                      struct mdd_object *obj)
2504 {
2505         struct dt_object *next = mdd_object_child(obj);
2506         int rc;
2507         ENTRY;
2508
2509         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2510                 rc = 0;
2511         else
2512                 rc = -ENOTDIR;
2513
2514         RETURN(rc);
2515 }
2516
2517 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2518                               struct lu_dirpage *dp, int nob,
2519                               const struct dt_it_ops *iops, struct dt_it *it,
2520                               __u32 attr)
2521 {
2522         void                   *area = dp;
2523         int                     result;
2524         __u64                   hash = 0;
2525         struct lu_dirent       *ent;
2526         struct lu_dirent       *last = NULL;
2527         int                     first = 1;
2528
2529         memset(area, 0, sizeof (*dp));
2530         area += sizeof (*dp);
2531         nob  -= sizeof (*dp);
2532
2533         ent  = area;
2534         do {
2535                 int    len;
2536                 int    recsize;
2537
2538                 len = iops->key_size(env, it);
2539
2540                 /* IAM iterator can return record with zero len. */
2541                 if (len == 0)
2542                         goto next;
2543
2544                 hash = iops->store(env, it);
2545                 if (unlikely(first)) {
2546                         first = 0;
2547                         dp->ldp_hash_start = cpu_to_le64(hash);
2548                 }
2549
2550                 /* calculate max space required for lu_dirent */
2551                 recsize = lu_dirent_calc_size(len, attr);
2552
2553                 if (nob >= recsize) {
2554                         result = iops->rec(env, it, (struct dt_rec *)ent, attr);
2555                         if (result == -ESTALE)
2556                                 goto next;
2557                         if (result != 0)
2558                                 goto out;
2559
2560                         /* osd might not able to pack all attributes,
2561                          * so recheck rec length */
2562                         recsize = le16_to_cpu(ent->lde_reclen);
2563                 } else {
2564                         result = (last != NULL) ? 0 :-EINVAL;
2565                         goto out;
2566                 }
2567                 last = ent;
2568                 ent = (void *)ent + recsize;
2569                 nob -= recsize;
2570
2571 next:
2572                 result = iops->next(env, it);
2573                 if (result == -ESTALE)
2574                         goto next;
2575         } while (result == 0);
2576
2577 out:
2578         dp->ldp_hash_end = cpu_to_le64(hash);
2579         if (last != NULL) {
2580                 if (last->lde_hash == dp->ldp_hash_end)
2581                         dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2582                 last->lde_reclen = 0; /* end mark */
2583         }
2584         return result;
2585 }
2586
2587 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2588                           const struct lu_rdpg *rdpg)
2589 {
2590         struct dt_it      *it;
2591         struct dt_object  *next = mdd_object_child(obj);
2592         const struct dt_it_ops  *iops;
2593         struct page       *pg;
2594         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2595         int i;
2596         int nlupgs = 0;
2597         int rc;
2598         int nob;
2599
2600         LASSERT(rdpg->rp_pages != NULL);
2601         LASSERT(next->do_index_ops != NULL);
2602
2603         if (rdpg->rp_count <= 0)
2604                 return -EFAULT;
2605
2606         /*
2607          * iterate through directory and fill pages from @rdpg
2608          */
2609         iops = &next->do_index_ops->dio_it;
2610         it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
2611         if (IS_ERR(it))
2612                 return PTR_ERR(it);
2613
2614         rc = iops->load(env, it, rdpg->rp_hash);
2615
2616         if (rc == 0) {
2617                 /*
2618                  * Iterator didn't find record with exactly the key requested.
2619                  *
2620                  * It is currently either
2621                  *
2622                  *     - positioned above record with key less than
2623                  *     requested---skip it.
2624                  *
2625                  *     - or not positioned at all (is in IAM_IT_SKEWED
2626                  *     state)---position it on the next item.
2627                  */
2628                 rc = iops->next(env, it);
2629         } else if (rc > 0)
2630                 rc = 0;
2631
2632         /*
2633          * At this point and across for-loop:
2634          *
2635          *  rc == 0 -> ok, proceed.
2636          *  rc >  0 -> end of directory.
2637          *  rc <  0 -> error.
2638          */
2639         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2640              i++, nob -= CFS_PAGE_SIZE) {
2641                 struct lu_dirpage *dp;
2642
2643                 LASSERT(i < rdpg->rp_npages);
2644                 pg = rdpg->rp_pages[i];
2645                 dp = cfs_kmap(pg);
2646 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2647 repeat:
2648 #endif
2649                 rc = mdd_dir_page_build(env, mdd, dp,
2650                                         min_t(int, nob, LU_PAGE_SIZE),
2651                                         iops, it, rdpg->rp_attrs);
2652                 if (rc > 0) {
2653                         /*
2654                          * end of directory.
2655                          */
2656                         dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2657                         nlupgs++;
2658                 } else if (rc < 0) {
2659                         CWARN("build page failed: %d!\n", rc);
2660                 } else {
2661                         nlupgs++;
2662 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2663                         dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2664                         if ((unsigned long)dp & ~CFS_PAGE_MASK)
2665                                 goto repeat;
2666 #endif
2667                 }
2668                 cfs_kunmap(pg);
2669         }
2670         if (rc >= 0) {
2671                 struct lu_dirpage *dp;
2672
2673                 dp = cfs_kmap(rdpg->rp_pages[0]);
2674                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2675                 if (nlupgs == 0) {
2676                         /*
2677                          * No pages were processed, mark this for first page
2678                          * and send back.
2679                          */
2680                         dp->ldp_flags  = cpu_to_le32(LDF_EMPTY);
2681                         nlupgs = 1;
2682                 }
2683                 cfs_kunmap(rdpg->rp_pages[0]);
2684
2685                 rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
2686         }
2687         iops->put(env, it);
2688         iops->fini(env, it);
2689
2690         return rc;
2691 }
2692
2693 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2694                  const struct lu_rdpg *rdpg)
2695 {
2696         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2697         int rc;
2698         ENTRY;
2699
2700         LASSERT(mdd_object_exists(mdd_obj));
2701
2702         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2703         rc = mdd_readpage_sanity_check(env, mdd_obj);
2704         if (rc)
2705                 GOTO(out_unlock, rc);
2706
2707         if (mdd_is_dead_obj(mdd_obj)) {
2708                 struct page *pg;
2709                 struct lu_dirpage *dp;
2710
2711                 /*
2712                  * According to POSIX, please do not return any entry to client:
2713                  * even dot and dotdot should not be returned.
2714                  */
2715                 CWARN("readdir from dead object: "DFID"\n",
2716                         PFID(mdd_object_fid(mdd_obj)));
2717
2718                 if (rdpg->rp_count <= 0)
2719                         GOTO(out_unlock, rc = -EFAULT);
2720                 LASSERT(rdpg->rp_pages != NULL);
2721
2722                 pg = rdpg->rp_pages[0];
2723                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2724                 memset(dp, 0 , sizeof(struct lu_dirpage));
2725                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2726                 dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
2727                 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2728                 cfs_kunmap(pg);
2729                 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2730         }
2731
2732         rc = __mdd_readpage(env, mdd_obj, rdpg);
2733
2734         EXIT;
2735 out_unlock:
2736         mdd_read_unlock(env, mdd_obj);
2737         return rc;
2738 }
2739
2740 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2741 {
2742         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2743         struct dt_object *next;
2744
2745         LASSERT(mdd_object_exists(mdd_obj));
2746         next = mdd_object_child(mdd_obj);
2747         return next->do_ops->do_object_sync(env, next);
2748 }
2749
2750 const struct md_object_operations mdd_obj_ops = {
2751         .moo_permission    = mdd_permission,
2752         .moo_attr_get      = mdd_attr_get,
2753         .moo_attr_set      = mdd_attr_set,
2754         .moo_xattr_get     = mdd_xattr_get,
2755         .moo_xattr_set     = mdd_xattr_set,
2756         .moo_xattr_list    = mdd_xattr_list,
2757         .moo_xattr_del     = mdd_xattr_del,
2758         .moo_object_create = mdd_object_create,
2759         .moo_ref_add       = mdd_ref_add,
2760         .moo_ref_del       = mdd_ref_del,
2761         .moo_open          = mdd_open,
2762         .moo_close         = mdd_close,
2763         .moo_readpage      = mdd_readpage,
2764         .moo_readlink      = mdd_readlink,
2765         .moo_changelog     = mdd_changelog,
2766         .moo_capa_get      = mdd_capa_get,
2767         .moo_object_sync   = mdd_object_sync,
2768         .moo_path          = mdd_path,
2769         .moo_file_lock     = mdd_file_lock,
2770         .moo_file_unlock   = mdd_file_unlock,
2771 };