Whamcloud - gitweb
LU-909 osd: changes to osd api
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * Copyright (c) 2011 Whamcloud, Inc.
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  *
39  * lustre/mdd/mdd_object.c
40  *
41  * Lustre Metadata Server (mdd) routines
42  *
43  * Author: Wang Di <wangdi@clusterfs.com>
44  */
45
46 #ifndef EXPORT_SYMTAB
47 # define EXPORT_SYMTAB
48 #endif
49 #define DEBUG_SUBSYSTEM S_MDS
50
51 #include <linux/module.h>
52 #include <obd.h>
53 #include <obd_class.h>
54 #include <obd_support.h>
55 #include <lprocfs_status.h>
56 /* fid_be_cpu(), fid_cpu_to_be(). */
57 #include <lustre_fid.h>
58 #include <obd_lov.h>
59
60 #include <lustre_param.h>
61 #include <lustre_mds.h>
62 #include <lustre/lustre_idl.h>
63
64 #include "mdd_internal.h"
65
66 static const struct lu_object_operations mdd_lu_obj_ops;
67
68 static int mdd_xattr_get(const struct lu_env *env,
69                          struct md_object *obj, struct lu_buf *buf,
70                          const char *name);
71
72 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
73                  void **data)
74 {
75         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
76                  PFID(mdd_object_fid(obj)));
77         mdo_data_get(env, obj, data);
78         return 0;
79 }
80
81 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
82                struct lu_attr *la, struct lustre_capa *capa)
83 {
84         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
85                  PFID(mdd_object_fid(obj)));
86         return mdo_attr_get(env, obj, la, capa);
87 }
88
89 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
90 {
91         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
92
93         if (flags & LUSTRE_APPEND_FL)
94                 obj->mod_flags |= APPEND_OBJ;
95
96         if (flags & LUSTRE_IMMUTABLE_FL)
97                 obj->mod_flags |= IMMUTE_OBJ;
98 }
99
100 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
101 {
102         struct mdd_thread_info *info;
103
104         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
105         LASSERT(info != NULL);
106         return info;
107 }
108
109 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
110 {
111         struct lu_buf *buf;
112
113         buf = &mdd_env_info(env)->mti_buf;
114         buf->lb_buf = area;
115         buf->lb_len = len;
116         return buf;
117 }
118
119 void mdd_buf_put(struct lu_buf *buf)
120 {
121         if (buf == NULL || buf->lb_buf == NULL)
122                 return;
123         OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
124         buf->lb_buf = NULL;
125         buf->lb_len = 0;
126 }
127
128 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
129                                        const void *area, ssize_t len)
130 {
131         struct lu_buf *buf;
132
133         buf = &mdd_env_info(env)->mti_buf;
134         buf->lb_buf = (void *)area;
135         buf->lb_len = len;
136         return buf;
137 }
138
139 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
140 {
141         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
142
143         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
144                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
145                 buf->lb_buf = NULL;
146         }
147         if (buf->lb_buf == NULL) {
148                 buf->lb_len = len;
149                 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
150                 if (buf->lb_buf == NULL)
151                         buf->lb_len = 0;
152         }
153         return buf;
154 }
155
156 /** Increase the size of the \a mti_big_buf.
157  * preserves old data in buffer
158  * old buffer remains unchanged on error
159  * \retval 0 or -ENOMEM
160  */
161 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
162 {
163         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
164         struct lu_buf buf;
165
166         LASSERT(len >= oldbuf->lb_len);
167         OBD_ALLOC_LARGE(buf.lb_buf, len);
168
169         if (buf.lb_buf == NULL)
170                 return -ENOMEM;
171
172         buf.lb_len = len;
173         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
174
175         OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
176
177         memcpy(oldbuf, &buf, sizeof(buf));
178
179         return 0;
180 }
181
182 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
183                                        struct mdd_device *mdd)
184 {
185         struct mdd_thread_info *mti = mdd_env_info(env);
186         int                     max_cookie_size;
187
188         max_cookie_size = mdd_lov_cookiesize(env, mdd);
189         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
190                 if (mti->mti_max_cookie)
191                         OBD_FREE_LARGE(mti->mti_max_cookie,
192                                        mti->mti_max_cookie_size);
193                 mti->mti_max_cookie = NULL;
194                 mti->mti_max_cookie_size = 0;
195         }
196         if (unlikely(mti->mti_max_cookie == NULL)) {
197                 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
198                 if (likely(mti->mti_max_cookie != NULL))
199                         mti->mti_max_cookie_size = max_cookie_size;
200         }
201         if (likely(mti->mti_max_cookie != NULL))
202                 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
203         return mti->mti_max_cookie;
204 }
205
206 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
207                                    struct mdd_device *mdd)
208 {
209         struct mdd_thread_info *mti = mdd_env_info(env);
210         int                     max_lmm_size;
211
212         max_lmm_size = mdd_lov_mdsize(env, mdd);
213         if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
214                 if (mti->mti_max_lmm)
215                         OBD_FREE_LARGE(mti->mti_max_lmm, mti->mti_max_lmm_size);
216                 mti->mti_max_lmm = NULL;
217                 mti->mti_max_lmm_size = 0;
218         }
219         if (unlikely(mti->mti_max_lmm == NULL)) {
220                 OBD_ALLOC_LARGE(mti->mti_max_lmm, max_lmm_size);
221                 if (likely(mti->mti_max_lmm != NULL))
222                         mti->mti_max_lmm_size = max_lmm_size;
223         }
224         return mti->mti_max_lmm;
225 }
226
227 struct lu_object *mdd_object_alloc(const struct lu_env *env,
228                                    const struct lu_object_header *hdr,
229                                    struct lu_device *d)
230 {
231         struct mdd_object *mdd_obj;
232
233         OBD_ALLOC_PTR(mdd_obj);
234         if (mdd_obj != NULL) {
235                 struct lu_object *o;
236
237                 o = mdd2lu_obj(mdd_obj);
238                 lu_object_init(o, NULL, d);
239                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
240                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
241                 mdd_obj->mod_count = 0;
242                 o->lo_ops = &mdd_lu_obj_ops;
243                 return o;
244         } else {
245                 return NULL;
246         }
247 }
248
249 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
250                            const struct lu_object_conf *unused)
251 {
252         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
253         struct mdd_object *mdd_obj = lu2mdd_obj(o);
254         struct lu_object  *below;
255         struct lu_device  *under;
256         ENTRY;
257
258         mdd_obj->mod_cltime = 0;
259         under = &d->mdd_child->dd_lu_dev;
260         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
261         mdd_pdlock_init(mdd_obj);
262         if (below == NULL)
263                 RETURN(-ENOMEM);
264
265         lu_object_add(o, below);
266
267         RETURN(0);
268 }
269
270 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
271 {
272         if (lu_object_exists(o))
273                 return mdd_get_flags(env, lu2mdd_obj(o));
274         else
275                 return 0;
276 }
277
278 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
279 {
280         struct mdd_object *mdd = lu2mdd_obj(o);
281
282         lu_object_fini(o);
283         OBD_FREE_PTR(mdd);
284 }
285
286 static int mdd_object_print(const struct lu_env *env, void *cookie,
287                             lu_printer_t p, const struct lu_object *o)
288 {
289         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
290         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
291                     "valid=%x, cltime="LPU64", flags=%lx)",
292                     mdd, mdd->mod_count, mdd->mod_valid,
293                     mdd->mod_cltime, mdd->mod_flags);
294 }
295
296 static const struct lu_object_operations mdd_lu_obj_ops = {
297         .loo_object_init    = mdd_object_init,
298         .loo_object_start   = mdd_object_start,
299         .loo_object_free    = mdd_object_free,
300         .loo_object_print   = mdd_object_print,
301 };
302
303 struct mdd_object *mdd_object_find(const struct lu_env *env,
304                                    struct mdd_device *d,
305                                    const struct lu_fid *f)
306 {
307         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
308 }
309
310 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
311                         const char *path, struct lu_fid *fid)
312 {
313         struct lu_buf *buf;
314         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
315         struct mdd_object *obj;
316         struct lu_name *lname = &mdd_env_info(env)->mti_name;
317         char *name;
318         int rc = 0;
319         ENTRY;
320
321         /* temp buffer for path element */
322         buf = mdd_buf_alloc(env, PATH_MAX);
323         if (buf->lb_buf == NULL)
324                 RETURN(-ENOMEM);
325
326         lname->ln_name = name = buf->lb_buf;
327         lname->ln_namelen = 0;
328         *f = mdd->mdd_root_fid;
329
330         while(1) {
331                 while (*path == '/')
332                         path++;
333                 if (*path == '\0')
334                         break;
335                 while (*path != '/' && *path != '\0') {
336                         *name = *path;
337                         path++;
338                         name++;
339                         lname->ln_namelen++;
340                 }
341
342                 *name = '\0';
343                 /* find obj corresponding to fid */
344                 obj = mdd_object_find(env, mdd, f);
345                 if (obj == NULL)
346                         GOTO(out, rc = -EREMOTE);
347                 if (IS_ERR(obj))
348                         GOTO(out, rc = PTR_ERR(obj));
349                 /* get child fid from parent and name */
350                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
351                 mdd_object_put(env, obj);
352                 if (rc)
353                         break;
354
355                 name = buf->lb_buf;
356                 lname->ln_namelen = 0;
357         }
358
359         if (!rc)
360                 *fid = *f;
361 out:
362         RETURN(rc);
363 }
364
365 /** The maximum depth that fid2path() will search.
366  * This is limited only because we want to store the fids for
367  * historical path lookup purposes.
368  */
369 #define MAX_PATH_DEPTH 100
370
371 /** mdd_path() lookup structure. */
372 struct path_lookup_info {
373         __u64                pli_recno;        /**< history point */
374         __u64                pli_currec;       /**< current record */
375         struct lu_fid        pli_fid;
376         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
377         struct mdd_object   *pli_mdd_obj;
378         char                *pli_path;         /**< full path */
379         int                  pli_pathlen;
380         int                  pli_linkno;       /**< which hardlink to follow */
381         int                  pli_fidcount;     /**< number of \a pli_fids */
382 };
383
384 static int mdd_path_current(const struct lu_env *env,
385                             struct path_lookup_info *pli)
386 {
387         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
388         struct mdd_object *mdd_obj;
389         struct lu_buf     *buf = NULL;
390         struct link_ea_header *leh;
391         struct link_ea_entry  *lee;
392         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
393         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
394         char *ptr;
395         int reclen;
396         int rc;
397         ENTRY;
398
399         ptr = pli->pli_path + pli->pli_pathlen - 1;
400         *ptr = 0;
401         --ptr;
402         pli->pli_fidcount = 0;
403         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
404
405         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
406                 mdd_obj = mdd_object_find(env, mdd,
407                                           &pli->pli_fids[pli->pli_fidcount]);
408                 if (mdd_obj == NULL)
409                         GOTO(out, rc = -EREMOTE);
410                 if (IS_ERR(mdd_obj))
411                         GOTO(out, rc = PTR_ERR(mdd_obj));
412                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
413                 if (rc <= 0) {
414                         mdd_object_put(env, mdd_obj);
415                         if (rc == -1)
416                                 rc = -EREMOTE;
417                         else if (rc == 0)
418                                 /* Do I need to error out here? */
419                                 rc = -ENOENT;
420                         GOTO(out, rc);
421                 }
422
423                 /* Get parent fid and object name */
424                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
425                 buf = mdd_links_get(env, mdd_obj);
426                 mdd_read_unlock(env, mdd_obj);
427                 mdd_object_put(env, mdd_obj);
428                 if (IS_ERR(buf))
429                         GOTO(out, rc = PTR_ERR(buf));
430
431                 leh = buf->lb_buf;
432                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
433                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
434
435                 /* If set, use link #linkno for path lookup, otherwise use
436                    link #0.  Only do this for the final path element. */
437                 if ((pli->pli_fidcount == 0) &&
438                     (pli->pli_linkno < leh->leh_reccount)) {
439                         int count;
440                         for (count = 0; count < pli->pli_linkno; count++) {
441                                 lee = (struct link_ea_entry *)
442                                      ((char *)lee + reclen);
443                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
444                         }
445                         if (pli->pli_linkno < leh->leh_reccount - 1)
446                                 /* indicate to user there are more links */
447                                 pli->pli_linkno++;
448                 }
449
450                 /* Pack the name in the end of the buffer */
451                 ptr -= tmpname->ln_namelen;
452                 if (ptr - 1 <= pli->pli_path)
453                         GOTO(out, rc = -EOVERFLOW);
454                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
455                 *(--ptr) = '/';
456
457                 /* Store the parent fid for historic lookup */
458                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
459                         GOTO(out, rc = -EOVERFLOW);
460                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
461         }
462
463         /* Verify that our path hasn't changed since we started the lookup.
464            Record the current index, and verify the path resolves to the
465            same fid. If it does, then the path is correct as of this index. */
466         cfs_spin_lock(&mdd->mdd_cl.mc_lock);
467         pli->pli_currec = mdd->mdd_cl.mc_index;
468         cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
469         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
470         if (rc) {
471                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
472                 GOTO (out, rc = -EAGAIN);
473         }
474         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
475                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
476                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
477                        PFID(&pli->pli_fid));
478                 GOTO(out, rc = -EAGAIN);
479         }
480         ptr++; /* skip leading / */
481         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
482
483         EXIT;
484 out:
485         if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
486                 /* if we vmalloced a large buffer drop it */
487                 mdd_buf_put(buf);
488
489         return rc;
490 }
491
492 static int mdd_path_historic(const struct lu_env *env,
493                              struct path_lookup_info *pli)
494 {
495         return 0;
496 }
497
498 /* Returns the full path to this fid, as of changelog record recno. */
499 static int mdd_path(const struct lu_env *env, struct md_object *obj,
500                     char *path, int pathlen, __u64 *recno, int *linkno)
501 {
502         struct path_lookup_info *pli;
503         int tries = 3;
504         int rc = -EAGAIN;
505         ENTRY;
506
507         if (pathlen < 3)
508                 RETURN(-EOVERFLOW);
509
510         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
511                 path[0] = '\0';
512                 RETURN(0);
513         }
514
515         OBD_ALLOC_PTR(pli);
516         if (pli == NULL)
517                 RETURN(-ENOMEM);
518
519         pli->pli_mdd_obj = md2mdd_obj(obj);
520         pli->pli_recno = *recno;
521         pli->pli_path = path;
522         pli->pli_pathlen = pathlen;
523         pli->pli_linkno = *linkno;
524
525         /* Retry multiple times in case file is being moved */
526         while (tries-- && rc == -EAGAIN)
527                 rc = mdd_path_current(env, pli);
528
529         /* For historical path lookup, the current links may not have existed
530          * at "recno" time.  We must switch over to earlier links/parents
531          * by using the changelog records.  If the earlier parent doesn't
532          * exist, we must search back through the changelog to reconstruct
533          * its parents, then check if it exists, etc.
534          * We may ignore this problem for the initial implementation and
535          * state that an "original" hardlink must still exist for us to find
536          * historic path name. */
537         if (pli->pli_recno != -1) {
538                 rc = mdd_path_historic(env, pli);
539         } else {
540                 *recno = pli->pli_currec;
541                 /* Return next link index to caller */
542                 *linkno = pli->pli_linkno;
543         }
544
545         OBD_FREE_PTR(pli);
546
547         RETURN (rc);
548 }
549
550 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
551 {
552         struct lu_attr *la = &mdd_env_info(env)->mti_la;
553         int rc;
554
555         ENTRY;
556         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
557         if (rc == 0) {
558                 mdd_flags_xlate(obj, la->la_flags);
559                 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
560                         obj->mod_flags |= MNLINK_OBJ;
561         }
562         RETURN(rc);
563 }
564
565 /* get only inode attributes */
566 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
567                   struct md_attr *ma)
568 {
569         int rc = 0;
570         ENTRY;
571
572         if (ma->ma_valid & MA_INODE)
573                 RETURN(0);
574
575         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
576                           mdd_object_capa(env, mdd_obj));
577         if (rc == 0)
578                 ma->ma_valid |= MA_INODE;
579         RETURN(rc);
580 }
581
582 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
583 {
584         struct lov_desc *ldesc;
585         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
586         struct lov_user_md *lum = (struct lov_user_md*)lmm;
587         ENTRY;
588
589         if (!lum)
590                 RETURN(0);
591
592         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
593         LASSERT(ldesc != NULL);
594
595         lum->lmm_magic = LOV_MAGIC_V1;
596         lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
597         lum->lmm_pattern = ldesc->ld_pattern;
598         lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
599         lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
600         lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
601
602         RETURN(sizeof(*lum));
603 }
604
605 static int is_rootdir(struct mdd_object *mdd_obj)
606 {
607         const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
608         const struct lu_fid *fid = mdo2fid(mdd_obj);
609
610         return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
611 }
612
613 /* get lov EA only */
614 static int __mdd_lmm_get(const struct lu_env *env,
615                          struct mdd_object *mdd_obj, struct md_attr *ma)
616 {
617         int rc;
618         ENTRY;
619
620         if (ma->ma_valid & MA_LOV)
621                 RETURN(0);
622
623         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
624                         XATTR_NAME_LOV);
625         if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
626                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
627         if (rc > 0) {
628                 ma->ma_lmm_size = rc;
629                 ma->ma_valid |= MA_LOV;
630                 rc = 0;
631         }
632         RETURN(rc);
633 }
634
635 /* get the first parent fid from link EA */
636 static int mdd_pfid_get(const struct lu_env *env,
637                         struct mdd_object *mdd_obj, struct md_attr *ma)
638 {
639         struct lu_buf *buf;
640         struct link_ea_header *leh;
641         struct link_ea_entry *lee;
642         struct lu_fid *pfid = &ma->ma_pfid;
643         ENTRY;
644
645         if (ma->ma_valid & MA_PFID)
646                 RETURN(0);
647
648         buf = mdd_links_get(env, mdd_obj);
649         if (IS_ERR(buf))
650                 RETURN(PTR_ERR(buf));
651
652         leh = buf->lb_buf;
653         lee = (struct link_ea_entry *)(leh + 1);
654         memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
655         fid_be_to_cpu(pfid, pfid);
656         ma->ma_valid |= MA_PFID;
657         if (buf->lb_len > OBD_ALLOC_BIG)
658                 /* if we vmalloced a large buffer drop it */
659                 mdd_buf_put(buf);
660         RETURN(0);
661 }
662
663 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
664                        struct md_attr *ma)
665 {
666         int rc;
667         ENTRY;
668
669         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
670         rc = __mdd_lmm_get(env, mdd_obj, ma);
671         mdd_read_unlock(env, mdd_obj);
672         RETURN(rc);
673 }
674
675 /* get lmv EA only*/
676 static int __mdd_lmv_get(const struct lu_env *env,
677                          struct mdd_object *mdd_obj, struct md_attr *ma)
678 {
679         int rc;
680         ENTRY;
681
682         if (ma->ma_valid & MA_LMV)
683                 RETURN(0);
684
685         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
686                         XATTR_NAME_LMV);
687         if (rc > 0) {
688                 ma->ma_valid |= MA_LMV;
689                 rc = 0;
690         }
691         RETURN(rc);
692 }
693
694 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
695                          struct md_attr *ma)
696 {
697         struct mdd_thread_info *info = mdd_env_info(env);
698         struct lustre_mdt_attrs *lma =
699                                  (struct lustre_mdt_attrs *)info->mti_xattr_buf;
700         int lma_size;
701         int rc;
702         ENTRY;
703
704         /* If all needed data are already valid, nothing to do */
705         if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
706             (ma->ma_need & (MA_HSM | MA_SOM)))
707                 RETURN(0);
708
709         /* Read LMA from disk EA */
710         lma_size = sizeof(info->mti_xattr_buf);
711         rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
712         if (rc <= 0)
713                 RETURN(rc);
714
715         /* Useless to check LMA incompatibility because this is already done in
716          * osd_ea_fid_get(), and this will fail long before this code is
717          * called.
718          * So, if we are here, LMA is compatible.
719          */
720
721         lustre_lma_swab(lma);
722
723         /* Swab and copy LMA */
724         if (ma->ma_need & MA_HSM) {
725                 if (lma->lma_compat & LMAC_HSM)
726                         ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
727                 else
728                         ma->ma_hsm.mh_flags = 0;
729                 ma->ma_valid |= MA_HSM;
730         }
731
732         /* Copy SOM */
733         if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
734                 LASSERT(ma->ma_som != NULL);
735                 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
736                 ma->ma_som->msd_size    = lma->lma_som_size;
737                 ma->ma_som->msd_blocks  = lma->lma_som_blocks;
738                 ma->ma_som->msd_mountid = lma->lma_som_mountid;
739                 ma->ma_valid |= MA_SOM;
740         }
741
742         RETURN(0);
743 }
744
745 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
746                                  struct md_attr *ma)
747 {
748         int rc = 0;
749         ENTRY;
750
751         if (ma->ma_need & MA_INODE)
752                 rc = mdd_iattr_get(env, mdd_obj, ma);
753
754         if (rc == 0 && ma->ma_need & MA_LOV) {
755                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
756                     S_ISDIR(mdd_object_type(mdd_obj)))
757                         rc = __mdd_lmm_get(env, mdd_obj, ma);
758         }
759         if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
760                 if (S_ISREG(mdd_object_type(mdd_obj)))
761                         rc = mdd_pfid_get(env, mdd_obj, ma);
762         }
763         if (rc == 0 && ma->ma_need & MA_LMV) {
764                 if (S_ISDIR(mdd_object_type(mdd_obj)))
765                         rc = __mdd_lmv_get(env, mdd_obj, ma);
766         }
767         if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
768                 if (S_ISREG(mdd_object_type(mdd_obj)))
769                         rc = __mdd_lma_get(env, mdd_obj, ma);
770         }
771 #ifdef CONFIG_FS_POSIX_ACL
772         if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
773                 if (S_ISDIR(mdd_object_type(mdd_obj)))
774                         rc = mdd_def_acl_get(env, mdd_obj, ma);
775         }
776 #endif
777         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
778                rc, ma->ma_valid, ma->ma_lmm);
779         RETURN(rc);
780 }
781
782 int mdd_attr_get_internal_locked(const struct lu_env *env,
783                                  struct mdd_object *mdd_obj, struct md_attr *ma)
784 {
785         int rc;
786         int needlock = ma->ma_need &
787                        (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
788
789         if (needlock)
790                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
791         rc = mdd_attr_get_internal(env, mdd_obj, ma);
792         if (needlock)
793                 mdd_read_unlock(env, mdd_obj);
794         return rc;
795 }
796
797 /*
798  * No permission check is needed.
799  */
800 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
801                         struct md_attr *ma)
802 {
803         struct mdd_object *mdd_obj = md2mdd_obj(obj);
804         int                rc;
805
806         ENTRY;
807         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
808         RETURN(rc);
809 }
810
811 /*
812  * No permission check is needed.
813  */
814 static int mdd_xattr_get(const struct lu_env *env,
815                          struct md_object *obj, struct lu_buf *buf,
816                          const char *name)
817 {
818         struct mdd_object *mdd_obj = md2mdd_obj(obj);
819         int rc;
820
821         ENTRY;
822
823         LASSERT(mdd_object_exists(mdd_obj));
824
825         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
826         rc = mdo_xattr_get(env, mdd_obj, buf, name,
827                            mdd_object_capa(env, mdd_obj));
828         mdd_read_unlock(env, mdd_obj);
829
830         RETURN(rc);
831 }
832
833 /*
834  * Permission check is done when open,
835  * no need check again.
836  */
837 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
838                         struct lu_buf *buf)
839 {
840         struct mdd_object *mdd_obj = md2mdd_obj(obj);
841         struct dt_object  *next;
842         loff_t             pos = 0;
843         int                rc;
844         ENTRY;
845
846         LASSERT(mdd_object_exists(mdd_obj));
847
848         next = mdd_object_child(mdd_obj);
849         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
850         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
851                                          mdd_object_capa(env, mdd_obj));
852         mdd_read_unlock(env, mdd_obj);
853         RETURN(rc);
854 }
855
856 /*
857  * No permission check is needed.
858  */
859 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
860                           struct lu_buf *buf)
861 {
862         struct mdd_object *mdd_obj = md2mdd_obj(obj);
863         int rc;
864
865         ENTRY;
866
867         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
868         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
869         mdd_read_unlock(env, mdd_obj);
870
871         RETURN(rc);
872 }
873
874 int mdd_declare_object_create_internal(const struct lu_env *env,
875                                        struct mdd_object *p,
876                                        struct mdd_object *c,
877                                        struct md_attr *ma,
878                                        struct thandle *handle,
879                                        const struct md_op_spec *spec)
880 {
881         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
882         const struct dt_index_features *feat = spec->sp_feat;
883         int rc;
884         ENTRY;
885
886         if (feat != &dt_directory_features && feat != NULL)
887                 dof->dof_type = DFT_INDEX;
888         else
889                 dof->dof_type = dt_mode_to_dft(ma->ma_attr.la_mode);
890
891         dof->u.dof_idx.di_feat = feat;
892
893         rc = mdo_declare_create_obj(env, c, &ma->ma_attr, NULL, dof, handle);
894
895         RETURN(rc);
896 }
897
898 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
899                                struct mdd_object *c, struct md_attr *ma,
900                                struct thandle *handle,
901                                const struct md_op_spec *spec)
902 {
903         struct lu_attr *attr = &ma->ma_attr;
904         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
905         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
906         const struct dt_index_features *feat = spec->sp_feat;
907         int rc;
908         ENTRY;
909
910         if (!mdd_object_exists(c)) {
911                 struct dt_object *next = mdd_object_child(c);
912                 LASSERT(next);
913
914                 if (feat != &dt_directory_features && feat != NULL)
915                         dof->dof_type = DFT_INDEX;
916                 else
917                         dof->dof_type = dt_mode_to_dft(attr->la_mode);
918
919                 dof->u.dof_idx.di_feat = feat;
920
921                 /* @hint will be initialized by underlying device. */
922                 next->do_ops->do_ah_init(env, hint,
923                                          p ? mdd_object_child(p) : NULL,
924                                          attr->la_mode & S_IFMT);
925
926                 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
927                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
928         } else
929                 rc = -EEXIST;
930
931         RETURN(rc);
932 }
933
934 /**
935  * Make sure the ctime is increased only.
936  */
937 static inline int mdd_attr_check(const struct lu_env *env,
938                                  struct mdd_object *obj,
939                                  struct lu_attr *attr)
940 {
941         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
942         int rc;
943         ENTRY;
944
945         if (attr->la_valid & LA_CTIME) {
946                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
947                 if (rc)
948                         RETURN(rc);
949
950                 if (attr->la_ctime < tmp_la->la_ctime)
951                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
952                 else if (attr->la_valid == LA_CTIME &&
953                          attr->la_ctime == tmp_la->la_ctime)
954                         attr->la_valid &= ~LA_CTIME;
955         }
956         RETURN(0);
957 }
958
959 int mdd_attr_set_internal(const struct lu_env *env,
960                           struct mdd_object *obj,
961                           struct lu_attr *attr,
962                           struct thandle *handle,
963                           int needacl)
964 {
965         int rc;
966         ENTRY;
967
968         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
969 #ifdef CONFIG_FS_POSIX_ACL
970         if (!rc && (attr->la_valid & LA_MODE) && needacl)
971                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
972 #endif
973         RETURN(rc);
974 }
975
976 int mdd_attr_check_set_internal(const struct lu_env *env,
977                                 struct mdd_object *obj,
978                                 struct lu_attr *attr,
979                                 struct thandle *handle,
980                                 int needacl)
981 {
982         int rc;
983         ENTRY;
984
985         rc = mdd_attr_check(env, obj, attr);
986         if (rc)
987                 RETURN(rc);
988
989         if (attr->la_valid)
990                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
991         RETURN(rc);
992 }
993
994 static int mdd_attr_set_internal_locked(const struct lu_env *env,
995                                         struct mdd_object *obj,
996                                         struct lu_attr *attr,
997                                         struct thandle *handle,
998                                         int needacl)
999 {
1000         int rc;
1001         ENTRY;
1002
1003         needacl = needacl && (attr->la_valid & LA_MODE);
1004         if (needacl)
1005                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1006         rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1007         if (needacl)
1008                 mdd_write_unlock(env, obj);
1009         RETURN(rc);
1010 }
1011
1012 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
1013                                        struct mdd_object *obj,
1014                                        struct lu_attr *attr,
1015                                        struct thandle *handle,
1016                                        int needacl)
1017 {
1018         int rc;
1019         ENTRY;
1020
1021         needacl = needacl && (attr->la_valid & LA_MODE);
1022         if (needacl)
1023                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1024         rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
1025         if (needacl)
1026                 mdd_write_unlock(env, obj);
1027         RETURN(rc);
1028 }
1029
1030 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
1031                     const struct lu_buf *buf, const char *name,
1032                     int fl, struct thandle *handle)
1033 {
1034         struct lustre_capa *capa = mdd_object_capa(env, obj);
1035         int rc = -EINVAL;
1036         ENTRY;
1037
1038         if (buf->lb_buf && buf->lb_len > 0)
1039                 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1040         else if (buf->lb_buf == NULL && buf->lb_len == 0)
1041                 rc = mdo_xattr_del(env, obj, name, handle, capa);
1042
1043         RETURN(rc);
1044 }
1045
1046 /*
1047  * This gives the same functionality as the code between
1048  * sys_chmod and inode_setattr
1049  * chown_common and inode_setattr
1050  * utimes and inode_setattr
1051  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1052  */
1053 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1054                         struct lu_attr *la, const struct md_attr *ma)
1055 {
1056         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
1057         struct md_ucred  *uc;
1058         int               rc;
1059         ENTRY;
1060
1061         if (!la->la_valid)
1062                 RETURN(0);
1063
1064         /* Do not permit change file type */
1065         if (la->la_valid & LA_TYPE)
1066                 RETURN(-EPERM);
1067
1068         /* They should not be processed by setattr */
1069         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1070                 RETURN(-EPERM);
1071
1072         /* export destroy does not have ->le_ses, but we may want
1073          * to drop LUSTRE_SOM_FL. */
1074         if (!env->le_ses)
1075                 RETURN(0);
1076
1077         uc = md_ucred(env);
1078
1079         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1080         if (rc)
1081                 RETURN(rc);
1082
1083         if (la->la_valid == LA_CTIME) {
1084                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1085                         /* This is only for set ctime when rename's source is
1086                          * on remote MDS. */
1087                         rc = mdd_may_delete(env, NULL, obj,
1088                                             (struct md_attr *)ma, 1, 0);
1089                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1090                         la->la_valid &= ~LA_CTIME;
1091                 RETURN(rc);
1092         }
1093
1094         if (la->la_valid == LA_ATIME) {
1095                 /* This is atime only set for read atime update on close. */
1096                 if (la->la_atime >= tmp_la->la_atime &&
1097                     la->la_atime < (tmp_la->la_atime +
1098                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1099                         la->la_valid &= ~LA_ATIME;
1100                 RETURN(0);
1101         }
1102
1103         /* Check if flags change. */
1104         if (la->la_valid & LA_FLAGS) {
1105                 unsigned int oldflags = 0;
1106                 unsigned int newflags = la->la_flags &
1107                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1108
1109                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1110                     !mdd_capable(uc, CFS_CAP_FOWNER))
1111                         RETURN(-EPERM);
1112
1113                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1114                  * only be changed by the relevant capability. */
1115                 if (mdd_is_immutable(obj))
1116                         oldflags |= LUSTRE_IMMUTABLE_FL;
1117                 if (mdd_is_append(obj))
1118                         oldflags |= LUSTRE_APPEND_FL;
1119                 if ((oldflags ^ newflags) &&
1120                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1121                         RETURN(-EPERM);
1122
1123                 if (!S_ISDIR(tmp_la->la_mode))
1124                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1125         }
1126
1127         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1128             (la->la_valid & ~LA_FLAGS) &&
1129             !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1130                 RETURN(-EPERM);
1131
1132         /* Check for setting the obj time. */
1133         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1134             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1135                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1136                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
1137                         rc = mdd_permission_internal_locked(env, obj, tmp_la,
1138                                                             MAY_WRITE,
1139                                                             MOR_TGT_CHILD);
1140                         if (rc)
1141                                 RETURN(rc);
1142                 }
1143         }
1144
1145         if (la->la_valid & LA_KILL_SUID) {
1146                 la->la_valid &= ~LA_KILL_SUID;
1147                 if ((tmp_la->la_mode & S_ISUID) &&
1148                     !(la->la_valid & LA_MODE)) {
1149                         la->la_mode = tmp_la->la_mode;
1150                         la->la_valid |= LA_MODE;
1151                 }
1152                 la->la_mode &= ~S_ISUID;
1153         }
1154
1155         if (la->la_valid & LA_KILL_SGID) {
1156                 la->la_valid &= ~LA_KILL_SGID;
1157                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1158                                         (S_ISGID | S_IXGRP)) &&
1159                     !(la->la_valid & LA_MODE)) {
1160                         la->la_mode = tmp_la->la_mode;
1161                         la->la_valid |= LA_MODE;
1162                 }
1163                 la->la_mode &= ~S_ISGID;
1164         }
1165
1166         /* Make sure a caller can chmod. */
1167         if (la->la_valid & LA_MODE) {
1168                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1169                     (uc->mu_fsuid != tmp_la->la_uid) &&
1170                     !mdd_capable(uc, CFS_CAP_FOWNER))
1171                         RETURN(-EPERM);
1172
1173                 if (la->la_mode == (cfs_umode_t) -1)
1174                         la->la_mode = tmp_la->la_mode;
1175                 else
1176                         la->la_mode = (la->la_mode & S_IALLUGO) |
1177                                       (tmp_la->la_mode & ~S_IALLUGO);
1178
1179                 /* Also check the setgid bit! */
1180                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1181                                        la->la_gid : tmp_la->la_gid) &&
1182                     !mdd_capable(uc, CFS_CAP_FSETID))
1183                         la->la_mode &= ~S_ISGID;
1184         } else {
1185                la->la_mode = tmp_la->la_mode;
1186         }
1187
1188         /* Make sure a caller can chown. */
1189         if (la->la_valid & LA_UID) {
1190                 if (la->la_uid == (uid_t) -1)
1191                         la->la_uid = tmp_la->la_uid;
1192                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1193                     (la->la_uid != tmp_la->la_uid)) &&
1194                     !mdd_capable(uc, CFS_CAP_CHOWN))
1195                         RETURN(-EPERM);
1196
1197                 /* If the user or group of a non-directory has been
1198                  * changed by a non-root user, remove the setuid bit.
1199                  * 19981026 David C Niemi <niemi@tux.org>
1200                  *
1201                  * Changed this to apply to all users, including root,
1202                  * to avoid some races. This is the behavior we had in
1203                  * 2.0. The check for non-root was definitely wrong
1204                  * for 2.2 anyway, as it should have been using
1205                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
1206                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1207                     !S_ISDIR(tmp_la->la_mode)) {
1208                         la->la_mode &= ~S_ISUID;
1209                         la->la_valid |= LA_MODE;
1210                 }
1211         }
1212
1213         /* Make sure caller can chgrp. */
1214         if (la->la_valid & LA_GID) {
1215                 if (la->la_gid == (gid_t) -1)
1216                         la->la_gid = tmp_la->la_gid;
1217                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1218                     ((la->la_gid != tmp_la->la_gid) &&
1219                     !lustre_in_group_p(uc, la->la_gid))) &&
1220                     !mdd_capable(uc, CFS_CAP_CHOWN))
1221                         RETURN(-EPERM);
1222
1223                 /* Likewise, if the user or group of a non-directory
1224                  * has been changed by a non-root user, remove the
1225                  * setgid bit UNLESS there is no group execute bit
1226                  * (this would be a file marked for mandatory
1227                  * locking).  19981026 David C Niemi <niemi@tux.org>
1228                  *
1229                  * Removed the fsuid check (see the comment above) --
1230                  * 19990830 SD. */
1231                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1232                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1233                         la->la_mode &= ~S_ISGID;
1234                         la->la_valid |= LA_MODE;
1235                 }
1236         }
1237
1238         /* For both Size-on-MDS case and truncate case,
1239          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1240          * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1241          * For SOM case, it is true, the MAY_WRITE perm has been checked
1242          * when open, no need check again. For truncate case, it is false,
1243          * the MAY_WRITE perm should be checked here. */
1244         if (ma->ma_attr_flags & MDS_SOM) {
1245                 /* For the "Size-on-MDS" setattr update, merge coming
1246                  * attributes with the set in the inode. BUG 10641 */
1247                 if ((la->la_valid & LA_ATIME) &&
1248                     (la->la_atime <= tmp_la->la_atime))
1249                         la->la_valid &= ~LA_ATIME;
1250
1251                 /* OST attributes do not have a priority over MDS attributes,
1252                  * so drop times if ctime is equal. */
1253                 if ((la->la_valid & LA_CTIME) &&
1254                     (la->la_ctime <= tmp_la->la_ctime))
1255                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1256         } else {
1257                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1258                         if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1259                               (uc->mu_fsuid == tmp_la->la_uid)) &&
1260                             !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1261                                 rc = mdd_permission_internal_locked(env, obj,
1262                                                             tmp_la, MAY_WRITE,
1263                                                             MOR_TGT_CHILD);
1264                                 if (rc)
1265                                         RETURN(rc);
1266                         }
1267                 }
1268                 if (la->la_valid & LA_CTIME) {
1269                         /* The pure setattr, it has the priority over what is
1270                          * already set, do not drop it if ctime is equal. */
1271                         if (la->la_ctime < tmp_la->la_ctime)
1272                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1273                                                   LA_CTIME);
1274                 }
1275         }
1276
1277         RETURN(0);
1278 }
1279
1280 /** Store a data change changelog record
1281  * If this fails, we must fail the whole transaction; we don't
1282  * want the change to commit without the log entry.
1283  * \param mdd_obj - mdd_object of change
1284  * \param handle - transacion handle
1285  */
1286 static int mdd_changelog_data_store(const struct lu_env     *env,
1287                                     struct mdd_device       *mdd,
1288                                     enum changelog_rec_type type,
1289                                     int                     flags,
1290                                     struct mdd_object       *mdd_obj,
1291                                     struct thandle          *handle)
1292 {
1293         const struct lu_fid *tfid = mdo2fid(mdd_obj);
1294         struct llog_changelog_rec *rec;
1295         struct thandle *th = NULL;
1296         struct lu_buf *buf;
1297         int reclen;
1298         int rc;
1299
1300         /* Not recording */
1301         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1302                 RETURN(0);
1303         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1304                 RETURN(0);
1305
1306         LASSERT(mdd_obj != NULL);
1307         LASSERT(handle != NULL);
1308
1309         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1310             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1311                 /* Don't need multiple updates in this log */
1312                 /* Don't check under lock - no big deal if we get an extra
1313                    entry */
1314                 RETURN(0);
1315         }
1316
1317         reclen = llog_data_len(sizeof(*rec));
1318         buf = mdd_buf_alloc(env, reclen);
1319         if (buf->lb_buf == NULL)
1320                 RETURN(-ENOMEM);
1321         rec = (struct llog_changelog_rec *)buf->lb_buf;
1322
1323         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1324         rec->cr.cr_type = (__u32)type;
1325         rec->cr.cr_tfid = *tfid;
1326         rec->cr.cr_namelen = 0;
1327         mdd_obj->mod_cltime = cfs_time_current_64();
1328
1329         rc = mdd_changelog_llog_write(mdd, rec, handle ? : th);
1330
1331         if (th)
1332                 mdd_trans_stop(env, mdd, rc, th);
1333
1334         if (rc < 0) {
1335                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1336                        rc, type, PFID(tfid));
1337                 return -EFAULT;
1338         }
1339
1340         return 0;
1341 }
1342
1343 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1344                   int flags, struct md_object *obj)
1345 {
1346         struct thandle *handle;
1347         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1348         struct mdd_device *mdd = mdo2mdd(obj);
1349         int rc;
1350         ENTRY;
1351
1352         handle = mdd_trans_create(env, mdd);
1353         if (IS_ERR(handle))
1354                 return(PTR_ERR(handle));
1355
1356         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1357         if (rc)
1358                 GOTO(stop, rc);
1359
1360         rc = mdd_trans_start(env, mdd, handle);
1361         if (rc)
1362                 GOTO(stop, rc);
1363
1364         rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1365                                       handle);
1366
1367 stop:
1368         mdd_trans_stop(env, mdd, rc, handle);
1369
1370         RETURN(rc);
1371 }
1372
1373 /**
1374  * Should be called with write lock held.
1375  *
1376  * \see mdd_lma_set_locked().
1377  */
1378 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1379                        const struct md_attr *ma, struct thandle *handle)
1380 {
1381         struct mdd_thread_info *info = mdd_env_info(env);
1382         struct lu_buf *buf;
1383         struct lustre_mdt_attrs *lma =
1384                                 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1385         int lmasize = sizeof(struct lustre_mdt_attrs);
1386         int rc = 0;
1387
1388         ENTRY;
1389
1390         /* Either HSM or SOM part is not valid, we need to read it before */
1391         if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1392                 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1393                 if (rc <= 0)
1394                         RETURN(rc);
1395
1396                 lustre_lma_swab(lma);
1397         } else {
1398                 memset(lma, 0, lmasize);
1399         }
1400
1401         /* Copy HSM data */
1402         if (ma->ma_valid & MA_HSM) {
1403                 lma->lma_flags  |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1404                 lma->lma_compat |= LMAC_HSM;
1405         }
1406
1407         /* Copy SOM data */
1408         if (ma->ma_valid & MA_SOM) {
1409                 LASSERT(ma->ma_som != NULL);
1410                 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1411                         lma->lma_compat     &= ~LMAC_SOM;
1412                 } else {
1413                         lma->lma_compat     |= LMAC_SOM;
1414                         lma->lma_ioepoch     = ma->ma_som->msd_ioepoch;
1415                         lma->lma_som_size    = ma->ma_som->msd_size;
1416                         lma->lma_som_blocks  = ma->ma_som->msd_blocks;
1417                         lma->lma_som_mountid = ma->ma_som->msd_mountid;
1418                 }
1419         }
1420
1421         /* Copy FID */
1422         memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1423
1424         lustre_lma_swab(lma);
1425         buf = mdd_buf_get(env, lma, lmasize);
1426         rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1427
1428         RETURN(rc);
1429 }
1430
1431 /**
1432  * Save LMA extended attributes with data from \a ma.
1433  *
1434  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1435  * not, LMA EA will be first read from disk, modified and write back.
1436  *
1437  */
1438 static int mdd_lma_set_locked(const struct lu_env *env,
1439                               struct mdd_object *mdd_obj,
1440                               const struct md_attr *ma, struct thandle *handle)
1441 {
1442         int rc;
1443
1444         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1445         rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1446         mdd_write_unlock(env, mdd_obj);
1447         return rc;
1448 }
1449
1450 /* Precedence for choosing record type when multiple
1451  * attributes change: setattr > mtime > ctime > atime
1452  * (ctime changes when mtime does, plus chmod/chown.
1453  * atime and ctime are independent.) */
1454 static int mdd_attr_set_changelog(const struct lu_env *env,
1455                                   struct md_object *obj, struct thandle *handle,
1456                                   __u64 valid)
1457 {
1458         struct mdd_device *mdd = mdo2mdd(obj);
1459         int bits, type = 0;
1460
1461         bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1462         bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1463         bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1464         bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1465         bits = bits & mdd->mdd_cl.mc_mask;
1466         if (bits == 0)
1467                 return 0;
1468
1469         /* The record type is the lowest non-masked set bit */
1470         while (bits && ((bits & 1) == 0)) {
1471                 bits = bits >> 1;
1472                 type++;
1473         }
1474
1475         /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1476         return mdd_changelog_data_store(env, mdd, type, (int)valid,
1477                                         md2mdd_obj(obj), handle);
1478 }
1479
1480 static int mdd_declare_attr_set(const struct lu_env *env,
1481                                 struct mdd_device *mdd,
1482                                 struct mdd_object *obj,
1483                                 const struct md_attr *ma,
1484                                 struct lov_mds_md *lmm,
1485                                 struct thandle *handle)
1486 {
1487         struct lu_buf  *buf = &mdd_env_info(env)->mti_buf;
1488         int             rc, stripe, i;
1489
1490         rc = mdo_declare_attr_set(env, obj, &ma->ma_attr, handle);
1491         if (rc)
1492                 return rc;
1493
1494         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1495         if (rc)
1496                 return rc;
1497
1498         if (ma->ma_valid & MA_LOV) {
1499                 buf->lb_buf = NULL;
1500                 buf->lb_len = ma->ma_lmm_size;
1501                 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
1502                                            0, handle);
1503                 if (rc)
1504                         return rc;
1505         }
1506
1507         if (ma->ma_valid & (MA_HSM | MA_SOM)) {
1508                 buf->lb_buf = NULL;
1509                 buf->lb_len = sizeof(struct lustre_mdt_attrs);
1510                 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA,
1511                                            0, handle);
1512                 if (rc)
1513                         return rc;
1514         }
1515
1516         /* basically the log is the same as in unlink case */
1517         if (lmm) {
1518                 if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V1 &&
1519                                 le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V3) {
1520                         CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n",
1521                                mdd->mdd_obd_dev->obd_name,
1522                                le32_to_cpu(lmm->lmm_magic),
1523                                PFID(lu_object_fid(&obj->mod_obj.mo_lu)));
1524                         return -EINVAL;
1525                 }
1526
1527                 stripe = mdd2obd_dev(mdd)->u.mds.mds_lov_desc.ld_tgt_count;
1528                 if ((int)le32_to_cpu(lmm->lmm_stripe_count) >= 0)
1529                         stripe = le32_to_cpu(lmm->lmm_stripe_count);
1530
1531                 for (i = 0; i < stripe; i++) {
1532                         rc = mdd_declare_llog_record(env, mdd,
1533                                         sizeof(struct llog_unlink_rec),
1534                                         handle);
1535                         if (rc)
1536                                 return rc;
1537                 }
1538         }
1539
1540         return rc;
1541 }
1542
1543 /* set attr and LOV EA at once, return updated attr */
1544 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1545                         const struct md_attr *ma)
1546 {
1547         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1548         struct mdd_device *mdd = mdo2mdd(obj);
1549         struct thandle *handle;
1550         struct lov_mds_md *lmm = NULL;
1551         struct llog_cookie *logcookies = NULL;
1552         int  rc, lmm_size = 0, cookie_size = 0;
1553         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1554         struct obd_device *obd = mdd->mdd_obd_dev;
1555         struct mds_obd *mds = &obd->u.mds;
1556 #ifdef HAVE_QUOTA_SUPPORT
1557         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1558         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1559         int quota_opc = 0, block_count = 0;
1560         int inode_pending[MAXQUOTAS] = { 0, 0 };
1561         int block_pending[MAXQUOTAS] = { 0, 0 };
1562 #endif
1563         ENTRY;
1564
1565         *la_copy = ma->ma_attr;
1566         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1567         if (rc != 0)
1568                 RETURN(rc);
1569
1570         /* setattr on "close" only change atime, or do nothing */
1571         if (ma->ma_valid == MA_INODE &&
1572             ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1573                 RETURN(0);
1574
1575         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1576             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1577                 lmm_size = mdd_lov_mdsize(env, mdd);
1578                 lmm = mdd_max_lmm_get(env, mdd);
1579                 if (lmm == NULL)
1580                         RETURN(-ENOMEM);
1581
1582                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1583                                 XATTR_NAME_LOV);
1584
1585                 if (rc < 0)
1586                         RETURN(rc);
1587         }
1588
1589         handle = mdd_trans_create(env, mdd);
1590         if (IS_ERR(handle))
1591                 RETURN(PTR_ERR(handle));
1592
1593         rc = mdd_declare_attr_set(env, mdd, mdd_obj, ma,
1594                                   lmm_size > 0 ? lmm : NULL, handle);
1595         if (rc)
1596                 GOTO(stop, rc);
1597
1598         /* permission changes may require sync operation */
1599         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1600                 handle->th_sync = !!mdd->mdd_sync_permission;
1601
1602         rc = mdd_trans_start(env, mdd, handle);
1603         if (rc)
1604                 GOTO(stop, rc);
1605
1606         /* permission changes may require sync operation */
1607         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1608                 handle->th_sync |= mdd->mdd_sync_permission;
1609
1610         if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1611                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1612                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1613
1614 #ifdef HAVE_QUOTA_SUPPORT
1615         if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1616                 struct obd_export *exp = md_quota(env)->mq_exp;
1617                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1618
1619                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1620                 if (!rc) {
1621                         quota_opc = FSFILT_OP_SETATTR;
1622                         mdd_quota_wrapper(la_copy, qnids);
1623                         mdd_quota_wrapper(la_tmp, qoids);
1624                         /* get file quota for new owner */
1625                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1626                                         qnids, inode_pending, 1, NULL, 0,
1627                                         NULL, 0);
1628                         block_count = (la_tmp->la_blocks + 7) >> 3;
1629                         if (block_count) {
1630                                 void *data = NULL;
1631                                 mdd_data_get(env, mdd_obj, &data);
1632                                 /* get block quota for new owner */
1633                                 lquota_chkquota(mds_quota_interface_ref, obd,
1634                                                 exp, qnids, block_pending,
1635                                                 block_count, NULL,
1636                                                 LQUOTA_FLAGS_BLK, data, 1);
1637                         }
1638                 }
1639         }
1640 #endif
1641
1642         if (la_copy->la_valid & LA_FLAGS) {
1643                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1644                                                   handle, 1);
1645                 if (rc == 0)
1646                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1647         } else if (la_copy->la_valid) {            /* setattr */
1648                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1649                                                   handle, 1);
1650                 /* journal chown/chgrp in llog, just like unlink */
1651                 if (rc == 0 && lmm_size){
1652                         cookie_size = mdd_lov_cookiesize(env, mdd);
1653                         logcookies = mdd_max_cookie_get(env, mdd);
1654                         if (logcookies == NULL)
1655                                 GOTO(cleanup, rc = -ENOMEM);
1656
1657                         if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1658                                             logcookies, cookie_size) <= 0)
1659                                 logcookies = NULL;
1660                 }
1661         }
1662
1663         if (rc == 0 && ma->ma_valid & MA_LOV) {
1664                 cfs_umode_t mode;
1665
1666                 mode = mdd_object_type(mdd_obj);
1667                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1668                         rc = mdd_lsm_sanity_check(env, mdd_obj);
1669                         if (rc)
1670                                 GOTO(cleanup, rc);
1671
1672                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1673                                             ma->ma_lmm_size, handle, 1);
1674                 }
1675
1676         }
1677         if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1678                 cfs_umode_t mode;
1679
1680                 mode = mdd_object_type(mdd_obj);
1681                 if (S_ISREG(mode))
1682                         rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1683
1684         }
1685 cleanup:
1686         if (rc == 0)
1687                 rc = mdd_attr_set_changelog(env, obj, handle,
1688                                             ma->ma_attr.la_valid);
1689 stop:
1690         mdd_trans_stop(env, mdd, rc, handle);
1691         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1692                 /*set obd attr, if needed*/
1693                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1694                                            logcookies);
1695         }
1696 #ifdef HAVE_QUOTA_SUPPORT
1697         if (quota_opc) {
1698                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1699                                       inode_pending, 0);
1700                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1701                                       block_pending, 1);
1702                 /* Trigger dqrel/dqacq for original owner and new owner.
1703                  * If failed, the next call for lquota_chkquota will
1704                  * process it. */
1705                 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1706                               quota_opc);
1707         }
1708 #endif
1709         RETURN(rc);
1710 }
1711
1712 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1713                       const struct lu_buf *buf, const char *name, int fl,
1714                       struct thandle *handle)
1715 {
1716         int  rc;
1717         ENTRY;
1718
1719         mdd_write_lock(env, obj, MOR_TGT_CHILD);
1720         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1721         mdd_write_unlock(env, obj);
1722
1723         RETURN(rc);
1724 }
1725
1726 static int mdd_xattr_sanity_check(const struct lu_env *env,
1727                                   struct mdd_object *obj)
1728 {
1729         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1730         struct md_ucred *uc     = md_ucred(env);
1731         int rc;
1732         ENTRY;
1733
1734         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1735                 RETURN(-EPERM);
1736
1737         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1738         if (rc)
1739                 RETURN(rc);
1740
1741         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1742             !mdd_capable(uc, CFS_CAP_FOWNER))
1743                 RETURN(-EPERM);
1744
1745         RETURN(rc);
1746 }
1747
1748 static int mdd_declare_xattr_set(const struct lu_env *env,
1749                                  struct mdd_device *mdd,
1750                                  struct mdd_object *obj,
1751                                  const struct lu_buf *buf,
1752                                  const char *name,
1753                                  struct thandle *handle)
1754
1755 {
1756         int rc;
1757
1758         rc = mdo_declare_xattr_set(env, obj, buf, name, 0, handle);
1759         if (rc)
1760                 return rc;
1761
1762         /* Only record user xattr changes */
1763         if ((strncmp("user.", name, 5) == 0))
1764                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1765
1766         return rc;
1767 }
1768
1769 /**
1770  * The caller should guarantee to update the object ctime
1771  * after xattr_set if needed.
1772  */
1773 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1774                          const struct lu_buf *buf, const char *name,
1775                          int fl)
1776 {
1777         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1778         struct mdd_device *mdd = mdo2mdd(obj);
1779         struct thandle *handle;
1780         int  rc;
1781         ENTRY;
1782
1783         rc = mdd_xattr_sanity_check(env, mdd_obj);
1784         if (rc)
1785                 RETURN(rc);
1786
1787         handle = mdd_trans_create(env, mdd);
1788         if (IS_ERR(handle))
1789                 RETURN(PTR_ERR(handle));
1790
1791         /* security-replated changes may require sync */
1792         if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1793             mdd->mdd_sync_permission == 1)
1794                 handle->th_sync = 1;
1795
1796         rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, handle);
1797         if (rc)
1798                 GOTO(stop, rc);
1799
1800         rc = mdd_trans_start(env, mdd, handle);
1801         if (rc)
1802                 GOTO(stop, rc);
1803
1804         /* security-replated changes may require sync */
1805         if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1806                 handle->th_sync |= mdd->mdd_sync_permission;
1807
1808         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1809
1810         /* Only record system & user xattr changes */
1811         if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1812                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1813                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1814                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1815                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1816                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1817                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1818                                               handle);
1819
1820 stop:
1821         mdd_trans_stop(env, mdd, rc, handle);
1822
1823         RETURN(rc);
1824 }
1825
1826 static int mdd_declare_xattr_del(const struct lu_env *env,
1827                                  struct mdd_device *mdd,
1828                                  struct mdd_object *obj,
1829                                  const char *name,
1830                                  struct thandle *handle)
1831 {
1832         int rc;
1833
1834         rc = mdo_declare_xattr_del(env, obj, name, handle);
1835         if (rc)
1836                 return rc;
1837
1838         /* Only record user xattr changes */
1839         if ((strncmp("user.", name, 5) == 0))
1840                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1841
1842         return rc;
1843 }
1844
1845 /**
1846  * The caller should guarantee to update the object ctime
1847  * after xattr_set if needed.
1848  */
1849 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1850                   const char *name)
1851 {
1852         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1853         struct mdd_device *mdd = mdo2mdd(obj);
1854         struct thandle *handle;
1855         int  rc;
1856         ENTRY;
1857
1858         rc = mdd_xattr_sanity_check(env, mdd_obj);
1859         if (rc)
1860                 RETURN(rc);
1861
1862         handle = mdd_trans_create(env, mdd);
1863         if (IS_ERR(handle))
1864                 RETURN(PTR_ERR(handle));
1865
1866         rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
1867         if (rc)
1868                 GOTO(stop, rc);
1869
1870         rc = mdd_trans_start(env, mdd, handle);
1871         if (rc)
1872                 GOTO(stop, rc);
1873
1874         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1875         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1876                            mdd_object_capa(env, mdd_obj));
1877         mdd_write_unlock(env, mdd_obj);
1878
1879         /* Only record system & user xattr changes */
1880         if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1881                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1882                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1883                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1884                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1885                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1886                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1887                                               handle);
1888
1889 stop:
1890         mdd_trans_stop(env, mdd, rc, handle);
1891
1892         RETURN(rc);
1893 }
1894
1895 /* partial unlink */
1896 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1897                        struct md_attr *ma)
1898 {
1899         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1900         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1901         struct mdd_device *mdd = mdo2mdd(obj);
1902         struct thandle *handle;
1903 #ifdef HAVE_QUOTA_SUPPORT
1904         struct obd_device *obd = mdd->mdd_obd_dev;
1905         struct mds_obd *mds = &obd->u.mds;
1906         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1907         int quota_opc = 0;
1908 #endif
1909         int rc;
1910         ENTRY;
1911
1912         /* XXX: this code won't be used ever:
1913          * DNE uses slightly different approach */
1914         LBUG();
1915
1916         /*
1917          * Check -ENOENT early here because we need to get object type
1918          * to calculate credits before transaction start
1919          */
1920         if (!mdd_object_exists(mdd_obj))
1921                 RETURN(-ENOENT);
1922
1923         LASSERT(mdd_object_exists(mdd_obj) > 0);
1924
1925         handle = mdd_trans_create(env, mdd);
1926         if (IS_ERR(handle))
1927                 RETURN(-ENOMEM);
1928
1929         rc = mdd_trans_start(env, mdd, handle);
1930
1931         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1932
1933         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1934         if (rc)
1935                 GOTO(cleanup, rc);
1936
1937         __mdd_ref_del(env, mdd_obj, handle, 0);
1938
1939         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1940                 /* unlink dot */
1941                 __mdd_ref_del(env, mdd_obj, handle, 1);
1942         }
1943
1944         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1945         la_copy->la_ctime = ma->ma_attr.la_ctime;
1946
1947         la_copy->la_valid = LA_CTIME;
1948         rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1949         if (rc)
1950                 GOTO(cleanup, rc);
1951
1952         rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1953 #ifdef HAVE_QUOTA_SUPPORT
1954         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1955             ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1956                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1957                 mdd_quota_wrapper(&ma->ma_attr, qids);
1958         }
1959 #endif
1960
1961
1962         EXIT;
1963 cleanup:
1964         mdd_write_unlock(env, mdd_obj);
1965         mdd_trans_stop(env, mdd, rc, handle);
1966 #ifdef HAVE_QUOTA_SUPPORT
1967         if (quota_opc)
1968                 /* Trigger dqrel on the owner of child. If failed,
1969                  * the next call for lquota_chkquota will process it */
1970                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1971                               quota_opc);
1972 #endif
1973         return rc;
1974 }
1975
1976 /* partial operation */
1977 static int mdd_oc_sanity_check(const struct lu_env *env,
1978                                struct mdd_object *obj,
1979                                struct md_attr *ma)
1980 {
1981         int rc;
1982         ENTRY;
1983
1984         switch (ma->ma_attr.la_mode & S_IFMT) {
1985         case S_IFREG:
1986         case S_IFDIR:
1987         case S_IFLNK:
1988         case S_IFCHR:
1989         case S_IFBLK:
1990         case S_IFIFO:
1991         case S_IFSOCK:
1992                 rc = 0;
1993                 break;
1994         default:
1995                 rc = -EINVAL;
1996                 break;
1997         }
1998         RETURN(rc);
1999 }
2000
2001 static int mdd_object_create(const struct lu_env *env,
2002                              struct md_object *obj,
2003                              const struct md_op_spec *spec,
2004                              struct md_attr *ma)
2005 {
2006
2007         struct mdd_device *mdd = mdo2mdd(obj);
2008         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2009         const struct lu_fid *pfid = spec->u.sp_pfid;
2010         struct thandle *handle;
2011 #ifdef HAVE_QUOTA_SUPPORT
2012         struct obd_device *obd = mdd->mdd_obd_dev;
2013         struct obd_export *exp = md_quota(env)->mq_exp;
2014         struct mds_obd *mds = &obd->u.mds;
2015         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2016         int quota_opc = 0, block_count = 0;
2017         int inode_pending[MAXQUOTAS] = { 0, 0 };
2018         int block_pending[MAXQUOTAS] = { 0, 0 };
2019 #endif
2020         int rc = 0;
2021         ENTRY;
2022
2023         /* XXX: this code won't be used ever:
2024          * DNE uses slightly different approach */
2025         LBUG();
2026
2027 #ifdef HAVE_QUOTA_SUPPORT
2028         if (mds->mds_quota) {
2029                 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
2030                 mdd_quota_wrapper(&ma->ma_attr, qids);
2031                 /* get file quota for child */
2032                 lquota_chkquota(mds_quota_interface_ref, obd, exp,
2033                                 qids, inode_pending, 1, NULL, 0,
2034                                 NULL, 0);
2035                 switch (ma->ma_attr.la_mode & S_IFMT) {
2036                 case S_IFLNK:
2037                 case S_IFDIR:
2038                         block_count = 2;
2039                         break;
2040                 case S_IFREG:
2041                         block_count = 1;
2042                         break;
2043                 }
2044                 /* get block quota for child */
2045                 if (block_count)
2046                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
2047                                         qids, block_pending, block_count,
2048                                         NULL, LQUOTA_FLAGS_BLK, NULL, 0);
2049         }
2050 #endif
2051
2052         handle = mdd_trans_create(env, mdd);
2053         if (IS_ERR(handle))
2054                 GOTO(out_pending, rc = PTR_ERR(handle));
2055
2056         rc = mdd_trans_start(env, mdd, handle);
2057
2058         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2059         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
2060         if (rc)
2061                 GOTO(unlock, rc);
2062
2063         rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
2064         if (rc)
2065                 GOTO(unlock, rc);
2066
2067         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
2068                 /* If creating the slave object, set slave EA here. */
2069                 int lmv_size = spec->u.sp_ea.eadatalen;
2070                 struct lmv_stripe_md *lmv;
2071
2072                 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
2073                 LASSERT(lmv != NULL && lmv_size > 0);
2074
2075                 rc = __mdd_xattr_set(env, mdd_obj,
2076                                      mdd_buf_get_const(env, lmv, lmv_size),
2077                                      XATTR_NAME_LMV, 0, handle);
2078                 if (rc)
2079                         GOTO(unlock, rc);
2080
2081                 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
2082                                            handle, 0);
2083         } else {
2084 #ifdef CONFIG_FS_POSIX_ACL
2085                 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
2086                         struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
2087
2088                         buf->lb_buf = (void *)spec->u.sp_ea.eadata;
2089                         buf->lb_len = spec->u.sp_ea.eadatalen;
2090                         if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
2091                                 rc = __mdd_acl_init(env, mdd_obj, buf,
2092                                                     &ma->ma_attr.la_mode,
2093                                                     handle);
2094                                 if (rc)
2095                                         GOTO(unlock, rc);
2096                                 else
2097                                         ma->ma_attr.la_valid |= LA_MODE;
2098                         }
2099
2100                         pfid = spec->u.sp_ea.fid;
2101                 }
2102 #endif
2103                 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
2104                                            spec);
2105         }
2106         EXIT;
2107 unlock:
2108         if (rc == 0)
2109                 rc = mdd_attr_get_internal(env, mdd_obj, ma);
2110         mdd_write_unlock(env, mdd_obj);
2111
2112         mdd_trans_stop(env, mdd, rc, handle);
2113 out_pending:
2114 #ifdef HAVE_QUOTA_SUPPORT
2115         if (quota_opc) {
2116                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2117                                       inode_pending, 0);
2118                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2119                                       block_pending, 1);
2120                 /* Trigger dqacq on the owner of child. If failed,
2121                  * the next call for lquota_chkquota will process it. */
2122                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2123                               quota_opc);
2124         }
2125 #endif
2126         return rc;
2127 }
2128
2129 /* partial link */
2130 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
2131                        const struct md_attr *ma)
2132 {
2133         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
2134         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2135         struct mdd_device *mdd = mdo2mdd(obj);
2136         struct thandle *handle;
2137         int rc;
2138         ENTRY;
2139
2140         /* XXX: this code won't be used ever:
2141          * DNE uses slightly different approach */
2142         LBUG();
2143
2144         handle = mdd_trans_create(env, mdd);
2145         if (IS_ERR(handle))
2146                 RETURN(-ENOMEM);
2147
2148         rc = mdd_trans_start(env, mdd, handle);
2149
2150         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2151         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
2152         if (rc == 0)
2153                 __mdd_ref_add(env, mdd_obj, handle);
2154         mdd_write_unlock(env, mdd_obj);
2155         if (rc == 0) {
2156                 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2157                 la_copy->la_ctime = ma->ma_attr.la_ctime;
2158
2159                 la_copy->la_valid = LA_CTIME;
2160                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
2161                                                         handle, 0);
2162         }
2163         mdd_trans_stop(env, mdd, 0, handle);
2164
2165         RETURN(rc);
2166 }
2167
2168 /*
2169  * do NOT or the MAY_*'s, you'll get the weakest
2170  */
2171 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
2172 {
2173         int res = 0;
2174
2175         /* Sadly, NFSD reopens a file repeatedly during operation, so the
2176          * "acc_mode = 0" allowance for newly-created files isn't honoured.
2177          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
2178          * owner can write to a file even if it is marked readonly to hide
2179          * its brokenness. (bug 5781) */
2180         if (flags & MDS_OPEN_OWNEROVERRIDE) {
2181                 struct md_ucred *uc = md_ucred(env);
2182
2183                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
2184                     (la->la_uid == uc->mu_fsuid))
2185                         return 0;
2186         }
2187
2188         if (flags & FMODE_READ)
2189                 res |= MAY_READ;
2190         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2191                 res |= MAY_WRITE;
2192         if (flags & MDS_FMODE_EXEC)
2193                 res = MAY_EXEC;
2194         return res;
2195 }
2196
2197 static int mdd_open_sanity_check(const struct lu_env *env,
2198                                  struct mdd_object *obj, int flag)
2199 {
2200         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2201         int mode, rc;
2202         ENTRY;
2203
2204         /* EEXIST check */
2205         if (mdd_is_dead_obj(obj))
2206                 RETURN(-ENOENT);
2207
2208         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
2209         if (rc)
2210                RETURN(rc);
2211
2212         if (S_ISLNK(tmp_la->la_mode))
2213                 RETURN(-ELOOP);
2214
2215         mode = accmode(env, tmp_la, flag);
2216
2217         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2218                 RETURN(-EISDIR);
2219
2220         if (!(flag & MDS_OPEN_CREATED)) {
2221                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
2222                 if (rc)
2223                         RETURN(rc);
2224         }
2225
2226         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2227             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2228                 flag &= ~MDS_OPEN_TRUNC;
2229
2230         /* For writing append-only file must open it with append mode. */
2231         if (mdd_is_append(obj)) {
2232                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2233                         RETURN(-EPERM);
2234                 if (flag & MDS_OPEN_TRUNC)
2235                         RETURN(-EPERM);
2236         }
2237
2238 #if 0
2239         /*
2240          * Now, flag -- O_NOATIME does not be packed by client.
2241          */
2242         if (flag & O_NOATIME) {
2243                 struct md_ucred *uc = md_ucred(env);
2244
2245                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2246                     (uc->mu_valid == UCRED_NEW)) &&
2247                     (uc->mu_fsuid != tmp_la->la_uid) &&
2248                     !mdd_capable(uc, CFS_CAP_FOWNER))
2249                         RETURN(-EPERM);
2250         }
2251 #endif
2252
2253         RETURN(0);
2254 }
2255
2256 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2257                     int flags)
2258 {
2259         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2260         int rc = 0;
2261
2262         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2263
2264         rc = mdd_open_sanity_check(env, mdd_obj, flags);
2265         if (rc == 0)
2266                 mdd_obj->mod_count++;
2267
2268         mdd_write_unlock(env, mdd_obj);
2269         return rc;
2270 }
2271
2272 int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
2273                             struct md_attr *ma, struct thandle *handle)
2274 {
2275         int rc;
2276
2277         rc = mdd_declare_unlink_log(env, obj, ma, handle);
2278         if (rc)
2279                 return rc;
2280
2281         return mdo_declare_destroy(env, obj, handle);
2282 }
2283
2284 /* return md_attr back,
2285  * if it is last unlink then return lov ea + llog cookie*/
2286 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2287                     struct md_attr *ma, struct thandle *handle)
2288 {
2289         int rc = 0;
2290         ENTRY;
2291
2292         if (S_ISREG(mdd_object_type(obj))) {
2293                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2294                  * Caller must be ready for that. */
2295
2296                 rc = __mdd_lmm_get(env, obj, ma);
2297                 if ((ma->ma_valid & MA_LOV))
2298                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2299                                             obj, ma);
2300         }
2301
2302         if (rc == 0)
2303                 rc = mdo_destroy(env, obj, handle);
2304
2305         RETURN(rc);
2306 }
2307
2308 static int mdd_declare_close(const struct lu_env *env,
2309                              struct mdd_object *obj,
2310                              struct md_attr *ma,
2311                              struct thandle *handle)
2312 {
2313         int rc;
2314
2315         rc = orph_declare_index_delete(env, obj, handle);
2316         if (rc)
2317                 return rc;
2318
2319         return mdd_declare_object_kill(env, obj, ma, handle);
2320 }
2321
2322 /*
2323  * No permission check is needed.
2324  */
2325 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2326                      struct md_attr *ma, int mode)
2327 {
2328         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2329         struct mdd_device *mdd = mdo2mdd(obj);
2330         struct thandle    *handle = NULL;
2331         int rc;
2332         int reset = 1;
2333
2334 #ifdef HAVE_QUOTA_SUPPORT
2335         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2336         struct mds_obd *mds = &obd->u.mds;
2337         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2338         int quota_opc = 0;
2339 #endif
2340         ENTRY;
2341
2342         if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2343                 mdd_obj->mod_count--;
2344
2345                 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2346                         CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2347                                "list\n", PFID(mdd_object_fid(mdd_obj)));
2348                 RETURN(0);
2349         }
2350
2351         /* check without any lock */
2352         if (mdd_obj->mod_count == 1 &&
2353             (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2354  again:
2355                 handle = mdd_trans_create(env, mdo2mdd(obj));
2356                 if (IS_ERR(handle))
2357                         RETURN(PTR_ERR(handle));
2358
2359                 rc = mdd_declare_close(env, mdd_obj, ma, handle);
2360                 if (rc)
2361                         GOTO(stop, rc);
2362
2363                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
2364                 if (rc)
2365                         GOTO(stop, rc);
2366
2367                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2368                 if (rc)
2369                         GOTO(stop, rc);
2370         }
2371
2372         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2373         if (handle == NULL && mdd_obj->mod_count == 1 &&
2374             (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2375                 mdd_write_unlock(env, mdd_obj);
2376                 goto again;
2377         }
2378
2379         /* release open count */
2380         mdd_obj->mod_count --;
2381
2382         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2383                 /* remove link to object from orphan index */
2384                 LASSERT(handle != NULL);
2385                 rc = __mdd_orphan_del(env, mdd_obj, handle);
2386                 if (rc == 0) {
2387                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2388                                "list, OSS objects to be destroyed.\n",
2389                                PFID(mdd_object_fid(mdd_obj)));
2390                 } else {
2391                         CERROR("Object "DFID" can not be deleted from orphan "
2392                                 "list, maybe cause OST objects can not be "
2393                                 "destroyed (err: %d).\n",
2394                                 PFID(mdd_object_fid(mdd_obj)), rc);
2395                         /* If object was not deleted from orphan list, do not
2396                          * destroy OSS objects, which will be done when next
2397                          * recovery. */
2398                         GOTO(out, rc);
2399                 }
2400         }
2401
2402         rc = mdd_iattr_get(env, mdd_obj, ma);
2403         /* Object maybe not in orphan list originally, it is rare case for
2404          * mdd_finish_unlink() failure. */
2405         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
2406 #ifdef HAVE_QUOTA_SUPPORT
2407                 if (mds->mds_quota) {
2408                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2409                         mdd_quota_wrapper(&ma->ma_attr, qids);
2410                 }
2411 #endif
2412                 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2413                 if (ma->ma_valid & MA_FLAGS &&
2414                     ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2415                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2416                 } else {
2417                         if (handle == NULL) {
2418                                 handle = mdd_trans_create(env, mdo2mdd(obj));
2419                                 if (IS_ERR(handle))
2420                                         GOTO(out, rc = PTR_ERR(handle));
2421
2422                                 rc = mdd_declare_object_kill(env, mdd_obj, ma,
2423                                                              handle);
2424                                 if (rc)
2425                                         GOTO(out, rc);
2426
2427                                 rc = mdd_declare_changelog_store(env, mdd,
2428                                                                  NULL, handle);
2429                                 if (rc)
2430                                         GOTO(stop, rc);
2431
2432                                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2433                                 if (rc)
2434                                         GOTO(out, rc);
2435                         }
2436
2437                         rc = mdd_object_kill(env, mdd_obj, ma, handle);
2438                         if (rc == 0)
2439                                 reset = 0;
2440                 }
2441
2442                 if (rc != 0)
2443                         CERROR("Error when prepare to delete Object "DFID" , "
2444                                "which will cause OST objects can not be "
2445                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
2446         }
2447         EXIT;
2448
2449 out:
2450         if (reset)
2451                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2452
2453         mdd_write_unlock(env, mdd_obj);
2454
2455         if (rc == 0 &&
2456             (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
2457             !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
2458                 if (handle == NULL) {
2459                         handle = mdd_trans_create(env, mdo2mdd(obj));
2460                         if (IS_ERR(handle))
2461                                 GOTO(stop, rc = IS_ERR(handle));
2462
2463                         rc = mdd_declare_changelog_store(env, mdd, NULL,
2464                                                          handle);
2465                         if (rc)
2466                                 GOTO(stop, rc);
2467
2468                         rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2469                         if (rc)
2470                                 GOTO(stop, rc);
2471                 }
2472
2473                 mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
2474                                          mdd_obj, handle);
2475         }
2476
2477 stop:
2478         if (handle != NULL)
2479                 mdd_trans_stop(env, mdd, rc, handle);
2480 #ifdef HAVE_QUOTA_SUPPORT
2481         if (quota_opc)
2482                 /* Trigger dqrel on the owner of child. If failed,
2483                  * the next call for lquota_chkquota will process it */
2484                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2485                               quota_opc);
2486 #endif
2487         return rc;
2488 }
2489
2490 /*
2491  * Permission check is done when open,
2492  * no need check again.
2493  */
2494 static int mdd_readpage_sanity_check(const struct lu_env *env,
2495                                      struct mdd_object *obj)
2496 {
2497         struct dt_object *next = mdd_object_child(obj);
2498         int rc;
2499         ENTRY;
2500
2501         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2502                 rc = 0;
2503         else
2504                 rc = -ENOTDIR;
2505
2506         RETURN(rc);
2507 }
2508
2509 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2510                               struct lu_dirpage *dp, int nob,
2511                               const struct dt_it_ops *iops, struct dt_it *it,
2512                               __u32 attr)
2513 {
2514         void                   *area = dp;
2515         int                     result;
2516         __u64                   hash = 0;
2517         struct lu_dirent       *ent;
2518         struct lu_dirent       *last = NULL;
2519         int                     first = 1;
2520
2521         memset(area, 0, sizeof (*dp));
2522         area += sizeof (*dp);
2523         nob  -= sizeof (*dp);
2524
2525         ent  = area;
2526         do {
2527                 int    len;
2528                 int    recsize;
2529
2530                 len = iops->key_size(env, it);
2531
2532                 /* IAM iterator can return record with zero len. */
2533                 if (len == 0)
2534                         goto next;
2535
2536                 hash = iops->store(env, it);
2537                 if (unlikely(first)) {
2538                         first = 0;
2539                         dp->ldp_hash_start = cpu_to_le64(hash);
2540                 }
2541
2542                 /* calculate max space required for lu_dirent */
2543                 recsize = lu_dirent_calc_size(len, attr);
2544
2545                 if (nob >= recsize) {
2546                         result = iops->rec(env, it, (struct dt_rec *)ent, attr);
2547                         if (result == -ESTALE)
2548                                 goto next;
2549                         if (result != 0)
2550                                 goto out;
2551
2552                         /* osd might not able to pack all attributes,
2553                          * so recheck rec length */
2554                         recsize = le16_to_cpu(ent->lde_reclen);
2555                 } else {
2556                         result = (last != NULL) ? 0 :-EINVAL;
2557                         goto out;
2558                 }
2559                 last = ent;
2560                 ent = (void *)ent + recsize;
2561                 nob -= recsize;
2562
2563 next:
2564                 result = iops->next(env, it);
2565                 if (result == -ESTALE)
2566                         goto next;
2567         } while (result == 0);
2568
2569 out:
2570         dp->ldp_hash_end = cpu_to_le64(hash);
2571         if (last != NULL) {
2572                 if (last->lde_hash == dp->ldp_hash_end)
2573                         dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2574                 last->lde_reclen = 0; /* end mark */
2575         }
2576         return result;
2577 }
2578
2579 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2580                           const struct lu_rdpg *rdpg)
2581 {
2582         struct dt_it      *it;
2583         struct dt_object  *next = mdd_object_child(obj);
2584         const struct dt_it_ops  *iops;
2585         struct page       *pg;
2586         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2587         int i;
2588         int nlupgs = 0;
2589         int rc;
2590         int nob;
2591
2592         LASSERT(rdpg->rp_pages != NULL);
2593         LASSERT(next->do_index_ops != NULL);
2594
2595         if (rdpg->rp_count <= 0)
2596                 return -EFAULT;
2597
2598         /*
2599          * iterate through directory and fill pages from @rdpg
2600          */
2601         iops = &next->do_index_ops->dio_it;
2602         it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
2603         if (IS_ERR(it))
2604                 return PTR_ERR(it);
2605
2606         rc = iops->load(env, it, rdpg->rp_hash);
2607
2608         if (rc == 0) {
2609                 /*
2610                  * Iterator didn't find record with exactly the key requested.
2611                  *
2612                  * It is currently either
2613                  *
2614                  *     - positioned above record with key less than
2615                  *     requested---skip it.
2616                  *
2617                  *     - or not positioned at all (is in IAM_IT_SKEWED
2618                  *     state)---position it on the next item.
2619                  */
2620                 rc = iops->next(env, it);
2621         } else if (rc > 0)
2622                 rc = 0;
2623
2624         /*
2625          * At this point and across for-loop:
2626          *
2627          *  rc == 0 -> ok, proceed.
2628          *  rc >  0 -> end of directory.
2629          *  rc <  0 -> error.
2630          */
2631         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2632              i++, nob -= CFS_PAGE_SIZE) {
2633                 struct lu_dirpage *dp;
2634
2635                 LASSERT(i < rdpg->rp_npages);
2636                 pg = rdpg->rp_pages[i];
2637                 dp = cfs_kmap(pg);
2638 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2639 repeat:
2640 #endif
2641                 rc = mdd_dir_page_build(env, mdd, dp,
2642                                         min_t(int, nob, LU_PAGE_SIZE),
2643                                         iops, it, rdpg->rp_attrs);
2644                 if (rc > 0) {
2645                         /*
2646                          * end of directory.
2647                          */
2648                         dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2649                         nlupgs++;
2650                 } else if (rc < 0) {
2651                         CWARN("build page failed: %d!\n", rc);
2652                 } else {
2653                         nlupgs++;
2654 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2655                         dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2656                         if ((unsigned long)dp & ~CFS_PAGE_MASK)
2657                                 goto repeat;
2658 #endif
2659                 }
2660                 cfs_kunmap(pg);
2661         }
2662         if (rc >= 0) {
2663                 struct lu_dirpage *dp;
2664
2665                 dp = cfs_kmap(rdpg->rp_pages[0]);
2666                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2667                 if (nlupgs == 0) {
2668                         /*
2669                          * No pages were processed, mark this for first page
2670                          * and send back.
2671                          */
2672                         dp->ldp_flags  = cpu_to_le32(LDF_EMPTY);
2673                         nlupgs = 1;
2674                 }
2675                 cfs_kunmap(rdpg->rp_pages[0]);
2676
2677                 rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
2678         }
2679         iops->put(env, it);
2680         iops->fini(env, it);
2681
2682         return rc;
2683 }
2684
2685 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2686                  const struct lu_rdpg *rdpg)
2687 {
2688         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2689         int rc;
2690         ENTRY;
2691
2692         LASSERT(mdd_object_exists(mdd_obj));
2693
2694         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2695         rc = mdd_readpage_sanity_check(env, mdd_obj);
2696         if (rc)
2697                 GOTO(out_unlock, rc);
2698
2699         if (mdd_is_dead_obj(mdd_obj)) {
2700                 struct page *pg;
2701                 struct lu_dirpage *dp;
2702
2703                 /*
2704                  * According to POSIX, please do not return any entry to client:
2705                  * even dot and dotdot should not be returned.
2706                  */
2707                 CWARN("readdir from dead object: "DFID"\n",
2708                         PFID(mdd_object_fid(mdd_obj)));
2709
2710                 if (rdpg->rp_count <= 0)
2711                         GOTO(out_unlock, rc = -EFAULT);
2712                 LASSERT(rdpg->rp_pages != NULL);
2713
2714                 pg = rdpg->rp_pages[0];
2715                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2716                 memset(dp, 0 , sizeof(struct lu_dirpage));
2717                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2718                 dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
2719                 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2720                 cfs_kunmap(pg);
2721                 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2722         }
2723
2724         rc = __mdd_readpage(env, mdd_obj, rdpg);
2725
2726         EXIT;
2727 out_unlock:
2728         mdd_read_unlock(env, mdd_obj);
2729         return rc;
2730 }
2731
2732 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2733 {
2734         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2735         struct dt_object *next;
2736
2737         LASSERT(mdd_object_exists(mdd_obj));
2738         next = mdd_object_child(mdd_obj);
2739         return next->do_ops->do_object_sync(env, next);
2740 }
2741
2742 const struct md_object_operations mdd_obj_ops = {
2743         .moo_permission    = mdd_permission,
2744         .moo_attr_get      = mdd_attr_get,
2745         .moo_attr_set      = mdd_attr_set,
2746         .moo_xattr_get     = mdd_xattr_get,
2747         .moo_xattr_set     = mdd_xattr_set,
2748         .moo_xattr_list    = mdd_xattr_list,
2749         .moo_xattr_del     = mdd_xattr_del,
2750         .moo_object_create = mdd_object_create,
2751         .moo_ref_add       = mdd_ref_add,
2752         .moo_ref_del       = mdd_ref_del,
2753         .moo_open          = mdd_open,
2754         .moo_close         = mdd_close,
2755         .moo_readpage      = mdd_readpage,
2756         .moo_readlink      = mdd_readlink,
2757         .moo_changelog     = mdd_changelog,
2758         .moo_capa_get      = mdd_capa_get,
2759         .moo_object_sync   = mdd_object_sync,
2760         .moo_path          = mdd_path,
2761         .moo_file_lock     = mdd_file_lock,
2762         .moo_file_unlock   = mdd_file_unlock,
2763 };