Whamcloud - gitweb
LU-1518 mdd: Fixup mdd_{obf,dot_lustre}_obj_ops.
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_object.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <linux/module.h>
46 #include <obd.h>
47 #include <obd_class.h>
48 #include <obd_support.h>
49 #include <lprocfs_status.h>
50 /* fid_be_cpu(), fid_cpu_to_be(). */
51 #include <lustre_fid.h>
52 #include <obd_lov.h>
53
54 #include <lustre_param.h>
55 #include <lustre_mds.h>
56 #include <lustre/lustre_idl.h>
57
58 #include "mdd_internal.h"
59
60 static const struct lu_object_operations mdd_lu_obj_ops;
61
62 static int mdd_xattr_get(const struct lu_env *env,
63                          struct md_object *obj, struct lu_buf *buf,
64                          const char *name);
65
66 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
67                  void **data)
68 {
69         if (mdd_object_exists(obj) == 0) {
70                 CERROR("%s: object "DFID" not found: rc = -2\n",
71                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
72                 return -ENOENT;
73         }
74         mdo_data_get(env, obj, data);
75         return 0;
76 }
77
78 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
79                struct lu_attr *la, struct lustre_capa *capa)
80 {
81         if (mdd_object_exists(obj) == 0) {
82                 CERROR("%s: object "DFID" not found: rc = -2\n",
83                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
84                 return -ENOENT;
85         }
86         return mdo_attr_get(env, obj, la, capa);
87 }
88
89 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
90 {
91         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
92
93         if (flags & LUSTRE_APPEND_FL)
94                 obj->mod_flags |= APPEND_OBJ;
95
96         if (flags & LUSTRE_IMMUTABLE_FL)
97                 obj->mod_flags |= IMMUTE_OBJ;
98 }
99
100 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
101 {
102         struct mdd_thread_info *info;
103
104         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
105         LASSERT(info != NULL);
106         return info;
107 }
108
109 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
110 {
111         struct lu_buf *buf;
112
113         buf = &mdd_env_info(env)->mti_buf;
114         buf->lb_buf = area;
115         buf->lb_len = len;
116         return buf;
117 }
118
119 void mdd_buf_put(struct lu_buf *buf)
120 {
121         if (buf == NULL || buf->lb_buf == NULL)
122                 return;
123         OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
124         buf->lb_buf = NULL;
125         buf->lb_len = 0;
126 }
127
128 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
129                                        const void *area, ssize_t len)
130 {
131         struct lu_buf *buf;
132
133         buf = &mdd_env_info(env)->mti_buf;
134         buf->lb_buf = (void *)area;
135         buf->lb_len = len;
136         return buf;
137 }
138
139 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
140 {
141         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
142
143         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
144                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
145                 buf->lb_buf = NULL;
146         }
147         if (buf->lb_buf == NULL) {
148                 buf->lb_len = len;
149                 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
150                 if (buf->lb_buf == NULL)
151                         buf->lb_len = 0;
152         }
153         return buf;
154 }
155
156 /** Increase the size of the \a mti_big_buf.
157  * preserves old data in buffer
158  * old buffer remains unchanged on error
159  * \retval 0 or -ENOMEM
160  */
161 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
162 {
163         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
164         struct lu_buf buf;
165
166         LASSERT(len >= oldbuf->lb_len);
167         OBD_ALLOC_LARGE(buf.lb_buf, len);
168
169         if (buf.lb_buf == NULL)
170                 return -ENOMEM;
171
172         buf.lb_len = len;
173         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
174
175         OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
176
177         memcpy(oldbuf, &buf, sizeof(buf));
178
179         return 0;
180 }
181
182 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
183                                        struct mdd_device *mdd)
184 {
185         struct mdd_thread_info *mti = mdd_env_info(env);
186         int                     max_cookie_size;
187
188         max_cookie_size = mdd_lov_cookiesize(env, mdd);
189         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
190                 if (mti->mti_max_cookie)
191                         OBD_FREE_LARGE(mti->mti_max_cookie,
192                                        mti->mti_max_cookie_size);
193                 mti->mti_max_cookie = NULL;
194                 mti->mti_max_cookie_size = 0;
195         }
196         if (unlikely(mti->mti_max_cookie == NULL)) {
197                 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
198                 if (likely(mti->mti_max_cookie != NULL))
199                         mti->mti_max_cookie_size = max_cookie_size;
200         }
201         if (likely(mti->mti_max_cookie != NULL))
202                 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
203         return mti->mti_max_cookie;
204 }
205
206 struct lov_mds_md *mdd_max_lmm_buffer(const struct lu_env *env, int size)
207 {
208         struct mdd_thread_info *mti = mdd_env_info(env);
209
210         if (unlikely(mti->mti_max_lmm_size < size)) {
211                 int rsize = size_roundup_power2(size);
212
213                 if (mti->mti_max_lmm_size > 0) {
214                         LASSERT(mti->mti_max_lmm);
215                         OBD_FREE_LARGE(mti->mti_max_lmm,
216                                        mti->mti_max_lmm_size);
217                         mti->mti_max_lmm = NULL;
218                         mti->mti_max_lmm_size = 0;
219                 }
220
221                 OBD_ALLOC_LARGE(mti->mti_max_lmm, rsize);
222                 if (likely(mti->mti_max_lmm != NULL))
223                         mti->mti_max_lmm_size = rsize;
224         }
225         return mti->mti_max_lmm;
226 }
227
228 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
229                                    struct mdd_device *mdd)
230 {
231         int max_lmm_size;
232
233         max_lmm_size = mdd_lov_mdsize(env, mdd);
234         return mdd_max_lmm_buffer(env, max_lmm_size);
235 }
236
237 struct lu_object *mdd_object_alloc(const struct lu_env *env,
238                                    const struct lu_object_header *hdr,
239                                    struct lu_device *d)
240 {
241         struct mdd_object *mdd_obj;
242
243         OBD_ALLOC_PTR(mdd_obj);
244         if (mdd_obj != NULL) {
245                 struct lu_object *o;
246
247                 o = mdd2lu_obj(mdd_obj);
248                 lu_object_init(o, NULL, d);
249                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
250                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
251                 mdd_obj->mod_count = 0;
252                 o->lo_ops = &mdd_lu_obj_ops;
253                 return o;
254         } else {
255                 return NULL;
256         }
257 }
258
259 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
260                            const struct lu_object_conf *unused)
261 {
262         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
263         struct mdd_object *mdd_obj = lu2mdd_obj(o);
264         struct lu_object  *below;
265         struct lu_device  *under;
266         ENTRY;
267
268         mdd_obj->mod_cltime = 0;
269         under = &d->mdd_child->dd_lu_dev;
270         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
271         mdd_pdlock_init(mdd_obj);
272         if (below == NULL)
273                 RETURN(-ENOMEM);
274
275         lu_object_add(o, below);
276
277         RETURN(0);
278 }
279
280 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
281 {
282         if (lu_object_exists(o))
283                 return mdd_get_flags(env, lu2mdd_obj(o));
284         else
285                 return 0;
286 }
287
288 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
289 {
290         struct mdd_object *mdd = lu2mdd_obj(o);
291
292         lu_object_fini(o);
293         OBD_FREE_PTR(mdd);
294 }
295
296 static int mdd_object_print(const struct lu_env *env, void *cookie,
297                             lu_printer_t p, const struct lu_object *o)
298 {
299         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
300         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
301                     "valid=%x, cltime="LPU64", flags=%lx)",
302                     mdd, mdd->mod_count, mdd->mod_valid,
303                     mdd->mod_cltime, mdd->mod_flags);
304 }
305
306 static const struct lu_object_operations mdd_lu_obj_ops = {
307         .loo_object_init    = mdd_object_init,
308         .loo_object_start   = mdd_object_start,
309         .loo_object_free    = mdd_object_free,
310         .loo_object_print   = mdd_object_print,
311 };
312
313 struct mdd_object *mdd_object_find(const struct lu_env *env,
314                                    struct mdd_device *d,
315                                    const struct lu_fid *f)
316 {
317         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
318 }
319
320 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
321                         const char *path, struct lu_fid *fid)
322 {
323         struct lu_buf *buf;
324         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
325         struct mdd_object *obj;
326         struct lu_name *lname = &mdd_env_info(env)->mti_name;
327         char *name;
328         int rc = 0;
329         ENTRY;
330
331         /* temp buffer for path element */
332         buf = mdd_buf_alloc(env, PATH_MAX);
333         if (buf->lb_buf == NULL)
334                 RETURN(-ENOMEM);
335
336         lname->ln_name = name = buf->lb_buf;
337         lname->ln_namelen = 0;
338         *f = mdd->mdd_root_fid;
339
340         while(1) {
341                 while (*path == '/')
342                         path++;
343                 if (*path == '\0')
344                         break;
345                 while (*path != '/' && *path != '\0') {
346                         *name = *path;
347                         path++;
348                         name++;
349                         lname->ln_namelen++;
350                 }
351
352                 *name = '\0';
353                 /* find obj corresponding to fid */
354                 obj = mdd_object_find(env, mdd, f);
355                 if (obj == NULL)
356                         GOTO(out, rc = -EREMOTE);
357                 if (IS_ERR(obj))
358                         GOTO(out, rc = PTR_ERR(obj));
359                 /* get child fid from parent and name */
360                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
361                 mdd_object_put(env, obj);
362                 if (rc)
363                         break;
364
365                 name = buf->lb_buf;
366                 lname->ln_namelen = 0;
367         }
368
369         if (!rc)
370                 *fid = *f;
371 out:
372         RETURN(rc);
373 }
374
375 /** The maximum depth that fid2path() will search.
376  * This is limited only because we want to store the fids for
377  * historical path lookup purposes.
378  */
379 #define MAX_PATH_DEPTH 100
380
381 /** mdd_path() lookup structure. */
382 struct path_lookup_info {
383         __u64                pli_recno;        /**< history point */
384         __u64                pli_currec;       /**< current record */
385         struct lu_fid        pli_fid;
386         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
387         struct mdd_object   *pli_mdd_obj;
388         char                *pli_path;         /**< full path */
389         int                  pli_pathlen;
390         int                  pli_linkno;       /**< which hardlink to follow */
391         int                  pli_fidcount;     /**< number of \a pli_fids */
392 };
393
394 static int mdd_path_current(const struct lu_env *env,
395                             struct path_lookup_info *pli)
396 {
397         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
398         struct mdd_object *mdd_obj;
399         struct lu_buf     *buf = NULL;
400         struct link_ea_header *leh;
401         struct link_ea_entry  *lee;
402         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
403         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
404         char *ptr;
405         int reclen;
406         int rc;
407         ENTRY;
408
409         ptr = pli->pli_path + pli->pli_pathlen - 1;
410         *ptr = 0;
411         --ptr;
412         pli->pli_fidcount = 0;
413         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
414
415         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
416                 mdd_obj = mdd_object_find(env, mdd,
417                                           &pli->pli_fids[pli->pli_fidcount]);
418                 if (mdd_obj == NULL)
419                         GOTO(out, rc = -EREMOTE);
420                 if (IS_ERR(mdd_obj))
421                         GOTO(out, rc = PTR_ERR(mdd_obj));
422                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
423                 if (rc <= 0) {
424                         mdd_object_put(env, mdd_obj);
425                         if (rc == -1)
426                                 rc = -EREMOTE;
427                         else if (rc == 0)
428                                 /* Do I need to error out here? */
429                                 rc = -ENOENT;
430                         GOTO(out, rc);
431                 }
432
433                 /* Get parent fid and object name */
434                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
435                 buf = mdd_links_get(env, mdd_obj);
436                 mdd_read_unlock(env, mdd_obj);
437                 mdd_object_put(env, mdd_obj);
438                 if (IS_ERR(buf))
439                         GOTO(out, rc = PTR_ERR(buf));
440
441                 leh = buf->lb_buf;
442                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
443                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
444
445                 /* If set, use link #linkno for path lookup, otherwise use
446                    link #0.  Only do this for the final path element. */
447                 if ((pli->pli_fidcount == 0) &&
448                     (pli->pli_linkno < leh->leh_reccount)) {
449                         int count;
450                         for (count = 0; count < pli->pli_linkno; count++) {
451                                 lee = (struct link_ea_entry *)
452                                      ((char *)lee + reclen);
453                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
454                         }
455                         if (pli->pli_linkno < leh->leh_reccount - 1)
456                                 /* indicate to user there are more links */
457                                 pli->pli_linkno++;
458                 }
459
460                 /* Pack the name in the end of the buffer */
461                 ptr -= tmpname->ln_namelen;
462                 if (ptr - 1 <= pli->pli_path)
463                         GOTO(out, rc = -EOVERFLOW);
464                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
465                 *(--ptr) = '/';
466
467                 /* Store the parent fid for historic lookup */
468                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
469                         GOTO(out, rc = -EOVERFLOW);
470                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
471         }
472
473         /* Verify that our path hasn't changed since we started the lookup.
474            Record the current index, and verify the path resolves to the
475            same fid. If it does, then the path is correct as of this index. */
476         cfs_spin_lock(&mdd->mdd_cl.mc_lock);
477         pli->pli_currec = mdd->mdd_cl.mc_index;
478         cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
479         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
480         if (rc) {
481                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
482                 GOTO (out, rc = -EAGAIN);
483         }
484         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
485                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
486                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
487                        PFID(&pli->pli_fid));
488                 GOTO(out, rc = -EAGAIN);
489         }
490         ptr++; /* skip leading / */
491         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
492
493         EXIT;
494 out:
495         if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
496                 /* if we vmalloced a large buffer drop it */
497                 mdd_buf_put(buf);
498
499         return rc;
500 }
501
502 static int mdd_path_historic(const struct lu_env *env,
503                              struct path_lookup_info *pli)
504 {
505         return 0;
506 }
507
508 /* Returns the full path to this fid, as of changelog record recno. */
509 static int mdd_path(const struct lu_env *env, struct md_object *obj,
510                     char *path, int pathlen, __u64 *recno, int *linkno)
511 {
512         struct path_lookup_info *pli;
513         int tries = 3;
514         int rc = -EAGAIN;
515         ENTRY;
516
517         if (pathlen < 3)
518                 RETURN(-EOVERFLOW);
519
520         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
521                 path[0] = '\0';
522                 RETURN(0);
523         }
524
525         OBD_ALLOC_PTR(pli);
526         if (pli == NULL)
527                 RETURN(-ENOMEM);
528
529         pli->pli_mdd_obj = md2mdd_obj(obj);
530         pli->pli_recno = *recno;
531         pli->pli_path = path;
532         pli->pli_pathlen = pathlen;
533         pli->pli_linkno = *linkno;
534
535         /* Retry multiple times in case file is being moved */
536         while (tries-- && rc == -EAGAIN)
537                 rc = mdd_path_current(env, pli);
538
539         /* For historical path lookup, the current links may not have existed
540          * at "recno" time.  We must switch over to earlier links/parents
541          * by using the changelog records.  If the earlier parent doesn't
542          * exist, we must search back through the changelog to reconstruct
543          * its parents, then check if it exists, etc.
544          * We may ignore this problem for the initial implementation and
545          * state that an "original" hardlink must still exist for us to find
546          * historic path name. */
547         if (pli->pli_recno != -1) {
548                 rc = mdd_path_historic(env, pli);
549         } else {
550                 *recno = pli->pli_currec;
551                 /* Return next link index to caller */
552                 *linkno = pli->pli_linkno;
553         }
554
555         OBD_FREE_PTR(pli);
556
557         RETURN (rc);
558 }
559
560 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
561 {
562         struct lu_attr *la = &mdd_env_info(env)->mti_la;
563         int rc;
564
565         ENTRY;
566         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
567         if (rc == 0) {
568                 mdd_flags_xlate(obj, la->la_flags);
569         }
570         RETURN(rc);
571 }
572
573 /* get only inode attributes */
574 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
575                   struct md_attr *ma)
576 {
577         int rc = 0;
578         ENTRY;
579
580         if (ma->ma_valid & MA_INODE)
581                 RETURN(0);
582
583         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
584                           mdd_object_capa(env, mdd_obj));
585         if (rc == 0)
586                 ma->ma_valid |= MA_INODE;
587         RETURN(rc);
588 }
589
590 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
591 {
592         struct lov_desc *ldesc;
593         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
594         struct lov_user_md *lum = (struct lov_user_md*)lmm;
595         ENTRY;
596
597         if (!lum)
598                 RETURN(0);
599
600         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
601         LASSERT(ldesc != NULL);
602
603         lum->lmm_magic = LOV_MAGIC_V1;
604         lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
605         lum->lmm_pattern = ldesc->ld_pattern;
606         lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
607         lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
608         lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
609
610         RETURN(sizeof(*lum));
611 }
612
613 static int is_rootdir(struct mdd_object *mdd_obj)
614 {
615         const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
616         const struct lu_fid *fid = mdo2fid(mdd_obj);
617
618         return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
619 }
620
621 int mdd_big_lmm_get(const struct lu_env *env, struct mdd_object *obj,
622                     struct md_attr *ma)
623 {
624         struct mdd_thread_info *info = mdd_env_info(env);
625         int                     size;
626         int                     rc   = -EINVAL;
627         ENTRY;
628
629         LASSERT(info != NULL);
630         LASSERT(ma->ma_big_lmm_used == 0);
631
632         if (ma->ma_lmm_size == 0) {
633                 CERROR("No buffer to hold %s xattr of object "DFID"\n",
634                        XATTR_NAME_LOV, PFID(mdd_object_fid(obj)));
635                 RETURN(rc);
636         }
637
638         rc = mdo_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
639                            mdd_object_capa(env, obj));
640         if (rc < 0)
641                 RETURN(rc);
642
643         /* big_lmm may need to grow */
644         size = rc;
645         mdd_max_lmm_buffer(env, size);
646         if (info->mti_max_lmm == NULL)
647                 RETURN(-ENOMEM);
648
649         LASSERT(info->mti_max_lmm_size >= size);
650         rc = mdd_get_md(env, obj, info->mti_max_lmm, &size,
651                         XATTR_NAME_LOV);
652         if (rc < 0)
653                 RETURN(rc);
654
655         ma->ma_big_lmm_used = 1;
656         ma->ma_valid |= MA_LOV;
657         ma->ma_lmm = info->mti_max_lmm;
658         ma->ma_lmm_size = size;
659         LASSERT(size == rc);
660         RETURN(rc);
661 }
662
663 /* get lov EA only */
664 static int __mdd_lmm_get(const struct lu_env *env,
665                          struct mdd_object *mdd_obj, struct md_attr *ma)
666 {
667         int rc;
668         ENTRY;
669
670         if (ma->ma_valid & MA_LOV)
671                 RETURN(0);
672
673         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
674                         XATTR_NAME_LOV);
675         if (rc == -ERANGE)
676                 rc = mdd_big_lmm_get(env, mdd_obj, ma);
677         else if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
678                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
679
680         if (rc > 0) {
681                 ma->ma_lmm_size = rc;
682                 ma->ma_layout_gen = ma->ma_lmm->lmm_layout_gen;
683                 ma->ma_valid |= MA_LOV | MA_LAY_GEN;
684                 rc = 0;
685         }
686         RETURN(rc);
687 }
688
689 /* get the first parent fid from link EA */
690 static int mdd_pfid_get(const struct lu_env *env,
691                         struct mdd_object *mdd_obj, struct md_attr *ma)
692 {
693         struct lu_buf *buf;
694         struct link_ea_header *leh;
695         struct link_ea_entry *lee;
696         struct lu_fid *pfid = &ma->ma_pfid;
697         ENTRY;
698
699         if (ma->ma_valid & MA_PFID)
700                 RETURN(0);
701
702         buf = mdd_links_get(env, mdd_obj);
703         if (IS_ERR(buf))
704                 RETURN(PTR_ERR(buf));
705
706         leh = buf->lb_buf;
707         lee = (struct link_ea_entry *)(leh + 1);
708         memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
709         fid_be_to_cpu(pfid, pfid);
710         ma->ma_valid |= MA_PFID;
711         if (buf->lb_len > OBD_ALLOC_BIG)
712                 /* if we vmalloced a large buffer drop it */
713                 mdd_buf_put(buf);
714         RETURN(0);
715 }
716
717 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
718                        struct md_attr *ma)
719 {
720         int rc;
721         ENTRY;
722
723         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
724         rc = __mdd_lmm_get(env, mdd_obj, ma);
725         mdd_read_unlock(env, mdd_obj);
726         RETURN(rc);
727 }
728
729 /* get lmv EA only*/
730 static int __mdd_lmv_get(const struct lu_env *env,
731                          struct mdd_object *mdd_obj, struct md_attr *ma)
732 {
733         int rc;
734         ENTRY;
735
736         if (ma->ma_valid & MA_LMV)
737                 RETURN(0);
738
739         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
740                         XATTR_NAME_LMV);
741         if (rc > 0) {
742                 ma->ma_valid |= MA_LMV;
743                 rc = 0;
744         }
745         RETURN(rc);
746 }
747
748 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
749                          struct md_attr *ma)
750 {
751         struct mdd_thread_info *info = mdd_env_info(env);
752         struct lustre_mdt_attrs *lma =
753                                  (struct lustre_mdt_attrs *)info->mti_xattr_buf;
754         int lma_size;
755         int rc;
756         ENTRY;
757
758         /* If all needed data are already valid, nothing to do */
759         if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
760             (ma->ma_need & (MA_HSM | MA_SOM)))
761                 RETURN(0);
762
763         /* Read LMA from disk EA */
764         lma_size = sizeof(info->mti_xattr_buf);
765         rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
766         if (rc <= 0)
767                 RETURN(rc);
768
769         /* Useless to check LMA incompatibility because this is already done in
770          * osd_ea_fid_get(), and this will fail long before this code is
771          * called.
772          * So, if we are here, LMA is compatible.
773          */
774
775         lustre_lma_swab(lma);
776
777         /* Swab and copy LMA */
778         if (ma->ma_need & MA_HSM) {
779                 if (lma->lma_compat & LMAC_HSM)
780                         ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
781                 else
782                         ma->ma_hsm.mh_flags = 0;
783                 ma->ma_valid |= MA_HSM;
784         }
785
786         /* Copy SOM */
787         if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
788                 LASSERT(ma->ma_som != NULL);
789                 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
790                 ma->ma_som->msd_size    = lma->lma_som_size;
791                 ma->ma_som->msd_blocks  = lma->lma_som_blocks;
792                 ma->ma_som->msd_mountid = lma->lma_som_mountid;
793                 ma->ma_valid |= MA_SOM;
794         }
795
796         RETURN(0);
797 }
798
799 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
800                                  struct md_attr *ma)
801 {
802         int rc = 0;
803         ENTRY;
804
805         if (ma->ma_need & MA_INODE)
806                 rc = mdd_iattr_get(env, mdd_obj, ma);
807
808         if (rc == 0 && ma->ma_need & MA_LOV) {
809                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
810                     S_ISDIR(mdd_object_type(mdd_obj)))
811                         rc = __mdd_lmm_get(env, mdd_obj, ma);
812         }
813         if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
814                 if (S_ISREG(mdd_object_type(mdd_obj)))
815                         rc = mdd_pfid_get(env, mdd_obj, ma);
816         }
817         if (rc == 0 && ma->ma_need & MA_LMV) {
818                 if (S_ISDIR(mdd_object_type(mdd_obj)))
819                         rc = __mdd_lmv_get(env, mdd_obj, ma);
820         }
821         if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
822                 if (S_ISREG(mdd_object_type(mdd_obj)))
823                         rc = __mdd_lma_get(env, mdd_obj, ma);
824         }
825 #ifdef CONFIG_FS_POSIX_ACL
826         if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
827                 if (S_ISDIR(mdd_object_type(mdd_obj)))
828                         rc = mdd_def_acl_get(env, mdd_obj, ma);
829         }
830 #endif
831         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
832                rc, ma->ma_valid, ma->ma_lmm);
833         RETURN(rc);
834 }
835
836 int mdd_attr_get_internal_locked(const struct lu_env *env,
837                                  struct mdd_object *mdd_obj, struct md_attr *ma)
838 {
839         int rc;
840         int needlock = ma->ma_need &
841                        (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
842
843         if (needlock)
844                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
845         rc = mdd_attr_get_internal(env, mdd_obj, ma);
846         if (needlock)
847                 mdd_read_unlock(env, mdd_obj);
848         return rc;
849 }
850
851 /*
852  * No permission check is needed.
853  */
854 int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
855                  struct md_attr *ma)
856 {
857         struct mdd_object *mdd_obj = md2mdd_obj(obj);
858         int                rc;
859
860         ENTRY;
861         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
862         RETURN(rc);
863 }
864
865 /*
866  * No permission check is needed.
867  */
868 static int mdd_xattr_get(const struct lu_env *env,
869                          struct md_object *obj, struct lu_buf *buf,
870                          const char *name)
871 {
872         struct mdd_object *mdd_obj = md2mdd_obj(obj);
873         int rc;
874
875         ENTRY;
876
877         if (mdd_object_exists(mdd_obj) == 0) {
878                 CERROR("%s: object "DFID" not found: rc = -2\n",
879                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
880                 return -ENOENT;
881         }
882
883         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
884         rc = mdo_xattr_get(env, mdd_obj, buf, name,
885                            mdd_object_capa(env, mdd_obj));
886         mdd_read_unlock(env, mdd_obj);
887
888         RETURN(rc);
889 }
890
891 /*
892  * Permission check is done when open,
893  * no need check again.
894  */
895 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
896                         struct lu_buf *buf)
897 {
898         struct mdd_object *mdd_obj = md2mdd_obj(obj);
899         struct dt_object  *next;
900         loff_t             pos = 0;
901         int                rc;
902         ENTRY;
903
904         if (mdd_object_exists(mdd_obj) == 0) {
905                 CERROR("%s: object "DFID" not found: rc = -2\n",
906                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
907                 return -ENOENT;
908         }
909
910         next = mdd_object_child(mdd_obj);
911         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
912         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
913                                          mdd_object_capa(env, mdd_obj));
914         mdd_read_unlock(env, mdd_obj);
915         RETURN(rc);
916 }
917
918 /*
919  * No permission check is needed.
920  */
921 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
922                           struct lu_buf *buf)
923 {
924         struct mdd_object *mdd_obj = md2mdd_obj(obj);
925         int rc;
926
927         ENTRY;
928
929         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
930         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
931         mdd_read_unlock(env, mdd_obj);
932
933         RETURN(rc);
934 }
935
936 int mdd_declare_object_create_internal(const struct lu_env *env,
937                                        struct mdd_object *p,
938                                        struct mdd_object *c,
939                                        struct md_attr *ma,
940                                        struct thandle *handle,
941                                        const struct md_op_spec *spec)
942 {
943         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
944         const struct dt_index_features *feat = spec->sp_feat;
945         int rc;
946         ENTRY;
947
948         if (feat != &dt_directory_features && feat != NULL)
949                 dof->dof_type = DFT_INDEX;
950         else
951                 dof->dof_type = dt_mode_to_dft(ma->ma_attr.la_mode);
952
953         dof->u.dof_idx.di_feat = feat;
954
955         rc = mdo_declare_create_obj(env, c, &ma->ma_attr, NULL, dof, handle);
956
957         RETURN(rc);
958 }
959
960 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
961                                struct mdd_object *c, struct md_attr *ma,
962                                struct thandle *handle,
963                                const struct md_op_spec *spec)
964 {
965         struct lu_attr *attr = &ma->ma_attr;
966         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
967         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
968         const struct dt_index_features *feat = spec->sp_feat;
969         int rc;
970         ENTRY;
971
972         if (!mdd_object_exists(c)) {
973                 struct dt_object *next = mdd_object_child(c);
974                 LASSERT(next);
975
976                 if (feat != &dt_directory_features && feat != NULL)
977                         dof->dof_type = DFT_INDEX;
978                 else
979                         dof->dof_type = dt_mode_to_dft(attr->la_mode);
980
981                 dof->u.dof_idx.di_feat = feat;
982
983                 /* @hint will be initialized by underlying device. */
984                 next->do_ops->do_ah_init(env, hint,
985                                          p ? mdd_object_child(p) : NULL,
986                                          attr->la_mode & S_IFMT);
987
988                 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
989                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
990         } else
991                 rc = -EEXIST;
992
993         RETURN(rc);
994 }
995
996 /**
997  * Make sure the ctime is increased only.
998  */
999 static inline int mdd_attr_check(const struct lu_env *env,
1000                                  struct mdd_object *obj,
1001                                  struct lu_attr *attr)
1002 {
1003         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1004         int rc;
1005         ENTRY;
1006
1007         if (attr->la_valid & LA_CTIME) {
1008                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1009                 if (rc)
1010                         RETURN(rc);
1011
1012                 if (attr->la_ctime < tmp_la->la_ctime)
1013                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
1014                 else if (attr->la_valid == LA_CTIME &&
1015                          attr->la_ctime == tmp_la->la_ctime)
1016                         attr->la_valid &= ~LA_CTIME;
1017         }
1018         RETURN(0);
1019 }
1020
1021 int mdd_attr_set_internal(const struct lu_env *env,
1022                           struct mdd_object *obj,
1023                           struct lu_attr *attr,
1024                           struct thandle *handle,
1025                           int needacl)
1026 {
1027         int rc;
1028         ENTRY;
1029
1030         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
1031 #ifdef CONFIG_FS_POSIX_ACL
1032         if (!rc && (attr->la_valid & LA_MODE) && needacl)
1033                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
1034 #endif
1035         RETURN(rc);
1036 }
1037
1038 int mdd_attr_check_set_internal(const struct lu_env *env,
1039                                 struct mdd_object *obj,
1040                                 struct lu_attr *attr,
1041                                 struct thandle *handle,
1042                                 int needacl)
1043 {
1044         int rc;
1045         ENTRY;
1046
1047         rc = mdd_attr_check(env, obj, attr);
1048         if (rc)
1049                 RETURN(rc);
1050
1051         if (attr->la_valid)
1052                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1053         RETURN(rc);
1054 }
1055
1056 static int mdd_attr_set_internal_locked(const struct lu_env *env,
1057                                         struct mdd_object *obj,
1058                                         struct lu_attr *attr,
1059                                         struct thandle *handle,
1060                                         int needacl)
1061 {
1062         int rc;
1063         ENTRY;
1064
1065         needacl = needacl && (attr->la_valid & LA_MODE);
1066         if (needacl)
1067                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1068         rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1069         if (needacl)
1070                 mdd_write_unlock(env, obj);
1071         RETURN(rc);
1072 }
1073
1074 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
1075                                        struct mdd_object *obj,
1076                                        struct lu_attr *attr,
1077                                        struct thandle *handle,
1078                                        int needacl)
1079 {
1080         int rc;
1081         ENTRY;
1082
1083         needacl = needacl && (attr->la_valid & LA_MODE);
1084         if (needacl)
1085                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1086         rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
1087         if (needacl)
1088                 mdd_write_unlock(env, obj);
1089         RETURN(rc);
1090 }
1091
1092 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
1093                     const struct lu_buf *buf, const char *name,
1094                     int fl, struct thandle *handle)
1095 {
1096         struct lustre_capa *capa = mdd_object_capa(env, obj);
1097         int rc = -EINVAL;
1098         ENTRY;
1099
1100         if (buf->lb_buf && buf->lb_len > 0)
1101                 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1102         else if (buf->lb_buf == NULL && buf->lb_len == 0)
1103                 rc = mdo_xattr_del(env, obj, name, handle, capa);
1104
1105         RETURN(rc);
1106 }
1107
1108 /*
1109  * This gives the same functionality as the code between
1110  * sys_chmod and inode_setattr
1111  * chown_common and inode_setattr
1112  * utimes and inode_setattr
1113  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1114  */
1115 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1116                         struct lu_attr *la, const struct md_attr *ma)
1117 {
1118         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
1119         struct md_ucred  *uc;
1120         int               rc;
1121         ENTRY;
1122
1123         if (!la->la_valid)
1124                 RETURN(0);
1125
1126         /* Do not permit change file type */
1127         if (la->la_valid & LA_TYPE)
1128                 RETURN(-EPERM);
1129
1130         /* They should not be processed by setattr */
1131         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1132                 RETURN(-EPERM);
1133
1134         /* export destroy does not have ->le_ses, but we may want
1135          * to drop LUSTRE_SOM_FL. */
1136         if (!env->le_ses)
1137                 RETURN(0);
1138
1139         uc = md_ucred(env);
1140
1141         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1142         if (rc)
1143                 RETURN(rc);
1144
1145         if (la->la_valid == LA_CTIME) {
1146                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1147                         /* This is only for set ctime when rename's source is
1148                          * on remote MDS. */
1149                         rc = mdd_may_delete(env, NULL, obj,
1150                                             (struct md_attr *)ma, 1, 0);
1151                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1152                         la->la_valid &= ~LA_CTIME;
1153                 RETURN(rc);
1154         }
1155
1156         if (la->la_valid == LA_ATIME) {
1157                 /* This is atime only set for read atime update on close. */
1158                 if (la->la_atime >= tmp_la->la_atime &&
1159                     la->la_atime < (tmp_la->la_atime +
1160                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1161                         la->la_valid &= ~LA_ATIME;
1162                 RETURN(0);
1163         }
1164
1165         /* Check if flags change. */
1166         if (la->la_valid & LA_FLAGS) {
1167                 unsigned int oldflags = 0;
1168                 unsigned int newflags = la->la_flags &
1169                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1170
1171                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1172                     !mdd_capable(uc, CFS_CAP_FOWNER))
1173                         RETURN(-EPERM);
1174
1175                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1176                  * only be changed by the relevant capability. */
1177                 if (mdd_is_immutable(obj))
1178                         oldflags |= LUSTRE_IMMUTABLE_FL;
1179                 if (mdd_is_append(obj))
1180                         oldflags |= LUSTRE_APPEND_FL;
1181                 if ((oldflags ^ newflags) &&
1182                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1183                         RETURN(-EPERM);
1184
1185                 if (!S_ISDIR(tmp_la->la_mode))
1186                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1187         }
1188
1189         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1190             (la->la_valid & ~LA_FLAGS) &&
1191             !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1192                 RETURN(-EPERM);
1193
1194         /* Check for setting the obj time. */
1195         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1196             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1197                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1198                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
1199                         rc = mdd_permission_internal_locked(env, obj, tmp_la,
1200                                                             MAY_WRITE,
1201                                                             MOR_TGT_CHILD);
1202                         if (rc)
1203                                 RETURN(rc);
1204                 }
1205         }
1206
1207         if (la->la_valid & LA_KILL_SUID) {
1208                 la->la_valid &= ~LA_KILL_SUID;
1209                 if ((tmp_la->la_mode & S_ISUID) &&
1210                     !(la->la_valid & LA_MODE)) {
1211                         la->la_mode = tmp_la->la_mode;
1212                         la->la_valid |= LA_MODE;
1213                 }
1214                 la->la_mode &= ~S_ISUID;
1215         }
1216
1217         if (la->la_valid & LA_KILL_SGID) {
1218                 la->la_valid &= ~LA_KILL_SGID;
1219                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1220                                         (S_ISGID | S_IXGRP)) &&
1221                     !(la->la_valid & LA_MODE)) {
1222                         la->la_mode = tmp_la->la_mode;
1223                         la->la_valid |= LA_MODE;
1224                 }
1225                 la->la_mode &= ~S_ISGID;
1226         }
1227
1228         /* Make sure a caller can chmod. */
1229         if (la->la_valid & LA_MODE) {
1230                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1231                     (uc->mu_fsuid != tmp_la->la_uid) &&
1232                     !mdd_capable(uc, CFS_CAP_FOWNER))
1233                         RETURN(-EPERM);
1234
1235                 if (la->la_mode == (cfs_umode_t) -1)
1236                         la->la_mode = tmp_la->la_mode;
1237                 else
1238                         la->la_mode = (la->la_mode & S_IALLUGO) |
1239                                       (tmp_la->la_mode & ~S_IALLUGO);
1240
1241                 /* Also check the setgid bit! */
1242                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1243                                        la->la_gid : tmp_la->la_gid) &&
1244                     !mdd_capable(uc, CFS_CAP_FSETID))
1245                         la->la_mode &= ~S_ISGID;
1246         } else {
1247                la->la_mode = tmp_la->la_mode;
1248         }
1249
1250         /* Make sure a caller can chown. */
1251         if (la->la_valid & LA_UID) {
1252                 if (la->la_uid == (uid_t) -1)
1253                         la->la_uid = tmp_la->la_uid;
1254                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1255                     (la->la_uid != tmp_la->la_uid)) &&
1256                     !mdd_capable(uc, CFS_CAP_CHOWN))
1257                         RETURN(-EPERM);
1258
1259                 /* If the user or group of a non-directory has been
1260                  * changed by a non-root user, remove the setuid bit.
1261                  * 19981026 David C Niemi <niemi@tux.org>
1262                  *
1263                  * Changed this to apply to all users, including root,
1264                  * to avoid some races. This is the behavior we had in
1265                  * 2.0. The check for non-root was definitely wrong
1266                  * for 2.2 anyway, as it should have been using
1267                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
1268                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1269                     !S_ISDIR(tmp_la->la_mode)) {
1270                         la->la_mode &= ~S_ISUID;
1271                         la->la_valid |= LA_MODE;
1272                 }
1273         }
1274
1275         /* Make sure caller can chgrp. */
1276         if (la->la_valid & LA_GID) {
1277                 if (la->la_gid == (gid_t) -1)
1278                         la->la_gid = tmp_la->la_gid;
1279                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1280                     ((la->la_gid != tmp_la->la_gid) &&
1281                     !lustre_in_group_p(uc, la->la_gid))) &&
1282                     !mdd_capable(uc, CFS_CAP_CHOWN))
1283                         RETURN(-EPERM);
1284
1285                 /* Likewise, if the user or group of a non-directory
1286                  * has been changed by a non-root user, remove the
1287                  * setgid bit UNLESS there is no group execute bit
1288                  * (this would be a file marked for mandatory
1289                  * locking).  19981026 David C Niemi <niemi@tux.org>
1290                  *
1291                  * Removed the fsuid check (see the comment above) --
1292                  * 19990830 SD. */
1293                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1294                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1295                         la->la_mode &= ~S_ISGID;
1296                         la->la_valid |= LA_MODE;
1297                 }
1298         }
1299
1300         /* For both Size-on-MDS case and truncate case,
1301          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1302          * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1303          * For SOM case, it is true, the MAY_WRITE perm has been checked
1304          * when open, no need check again. For truncate case, it is false,
1305          * the MAY_WRITE perm should be checked here. */
1306         if (ma->ma_attr_flags & MDS_SOM) {
1307                 /* For the "Size-on-MDS" setattr update, merge coming
1308                  * attributes with the set in the inode. BUG 10641 */
1309                 if ((la->la_valid & LA_ATIME) &&
1310                     (la->la_atime <= tmp_la->la_atime))
1311                         la->la_valid &= ~LA_ATIME;
1312
1313                 /* OST attributes do not have a priority over MDS attributes,
1314                  * so drop times if ctime is equal. */
1315                 if ((la->la_valid & LA_CTIME) &&
1316                     (la->la_ctime <= tmp_la->la_ctime))
1317                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1318         } else {
1319                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1320                         if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1321                               (uc->mu_fsuid == tmp_la->la_uid)) &&
1322                             !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1323                                 rc = mdd_permission_internal_locked(env, obj,
1324                                                             tmp_la, MAY_WRITE,
1325                                                             MOR_TGT_CHILD);
1326                                 if (rc)
1327                                         RETURN(rc);
1328                         }
1329                 }
1330                 if (la->la_valid & LA_CTIME) {
1331                         /* The pure setattr, it has the priority over what is
1332                          * already set, do not drop it if ctime is equal. */
1333                         if (la->la_ctime < tmp_la->la_ctime)
1334                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1335                                                   LA_CTIME);
1336                 }
1337         }
1338
1339         RETURN(0);
1340 }
1341
1342 /** Store a data change changelog record
1343  * If this fails, we must fail the whole transaction; we don't
1344  * want the change to commit without the log entry.
1345  * \param mdd_obj - mdd_object of change
1346  * \param handle - transacion handle
1347  */
1348 static int mdd_changelog_data_store(const struct lu_env     *env,
1349                                     struct mdd_device       *mdd,
1350                                     enum changelog_rec_type type,
1351                                     int                     flags,
1352                                     struct mdd_object       *mdd_obj,
1353                                     struct thandle          *handle)
1354 {
1355         const struct lu_fid *tfid = mdo2fid(mdd_obj);
1356         struct llog_changelog_rec *rec;
1357         struct thandle *th = NULL;
1358         struct lu_buf *buf;
1359         int reclen;
1360         int rc;
1361
1362         /* Not recording */
1363         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1364                 RETURN(0);
1365         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1366                 RETURN(0);
1367
1368         LASSERT(mdd_obj != NULL);
1369         LASSERT(handle != NULL);
1370
1371         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1372             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1373                 /* Don't need multiple updates in this log */
1374                 /* Don't check under lock - no big deal if we get an extra
1375                    entry */
1376                 RETURN(0);
1377         }
1378
1379         reclen = llog_data_len(sizeof(*rec));
1380         buf = mdd_buf_alloc(env, reclen);
1381         if (buf->lb_buf == NULL)
1382                 RETURN(-ENOMEM);
1383         rec = (struct llog_changelog_rec *)buf->lb_buf;
1384
1385         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1386         rec->cr.cr_type = (__u32)type;
1387         rec->cr.cr_tfid = *tfid;
1388         rec->cr.cr_namelen = 0;
1389         mdd_obj->mod_cltime = cfs_time_current_64();
1390
1391         rc = mdd_changelog_llog_write(mdd, rec, handle ? : th);
1392
1393         if (th)
1394                 mdd_trans_stop(env, mdd, rc, th);
1395
1396         if (rc < 0) {
1397                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1398                        rc, type, PFID(tfid));
1399                 return -EFAULT;
1400         }
1401
1402         return 0;
1403 }
1404
1405 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1406                   int flags, struct md_object *obj)
1407 {
1408         struct thandle *handle;
1409         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1410         struct mdd_device *mdd = mdo2mdd(obj);
1411         int rc;
1412         ENTRY;
1413
1414         handle = mdd_trans_create(env, mdd);
1415         if (IS_ERR(handle))
1416                 return(PTR_ERR(handle));
1417
1418         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1419         if (rc)
1420                 GOTO(stop, rc);
1421
1422         rc = mdd_trans_start(env, mdd, handle);
1423         if (rc)
1424                 GOTO(stop, rc);
1425
1426         rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1427                                       handle);
1428
1429 stop:
1430         mdd_trans_stop(env, mdd, rc, handle);
1431
1432         RETURN(rc);
1433 }
1434
1435 /**
1436  * Should be called with write lock held.
1437  *
1438  * \see mdd_lma_set_locked().
1439  */
1440 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1441                        const struct md_attr *ma, struct thandle *handle)
1442 {
1443         struct mdd_thread_info *info = mdd_env_info(env);
1444         struct lu_buf *buf;
1445         struct lustre_mdt_attrs *lma =
1446                                 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1447         int lmasize = sizeof(struct lustre_mdt_attrs);
1448         int rc = 0;
1449
1450         ENTRY;
1451
1452         /* Either HSM or SOM part is not valid, we need to read it before */
1453         if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1454                 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1455                 if (rc <= 0)
1456                         RETURN(rc);
1457
1458                 lustre_lma_swab(lma);
1459         } else {
1460                 memset(lma, 0, lmasize);
1461         }
1462
1463         /* Copy HSM data */
1464         if (ma->ma_valid & MA_HSM) {
1465                 lma->lma_flags  |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1466                 lma->lma_compat |= LMAC_HSM;
1467         }
1468
1469         /* Copy SOM data */
1470         if (ma->ma_valid & MA_SOM) {
1471                 LASSERT(ma->ma_som != NULL);
1472                 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1473                         lma->lma_compat     &= ~LMAC_SOM;
1474                 } else {
1475                         lma->lma_compat     |= LMAC_SOM;
1476                         lma->lma_ioepoch     = ma->ma_som->msd_ioepoch;
1477                         lma->lma_som_size    = ma->ma_som->msd_size;
1478                         lma->lma_som_blocks  = ma->ma_som->msd_blocks;
1479                         lma->lma_som_mountid = ma->ma_som->msd_mountid;
1480                 }
1481         }
1482
1483         /* Copy FID */
1484         memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1485
1486         lustre_lma_swab(lma);
1487         buf = mdd_buf_get(env, lma, lmasize);
1488         rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1489
1490         RETURN(rc);
1491 }
1492
1493 /**
1494  * Save LMA extended attributes with data from \a ma.
1495  *
1496  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1497  * not, LMA EA will be first read from disk, modified and write back.
1498  *
1499  */
1500 static int mdd_lma_set_locked(const struct lu_env *env,
1501                               struct mdd_object *mdd_obj,
1502                               const struct md_attr *ma, struct thandle *handle)
1503 {
1504         int rc;
1505
1506         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1507         rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1508         mdd_write_unlock(env, mdd_obj);
1509         return rc;
1510 }
1511
1512 /* Precedence for choosing record type when multiple
1513  * attributes change: setattr > mtime > ctime > atime
1514  * (ctime changes when mtime does, plus chmod/chown.
1515  * atime and ctime are independent.) */
1516 static int mdd_attr_set_changelog(const struct lu_env *env,
1517                                   struct md_object *obj, struct thandle *handle,
1518                                   __u64 valid)
1519 {
1520         struct mdd_device *mdd = mdo2mdd(obj);
1521         int bits, type = 0;
1522
1523         bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1524         bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1525         bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1526         bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1527         bits = bits & mdd->mdd_cl.mc_mask;
1528         if (bits == 0)
1529                 return 0;
1530
1531         /* The record type is the lowest non-masked set bit */
1532         while (bits && ((bits & 1) == 0)) {
1533                 bits = bits >> 1;
1534                 type++;
1535         }
1536
1537         /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1538         return mdd_changelog_data_store(env, mdd, type, (int)valid,
1539                                         md2mdd_obj(obj), handle);
1540 }
1541
1542 static int mdd_declare_attr_set(const struct lu_env *env,
1543                                 struct mdd_device *mdd,
1544                                 struct mdd_object *obj,
1545                                 const struct md_attr *ma,
1546                                 struct lov_mds_md *lmm,
1547                                 struct thandle *handle)
1548 {
1549         struct lu_buf  *buf = &mdd_env_info(env)->mti_buf;
1550         int             rc, i;
1551
1552         rc = mdo_declare_attr_set(env, obj, &ma->ma_attr, handle);
1553         if (rc)
1554                 return rc;
1555
1556         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1557         if (rc)
1558                 return rc;
1559
1560         if (ma->ma_valid & MA_LOV) {
1561                 buf->lb_buf = NULL;
1562                 buf->lb_len = ma->ma_lmm_size;
1563                 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
1564                                            0, handle);
1565                 if (rc)
1566                         return rc;
1567         }
1568
1569         if (ma->ma_valid & (MA_HSM | MA_SOM)) {
1570                 buf->lb_buf = NULL;
1571                 buf->lb_len = sizeof(struct lustre_mdt_attrs);
1572                 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA,
1573                                            0, handle);
1574                 if (rc)
1575                         return rc;
1576         }
1577
1578 #ifdef CONFIG_FS_POSIX_ACL
1579         if (ma->ma_attr.la_valid & LA_MODE) {
1580                 mdd_read_lock(env, obj, MOR_TGT_CHILD);
1581                 rc = mdo_xattr_get(env, obj, buf, XATTR_NAME_ACL_ACCESS,
1582                                    BYPASS_CAPA);
1583                 mdd_read_unlock(env, obj);
1584                 if (rc == -EOPNOTSUPP || rc == -ENODATA)
1585                         rc = 0;
1586                 else if (rc < 0)
1587                         return rc;
1588
1589                 if (rc != 0) {
1590                         buf->lb_buf = NULL;
1591                         buf->lb_len = rc;
1592                         rc = mdo_declare_xattr_set(env, obj, buf,
1593                                                    XATTR_NAME_ACL_ACCESS, 0,
1594                                                    handle);
1595                         if (rc)
1596                                 return rc;
1597                 }
1598         }
1599 #endif
1600
1601         /* basically the log is the same as in unlink case */
1602         if (lmm) {
1603                 __u16 stripe;
1604
1605                 if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V1 &&
1606                                 le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V3) {
1607                         CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n",
1608                                mdd->mdd_obd_dev->obd_name,
1609                                le32_to_cpu(lmm->lmm_magic),
1610                                PFID(lu_object_fid(&obj->mod_obj.mo_lu)));
1611                         return -EINVAL;
1612                 }
1613
1614                 stripe = le16_to_cpu(lmm->lmm_stripe_count);
1615                 if (stripe == LOV_ALL_STRIPES) {
1616                         struct lov_desc *ldesc;
1617
1618                         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
1619                         LASSERT(ldesc != NULL);
1620                         stripe = ldesc->ld_tgt_count;
1621                 }
1622
1623                 for (i = 0; i < stripe; i++) {
1624                         rc = mdd_declare_llog_record(env, mdd,
1625                                         sizeof(struct llog_unlink_rec),
1626                                         handle);
1627                         if (rc)
1628                                 return rc;
1629                 }
1630         }
1631
1632         return rc;
1633 }
1634
1635 /* set attr and LOV EA at once, return updated attr */
1636 int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1637                  const struct md_attr *ma)
1638 {
1639         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1640         struct mdd_device *mdd = mdo2mdd(obj);
1641         struct thandle *handle;
1642         struct lov_mds_md *lmm = NULL;
1643         struct llog_cookie *logcookies = NULL;
1644         int  rc, lmm_size = 0, cookie_size = 0;
1645         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1646 #ifdef HAVE_QUOTA_SUPPORT
1647         struct obd_device *obd = mdd->mdd_obd_dev;
1648         struct mds_obd *mds = &obd->u.mds;
1649         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1650         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1651         int quota_opc = 0, block_count = 0;
1652         int inode_pending[MAXQUOTAS] = { 0, 0 };
1653         int block_pending[MAXQUOTAS] = { 0, 0 };
1654 #endif
1655         ENTRY;
1656
1657         *la_copy = ma->ma_attr;
1658         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1659         if (rc != 0)
1660                 RETURN(rc);
1661
1662         /* setattr on "close" only change atime, or do nothing */
1663         if (ma->ma_valid == MA_INODE &&
1664             ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1665                 RETURN(0);
1666
1667         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1668             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1669                 lmm_size = mdd_lov_mdsize(env, mdd);
1670                 lmm = mdd_max_lmm_get(env, mdd);
1671                 if (lmm == NULL)
1672                         RETURN(-ENOMEM);
1673
1674                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1675                                 XATTR_NAME_LOV);
1676
1677                 if (rc < 0)
1678                         RETURN(rc);
1679         }
1680
1681         handle = mdd_trans_create(env, mdd);
1682         if (IS_ERR(handle))
1683                 RETURN(PTR_ERR(handle));
1684
1685         rc = mdd_declare_attr_set(env, mdd, mdd_obj, ma,
1686                                   lmm_size > 0 ? lmm : NULL, handle);
1687         if (rc)
1688                 GOTO(stop, rc);
1689
1690         rc = mdd_trans_start(env, mdd, handle);
1691         if (rc)
1692                 GOTO(stop, rc);
1693
1694         /* permission changes may require sync operation */
1695         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1696                 handle->th_sync |= !!mdd->mdd_sync_permission;
1697
1698         if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1699                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1700                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1701
1702 #ifdef HAVE_QUOTA_SUPPORT
1703         if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1704                 struct obd_export *exp = md_quota(env)->mq_exp;
1705                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1706
1707                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1708                 if (!rc) {
1709                         quota_opc = FSFILT_OP_SETATTR;
1710                         mdd_quota_wrapper(la_copy, qnids);
1711                         mdd_quota_wrapper(la_tmp, qoids);
1712                         /* get file quota for new owner */
1713                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1714                                         qnids, inode_pending, 1, NULL, 0,
1715                                         NULL, 0);
1716                         block_count = (la_tmp->la_blocks + 7) >> 3;
1717                         if (block_count) {
1718                                 void *data = NULL;
1719                                 mdd_data_get(env, mdd_obj, &data);
1720                                 /* get block quota for new owner */
1721                                 lquota_chkquota(mds_quota_interface_ref, obd,
1722                                                 exp, qnids, block_pending,
1723                                                 block_count, NULL,
1724                                                 LQUOTA_FLAGS_BLK, data, 1);
1725                         }
1726                 }
1727         }
1728 #endif
1729
1730         if (la_copy->la_valid & LA_FLAGS) {
1731                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1732                                                   handle, 1);
1733                 if (rc == 0)
1734                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1735         } else if (la_copy->la_valid) {            /* setattr */
1736                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1737                                                   handle, 1);
1738                 /* journal chown/chgrp in llog, just like unlink */
1739                 if (rc == 0 && lmm_size){
1740                         cookie_size = mdd_lov_cookiesize(env, mdd);
1741                         logcookies = mdd_max_cookie_get(env, mdd);
1742                         if (logcookies == NULL)
1743                                 GOTO(cleanup, rc = -ENOMEM);
1744
1745                         if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1746                                             logcookies, cookie_size) <= 0)
1747                                 logcookies = NULL;
1748                 }
1749         }
1750
1751         if (rc == 0 && ma->ma_valid & MA_LOV) {
1752                 cfs_umode_t mode;
1753
1754                 mode = mdd_object_type(mdd_obj);
1755                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1756                         rc = mdd_lsm_sanity_check(env, mdd_obj);
1757                         if (rc)
1758                                 GOTO(cleanup, rc);
1759
1760                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1761                                             ma->ma_lmm_size, handle, 1);
1762                 }
1763
1764         }
1765         if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1766                 cfs_umode_t mode;
1767
1768                 mode = mdd_object_type(mdd_obj);
1769                 if (S_ISREG(mode))
1770                         rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1771
1772         }
1773 cleanup:
1774         if (rc == 0)
1775                 rc = mdd_attr_set_changelog(env, obj, handle,
1776                                             ma->ma_attr.la_valid);
1777 stop:
1778         mdd_trans_stop(env, mdd, rc, handle);
1779         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1780                 /*set obd attr, if needed*/
1781                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1782                                            logcookies);
1783         }
1784 #ifdef HAVE_QUOTA_SUPPORT
1785         if (quota_opc) {
1786                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1787                                       inode_pending, 0);
1788                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1789                                       block_pending, 1);
1790                 /* Trigger dqrel/dqacq for original owner and new owner.
1791                  * If failed, the next call for lquota_chkquota will
1792                  * process it. */
1793                 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1794                               quota_opc);
1795         }
1796 #endif
1797         RETURN(rc);
1798 }
1799
1800 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1801                       const struct lu_buf *buf, const char *name, int fl,
1802                       struct thandle *handle)
1803 {
1804         int  rc;
1805         ENTRY;
1806
1807         mdd_write_lock(env, obj, MOR_TGT_CHILD);
1808         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1809         mdd_write_unlock(env, obj);
1810
1811         RETURN(rc);
1812 }
1813
1814 static int mdd_xattr_sanity_check(const struct lu_env *env,
1815                                   struct mdd_object *obj)
1816 {
1817         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1818         struct md_ucred *uc     = md_ucred(env);
1819         int rc;
1820         ENTRY;
1821
1822         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1823                 RETURN(-EPERM);
1824
1825         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1826         if (rc)
1827                 RETURN(rc);
1828
1829         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1830             !mdd_capable(uc, CFS_CAP_FOWNER))
1831                 RETURN(-EPERM);
1832
1833         RETURN(rc);
1834 }
1835
1836 static int mdd_declare_xattr_set(const struct lu_env *env,
1837                                  struct mdd_device *mdd,
1838                                  struct mdd_object *obj,
1839                                  const struct lu_buf *buf,
1840                                  const char *name,
1841                                  struct thandle *handle)
1842
1843 {
1844         int rc;
1845
1846         rc = mdo_declare_xattr_set(env, obj, buf, name, 0, handle);
1847         if (rc)
1848                 return rc;
1849
1850         /* Only record user xattr changes */
1851         if ((strncmp("user.", name, 5) == 0))
1852                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1853
1854         return rc;
1855 }
1856
1857 /**
1858  * The caller should guarantee to update the object ctime
1859  * after xattr_set if needed.
1860  */
1861 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1862                          const struct lu_buf *buf, const char *name,
1863                          int fl)
1864 {
1865         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1866         struct mdd_device *mdd = mdo2mdd(obj);
1867         struct thandle *handle;
1868         int  rc;
1869         ENTRY;
1870
1871         rc = mdd_xattr_sanity_check(env, mdd_obj);
1872         if (rc)
1873                 RETURN(rc);
1874
1875         handle = mdd_trans_create(env, mdd);
1876         if (IS_ERR(handle))
1877                 RETURN(PTR_ERR(handle));
1878
1879         rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, handle);
1880         if (rc)
1881                 GOTO(stop, rc);
1882
1883         rc = mdd_trans_start(env, mdd, handle);
1884         if (rc)
1885                 GOTO(stop, rc);
1886
1887         /* security-replated changes may require sync */
1888         if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1889                 handle->th_sync |= !!mdd->mdd_sync_permission;
1890
1891         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1892
1893         /* Only record system & user xattr changes */
1894         if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1895                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1896                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1897                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1898                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1899                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1900                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1901                                               handle);
1902
1903 stop:
1904         mdd_trans_stop(env, mdd, rc, handle);
1905
1906         RETURN(rc);
1907 }
1908
1909 static int mdd_declare_xattr_del(const struct lu_env *env,
1910                                  struct mdd_device *mdd,
1911                                  struct mdd_object *obj,
1912                                  const char *name,
1913                                  struct thandle *handle)
1914 {
1915         int rc;
1916
1917         rc = mdo_declare_xattr_del(env, obj, name, handle);
1918         if (rc)
1919                 return rc;
1920
1921         /* Only record user xattr changes */
1922         if ((strncmp("user.", name, 5) == 0))
1923                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1924
1925         return rc;
1926 }
1927
1928 /**
1929  * The caller should guarantee to update the object ctime
1930  * after xattr_set if needed.
1931  */
1932 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1933                   const char *name)
1934 {
1935         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1936         struct mdd_device *mdd = mdo2mdd(obj);
1937         struct thandle *handle;
1938         int  rc;
1939         ENTRY;
1940
1941         rc = mdd_xattr_sanity_check(env, mdd_obj);
1942         if (rc)
1943                 RETURN(rc);
1944
1945         handle = mdd_trans_create(env, mdd);
1946         if (IS_ERR(handle))
1947                 RETURN(PTR_ERR(handle));
1948
1949         rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
1950         if (rc)
1951                 GOTO(stop, rc);
1952
1953         rc = mdd_trans_start(env, mdd, handle);
1954         if (rc)
1955                 GOTO(stop, rc);
1956
1957         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1958         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1959                            mdd_object_capa(env, mdd_obj));
1960         mdd_write_unlock(env, mdd_obj);
1961
1962         /* Only record system & user xattr changes */
1963         if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1964                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1965                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1966                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1967                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1968                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1969                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1970                                               handle);
1971
1972 stop:
1973         mdd_trans_stop(env, mdd, rc, handle);
1974
1975         RETURN(rc);
1976 }
1977
1978 /* partial unlink */
1979 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1980                        struct md_attr *ma)
1981 {
1982         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1983         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1984         struct mdd_device *mdd = mdo2mdd(obj);
1985         struct thandle *handle;
1986 #ifdef HAVE_QUOTA_SUPPORT
1987         struct obd_device *obd = mdd->mdd_obd_dev;
1988         struct mds_obd *mds = &obd->u.mds;
1989         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1990         int quota_opc = 0;
1991 #endif
1992         int rc;
1993         ENTRY;
1994
1995         /* XXX: this code won't be used ever:
1996          * DNE uses slightly different approach */
1997         LBUG();
1998
1999         /*
2000          * Check -ENOENT early here because we need to get object type
2001          * to calculate credits before transaction start
2002          */
2003         if (mdd_object_exists(mdd_obj) == 0) {
2004                 CERROR("%s: object "DFID" not found: rc = -2\n",
2005                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2006                 RETURN(-ENOENT);
2007         }
2008
2009         LASSERT(mdd_object_exists(mdd_obj) > 0);
2010
2011         handle = mdd_trans_create(env, mdd);
2012         if (IS_ERR(handle))
2013                 RETURN(-ENOMEM);
2014
2015         rc = mdd_trans_start(env, mdd, handle);
2016
2017         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2018
2019         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
2020         if (rc)
2021                 GOTO(cleanup, rc);
2022
2023         mdo_ref_del(env, mdd_obj, handle);
2024
2025         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
2026                 /* unlink dot */
2027                 mdo_ref_del(env, mdd_obj, handle);
2028         }
2029
2030         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2031         la_copy->la_ctime = ma->ma_attr.la_ctime;
2032
2033         la_copy->la_valid = LA_CTIME;
2034         rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
2035         if (rc)
2036                 GOTO(cleanup, rc);
2037
2038         rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
2039 #ifdef HAVE_QUOTA_SUPPORT
2040         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
2041             ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
2042                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2043                 mdd_quota_wrapper(&ma->ma_attr, qids);
2044         }
2045 #endif
2046
2047
2048         EXIT;
2049 cleanup:
2050         mdd_write_unlock(env, mdd_obj);
2051         mdd_trans_stop(env, mdd, rc, handle);
2052 #ifdef HAVE_QUOTA_SUPPORT
2053         if (quota_opc)
2054                 /* Trigger dqrel on the owner of child. If failed,
2055                  * the next call for lquota_chkquota will process it */
2056                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2057                               quota_opc);
2058 #endif
2059         return rc;
2060 }
2061
2062 /* partial operation */
2063 static int mdd_oc_sanity_check(const struct lu_env *env,
2064                                struct mdd_object *obj,
2065                                struct md_attr *ma)
2066 {
2067         int rc;
2068         ENTRY;
2069
2070         switch (ma->ma_attr.la_mode & S_IFMT) {
2071         case S_IFREG:
2072         case S_IFDIR:
2073         case S_IFLNK:
2074         case S_IFCHR:
2075         case S_IFBLK:
2076         case S_IFIFO:
2077         case S_IFSOCK:
2078                 rc = 0;
2079                 break;
2080         default:
2081                 rc = -EINVAL;
2082                 break;
2083         }
2084         RETURN(rc);
2085 }
2086
2087 static int mdd_object_create(const struct lu_env *env,
2088                              struct md_object *obj,
2089                              const struct md_op_spec *spec,
2090                              struct md_attr *ma)
2091 {
2092
2093         struct mdd_device *mdd = mdo2mdd(obj);
2094         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2095         const struct lu_fid *pfid = spec->u.sp_pfid;
2096         struct thandle *handle;
2097 #ifdef HAVE_QUOTA_SUPPORT
2098         struct obd_device *obd = mdd->mdd_obd_dev;
2099         struct obd_export *exp = md_quota(env)->mq_exp;
2100         struct mds_obd *mds = &obd->u.mds;
2101         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2102         int quota_opc = 0, block_count = 0;
2103         int inode_pending[MAXQUOTAS] = { 0, 0 };
2104         int block_pending[MAXQUOTAS] = { 0, 0 };
2105 #endif
2106         int rc = 0;
2107         ENTRY;
2108
2109         /* XXX: this code won't be used ever:
2110          * DNE uses slightly different approach */
2111         LBUG();
2112
2113 #ifdef HAVE_QUOTA_SUPPORT
2114         if (mds->mds_quota) {
2115                 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
2116                 mdd_quota_wrapper(&ma->ma_attr, qids);
2117                 /* get file quota for child */
2118                 lquota_chkquota(mds_quota_interface_ref, obd, exp,
2119                                 qids, inode_pending, 1, NULL, 0,
2120                                 NULL, 0);
2121                 switch (ma->ma_attr.la_mode & S_IFMT) {
2122                 case S_IFLNK:
2123                 case S_IFDIR:
2124                         block_count = 2;
2125                         break;
2126                 case S_IFREG:
2127                         block_count = 1;
2128                         break;
2129                 }
2130                 /* get block quota for child */
2131                 if (block_count)
2132                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
2133                                         qids, block_pending, block_count,
2134                                         NULL, LQUOTA_FLAGS_BLK, NULL, 0);
2135         }
2136 #endif
2137
2138         handle = mdd_trans_create(env, mdd);
2139         if (IS_ERR(handle))
2140                 GOTO(out_pending, rc = PTR_ERR(handle));
2141
2142         rc = mdd_trans_start(env, mdd, handle);
2143
2144         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2145         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
2146         if (rc)
2147                 GOTO(unlock, rc);
2148
2149         rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
2150         if (rc)
2151                 GOTO(unlock, rc);
2152
2153         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
2154                 /* If creating the slave object, set slave EA here. */
2155                 int lmv_size = spec->u.sp_ea.eadatalen;
2156                 struct lmv_stripe_md *lmv;
2157
2158                 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
2159                 LASSERT(lmv != NULL && lmv_size > 0);
2160
2161                 rc = __mdd_xattr_set(env, mdd_obj,
2162                                      mdd_buf_get_const(env, lmv, lmv_size),
2163                                      XATTR_NAME_LMV, 0, handle);
2164                 if (rc)
2165                         GOTO(unlock, rc);
2166
2167                 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
2168                                            handle, 0);
2169         } else {
2170 #ifdef CONFIG_FS_POSIX_ACL
2171                 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
2172                         struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
2173
2174                         buf->lb_buf = (void *)spec->u.sp_ea.eadata;
2175                         buf->lb_len = spec->u.sp_ea.eadatalen;
2176                         if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
2177                                 rc = __mdd_acl_init(env, mdd_obj, buf,
2178                                                     &ma->ma_attr.la_mode,
2179                                                     handle);
2180                                 if (rc)
2181                                         GOTO(unlock, rc);
2182                                 else
2183                                         ma->ma_attr.la_valid |= LA_MODE;
2184                         }
2185
2186                         pfid = spec->u.sp_ea.fid;
2187                 }
2188 #endif
2189                 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
2190                                            spec);
2191         }
2192         EXIT;
2193 unlock:
2194         if (rc == 0)
2195                 rc = mdd_attr_get_internal(env, mdd_obj, ma);
2196         mdd_write_unlock(env, mdd_obj);
2197
2198         mdd_trans_stop(env, mdd, rc, handle);
2199 out_pending:
2200 #ifdef HAVE_QUOTA_SUPPORT
2201         if (quota_opc) {
2202                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2203                                       inode_pending, 0);
2204                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2205                                       block_pending, 1);
2206                 /* Trigger dqacq on the owner of child. If failed,
2207                  * the next call for lquota_chkquota will process it. */
2208                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2209                               quota_opc);
2210         }
2211 #endif
2212         return rc;
2213 }
2214
2215 /* partial link */
2216 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
2217                        const struct md_attr *ma)
2218 {
2219         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
2220         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2221         struct mdd_device *mdd = mdo2mdd(obj);
2222         struct thandle *handle;
2223         int rc;
2224         ENTRY;
2225
2226         /* XXX: this code won't be used ever:
2227          * DNE uses slightly different approach */
2228         LBUG();
2229
2230         handle = mdd_trans_create(env, mdd);
2231         if (IS_ERR(handle))
2232                 RETURN(-ENOMEM);
2233
2234         rc = mdd_trans_start(env, mdd, handle);
2235
2236         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2237         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
2238         if (rc == 0)
2239                 mdo_ref_add(env, mdd_obj, handle);
2240         mdd_write_unlock(env, mdd_obj);
2241         if (rc == 0) {
2242                 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2243                 la_copy->la_ctime = ma->ma_attr.la_ctime;
2244
2245                 la_copy->la_valid = LA_CTIME;
2246                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
2247                                                         handle, 0);
2248         }
2249         mdd_trans_stop(env, mdd, 0, handle);
2250
2251         RETURN(rc);
2252 }
2253
2254 /*
2255  * do NOT or the MAY_*'s, you'll get the weakest
2256  */
2257 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
2258 {
2259         int res = 0;
2260
2261         /* Sadly, NFSD reopens a file repeatedly during operation, so the
2262          * "acc_mode = 0" allowance for newly-created files isn't honoured.
2263          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
2264          * owner can write to a file even if it is marked readonly to hide
2265          * its brokenness. (bug 5781) */
2266         if (flags & MDS_OPEN_OWNEROVERRIDE) {
2267                 struct md_ucred *uc = md_ucred(env);
2268
2269                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
2270                     (la->la_uid == uc->mu_fsuid))
2271                         return 0;
2272         }
2273
2274         if (flags & FMODE_READ)
2275                 res |= MAY_READ;
2276         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2277                 res |= MAY_WRITE;
2278         if (flags & MDS_FMODE_EXEC)
2279                 res = MAY_EXEC;
2280         return res;
2281 }
2282
2283 static int mdd_open_sanity_check(const struct lu_env *env,
2284                                  struct mdd_object *obj, int flag)
2285 {
2286         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2287         int mode, rc;
2288         ENTRY;
2289
2290         /* EEXIST check */
2291         if (mdd_is_dead_obj(obj))
2292                 RETURN(-ENOENT);
2293
2294         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
2295         if (rc)
2296                RETURN(rc);
2297
2298         if (S_ISLNK(tmp_la->la_mode))
2299                 RETURN(-ELOOP);
2300
2301         mode = accmode(env, tmp_la, flag);
2302
2303         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2304                 RETURN(-EISDIR);
2305
2306         if (!(flag & MDS_OPEN_CREATED)) {
2307                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
2308                 if (rc)
2309                         RETURN(rc);
2310         }
2311
2312         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2313             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2314                 flag &= ~MDS_OPEN_TRUNC;
2315
2316         /* For writing append-only file must open it with append mode. */
2317         if (mdd_is_append(obj)) {
2318                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2319                         RETURN(-EPERM);
2320                 if (flag & MDS_OPEN_TRUNC)
2321                         RETURN(-EPERM);
2322         }
2323
2324 #if 0
2325         /*
2326          * Now, flag -- O_NOATIME does not be packed by client.
2327          */
2328         if (flag & O_NOATIME) {
2329                 struct md_ucred *uc = md_ucred(env);
2330
2331                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2332                     (uc->mu_valid == UCRED_NEW)) &&
2333                     (uc->mu_fsuid != tmp_la->la_uid) &&
2334                     !mdd_capable(uc, CFS_CAP_FOWNER))
2335                         RETURN(-EPERM);
2336         }
2337 #endif
2338
2339         RETURN(0);
2340 }
2341
2342 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2343                     int flags)
2344 {
2345         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2346         int rc = 0;
2347
2348         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2349
2350         rc = mdd_open_sanity_check(env, mdd_obj, flags);
2351         if (rc == 0)
2352                 mdd_obj->mod_count++;
2353
2354         mdd_write_unlock(env, mdd_obj);
2355         return rc;
2356 }
2357
2358 int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
2359                             struct md_attr *ma, struct thandle *handle)
2360 {
2361         int rc;
2362
2363         rc = mdd_declare_unlink_log(env, obj, ma, handle);
2364         if (rc)
2365                 return rc;
2366
2367         return mdo_declare_destroy(env, obj, handle);
2368 }
2369
2370 /* return md_attr back,
2371  * if it is last unlink then return lov ea + llog cookie*/
2372 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2373                     struct md_attr *ma, struct thandle *handle)
2374 {
2375         int rc = 0;
2376         ENTRY;
2377
2378         if (S_ISREG(mdd_object_type(obj))) {
2379                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2380                  * Caller must be ready for that. */
2381                 rc = __mdd_lmm_get(env, obj, ma);
2382                 if ((ma->ma_valid & MA_LOV))
2383                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2384                                             obj, ma);
2385         }
2386
2387         if (rc == 0)
2388                 rc = mdo_destroy(env, obj, handle);
2389
2390         RETURN(rc);
2391 }
2392
2393 static int mdd_declare_close(const struct lu_env *env,
2394                              struct mdd_object *obj,
2395                              struct md_attr *ma,
2396                              struct thandle *handle)
2397 {
2398         int rc;
2399
2400         rc = orph_declare_index_delete(env, obj, handle);
2401         if (rc)
2402                 return rc;
2403
2404         return mdd_declare_object_kill(env, obj, ma, handle);
2405 }
2406
2407 /*
2408  * No permission check is needed.
2409  */
2410 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2411                      struct md_attr *ma, int mode)
2412 {
2413         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2414         struct mdd_device *mdd = mdo2mdd(obj);
2415         struct thandle    *handle = NULL;
2416         int rc;
2417         int is_orphan = 0, reset = 1;
2418
2419 #ifdef HAVE_QUOTA_SUPPORT
2420         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2421         struct mds_obd *mds = &obd->u.mds;
2422         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2423         int quota_opc = 0;
2424 #endif
2425         ENTRY;
2426
2427         if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2428                 mdd_obj->mod_count--;
2429
2430                 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2431                         CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2432                                "list\n", PFID(mdd_object_fid(mdd_obj)));
2433                 RETURN(0);
2434         }
2435
2436         /* check without any lock */
2437         if (mdd_obj->mod_count == 1 &&
2438             (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2439  again:
2440                 handle = mdd_trans_create(env, mdo2mdd(obj));
2441                 if (IS_ERR(handle))
2442                         RETURN(PTR_ERR(handle));
2443
2444                 rc = mdd_declare_close(env, mdd_obj, ma, handle);
2445                 if (rc)
2446                         GOTO(stop, rc);
2447
2448                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
2449                 if (rc)
2450                         GOTO(stop, rc);
2451
2452                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2453                 if (rc)
2454                         GOTO(stop, rc);
2455         }
2456
2457         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2458         if (handle == NULL && mdd_obj->mod_count == 1 &&
2459             (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2460                 mdd_write_unlock(env, mdd_obj);
2461                 goto again;
2462         }
2463
2464         /* release open count */
2465         mdd_obj->mod_count --;
2466
2467         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2468                 /* remove link to object from orphan index */
2469                 LASSERT(handle != NULL);
2470                 rc = __mdd_orphan_del(env, mdd_obj, handle);
2471                 if (rc == 0) {
2472                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2473                                "list, OSS objects to be destroyed.\n",
2474                                PFID(mdd_object_fid(mdd_obj)));
2475                         is_orphan = 1;
2476                 } else {
2477                         CERROR("Object "DFID" can not be deleted from orphan "
2478                                 "list, maybe cause OST objects can not be "
2479                                 "destroyed (err: %d).\n",
2480                                 PFID(mdd_object_fid(mdd_obj)), rc);
2481                         /* If object was not deleted from orphan list, do not
2482                          * destroy OSS objects, which will be done when next
2483                          * recovery. */
2484                         GOTO(out, rc);
2485                 }
2486         }
2487
2488         rc = mdd_iattr_get(env, mdd_obj, ma);
2489         /* Object maybe not in orphan list originally, it is rare case for
2490          * mdd_finish_unlink() failure. */
2491         if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_orphan)) {
2492 #ifdef HAVE_QUOTA_SUPPORT
2493                 if (mds->mds_quota) {
2494                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2495                         mdd_quota_wrapper(&ma->ma_attr, qids);
2496                 }
2497 #endif
2498                 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2499                 if (ma->ma_valid & MA_FLAGS &&
2500                     ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2501                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2502                 } else {
2503                         if (handle == NULL) {
2504                                 handle = mdd_trans_create(env, mdo2mdd(obj));
2505                                 if (IS_ERR(handle))
2506                                         GOTO(out, rc = PTR_ERR(handle));
2507
2508                                 rc = mdd_declare_object_kill(env, mdd_obj, ma,
2509                                                              handle);
2510                                 if (rc)
2511                                         GOTO(out, rc);
2512
2513                                 rc = mdd_declare_changelog_store(env, mdd,
2514                                                                  NULL, handle);
2515                                 if (rc)
2516                                         GOTO(stop, rc);
2517
2518                                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2519                                 if (rc)
2520                                         GOTO(out, rc);
2521                         }
2522
2523                         rc = mdd_object_kill(env, mdd_obj, ma, handle);
2524                         if (rc == 0)
2525                                 reset = 0;
2526                 }
2527
2528                 if (rc != 0)
2529                         CERROR("Error when prepare to delete Object "DFID" , "
2530                                "which will cause OST objects can not be "
2531                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
2532         }
2533         EXIT;
2534
2535 out:
2536         if (reset)
2537                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2538
2539         mdd_write_unlock(env, mdd_obj);
2540
2541         if (rc == 0 &&
2542             (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
2543             !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
2544                 if (handle == NULL) {
2545                         handle = mdd_trans_create(env, mdo2mdd(obj));
2546                         if (IS_ERR(handle))
2547                                 GOTO(stop, rc = IS_ERR(handle));
2548
2549                         rc = mdd_declare_changelog_store(env, mdd, NULL,
2550                                                          handle);
2551                         if (rc)
2552                                 GOTO(stop, rc);
2553
2554                         rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2555                         if (rc)
2556                                 GOTO(stop, rc);
2557                 }
2558
2559                 mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
2560                                          mdd_obj, handle);
2561         }
2562
2563 stop:
2564         if (handle != NULL)
2565                 mdd_trans_stop(env, mdd, rc, handle);
2566 #ifdef HAVE_QUOTA_SUPPORT
2567         if (quota_opc)
2568                 /* Trigger dqrel on the owner of child. If failed,
2569                  * the next call for lquota_chkquota will process it */
2570                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2571                               quota_opc);
2572 #endif
2573         return rc;
2574 }
2575
2576 /*
2577  * Permission check is done when open,
2578  * no need check again.
2579  */
2580 static int mdd_readpage_sanity_check(const struct lu_env *env,
2581                                      struct mdd_object *obj)
2582 {
2583         struct dt_object *next = mdd_object_child(obj);
2584         int rc;
2585         ENTRY;
2586
2587         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2588                 rc = 0;
2589         else
2590                 rc = -ENOTDIR;
2591
2592         RETURN(rc);
2593 }
2594
2595 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2596                               struct lu_dirpage *dp, int nob,
2597                               const struct dt_it_ops *iops, struct dt_it *it,
2598                               __u32 attr)
2599 {
2600         void                   *area = dp;
2601         int                     result;
2602         __u64                   hash = 0;
2603         struct lu_dirent       *ent;
2604         struct lu_dirent       *last = NULL;
2605         int                     first = 1;
2606
2607         memset(area, 0, sizeof (*dp));
2608         area += sizeof (*dp);
2609         nob  -= sizeof (*dp);
2610
2611         ent  = area;
2612         do {
2613                 int    len;
2614                 int    recsize;
2615
2616                 len = iops->key_size(env, it);
2617
2618                 /* IAM iterator can return record with zero len. */
2619                 if (len == 0)
2620                         goto next;
2621
2622                 hash = iops->store(env, it);
2623                 if (unlikely(first)) {
2624                         first = 0;
2625                         dp->ldp_hash_start = cpu_to_le64(hash);
2626                 }
2627
2628                 /* calculate max space required for lu_dirent */
2629                 recsize = lu_dirent_calc_size(len, attr);
2630
2631                 if (nob >= recsize) {
2632                         result = iops->rec(env, it, (struct dt_rec *)ent, attr);
2633                         if (result == -ESTALE)
2634                                 goto next;
2635                         if (result != 0)
2636                                 goto out;
2637
2638                         /* osd might not able to pack all attributes,
2639                          * so recheck rec length */
2640                         recsize = le16_to_cpu(ent->lde_reclen);
2641                 } else {
2642                         result = (last != NULL) ? 0 :-EINVAL;
2643                         goto out;
2644                 }
2645                 last = ent;
2646                 ent = (void *)ent + recsize;
2647                 nob -= recsize;
2648
2649 next:
2650                 result = iops->next(env, it);
2651                 if (result == -ESTALE)
2652                         goto next;
2653         } while (result == 0);
2654
2655 out:
2656         dp->ldp_hash_end = cpu_to_le64(hash);
2657         if (last != NULL) {
2658                 if (last->lde_hash == dp->ldp_hash_end)
2659                         dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2660                 last->lde_reclen = 0; /* end mark */
2661         }
2662         return result;
2663 }
2664
2665 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2666                           const struct lu_rdpg *rdpg)
2667 {
2668         struct dt_it      *it;
2669         struct dt_object  *next = mdd_object_child(obj);
2670         const struct dt_it_ops  *iops;
2671         struct page       *pg;
2672         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2673         int i;
2674         int nlupgs = 0;
2675         int rc;
2676         int nob;
2677
2678         LASSERT(rdpg->rp_pages != NULL);
2679         LASSERT(next->do_index_ops != NULL);
2680
2681         if (rdpg->rp_count <= 0)
2682                 return -EFAULT;
2683
2684         /*
2685          * iterate through directory and fill pages from @rdpg
2686          */
2687         iops = &next->do_index_ops->dio_it;
2688         it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
2689         if (IS_ERR(it))
2690                 return PTR_ERR(it);
2691
2692         rc = iops->load(env, it, rdpg->rp_hash);
2693
2694         if (rc == 0) {
2695                 /*
2696                  * Iterator didn't find record with exactly the key requested.
2697                  *
2698                  * It is currently either
2699                  *
2700                  *     - positioned above record with key less than
2701                  *     requested---skip it.
2702                  *
2703                  *     - or not positioned at all (is in IAM_IT_SKEWED
2704                  *     state)---position it on the next item.
2705                  */
2706                 rc = iops->next(env, it);
2707         } else if (rc > 0)
2708                 rc = 0;
2709
2710         /*
2711          * At this point and across for-loop:
2712          *
2713          *  rc == 0 -> ok, proceed.
2714          *  rc >  0 -> end of directory.
2715          *  rc <  0 -> error.
2716          */
2717         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2718              i++, nob -= CFS_PAGE_SIZE) {
2719                 struct lu_dirpage *dp;
2720
2721                 LASSERT(i < rdpg->rp_npages);
2722                 pg = rdpg->rp_pages[i];
2723                 dp = cfs_kmap(pg);
2724 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2725 repeat:
2726 #endif
2727                 rc = mdd_dir_page_build(env, mdd, dp,
2728                                         min_t(int, nob, LU_PAGE_SIZE),
2729                                         iops, it, rdpg->rp_attrs);
2730                 if (rc > 0) {
2731                         /*
2732                          * end of directory.
2733                          */
2734                         dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2735                         nlupgs++;
2736                 } else if (rc < 0) {
2737                         CWARN("build page failed: %d!\n", rc);
2738                 } else {
2739                         nlupgs++;
2740 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2741                         dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2742                         if ((unsigned long)dp & ~CFS_PAGE_MASK)
2743                                 goto repeat;
2744 #endif
2745                 }
2746                 cfs_kunmap(pg);
2747         }
2748         if (rc >= 0) {
2749                 struct lu_dirpage *dp;
2750
2751                 dp = cfs_kmap(rdpg->rp_pages[0]);
2752                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2753                 if (nlupgs == 0) {
2754                         /*
2755                          * No pages were processed, mark this for first page
2756                          * and send back.
2757                          */
2758                         dp->ldp_flags  = cpu_to_le32(LDF_EMPTY);
2759                         nlupgs = 1;
2760                 }
2761                 cfs_kunmap(rdpg->rp_pages[0]);
2762
2763                 rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
2764         }
2765         iops->put(env, it);
2766         iops->fini(env, it);
2767
2768         return rc;
2769 }
2770
2771 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2772                  const struct lu_rdpg *rdpg)
2773 {
2774         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2775         int rc;
2776         ENTRY;
2777
2778         if (mdd_object_exists(mdd_obj) == 0) {
2779                 CERROR("%s: object "DFID" not found: rc = -2\n",
2780                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2781                 return -ENOENT;
2782         }
2783
2784         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2785         rc = mdd_readpage_sanity_check(env, mdd_obj);
2786         if (rc)
2787                 GOTO(out_unlock, rc);
2788
2789         if (mdd_is_dead_obj(mdd_obj)) {
2790                 struct page *pg;
2791                 struct lu_dirpage *dp;
2792
2793                 /*
2794                  * According to POSIX, please do not return any entry to client:
2795                  * even dot and dotdot should not be returned.
2796                  */
2797                 CWARN("readdir from dead object: "DFID"\n",
2798                         PFID(mdd_object_fid(mdd_obj)));
2799
2800                 if (rdpg->rp_count <= 0)
2801                         GOTO(out_unlock, rc = -EFAULT);
2802                 LASSERT(rdpg->rp_pages != NULL);
2803
2804                 pg = rdpg->rp_pages[0];
2805                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2806                 memset(dp, 0 , sizeof(struct lu_dirpage));
2807                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2808                 dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
2809                 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2810                 cfs_kunmap(pg);
2811                 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2812         }
2813
2814         rc = __mdd_readpage(env, mdd_obj, rdpg);
2815
2816         EXIT;
2817 out_unlock:
2818         mdd_read_unlock(env, mdd_obj);
2819         return rc;
2820 }
2821
2822 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2823 {
2824         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2825
2826         if (mdd_object_exists(mdd_obj) == 0) {
2827                 CERROR("%s: object "DFID" not found: rc = -2\n",
2828                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2829                 return -ENOENT;
2830         }
2831         return dt_object_sync(env, mdd_object_child(mdd_obj));
2832 }
2833
2834 const struct md_object_operations mdd_obj_ops = {
2835         .moo_permission    = mdd_permission,
2836         .moo_attr_get      = mdd_attr_get,
2837         .moo_attr_set      = mdd_attr_set,
2838         .moo_xattr_get     = mdd_xattr_get,
2839         .moo_xattr_set     = mdd_xattr_set,
2840         .moo_xattr_list    = mdd_xattr_list,
2841         .moo_xattr_del     = mdd_xattr_del,
2842         .moo_object_create = mdd_object_create,
2843         .moo_ref_add       = mdd_ref_add,
2844         .moo_ref_del       = mdd_ref_del,
2845         .moo_open          = mdd_open,
2846         .moo_close         = mdd_close,
2847         .moo_readpage      = mdd_readpage,
2848         .moo_readlink      = mdd_readlink,
2849         .moo_changelog     = mdd_changelog,
2850         .moo_capa_get      = mdd_capa_get,
2851         .moo_object_sync   = mdd_object_sync,
2852         .moo_path          = mdd_path,
2853         .moo_file_lock     = mdd_file_lock,
2854         .moo_file_unlock   = mdd_file_unlock,
2855 };