Whamcloud - gitweb
277f896d4e974a52751269654f3af2af966f3714
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011, 2012, Whamcloud, Inc.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  *
38  * lustre/mdd/mdd_object.c
39  *
40  * Lustre Metadata Server (mdd) routines
41  *
42  * Author: Wang Di <wangdi@clusterfs.com>
43  */
44
45 #ifndef EXPORT_SYMTAB
46 # define EXPORT_SYMTAB
47 #endif
48 #define DEBUG_SUBSYSTEM S_MDS
49
50 #include <linux/module.h>
51 #include <obd.h>
52 #include <obd_class.h>
53 #include <obd_support.h>
54 #include <lprocfs_status.h>
55 /* fid_be_cpu(), fid_cpu_to_be(). */
56 #include <lustre_fid.h>
57 #include <obd_lov.h>
58
59 #include <lustre_param.h>
60 #include <lustre_mds.h>
61 #include <lustre/lustre_idl.h>
62
63 #include "mdd_internal.h"
64
65 static const struct lu_object_operations mdd_lu_obj_ops;
66
67 static int mdd_xattr_get(const struct lu_env *env,
68                          struct md_object *obj, struct lu_buf *buf,
69                          const char *name);
70
71 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
72                  void **data)
73 {
74         if (mdd_object_exists(obj) == 0) {
75                 CERROR("%s: object "DFID" not found: rc = -2\n",
76                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
77                 return -ENOENT;
78         }
79         mdo_data_get(env, obj, data);
80         return 0;
81 }
82
83 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
84                struct lu_attr *la, struct lustre_capa *capa)
85 {
86         if (mdd_object_exists(obj) == 0) {
87                 CERROR("%s: object "DFID" not found: rc = -2\n",
88                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
89                 return -ENOENT;
90         }
91         return mdo_attr_get(env, obj, la, capa);
92 }
93
94 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
95 {
96         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
97
98         if (flags & LUSTRE_APPEND_FL)
99                 obj->mod_flags |= APPEND_OBJ;
100
101         if (flags & LUSTRE_IMMUTABLE_FL)
102                 obj->mod_flags |= IMMUTE_OBJ;
103 }
104
105 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
106 {
107         struct mdd_thread_info *info;
108
109         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
110         LASSERT(info != NULL);
111         return info;
112 }
113
114 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
115 {
116         struct lu_buf *buf;
117
118         buf = &mdd_env_info(env)->mti_buf;
119         buf->lb_buf = area;
120         buf->lb_len = len;
121         return buf;
122 }
123
124 void mdd_buf_put(struct lu_buf *buf)
125 {
126         if (buf == NULL || buf->lb_buf == NULL)
127                 return;
128         OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
129         buf->lb_buf = NULL;
130         buf->lb_len = 0;
131 }
132
133 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
134                                        const void *area, ssize_t len)
135 {
136         struct lu_buf *buf;
137
138         buf = &mdd_env_info(env)->mti_buf;
139         buf->lb_buf = (void *)area;
140         buf->lb_len = len;
141         return buf;
142 }
143
144 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
145 {
146         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
147
148         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
149                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
150                 buf->lb_buf = NULL;
151         }
152         if (buf->lb_buf == NULL) {
153                 buf->lb_len = len;
154                 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
155                 if (buf->lb_buf == NULL)
156                         buf->lb_len = 0;
157         }
158         return buf;
159 }
160
161 /** Increase the size of the \a mti_big_buf.
162  * preserves old data in buffer
163  * old buffer remains unchanged on error
164  * \retval 0 or -ENOMEM
165  */
166 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
167 {
168         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
169         struct lu_buf buf;
170
171         LASSERT(len >= oldbuf->lb_len);
172         OBD_ALLOC_LARGE(buf.lb_buf, len);
173
174         if (buf.lb_buf == NULL)
175                 return -ENOMEM;
176
177         buf.lb_len = len;
178         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
179
180         OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
181
182         memcpy(oldbuf, &buf, sizeof(buf));
183
184         return 0;
185 }
186
187 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
188                                        struct mdd_device *mdd)
189 {
190         struct mdd_thread_info *mti = mdd_env_info(env);
191         int                     max_cookie_size;
192
193         max_cookie_size = mdd_lov_cookiesize(env, mdd);
194         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
195                 if (mti->mti_max_cookie)
196                         OBD_FREE_LARGE(mti->mti_max_cookie,
197                                        mti->mti_max_cookie_size);
198                 mti->mti_max_cookie = NULL;
199                 mti->mti_max_cookie_size = 0;
200         }
201         if (unlikely(mti->mti_max_cookie == NULL)) {
202                 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
203                 if (likely(mti->mti_max_cookie != NULL))
204                         mti->mti_max_cookie_size = max_cookie_size;
205         }
206         if (likely(mti->mti_max_cookie != NULL))
207                 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
208         return mti->mti_max_cookie;
209 }
210
211 struct lov_mds_md *mdd_max_lmm_buffer(const struct lu_env *env, int size)
212 {
213         struct mdd_thread_info *mti = mdd_env_info(env);
214
215         if (unlikely(mti->mti_max_lmm_size < size)) {
216                 int rsize = size_roundup_power2(size);
217
218                 if (mti->mti_max_lmm_size > 0) {
219                         LASSERT(mti->mti_max_lmm);
220                         OBD_FREE_LARGE(mti->mti_max_lmm,
221                                        mti->mti_max_lmm_size);
222                         mti->mti_max_lmm = NULL;
223                         mti->mti_max_lmm_size = 0;
224                 }
225
226                 OBD_ALLOC_LARGE(mti->mti_max_lmm, rsize);
227                 if (likely(mti->mti_max_lmm != NULL))
228                         mti->mti_max_lmm_size = rsize;
229         }
230         return mti->mti_max_lmm;
231 }
232
233 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
234                                    struct mdd_device *mdd)
235 {
236         int max_lmm_size;
237
238         max_lmm_size = mdd_lov_mdsize(env, mdd);
239         return mdd_max_lmm_buffer(env, max_lmm_size);
240 }
241
242 struct lu_object *mdd_object_alloc(const struct lu_env *env,
243                                    const struct lu_object_header *hdr,
244                                    struct lu_device *d)
245 {
246         struct mdd_object *mdd_obj;
247
248         OBD_ALLOC_PTR(mdd_obj);
249         if (mdd_obj != NULL) {
250                 struct lu_object *o;
251
252                 o = mdd2lu_obj(mdd_obj);
253                 lu_object_init(o, NULL, d);
254                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
255                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
256                 mdd_obj->mod_count = 0;
257                 o->lo_ops = &mdd_lu_obj_ops;
258                 return o;
259         } else {
260                 return NULL;
261         }
262 }
263
264 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
265                            const struct lu_object_conf *unused)
266 {
267         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
268         struct mdd_object *mdd_obj = lu2mdd_obj(o);
269         struct lu_object  *below;
270         struct lu_device  *under;
271         ENTRY;
272
273         mdd_obj->mod_cltime = 0;
274         under = &d->mdd_child->dd_lu_dev;
275         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
276         mdd_pdlock_init(mdd_obj);
277         if (below == NULL)
278                 RETURN(-ENOMEM);
279
280         lu_object_add(o, below);
281
282         RETURN(0);
283 }
284
285 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
286 {
287         if (lu_object_exists(o))
288                 return mdd_get_flags(env, lu2mdd_obj(o));
289         else
290                 return 0;
291 }
292
293 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
294 {
295         struct mdd_object *mdd = lu2mdd_obj(o);
296
297         lu_object_fini(o);
298         OBD_FREE_PTR(mdd);
299 }
300
301 static int mdd_object_print(const struct lu_env *env, void *cookie,
302                             lu_printer_t p, const struct lu_object *o)
303 {
304         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
305         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
306                     "valid=%x, cltime="LPU64", flags=%lx)",
307                     mdd, mdd->mod_count, mdd->mod_valid,
308                     mdd->mod_cltime, mdd->mod_flags);
309 }
310
311 static const struct lu_object_operations mdd_lu_obj_ops = {
312         .loo_object_init    = mdd_object_init,
313         .loo_object_start   = mdd_object_start,
314         .loo_object_free    = mdd_object_free,
315         .loo_object_print   = mdd_object_print,
316 };
317
318 struct mdd_object *mdd_object_find(const struct lu_env *env,
319                                    struct mdd_device *d,
320                                    const struct lu_fid *f)
321 {
322         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
323 }
324
325 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
326                         const char *path, struct lu_fid *fid)
327 {
328         struct lu_buf *buf;
329         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
330         struct mdd_object *obj;
331         struct lu_name *lname = &mdd_env_info(env)->mti_name;
332         char *name;
333         int rc = 0;
334         ENTRY;
335
336         /* temp buffer for path element */
337         buf = mdd_buf_alloc(env, PATH_MAX);
338         if (buf->lb_buf == NULL)
339                 RETURN(-ENOMEM);
340
341         lname->ln_name = name = buf->lb_buf;
342         lname->ln_namelen = 0;
343         *f = mdd->mdd_root_fid;
344
345         while(1) {
346                 while (*path == '/')
347                         path++;
348                 if (*path == '\0')
349                         break;
350                 while (*path != '/' && *path != '\0') {
351                         *name = *path;
352                         path++;
353                         name++;
354                         lname->ln_namelen++;
355                 }
356
357                 *name = '\0';
358                 /* find obj corresponding to fid */
359                 obj = mdd_object_find(env, mdd, f);
360                 if (obj == NULL)
361                         GOTO(out, rc = -EREMOTE);
362                 if (IS_ERR(obj))
363                         GOTO(out, rc = PTR_ERR(obj));
364                 /* get child fid from parent and name */
365                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
366                 mdd_object_put(env, obj);
367                 if (rc)
368                         break;
369
370                 name = buf->lb_buf;
371                 lname->ln_namelen = 0;
372         }
373
374         if (!rc)
375                 *fid = *f;
376 out:
377         RETURN(rc);
378 }
379
380 /** The maximum depth that fid2path() will search.
381  * This is limited only because we want to store the fids for
382  * historical path lookup purposes.
383  */
384 #define MAX_PATH_DEPTH 100
385
386 /** mdd_path() lookup structure. */
387 struct path_lookup_info {
388         __u64                pli_recno;        /**< history point */
389         __u64                pli_currec;       /**< current record */
390         struct lu_fid        pli_fid;
391         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
392         struct mdd_object   *pli_mdd_obj;
393         char                *pli_path;         /**< full path */
394         int                  pli_pathlen;
395         int                  pli_linkno;       /**< which hardlink to follow */
396         int                  pli_fidcount;     /**< number of \a pli_fids */
397 };
398
399 static int mdd_path_current(const struct lu_env *env,
400                             struct path_lookup_info *pli)
401 {
402         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
403         struct mdd_object *mdd_obj;
404         struct lu_buf     *buf = NULL;
405         struct link_ea_header *leh;
406         struct link_ea_entry  *lee;
407         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
408         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
409         char *ptr;
410         int reclen;
411         int rc;
412         ENTRY;
413
414         ptr = pli->pli_path + pli->pli_pathlen - 1;
415         *ptr = 0;
416         --ptr;
417         pli->pli_fidcount = 0;
418         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
419
420         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
421                 mdd_obj = mdd_object_find(env, mdd,
422                                           &pli->pli_fids[pli->pli_fidcount]);
423                 if (mdd_obj == NULL)
424                         GOTO(out, rc = -EREMOTE);
425                 if (IS_ERR(mdd_obj))
426                         GOTO(out, rc = PTR_ERR(mdd_obj));
427                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
428                 if (rc <= 0) {
429                         mdd_object_put(env, mdd_obj);
430                         if (rc == -1)
431                                 rc = -EREMOTE;
432                         else if (rc == 0)
433                                 /* Do I need to error out here? */
434                                 rc = -ENOENT;
435                         GOTO(out, rc);
436                 }
437
438                 /* Get parent fid and object name */
439                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
440                 buf = mdd_links_get(env, mdd_obj);
441                 mdd_read_unlock(env, mdd_obj);
442                 mdd_object_put(env, mdd_obj);
443                 if (IS_ERR(buf))
444                         GOTO(out, rc = PTR_ERR(buf));
445
446                 leh = buf->lb_buf;
447                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
448                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
449
450                 /* If set, use link #linkno for path lookup, otherwise use
451                    link #0.  Only do this for the final path element. */
452                 if ((pli->pli_fidcount == 0) &&
453                     (pli->pli_linkno < leh->leh_reccount)) {
454                         int count;
455                         for (count = 0; count < pli->pli_linkno; count++) {
456                                 lee = (struct link_ea_entry *)
457                                      ((char *)lee + reclen);
458                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
459                         }
460                         if (pli->pli_linkno < leh->leh_reccount - 1)
461                                 /* indicate to user there are more links */
462                                 pli->pli_linkno++;
463                 }
464
465                 /* Pack the name in the end of the buffer */
466                 ptr -= tmpname->ln_namelen;
467                 if (ptr - 1 <= pli->pli_path)
468                         GOTO(out, rc = -EOVERFLOW);
469                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
470                 *(--ptr) = '/';
471
472                 /* Store the parent fid for historic lookup */
473                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
474                         GOTO(out, rc = -EOVERFLOW);
475                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
476         }
477
478         /* Verify that our path hasn't changed since we started the lookup.
479            Record the current index, and verify the path resolves to the
480            same fid. If it does, then the path is correct as of this index. */
481         cfs_spin_lock(&mdd->mdd_cl.mc_lock);
482         pli->pli_currec = mdd->mdd_cl.mc_index;
483         cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
484         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
485         if (rc) {
486                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
487                 GOTO (out, rc = -EAGAIN);
488         }
489         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
490                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
491                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
492                        PFID(&pli->pli_fid));
493                 GOTO(out, rc = -EAGAIN);
494         }
495         ptr++; /* skip leading / */
496         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
497
498         EXIT;
499 out:
500         if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
501                 /* if we vmalloced a large buffer drop it */
502                 mdd_buf_put(buf);
503
504         return rc;
505 }
506
507 static int mdd_path_historic(const struct lu_env *env,
508                              struct path_lookup_info *pli)
509 {
510         return 0;
511 }
512
513 /* Returns the full path to this fid, as of changelog record recno. */
514 static int mdd_path(const struct lu_env *env, struct md_object *obj,
515                     char *path, int pathlen, __u64 *recno, int *linkno)
516 {
517         struct path_lookup_info *pli;
518         int tries = 3;
519         int rc = -EAGAIN;
520         ENTRY;
521
522         if (pathlen < 3)
523                 RETURN(-EOVERFLOW);
524
525         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
526                 path[0] = '\0';
527                 RETURN(0);
528         }
529
530         OBD_ALLOC_PTR(pli);
531         if (pli == NULL)
532                 RETURN(-ENOMEM);
533
534         pli->pli_mdd_obj = md2mdd_obj(obj);
535         pli->pli_recno = *recno;
536         pli->pli_path = path;
537         pli->pli_pathlen = pathlen;
538         pli->pli_linkno = *linkno;
539
540         /* Retry multiple times in case file is being moved */
541         while (tries-- && rc == -EAGAIN)
542                 rc = mdd_path_current(env, pli);
543
544         /* For historical path lookup, the current links may not have existed
545          * at "recno" time.  We must switch over to earlier links/parents
546          * by using the changelog records.  If the earlier parent doesn't
547          * exist, we must search back through the changelog to reconstruct
548          * its parents, then check if it exists, etc.
549          * We may ignore this problem for the initial implementation and
550          * state that an "original" hardlink must still exist for us to find
551          * historic path name. */
552         if (pli->pli_recno != -1) {
553                 rc = mdd_path_historic(env, pli);
554         } else {
555                 *recno = pli->pli_currec;
556                 /* Return next link index to caller */
557                 *linkno = pli->pli_linkno;
558         }
559
560         OBD_FREE_PTR(pli);
561
562         RETURN (rc);
563 }
564
565 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
566 {
567         struct lu_attr *la = &mdd_env_info(env)->mti_la;
568         int rc;
569
570         ENTRY;
571         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
572         if (rc == 0) {
573                 mdd_flags_xlate(obj, la->la_flags);
574         }
575         RETURN(rc);
576 }
577
578 /* get only inode attributes */
579 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
580                   struct md_attr *ma)
581 {
582         int rc = 0;
583         ENTRY;
584
585         if (ma->ma_valid & MA_INODE)
586                 RETURN(0);
587
588         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
589                           mdd_object_capa(env, mdd_obj));
590         if (rc == 0)
591                 ma->ma_valid |= MA_INODE;
592         RETURN(rc);
593 }
594
595 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
596 {
597         struct lov_desc *ldesc;
598         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
599         struct lov_user_md *lum = (struct lov_user_md*)lmm;
600         ENTRY;
601
602         if (!lum)
603                 RETURN(0);
604
605         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
606         LASSERT(ldesc != NULL);
607
608         lum->lmm_magic = LOV_MAGIC_V1;
609         lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
610         lum->lmm_pattern = ldesc->ld_pattern;
611         lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
612         lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
613         lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
614
615         RETURN(sizeof(*lum));
616 }
617
618 static int is_rootdir(struct mdd_object *mdd_obj)
619 {
620         const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
621         const struct lu_fid *fid = mdo2fid(mdd_obj);
622
623         return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
624 }
625
626 int mdd_big_lmm_get(const struct lu_env *env, struct mdd_object *obj,
627                     struct md_attr *ma)
628 {
629         struct mdd_thread_info *info = mdd_env_info(env);
630         int size;
631         int rc;
632         ENTRY;
633
634         LASSERT(info != NULL);
635         LASSERT(ma->ma_lmm_size > 0);
636         LASSERT(ma->ma_big_lmm_used == 0);
637
638         rc = mdo_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
639                            mdd_object_capa(env, obj));
640         if (rc < 0)
641                 RETURN(rc);
642
643         /* big_lmm may need to grow */
644         size = rc;
645         mdd_max_lmm_buffer(env, size);
646         if (info->mti_max_lmm == NULL)
647                 RETURN(-ENOMEM);
648
649         LASSERT(info->mti_max_lmm_size >= size);
650         rc = mdd_get_md(env, obj, info->mti_max_lmm, &size,
651                         XATTR_NAME_LOV);
652         if (rc < 0)
653                 RETURN(rc);
654
655         ma->ma_big_lmm_used = 1;
656         ma->ma_valid |= MA_LOV;
657         ma->ma_lmm = info->mti_max_lmm;
658         ma->ma_lmm_size = size;
659         LASSERT(size == rc);
660         RETURN(rc);
661 }
662
663 /* get lov EA only */
664 static int __mdd_lmm_get(const struct lu_env *env,
665                          struct mdd_object *mdd_obj, struct md_attr *ma)
666 {
667         int rc;
668         ENTRY;
669
670         if (ma->ma_valid & MA_LOV)
671                 RETURN(0);
672
673         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
674                         XATTR_NAME_LOV);
675         if (rc == -ERANGE)
676                 rc = mdd_big_lmm_get(env, mdd_obj, ma);
677         else if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
678                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
679
680         if (rc > 0) {
681                 ma->ma_lmm_size = rc;
682                 ma->ma_layout_gen = ma->ma_lmm->lmm_layout_gen;
683                 ma->ma_valid |= MA_LOV | MA_LAY_GEN;
684                 rc = 0;
685         }
686         RETURN(rc);
687 }
688
689 /* get the first parent fid from link EA */
690 static int mdd_pfid_get(const struct lu_env *env,
691                         struct mdd_object *mdd_obj, struct md_attr *ma)
692 {
693         struct lu_buf *buf;
694         struct link_ea_header *leh;
695         struct link_ea_entry *lee;
696         struct lu_fid *pfid = &ma->ma_pfid;
697         ENTRY;
698
699         if (ma->ma_valid & MA_PFID)
700                 RETURN(0);
701
702         buf = mdd_links_get(env, mdd_obj);
703         if (IS_ERR(buf))
704                 RETURN(PTR_ERR(buf));
705
706         leh = buf->lb_buf;
707         lee = (struct link_ea_entry *)(leh + 1);
708         memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
709         fid_be_to_cpu(pfid, pfid);
710         ma->ma_valid |= MA_PFID;
711         if (buf->lb_len > OBD_ALLOC_BIG)
712                 /* if we vmalloced a large buffer drop it */
713                 mdd_buf_put(buf);
714         RETURN(0);
715 }
716
717 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
718                        struct md_attr *ma)
719 {
720         int rc;
721         ENTRY;
722
723         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
724         rc = __mdd_lmm_get(env, mdd_obj, ma);
725         mdd_read_unlock(env, mdd_obj);
726         RETURN(rc);
727 }
728
729 /* get lmv EA only*/
730 static int __mdd_lmv_get(const struct lu_env *env,
731                          struct mdd_object *mdd_obj, struct md_attr *ma)
732 {
733         int rc;
734         ENTRY;
735
736         if (ma->ma_valid & MA_LMV)
737                 RETURN(0);
738
739         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
740                         XATTR_NAME_LMV);
741         if (rc > 0) {
742                 ma->ma_valid |= MA_LMV;
743                 rc = 0;
744         }
745         RETURN(rc);
746 }
747
748 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
749                          struct md_attr *ma)
750 {
751         struct mdd_thread_info *info = mdd_env_info(env);
752         struct lustre_mdt_attrs *lma =
753                                  (struct lustre_mdt_attrs *)info->mti_xattr_buf;
754         int lma_size;
755         int rc;
756         ENTRY;
757
758         /* If all needed data are already valid, nothing to do */
759         if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
760             (ma->ma_need & (MA_HSM | MA_SOM)))
761                 RETURN(0);
762
763         /* Read LMA from disk EA */
764         lma_size = sizeof(info->mti_xattr_buf);
765         rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
766         if (rc <= 0)
767                 RETURN(rc);
768
769         /* Useless to check LMA incompatibility because this is already done in
770          * osd_ea_fid_get(), and this will fail long before this code is
771          * called.
772          * So, if we are here, LMA is compatible.
773          */
774
775         lustre_lma_swab(lma);
776
777         /* Swab and copy LMA */
778         if (ma->ma_need & MA_HSM) {
779                 if (lma->lma_compat & LMAC_HSM)
780                         ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
781                 else
782                         ma->ma_hsm.mh_flags = 0;
783                 ma->ma_valid |= MA_HSM;
784         }
785
786         /* Copy SOM */
787         if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
788                 LASSERT(ma->ma_som != NULL);
789                 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
790                 ma->ma_som->msd_size    = lma->lma_som_size;
791                 ma->ma_som->msd_blocks  = lma->lma_som_blocks;
792                 ma->ma_som->msd_mountid = lma->lma_som_mountid;
793                 ma->ma_valid |= MA_SOM;
794         }
795
796         RETURN(0);
797 }
798
799 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
800                                  struct md_attr *ma)
801 {
802         int rc = 0;
803         ENTRY;
804
805         if (ma->ma_need & MA_INODE)
806                 rc = mdd_iattr_get(env, mdd_obj, ma);
807
808         if (rc == 0 && ma->ma_need & MA_LOV) {
809                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
810                     S_ISDIR(mdd_object_type(mdd_obj)))
811                         rc = __mdd_lmm_get(env, mdd_obj, ma);
812         }
813         if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
814                 if (S_ISREG(mdd_object_type(mdd_obj)))
815                         rc = mdd_pfid_get(env, mdd_obj, ma);
816         }
817         if (rc == 0 && ma->ma_need & MA_LMV) {
818                 if (S_ISDIR(mdd_object_type(mdd_obj)))
819                         rc = __mdd_lmv_get(env, mdd_obj, ma);
820         }
821         if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
822                 if (S_ISREG(mdd_object_type(mdd_obj)))
823                         rc = __mdd_lma_get(env, mdd_obj, ma);
824         }
825 #ifdef CONFIG_FS_POSIX_ACL
826         if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
827                 if (S_ISDIR(mdd_object_type(mdd_obj)))
828                         rc = mdd_def_acl_get(env, mdd_obj, ma);
829         }
830 #endif
831         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
832                rc, ma->ma_valid, ma->ma_lmm);
833         RETURN(rc);
834 }
835
836 int mdd_attr_get_internal_locked(const struct lu_env *env,
837                                  struct mdd_object *mdd_obj, struct md_attr *ma)
838 {
839         int rc;
840         int needlock = ma->ma_need &
841                        (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
842
843         if (needlock)
844                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
845         rc = mdd_attr_get_internal(env, mdd_obj, ma);
846         if (needlock)
847                 mdd_read_unlock(env, mdd_obj);
848         return rc;
849 }
850
851 /*
852  * No permission check is needed.
853  */
854 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
855                         struct md_attr *ma)
856 {
857         struct mdd_object *mdd_obj = md2mdd_obj(obj);
858         int                rc;
859
860         ENTRY;
861         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
862         RETURN(rc);
863 }
864
865 /*
866  * No permission check is needed.
867  */
868 static int mdd_xattr_get(const struct lu_env *env,
869                          struct md_object *obj, struct lu_buf *buf,
870                          const char *name)
871 {
872         struct mdd_object *mdd_obj = md2mdd_obj(obj);
873         int rc;
874
875         ENTRY;
876
877         if (mdd_object_exists(mdd_obj) == 0) {
878                 CERROR("%s: object "DFID" not found: rc = -2\n",
879                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
880                 return -ENOENT;
881         }
882
883         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
884         rc = mdo_xattr_get(env, mdd_obj, buf, name,
885                            mdd_object_capa(env, mdd_obj));
886         mdd_read_unlock(env, mdd_obj);
887
888         RETURN(rc);
889 }
890
891 /*
892  * Permission check is done when open,
893  * no need check again.
894  */
895 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
896                         struct lu_buf *buf)
897 {
898         struct mdd_object *mdd_obj = md2mdd_obj(obj);
899         struct dt_object  *next;
900         loff_t             pos = 0;
901         int                rc;
902         ENTRY;
903
904         if (mdd_object_exists(mdd_obj) == 0) {
905                 CERROR("%s: object "DFID" not found: rc = -2\n",
906                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
907                 return -ENOENT;
908         }
909
910         next = mdd_object_child(mdd_obj);
911         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
912         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
913                                          mdd_object_capa(env, mdd_obj));
914         mdd_read_unlock(env, mdd_obj);
915         RETURN(rc);
916 }
917
918 /*
919  * No permission check is needed.
920  */
921 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
922                           struct lu_buf *buf)
923 {
924         struct mdd_object *mdd_obj = md2mdd_obj(obj);
925         int rc;
926
927         ENTRY;
928
929         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
930         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
931         mdd_read_unlock(env, mdd_obj);
932
933         RETURN(rc);
934 }
935
936 int mdd_declare_object_create_internal(const struct lu_env *env,
937                                        struct mdd_object *p,
938                                        struct mdd_object *c,
939                                        struct md_attr *ma,
940                                        struct thandle *handle,
941                                        const struct md_op_spec *spec)
942 {
943         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
944         const struct dt_index_features *feat = spec->sp_feat;
945         int rc;
946         ENTRY;
947
948         if (feat != &dt_directory_features && feat != NULL)
949                 dof->dof_type = DFT_INDEX;
950         else
951                 dof->dof_type = dt_mode_to_dft(ma->ma_attr.la_mode);
952
953         dof->u.dof_idx.di_feat = feat;
954
955         rc = mdo_declare_create_obj(env, c, &ma->ma_attr, NULL, dof, handle);
956
957         RETURN(rc);
958 }
959
960 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
961                                struct mdd_object *c, struct md_attr *ma,
962                                struct thandle *handle,
963                                const struct md_op_spec *spec)
964 {
965         struct lu_attr *attr = &ma->ma_attr;
966         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
967         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
968         const struct dt_index_features *feat = spec->sp_feat;
969         int rc;
970         ENTRY;
971
972         if (!mdd_object_exists(c)) {
973                 struct dt_object *next = mdd_object_child(c);
974                 LASSERT(next);
975
976                 if (feat != &dt_directory_features && feat != NULL)
977                         dof->dof_type = DFT_INDEX;
978                 else
979                         dof->dof_type = dt_mode_to_dft(attr->la_mode);
980
981                 dof->u.dof_idx.di_feat = feat;
982
983                 /* @hint will be initialized by underlying device. */
984                 next->do_ops->do_ah_init(env, hint,
985                                          p ? mdd_object_child(p) : NULL,
986                                          attr->la_mode & S_IFMT);
987
988                 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
989                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
990         } else
991                 rc = -EEXIST;
992
993         RETURN(rc);
994 }
995
996 /**
997  * Make sure the ctime is increased only.
998  */
999 static inline int mdd_attr_check(const struct lu_env *env,
1000                                  struct mdd_object *obj,
1001                                  struct lu_attr *attr)
1002 {
1003         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1004         int rc;
1005         ENTRY;
1006
1007         if (attr->la_valid & LA_CTIME) {
1008                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1009                 if (rc)
1010                         RETURN(rc);
1011
1012                 if (attr->la_ctime < tmp_la->la_ctime)
1013                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
1014                 else if (attr->la_valid == LA_CTIME &&
1015                          attr->la_ctime == tmp_la->la_ctime)
1016                         attr->la_valid &= ~LA_CTIME;
1017         }
1018         RETURN(0);
1019 }
1020
1021 int mdd_attr_set_internal(const struct lu_env *env,
1022                           struct mdd_object *obj,
1023                           struct lu_attr *attr,
1024                           struct thandle *handle,
1025                           int needacl)
1026 {
1027         int rc;
1028         ENTRY;
1029
1030         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
1031 #ifdef CONFIG_FS_POSIX_ACL
1032         if (!rc && (attr->la_valid & LA_MODE) && needacl)
1033                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
1034 #endif
1035         RETURN(rc);
1036 }
1037
1038 int mdd_attr_check_set_internal(const struct lu_env *env,
1039                                 struct mdd_object *obj,
1040                                 struct lu_attr *attr,
1041                                 struct thandle *handle,
1042                                 int needacl)
1043 {
1044         int rc;
1045         ENTRY;
1046
1047         rc = mdd_attr_check(env, obj, attr);
1048         if (rc)
1049                 RETURN(rc);
1050
1051         if (attr->la_valid)
1052                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1053         RETURN(rc);
1054 }
1055
1056 static int mdd_attr_set_internal_locked(const struct lu_env *env,
1057                                         struct mdd_object *obj,
1058                                         struct lu_attr *attr,
1059                                         struct thandle *handle,
1060                                         int needacl)
1061 {
1062         int rc;
1063         ENTRY;
1064
1065         needacl = needacl && (attr->la_valid & LA_MODE);
1066         if (needacl)
1067                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1068         rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1069         if (needacl)
1070                 mdd_write_unlock(env, obj);
1071         RETURN(rc);
1072 }
1073
1074 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
1075                                        struct mdd_object *obj,
1076                                        struct lu_attr *attr,
1077                                        struct thandle *handle,
1078                                        int needacl)
1079 {
1080         int rc;
1081         ENTRY;
1082
1083         needacl = needacl && (attr->la_valid & LA_MODE);
1084         if (needacl)
1085                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1086         rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
1087         if (needacl)
1088                 mdd_write_unlock(env, obj);
1089         RETURN(rc);
1090 }
1091
1092 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
1093                     const struct lu_buf *buf, const char *name,
1094                     int fl, struct thandle *handle)
1095 {
1096         struct lustre_capa *capa = mdd_object_capa(env, obj);
1097         int rc = -EINVAL;
1098         ENTRY;
1099
1100         if (buf->lb_buf && buf->lb_len > 0)
1101                 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1102         else if (buf->lb_buf == NULL && buf->lb_len == 0)
1103                 rc = mdo_xattr_del(env, obj, name, handle, capa);
1104
1105         RETURN(rc);
1106 }
1107
1108 /*
1109  * This gives the same functionality as the code between
1110  * sys_chmod and inode_setattr
1111  * chown_common and inode_setattr
1112  * utimes and inode_setattr
1113  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1114  */
1115 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1116                         struct lu_attr *la, const struct md_attr *ma)
1117 {
1118         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
1119         struct md_ucred  *uc;
1120         int               rc;
1121         ENTRY;
1122
1123         if (!la->la_valid)
1124                 RETURN(0);
1125
1126         /* Do not permit change file type */
1127         if (la->la_valid & LA_TYPE)
1128                 RETURN(-EPERM);
1129
1130         /* They should not be processed by setattr */
1131         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1132                 RETURN(-EPERM);
1133
1134         /* export destroy does not have ->le_ses, but we may want
1135          * to drop LUSTRE_SOM_FL. */
1136         if (!env->le_ses)
1137                 RETURN(0);
1138
1139         uc = md_ucred(env);
1140
1141         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1142         if (rc)
1143                 RETURN(rc);
1144
1145         if (la->la_valid == LA_CTIME) {
1146                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1147                         /* This is only for set ctime when rename's source is
1148                          * on remote MDS. */
1149                         rc = mdd_may_delete(env, NULL, obj,
1150                                             (struct md_attr *)ma, 1, 0);
1151                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1152                         la->la_valid &= ~LA_CTIME;
1153                 RETURN(rc);
1154         }
1155
1156         if (la->la_valid == LA_ATIME) {
1157                 /* This is atime only set for read atime update on close. */
1158                 if (la->la_atime >= tmp_la->la_atime &&
1159                     la->la_atime < (tmp_la->la_atime +
1160                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1161                         la->la_valid &= ~LA_ATIME;
1162                 RETURN(0);
1163         }
1164
1165         /* Check if flags change. */
1166         if (la->la_valid & LA_FLAGS) {
1167                 unsigned int oldflags = 0;
1168                 unsigned int newflags = la->la_flags &
1169                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1170
1171                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1172                     !mdd_capable(uc, CFS_CAP_FOWNER))
1173                         RETURN(-EPERM);
1174
1175                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1176                  * only be changed by the relevant capability. */
1177                 if (mdd_is_immutable(obj))
1178                         oldflags |= LUSTRE_IMMUTABLE_FL;
1179                 if (mdd_is_append(obj))
1180                         oldflags |= LUSTRE_APPEND_FL;
1181                 if ((oldflags ^ newflags) &&
1182                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1183                         RETURN(-EPERM);
1184
1185                 if (!S_ISDIR(tmp_la->la_mode))
1186                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1187         }
1188
1189         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1190             (la->la_valid & ~LA_FLAGS) &&
1191             !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1192                 RETURN(-EPERM);
1193
1194         /* Check for setting the obj time. */
1195         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1196             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1197                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1198                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
1199                         rc = mdd_permission_internal_locked(env, obj, tmp_la,
1200                                                             MAY_WRITE,
1201                                                             MOR_TGT_CHILD);
1202                         if (rc)
1203                                 RETURN(rc);
1204                 }
1205         }
1206
1207         if (la->la_valid & LA_KILL_SUID) {
1208                 la->la_valid &= ~LA_KILL_SUID;
1209                 if ((tmp_la->la_mode & S_ISUID) &&
1210                     !(la->la_valid & LA_MODE)) {
1211                         la->la_mode = tmp_la->la_mode;
1212                         la->la_valid |= LA_MODE;
1213                 }
1214                 la->la_mode &= ~S_ISUID;
1215         }
1216
1217         if (la->la_valid & LA_KILL_SGID) {
1218                 la->la_valid &= ~LA_KILL_SGID;
1219                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1220                                         (S_ISGID | S_IXGRP)) &&
1221                     !(la->la_valid & LA_MODE)) {
1222                         la->la_mode = tmp_la->la_mode;
1223                         la->la_valid |= LA_MODE;
1224                 }
1225                 la->la_mode &= ~S_ISGID;
1226         }
1227
1228         /* Make sure a caller can chmod. */
1229         if (la->la_valid & LA_MODE) {
1230                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1231                     (uc->mu_fsuid != tmp_la->la_uid) &&
1232                     !mdd_capable(uc, CFS_CAP_FOWNER))
1233                         RETURN(-EPERM);
1234
1235                 if (la->la_mode == (cfs_umode_t) -1)
1236                         la->la_mode = tmp_la->la_mode;
1237                 else
1238                         la->la_mode = (la->la_mode & S_IALLUGO) |
1239                                       (tmp_la->la_mode & ~S_IALLUGO);
1240
1241                 /* Also check the setgid bit! */
1242                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1243                                        la->la_gid : tmp_la->la_gid) &&
1244                     !mdd_capable(uc, CFS_CAP_FSETID))
1245                         la->la_mode &= ~S_ISGID;
1246         } else {
1247                la->la_mode = tmp_la->la_mode;
1248         }
1249
1250         /* Make sure a caller can chown. */
1251         if (la->la_valid & LA_UID) {
1252                 if (la->la_uid == (uid_t) -1)
1253                         la->la_uid = tmp_la->la_uid;
1254                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1255                     (la->la_uid != tmp_la->la_uid)) &&
1256                     !mdd_capable(uc, CFS_CAP_CHOWN))
1257                         RETURN(-EPERM);
1258
1259                 /* If the user or group of a non-directory has been
1260                  * changed by a non-root user, remove the setuid bit.
1261                  * 19981026 David C Niemi <niemi@tux.org>
1262                  *
1263                  * Changed this to apply to all users, including root,
1264                  * to avoid some races. This is the behavior we had in
1265                  * 2.0. The check for non-root was definitely wrong
1266                  * for 2.2 anyway, as it should have been using
1267                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
1268                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1269                     !S_ISDIR(tmp_la->la_mode)) {
1270                         la->la_mode &= ~S_ISUID;
1271                         la->la_valid |= LA_MODE;
1272                 }
1273         }
1274
1275         /* Make sure caller can chgrp. */
1276         if (la->la_valid & LA_GID) {
1277                 if (la->la_gid == (gid_t) -1)
1278                         la->la_gid = tmp_la->la_gid;
1279                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1280                     ((la->la_gid != tmp_la->la_gid) &&
1281                     !lustre_in_group_p(uc, la->la_gid))) &&
1282                     !mdd_capable(uc, CFS_CAP_CHOWN))
1283                         RETURN(-EPERM);
1284
1285                 /* Likewise, if the user or group of a non-directory
1286                  * has been changed by a non-root user, remove the
1287                  * setgid bit UNLESS there is no group execute bit
1288                  * (this would be a file marked for mandatory
1289                  * locking).  19981026 David C Niemi <niemi@tux.org>
1290                  *
1291                  * Removed the fsuid check (see the comment above) --
1292                  * 19990830 SD. */
1293                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1294                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1295                         la->la_mode &= ~S_ISGID;
1296                         la->la_valid |= LA_MODE;
1297                 }
1298         }
1299
1300         /* For both Size-on-MDS case and truncate case,
1301          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1302          * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1303          * For SOM case, it is true, the MAY_WRITE perm has been checked
1304          * when open, no need check again. For truncate case, it is false,
1305          * the MAY_WRITE perm should be checked here. */
1306         if (ma->ma_attr_flags & MDS_SOM) {
1307                 /* For the "Size-on-MDS" setattr update, merge coming
1308                  * attributes with the set in the inode. BUG 10641 */
1309                 if ((la->la_valid & LA_ATIME) &&
1310                     (la->la_atime <= tmp_la->la_atime))
1311                         la->la_valid &= ~LA_ATIME;
1312
1313                 /* OST attributes do not have a priority over MDS attributes,
1314                  * so drop times if ctime is equal. */
1315                 if ((la->la_valid & LA_CTIME) &&
1316                     (la->la_ctime <= tmp_la->la_ctime))
1317                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1318         } else {
1319                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1320                         if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1321                               (uc->mu_fsuid == tmp_la->la_uid)) &&
1322                             !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1323                                 rc = mdd_permission_internal_locked(env, obj,
1324                                                             tmp_la, MAY_WRITE,
1325                                                             MOR_TGT_CHILD);
1326                                 if (rc)
1327                                         RETURN(rc);
1328                         }
1329                 }
1330                 if (la->la_valid & LA_CTIME) {
1331                         /* The pure setattr, it has the priority over what is
1332                          * already set, do not drop it if ctime is equal. */
1333                         if (la->la_ctime < tmp_la->la_ctime)
1334                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1335                                                   LA_CTIME);
1336                 }
1337         }
1338
1339         RETURN(0);
1340 }
1341
1342 /** Store a data change changelog record
1343  * If this fails, we must fail the whole transaction; we don't
1344  * want the change to commit without the log entry.
1345  * \param mdd_obj - mdd_object of change
1346  * \param handle - transacion handle
1347  */
1348 static int mdd_changelog_data_store(const struct lu_env     *env,
1349                                     struct mdd_device       *mdd,
1350                                     enum changelog_rec_type type,
1351                                     int                     flags,
1352                                     struct mdd_object       *mdd_obj,
1353                                     struct thandle          *handle)
1354 {
1355         const struct lu_fid *tfid = mdo2fid(mdd_obj);
1356         struct llog_changelog_rec *rec;
1357         struct thandle *th = NULL;
1358         struct lu_buf *buf;
1359         int reclen;
1360         int rc;
1361
1362         /* Not recording */
1363         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1364                 RETURN(0);
1365         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1366                 RETURN(0);
1367
1368         LASSERT(mdd_obj != NULL);
1369         LASSERT(handle != NULL);
1370
1371         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1372             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1373                 /* Don't need multiple updates in this log */
1374                 /* Don't check under lock - no big deal if we get an extra
1375                    entry */
1376                 RETURN(0);
1377         }
1378
1379         reclen = llog_data_len(sizeof(*rec));
1380         buf = mdd_buf_alloc(env, reclen);
1381         if (buf->lb_buf == NULL)
1382                 RETURN(-ENOMEM);
1383         rec = (struct llog_changelog_rec *)buf->lb_buf;
1384
1385         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1386         rec->cr.cr_type = (__u32)type;
1387         rec->cr.cr_tfid = *tfid;
1388         rec->cr.cr_namelen = 0;
1389         mdd_obj->mod_cltime = cfs_time_current_64();
1390
1391         rc = mdd_changelog_llog_write(mdd, rec, handle ? : th);
1392
1393         if (th)
1394                 mdd_trans_stop(env, mdd, rc, th);
1395
1396         if (rc < 0) {
1397                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1398                        rc, type, PFID(tfid));
1399                 return -EFAULT;
1400         }
1401
1402         return 0;
1403 }
1404
1405 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1406                   int flags, struct md_object *obj)
1407 {
1408         struct thandle *handle;
1409         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1410         struct mdd_device *mdd = mdo2mdd(obj);
1411         int rc;
1412         ENTRY;
1413
1414         handle = mdd_trans_create(env, mdd);
1415         if (IS_ERR(handle))
1416                 return(PTR_ERR(handle));
1417
1418         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1419         if (rc)
1420                 GOTO(stop, rc);
1421
1422         rc = mdd_trans_start(env, mdd, handle);
1423         if (rc)
1424                 GOTO(stop, rc);
1425
1426         rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1427                                       handle);
1428
1429 stop:
1430         mdd_trans_stop(env, mdd, rc, handle);
1431
1432         RETURN(rc);
1433 }
1434
1435 /**
1436  * Should be called with write lock held.
1437  *
1438  * \see mdd_lma_set_locked().
1439  */
1440 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1441                        const struct md_attr *ma, struct thandle *handle)
1442 {
1443         struct mdd_thread_info *info = mdd_env_info(env);
1444         struct lu_buf *buf;
1445         struct lustre_mdt_attrs *lma =
1446                                 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1447         int lmasize = sizeof(struct lustre_mdt_attrs);
1448         int rc = 0;
1449
1450         ENTRY;
1451
1452         /* Either HSM or SOM part is not valid, we need to read it before */
1453         if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1454                 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1455                 if (rc <= 0)
1456                         RETURN(rc);
1457
1458                 lustre_lma_swab(lma);
1459         } else {
1460                 memset(lma, 0, lmasize);
1461         }
1462
1463         /* Copy HSM data */
1464         if (ma->ma_valid & MA_HSM) {
1465                 lma->lma_flags  |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1466                 lma->lma_compat |= LMAC_HSM;
1467         }
1468
1469         /* Copy SOM data */
1470         if (ma->ma_valid & MA_SOM) {
1471                 LASSERT(ma->ma_som != NULL);
1472                 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1473                         lma->lma_compat     &= ~LMAC_SOM;
1474                 } else {
1475                         lma->lma_compat     |= LMAC_SOM;
1476                         lma->lma_ioepoch     = ma->ma_som->msd_ioepoch;
1477                         lma->lma_som_size    = ma->ma_som->msd_size;
1478                         lma->lma_som_blocks  = ma->ma_som->msd_blocks;
1479                         lma->lma_som_mountid = ma->ma_som->msd_mountid;
1480                 }
1481         }
1482
1483         /* Copy FID */
1484         memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1485
1486         lustre_lma_swab(lma);
1487         buf = mdd_buf_get(env, lma, lmasize);
1488         rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1489
1490         RETURN(rc);
1491 }
1492
1493 /**
1494  * Save LMA extended attributes with data from \a ma.
1495  *
1496  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1497  * not, LMA EA will be first read from disk, modified and write back.
1498  *
1499  */
1500 static int mdd_lma_set_locked(const struct lu_env *env,
1501                               struct mdd_object *mdd_obj,
1502                               const struct md_attr *ma, struct thandle *handle)
1503 {
1504         int rc;
1505
1506         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1507         rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1508         mdd_write_unlock(env, mdd_obj);
1509         return rc;
1510 }
1511
1512 /* Precedence for choosing record type when multiple
1513  * attributes change: setattr > mtime > ctime > atime
1514  * (ctime changes when mtime does, plus chmod/chown.
1515  * atime and ctime are independent.) */
1516 static int mdd_attr_set_changelog(const struct lu_env *env,
1517                                   struct md_object *obj, struct thandle *handle,
1518                                   __u64 valid)
1519 {
1520         struct mdd_device *mdd = mdo2mdd(obj);
1521         int bits, type = 0;
1522
1523         bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1524         bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1525         bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1526         bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1527         bits = bits & mdd->mdd_cl.mc_mask;
1528         if (bits == 0)
1529                 return 0;
1530
1531         /* The record type is the lowest non-masked set bit */
1532         while (bits && ((bits & 1) == 0)) {
1533                 bits = bits >> 1;
1534                 type++;
1535         }
1536
1537         /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1538         return mdd_changelog_data_store(env, mdd, type, (int)valid,
1539                                         md2mdd_obj(obj), handle);
1540 }
1541
1542 static int mdd_declare_attr_set(const struct lu_env *env,
1543                                 struct mdd_device *mdd,
1544                                 struct mdd_object *obj,
1545                                 const struct md_attr *ma,
1546                                 struct lov_mds_md *lmm,
1547                                 struct thandle *handle)
1548 {
1549         struct lu_buf  *buf = &mdd_env_info(env)->mti_buf;
1550         int             rc, i;
1551
1552         rc = mdo_declare_attr_set(env, obj, &ma->ma_attr, handle);
1553         if (rc)
1554                 return rc;
1555
1556         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1557         if (rc)
1558                 return rc;
1559
1560         if (ma->ma_valid & MA_LOV) {
1561                 buf->lb_buf = NULL;
1562                 buf->lb_len = ma->ma_lmm_size;
1563                 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
1564                                            0, handle);
1565                 if (rc)
1566                         return rc;
1567         }
1568
1569         if (ma->ma_valid & (MA_HSM | MA_SOM)) {
1570                 buf->lb_buf = NULL;
1571                 buf->lb_len = sizeof(struct lustre_mdt_attrs);
1572                 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA,
1573                                            0, handle);
1574                 if (rc)
1575                         return rc;
1576         }
1577
1578 #ifdef CONFIG_FS_POSIX_ACL
1579         if (ma->ma_attr.la_valid & LA_MODE) {
1580                 mdd_read_lock(env, obj, MOR_TGT_CHILD);
1581                 rc = mdo_xattr_get(env, obj, buf, XATTR_NAME_ACL_ACCESS,
1582                                    BYPASS_CAPA);
1583                 mdd_read_unlock(env, obj);
1584                 if (rc == -EOPNOTSUPP || rc == -ENODATA)
1585                         rc = 0;
1586                 else if (rc < 0)
1587                         return rc;
1588
1589                 if (rc != 0) {
1590                         buf->lb_buf = NULL;
1591                         buf->lb_len = rc;
1592                         rc = mdo_declare_xattr_set(env, obj, buf,
1593                                                    XATTR_NAME_ACL_ACCESS, 0,
1594                                                    handle);
1595                         if (rc)
1596                                 return rc;
1597                 }
1598         }
1599 #endif
1600
1601         /* basically the log is the same as in unlink case */
1602         if (lmm) {
1603                 __u16 stripe;
1604
1605                 if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V1 &&
1606                                 le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V3) {
1607                         CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n",
1608                                mdd->mdd_obd_dev->obd_name,
1609                                le32_to_cpu(lmm->lmm_magic),
1610                                PFID(lu_object_fid(&obj->mod_obj.mo_lu)));
1611                         return -EINVAL;
1612                 }
1613
1614                 stripe = le16_to_cpu(lmm->lmm_stripe_count);
1615                 if (stripe == LOV_ALL_STRIPES) {
1616                         struct lov_desc *ldesc;
1617
1618                         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
1619                         LASSERT(ldesc != NULL);
1620                         stripe = ldesc->ld_tgt_count;
1621                 }
1622
1623                 for (i = 0; i < stripe; i++) {
1624                         rc = mdd_declare_llog_record(env, mdd,
1625                                         sizeof(struct llog_unlink_rec),
1626                                         handle);
1627                         if (rc)
1628                                 return rc;
1629                 }
1630         }
1631
1632         return rc;
1633 }
1634
1635 /* set attr and LOV EA at once, return updated attr */
1636 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1637                         const struct md_attr *ma)
1638 {
1639         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1640         struct mdd_device *mdd = mdo2mdd(obj);
1641         struct thandle *handle;
1642         struct lov_mds_md *lmm = NULL;
1643         struct llog_cookie *logcookies = NULL;
1644         int  rc, lmm_size = 0, cookie_size = 0;
1645         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1646         struct obd_device *obd = mdd->mdd_obd_dev;
1647         struct mds_obd *mds = &obd->u.mds;
1648 #ifdef HAVE_QUOTA_SUPPORT
1649         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1650         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1651         int quota_opc = 0, block_count = 0;
1652         int inode_pending[MAXQUOTAS] = { 0, 0 };
1653         int block_pending[MAXQUOTAS] = { 0, 0 };
1654 #endif
1655         ENTRY;
1656
1657         *la_copy = ma->ma_attr;
1658         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1659         if (rc != 0)
1660                 RETURN(rc);
1661
1662         /* setattr on "close" only change atime, or do nothing */
1663         if (ma->ma_valid == MA_INODE &&
1664             ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1665                 RETURN(0);
1666
1667         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1668             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1669                 lmm_size = mdd_lov_mdsize(env, mdd);
1670                 lmm = mdd_max_lmm_get(env, mdd);
1671                 if (lmm == NULL)
1672                         RETURN(-ENOMEM);
1673
1674                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1675                                 XATTR_NAME_LOV);
1676
1677                 if (rc < 0)
1678                         RETURN(rc);
1679         }
1680
1681         handle = mdd_trans_create(env, mdd);
1682         if (IS_ERR(handle))
1683                 RETURN(PTR_ERR(handle));
1684
1685         rc = mdd_declare_attr_set(env, mdd, mdd_obj, ma,
1686                                   lmm_size > 0 ? lmm : NULL, handle);
1687         if (rc)
1688                 GOTO(stop, rc);
1689
1690         /* permission changes may require sync operation */
1691         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1692                 handle->th_sync = !!mdd->mdd_sync_permission;
1693
1694         rc = mdd_trans_start(env, mdd, handle);
1695         if (rc)
1696                 GOTO(stop, rc);
1697
1698         /* permission changes may require sync operation */
1699         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1700                 handle->th_sync |= mdd->mdd_sync_permission;
1701
1702         if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1703                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1704                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1705
1706 #ifdef HAVE_QUOTA_SUPPORT
1707         if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1708                 struct obd_export *exp = md_quota(env)->mq_exp;
1709                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1710
1711                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1712                 if (!rc) {
1713                         quota_opc = FSFILT_OP_SETATTR;
1714                         mdd_quota_wrapper(la_copy, qnids);
1715                         mdd_quota_wrapper(la_tmp, qoids);
1716                         /* get file quota for new owner */
1717                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1718                                         qnids, inode_pending, 1, NULL, 0,
1719                                         NULL, 0);
1720                         block_count = (la_tmp->la_blocks + 7) >> 3;
1721                         if (block_count) {
1722                                 void *data = NULL;
1723                                 mdd_data_get(env, mdd_obj, &data);
1724                                 /* get block quota for new owner */
1725                                 lquota_chkquota(mds_quota_interface_ref, obd,
1726                                                 exp, qnids, block_pending,
1727                                                 block_count, NULL,
1728                                                 LQUOTA_FLAGS_BLK, data, 1);
1729                         }
1730                 }
1731         }
1732 #endif
1733
1734         if (la_copy->la_valid & LA_FLAGS) {
1735                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1736                                                   handle, 1);
1737                 if (rc == 0)
1738                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1739         } else if (la_copy->la_valid) {            /* setattr */
1740                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1741                                                   handle, 1);
1742                 /* journal chown/chgrp in llog, just like unlink */
1743                 if (rc == 0 && lmm_size){
1744                         cookie_size = mdd_lov_cookiesize(env, mdd);
1745                         logcookies = mdd_max_cookie_get(env, mdd);
1746                         if (logcookies == NULL)
1747                                 GOTO(cleanup, rc = -ENOMEM);
1748
1749                         if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1750                                             logcookies, cookie_size) <= 0)
1751                                 logcookies = NULL;
1752                 }
1753         }
1754
1755         if (rc == 0 && ma->ma_valid & MA_LOV) {
1756                 cfs_umode_t mode;
1757
1758                 mode = mdd_object_type(mdd_obj);
1759                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1760                         rc = mdd_lsm_sanity_check(env, mdd_obj);
1761                         if (rc)
1762                                 GOTO(cleanup, rc);
1763
1764                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1765                                             ma->ma_lmm_size, handle, 1);
1766                 }
1767
1768         }
1769         if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1770                 cfs_umode_t mode;
1771
1772                 mode = mdd_object_type(mdd_obj);
1773                 if (S_ISREG(mode))
1774                         rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1775
1776         }
1777 cleanup:
1778         if (rc == 0)
1779                 rc = mdd_attr_set_changelog(env, obj, handle,
1780                                             ma->ma_attr.la_valid);
1781 stop:
1782         mdd_trans_stop(env, mdd, rc, handle);
1783         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1784                 /*set obd attr, if needed*/
1785                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1786                                            logcookies);
1787         }
1788 #ifdef HAVE_QUOTA_SUPPORT
1789         if (quota_opc) {
1790                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1791                                       inode_pending, 0);
1792                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1793                                       block_pending, 1);
1794                 /* Trigger dqrel/dqacq for original owner and new owner.
1795                  * If failed, the next call for lquota_chkquota will
1796                  * process it. */
1797                 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1798                               quota_opc);
1799         }
1800 #endif
1801         RETURN(rc);
1802 }
1803
1804 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1805                       const struct lu_buf *buf, const char *name, int fl,
1806                       struct thandle *handle)
1807 {
1808         int  rc;
1809         ENTRY;
1810
1811         mdd_write_lock(env, obj, MOR_TGT_CHILD);
1812         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1813         mdd_write_unlock(env, obj);
1814
1815         RETURN(rc);
1816 }
1817
1818 static int mdd_xattr_sanity_check(const struct lu_env *env,
1819                                   struct mdd_object *obj)
1820 {
1821         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1822         struct md_ucred *uc     = md_ucred(env);
1823         int rc;
1824         ENTRY;
1825
1826         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1827                 RETURN(-EPERM);
1828
1829         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1830         if (rc)
1831                 RETURN(rc);
1832
1833         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1834             !mdd_capable(uc, CFS_CAP_FOWNER))
1835                 RETURN(-EPERM);
1836
1837         RETURN(rc);
1838 }
1839
1840 static int mdd_declare_xattr_set(const struct lu_env *env,
1841                                  struct mdd_device *mdd,
1842                                  struct mdd_object *obj,
1843                                  const struct lu_buf *buf,
1844                                  const char *name,
1845                                  struct thandle *handle)
1846
1847 {
1848         int rc;
1849
1850         rc = mdo_declare_xattr_set(env, obj, buf, name, 0, handle);
1851         if (rc)
1852                 return rc;
1853
1854         /* Only record user xattr changes */
1855         if ((strncmp("user.", name, 5) == 0))
1856                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1857
1858         return rc;
1859 }
1860
1861 /**
1862  * The caller should guarantee to update the object ctime
1863  * after xattr_set if needed.
1864  */
1865 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1866                          const struct lu_buf *buf, const char *name,
1867                          int fl)
1868 {
1869         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1870         struct mdd_device *mdd = mdo2mdd(obj);
1871         struct thandle *handle;
1872         int  rc;
1873         ENTRY;
1874
1875         rc = mdd_xattr_sanity_check(env, mdd_obj);
1876         if (rc)
1877                 RETURN(rc);
1878
1879         handle = mdd_trans_create(env, mdd);
1880         if (IS_ERR(handle))
1881                 RETURN(PTR_ERR(handle));
1882
1883         /* security-replated changes may require sync */
1884         if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1885             mdd->mdd_sync_permission == 1)
1886                 handle->th_sync = 1;
1887
1888         rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, handle);
1889         if (rc)
1890                 GOTO(stop, rc);
1891
1892         rc = mdd_trans_start(env, mdd, handle);
1893         if (rc)
1894                 GOTO(stop, rc);
1895
1896         /* security-replated changes may require sync */
1897         if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1898                 handle->th_sync |= mdd->mdd_sync_permission;
1899
1900         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1901
1902         /* Only record system & user xattr changes */
1903         if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1904                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1905                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1906                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1907                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1908                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1909                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1910                                               handle);
1911
1912 stop:
1913         mdd_trans_stop(env, mdd, rc, handle);
1914
1915         RETURN(rc);
1916 }
1917
1918 static int mdd_declare_xattr_del(const struct lu_env *env,
1919                                  struct mdd_device *mdd,
1920                                  struct mdd_object *obj,
1921                                  const char *name,
1922                                  struct thandle *handle)
1923 {
1924         int rc;
1925
1926         rc = mdo_declare_xattr_del(env, obj, name, handle);
1927         if (rc)
1928                 return rc;
1929
1930         /* Only record user xattr changes */
1931         if ((strncmp("user.", name, 5) == 0))
1932                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1933
1934         return rc;
1935 }
1936
1937 /**
1938  * The caller should guarantee to update the object ctime
1939  * after xattr_set if needed.
1940  */
1941 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1942                   const char *name)
1943 {
1944         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1945         struct mdd_device *mdd = mdo2mdd(obj);
1946         struct thandle *handle;
1947         int  rc;
1948         ENTRY;
1949
1950         rc = mdd_xattr_sanity_check(env, mdd_obj);
1951         if (rc)
1952                 RETURN(rc);
1953
1954         handle = mdd_trans_create(env, mdd);
1955         if (IS_ERR(handle))
1956                 RETURN(PTR_ERR(handle));
1957
1958         rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
1959         if (rc)
1960                 GOTO(stop, rc);
1961
1962         rc = mdd_trans_start(env, mdd, handle);
1963         if (rc)
1964                 GOTO(stop, rc);
1965
1966         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1967         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1968                            mdd_object_capa(env, mdd_obj));
1969         mdd_write_unlock(env, mdd_obj);
1970
1971         /* Only record system & user xattr changes */
1972         if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1973                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1974                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1975                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1976                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1977                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1978                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1979                                               handle);
1980
1981 stop:
1982         mdd_trans_stop(env, mdd, rc, handle);
1983
1984         RETURN(rc);
1985 }
1986
1987 /* partial unlink */
1988 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1989                        struct md_attr *ma)
1990 {
1991         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1992         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1993         struct mdd_device *mdd = mdo2mdd(obj);
1994         struct thandle *handle;
1995 #ifdef HAVE_QUOTA_SUPPORT
1996         struct obd_device *obd = mdd->mdd_obd_dev;
1997         struct mds_obd *mds = &obd->u.mds;
1998         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1999         int quota_opc = 0;
2000 #endif
2001         int rc;
2002         ENTRY;
2003
2004         /* XXX: this code won't be used ever:
2005          * DNE uses slightly different approach */
2006         LBUG();
2007
2008         /*
2009          * Check -ENOENT early here because we need to get object type
2010          * to calculate credits before transaction start
2011          */
2012         if (mdd_object_exists(mdd_obj) == 0) {
2013                 CERROR("%s: object "DFID" not found: rc = -2\n",
2014                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2015                 RETURN(-ENOENT);
2016         }
2017
2018         LASSERT(mdd_object_exists(mdd_obj) > 0);
2019
2020         handle = mdd_trans_create(env, mdd);
2021         if (IS_ERR(handle))
2022                 RETURN(-ENOMEM);
2023
2024         rc = mdd_trans_start(env, mdd, handle);
2025
2026         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2027
2028         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
2029         if (rc)
2030                 GOTO(cleanup, rc);
2031
2032         mdo_ref_del(env, mdd_obj, handle);
2033
2034         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
2035                 /* unlink dot */
2036                 mdo_ref_del(env, mdd_obj, handle);
2037         }
2038
2039         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2040         la_copy->la_ctime = ma->ma_attr.la_ctime;
2041
2042         la_copy->la_valid = LA_CTIME;
2043         rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
2044         if (rc)
2045                 GOTO(cleanup, rc);
2046
2047         rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
2048 #ifdef HAVE_QUOTA_SUPPORT
2049         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
2050             ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
2051                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2052                 mdd_quota_wrapper(&ma->ma_attr, qids);
2053         }
2054 #endif
2055
2056
2057         EXIT;
2058 cleanup:
2059         mdd_write_unlock(env, mdd_obj);
2060         mdd_trans_stop(env, mdd, rc, handle);
2061 #ifdef HAVE_QUOTA_SUPPORT
2062         if (quota_opc)
2063                 /* Trigger dqrel on the owner of child. If failed,
2064                  * the next call for lquota_chkquota will process it */
2065                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2066                               quota_opc);
2067 #endif
2068         return rc;
2069 }
2070
2071 /* partial operation */
2072 static int mdd_oc_sanity_check(const struct lu_env *env,
2073                                struct mdd_object *obj,
2074                                struct md_attr *ma)
2075 {
2076         int rc;
2077         ENTRY;
2078
2079         switch (ma->ma_attr.la_mode & S_IFMT) {
2080         case S_IFREG:
2081         case S_IFDIR:
2082         case S_IFLNK:
2083         case S_IFCHR:
2084         case S_IFBLK:
2085         case S_IFIFO:
2086         case S_IFSOCK:
2087                 rc = 0;
2088                 break;
2089         default:
2090                 rc = -EINVAL;
2091                 break;
2092         }
2093         RETURN(rc);
2094 }
2095
2096 static int mdd_object_create(const struct lu_env *env,
2097                              struct md_object *obj,
2098                              const struct md_op_spec *spec,
2099                              struct md_attr *ma)
2100 {
2101
2102         struct mdd_device *mdd = mdo2mdd(obj);
2103         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2104         const struct lu_fid *pfid = spec->u.sp_pfid;
2105         struct thandle *handle;
2106 #ifdef HAVE_QUOTA_SUPPORT
2107         struct obd_device *obd = mdd->mdd_obd_dev;
2108         struct obd_export *exp = md_quota(env)->mq_exp;
2109         struct mds_obd *mds = &obd->u.mds;
2110         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2111         int quota_opc = 0, block_count = 0;
2112         int inode_pending[MAXQUOTAS] = { 0, 0 };
2113         int block_pending[MAXQUOTAS] = { 0, 0 };
2114 #endif
2115         int rc = 0;
2116         ENTRY;
2117
2118         /* XXX: this code won't be used ever:
2119          * DNE uses slightly different approach */
2120         LBUG();
2121
2122 #ifdef HAVE_QUOTA_SUPPORT
2123         if (mds->mds_quota) {
2124                 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
2125                 mdd_quota_wrapper(&ma->ma_attr, qids);
2126                 /* get file quota for child */
2127                 lquota_chkquota(mds_quota_interface_ref, obd, exp,
2128                                 qids, inode_pending, 1, NULL, 0,
2129                                 NULL, 0);
2130                 switch (ma->ma_attr.la_mode & S_IFMT) {
2131                 case S_IFLNK:
2132                 case S_IFDIR:
2133                         block_count = 2;
2134                         break;
2135                 case S_IFREG:
2136                         block_count = 1;
2137                         break;
2138                 }
2139                 /* get block quota for child */
2140                 if (block_count)
2141                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
2142                                         qids, block_pending, block_count,
2143                                         NULL, LQUOTA_FLAGS_BLK, NULL, 0);
2144         }
2145 #endif
2146
2147         handle = mdd_trans_create(env, mdd);
2148         if (IS_ERR(handle))
2149                 GOTO(out_pending, rc = PTR_ERR(handle));
2150
2151         rc = mdd_trans_start(env, mdd, handle);
2152
2153         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2154         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
2155         if (rc)
2156                 GOTO(unlock, rc);
2157
2158         rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
2159         if (rc)
2160                 GOTO(unlock, rc);
2161
2162         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
2163                 /* If creating the slave object, set slave EA here. */
2164                 int lmv_size = spec->u.sp_ea.eadatalen;
2165                 struct lmv_stripe_md *lmv;
2166
2167                 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
2168                 LASSERT(lmv != NULL && lmv_size > 0);
2169
2170                 rc = __mdd_xattr_set(env, mdd_obj,
2171                                      mdd_buf_get_const(env, lmv, lmv_size),
2172                                      XATTR_NAME_LMV, 0, handle);
2173                 if (rc)
2174                         GOTO(unlock, rc);
2175
2176                 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
2177                                            handle, 0);
2178         } else {
2179 #ifdef CONFIG_FS_POSIX_ACL
2180                 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
2181                         struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
2182
2183                         buf->lb_buf = (void *)spec->u.sp_ea.eadata;
2184                         buf->lb_len = spec->u.sp_ea.eadatalen;
2185                         if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
2186                                 rc = __mdd_acl_init(env, mdd_obj, buf,
2187                                                     &ma->ma_attr.la_mode,
2188                                                     handle);
2189                                 if (rc)
2190                                         GOTO(unlock, rc);
2191                                 else
2192                                         ma->ma_attr.la_valid |= LA_MODE;
2193                         }
2194
2195                         pfid = spec->u.sp_ea.fid;
2196                 }
2197 #endif
2198                 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
2199                                            spec);
2200         }
2201         EXIT;
2202 unlock:
2203         if (rc == 0)
2204                 rc = mdd_attr_get_internal(env, mdd_obj, ma);
2205         mdd_write_unlock(env, mdd_obj);
2206
2207         mdd_trans_stop(env, mdd, rc, handle);
2208 out_pending:
2209 #ifdef HAVE_QUOTA_SUPPORT
2210         if (quota_opc) {
2211                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2212                                       inode_pending, 0);
2213                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2214                                       block_pending, 1);
2215                 /* Trigger dqacq on the owner of child. If failed,
2216                  * the next call for lquota_chkquota will process it. */
2217                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2218                               quota_opc);
2219         }
2220 #endif
2221         return rc;
2222 }
2223
2224 /* partial link */
2225 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
2226                        const struct md_attr *ma)
2227 {
2228         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
2229         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2230         struct mdd_device *mdd = mdo2mdd(obj);
2231         struct thandle *handle;
2232         int rc;
2233         ENTRY;
2234
2235         /* XXX: this code won't be used ever:
2236          * DNE uses slightly different approach */
2237         LBUG();
2238
2239         handle = mdd_trans_create(env, mdd);
2240         if (IS_ERR(handle))
2241                 RETURN(-ENOMEM);
2242
2243         rc = mdd_trans_start(env, mdd, handle);
2244
2245         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2246         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
2247         if (rc == 0)
2248                 mdo_ref_add(env, mdd_obj, handle);
2249         mdd_write_unlock(env, mdd_obj);
2250         if (rc == 0) {
2251                 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2252                 la_copy->la_ctime = ma->ma_attr.la_ctime;
2253
2254                 la_copy->la_valid = LA_CTIME;
2255                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
2256                                                         handle, 0);
2257         }
2258         mdd_trans_stop(env, mdd, 0, handle);
2259
2260         RETURN(rc);
2261 }
2262
2263 /*
2264  * do NOT or the MAY_*'s, you'll get the weakest
2265  */
2266 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
2267 {
2268         int res = 0;
2269
2270         /* Sadly, NFSD reopens a file repeatedly during operation, so the
2271          * "acc_mode = 0" allowance for newly-created files isn't honoured.
2272          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
2273          * owner can write to a file even if it is marked readonly to hide
2274          * its brokenness. (bug 5781) */
2275         if (flags & MDS_OPEN_OWNEROVERRIDE) {
2276                 struct md_ucred *uc = md_ucred(env);
2277
2278                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
2279                     (la->la_uid == uc->mu_fsuid))
2280                         return 0;
2281         }
2282
2283         if (flags & FMODE_READ)
2284                 res |= MAY_READ;
2285         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2286                 res |= MAY_WRITE;
2287         if (flags & MDS_FMODE_EXEC)
2288                 res = MAY_EXEC;
2289         return res;
2290 }
2291
2292 static int mdd_open_sanity_check(const struct lu_env *env,
2293                                  struct mdd_object *obj, int flag)
2294 {
2295         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2296         int mode, rc;
2297         ENTRY;
2298
2299         /* EEXIST check */
2300         if (mdd_is_dead_obj(obj))
2301                 RETURN(-ENOENT);
2302
2303         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
2304         if (rc)
2305                RETURN(rc);
2306
2307         if (S_ISLNK(tmp_la->la_mode))
2308                 RETURN(-ELOOP);
2309
2310         mode = accmode(env, tmp_la, flag);
2311
2312         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2313                 RETURN(-EISDIR);
2314
2315         if (!(flag & MDS_OPEN_CREATED)) {
2316                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
2317                 if (rc)
2318                         RETURN(rc);
2319         }
2320
2321         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2322             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2323                 flag &= ~MDS_OPEN_TRUNC;
2324
2325         /* For writing append-only file must open it with append mode. */
2326         if (mdd_is_append(obj)) {
2327                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2328                         RETURN(-EPERM);
2329                 if (flag & MDS_OPEN_TRUNC)
2330                         RETURN(-EPERM);
2331         }
2332
2333 #if 0
2334         /*
2335          * Now, flag -- O_NOATIME does not be packed by client.
2336          */
2337         if (flag & O_NOATIME) {
2338                 struct md_ucred *uc = md_ucred(env);
2339
2340                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2341                     (uc->mu_valid == UCRED_NEW)) &&
2342                     (uc->mu_fsuid != tmp_la->la_uid) &&
2343                     !mdd_capable(uc, CFS_CAP_FOWNER))
2344                         RETURN(-EPERM);
2345         }
2346 #endif
2347
2348         RETURN(0);
2349 }
2350
2351 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2352                     int flags)
2353 {
2354         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2355         int rc = 0;
2356
2357         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2358
2359         rc = mdd_open_sanity_check(env, mdd_obj, flags);
2360         if (rc == 0)
2361                 mdd_obj->mod_count++;
2362
2363         mdd_write_unlock(env, mdd_obj);
2364         return rc;
2365 }
2366
2367 int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
2368                             struct md_attr *ma, struct thandle *handle)
2369 {
2370         int rc;
2371
2372         rc = mdd_declare_unlink_log(env, obj, ma, handle);
2373         if (rc)
2374                 return rc;
2375
2376         return mdo_declare_destroy(env, obj, handle);
2377 }
2378
2379 /* return md_attr back,
2380  * if it is last unlink then return lov ea + llog cookie*/
2381 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2382                     struct md_attr *ma, struct thandle *handle)
2383 {
2384         int rc = 0;
2385         ENTRY;
2386
2387         if (S_ISREG(mdd_object_type(obj))) {
2388                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2389                  * Caller must be ready for that. */
2390                 rc = __mdd_lmm_get(env, obj, ma);
2391                 if ((ma->ma_valid & MA_LOV))
2392                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2393                                             obj, ma);
2394         }
2395
2396         if (rc == 0)
2397                 rc = mdo_destroy(env, obj, handle);
2398
2399         RETURN(rc);
2400 }
2401
2402 static int mdd_declare_close(const struct lu_env *env,
2403                              struct mdd_object *obj,
2404                              struct md_attr *ma,
2405                              struct thandle *handle)
2406 {
2407         int rc;
2408
2409         rc = orph_declare_index_delete(env, obj, handle);
2410         if (rc)
2411                 return rc;
2412
2413         return mdd_declare_object_kill(env, obj, ma, handle);
2414 }
2415
2416 /*
2417  * No permission check is needed.
2418  */
2419 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2420                      struct md_attr *ma, int mode)
2421 {
2422         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2423         struct mdd_device *mdd = mdo2mdd(obj);
2424         struct thandle    *handle = NULL;
2425         int rc;
2426         int is_orphan = 0, reset = 1;
2427
2428 #ifdef HAVE_QUOTA_SUPPORT
2429         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2430         struct mds_obd *mds = &obd->u.mds;
2431         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2432         int quota_opc = 0;
2433 #endif
2434         ENTRY;
2435
2436         if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2437                 mdd_obj->mod_count--;
2438
2439                 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2440                         CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2441                                "list\n", PFID(mdd_object_fid(mdd_obj)));
2442                 RETURN(0);
2443         }
2444
2445         /* check without any lock */
2446         if (mdd_obj->mod_count == 1 &&
2447             (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2448  again:
2449                 handle = mdd_trans_create(env, mdo2mdd(obj));
2450                 if (IS_ERR(handle))
2451                         RETURN(PTR_ERR(handle));
2452
2453                 rc = mdd_declare_close(env, mdd_obj, ma, handle);
2454                 if (rc)
2455                         GOTO(stop, rc);
2456
2457                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
2458                 if (rc)
2459                         GOTO(stop, rc);
2460
2461                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2462                 if (rc)
2463                         GOTO(stop, rc);
2464         }
2465
2466         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2467         if (handle == NULL && mdd_obj->mod_count == 1 &&
2468             (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2469                 mdd_write_unlock(env, mdd_obj);
2470                 goto again;
2471         }
2472
2473         /* release open count */
2474         mdd_obj->mod_count --;
2475
2476         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2477                 /* remove link to object from orphan index */
2478                 LASSERT(handle != NULL);
2479                 rc = __mdd_orphan_del(env, mdd_obj, handle);
2480                 if (rc == 0) {
2481                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2482                                "list, OSS objects to be destroyed.\n",
2483                                PFID(mdd_object_fid(mdd_obj)));
2484                         is_orphan = 1;
2485                 } else {
2486                         CERROR("Object "DFID" can not be deleted from orphan "
2487                                 "list, maybe cause OST objects can not be "
2488                                 "destroyed (err: %d).\n",
2489                                 PFID(mdd_object_fid(mdd_obj)), rc);
2490                         /* If object was not deleted from orphan list, do not
2491                          * destroy OSS objects, which will be done when next
2492                          * recovery. */
2493                         GOTO(out, rc);
2494                 }
2495         }
2496
2497         rc = mdd_iattr_get(env, mdd_obj, ma);
2498         /* Object maybe not in orphan list originally, it is rare case for
2499          * mdd_finish_unlink() failure. */
2500         if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_orphan)) {
2501 #ifdef HAVE_QUOTA_SUPPORT
2502                 if (mds->mds_quota) {
2503                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2504                         mdd_quota_wrapper(&ma->ma_attr, qids);
2505                 }
2506 #endif
2507                 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2508                 if (ma->ma_valid & MA_FLAGS &&
2509                     ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2510                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2511                 } else {
2512                         if (handle == NULL) {
2513                                 handle = mdd_trans_create(env, mdo2mdd(obj));
2514                                 if (IS_ERR(handle))
2515                                         GOTO(out, rc = PTR_ERR(handle));
2516
2517                                 rc = mdd_declare_object_kill(env, mdd_obj, ma,
2518                                                              handle);
2519                                 if (rc)
2520                                         GOTO(out, rc);
2521
2522                                 rc = mdd_declare_changelog_store(env, mdd,
2523                                                                  NULL, handle);
2524                                 if (rc)
2525                                         GOTO(stop, rc);
2526
2527                                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2528                                 if (rc)
2529                                         GOTO(out, rc);
2530                         }
2531
2532                         rc = mdd_object_kill(env, mdd_obj, ma, handle);
2533                         if (rc == 0)
2534                                 reset = 0;
2535                 }
2536
2537                 if (rc != 0)
2538                         CERROR("Error when prepare to delete Object "DFID" , "
2539                                "which will cause OST objects can not be "
2540                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
2541         }
2542         EXIT;
2543
2544 out:
2545         if (reset)
2546                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2547
2548         mdd_write_unlock(env, mdd_obj);
2549
2550         if (rc == 0 &&
2551             (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
2552             !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
2553                 if (handle == NULL) {
2554                         handle = mdd_trans_create(env, mdo2mdd(obj));
2555                         if (IS_ERR(handle))
2556                                 GOTO(stop, rc = IS_ERR(handle));
2557
2558                         rc = mdd_declare_changelog_store(env, mdd, NULL,
2559                                                          handle);
2560                         if (rc)
2561                                 GOTO(stop, rc);
2562
2563                         rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2564                         if (rc)
2565                                 GOTO(stop, rc);
2566                 }
2567
2568                 mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
2569                                          mdd_obj, handle);
2570         }
2571
2572 stop:
2573         if (handle != NULL)
2574                 mdd_trans_stop(env, mdd, rc, handle);
2575 #ifdef HAVE_QUOTA_SUPPORT
2576         if (quota_opc)
2577                 /* Trigger dqrel on the owner of child. If failed,
2578                  * the next call for lquota_chkquota will process it */
2579                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2580                               quota_opc);
2581 #endif
2582         return rc;
2583 }
2584
2585 /*
2586  * Permission check is done when open,
2587  * no need check again.
2588  */
2589 static int mdd_readpage_sanity_check(const struct lu_env *env,
2590                                      struct mdd_object *obj)
2591 {
2592         struct dt_object *next = mdd_object_child(obj);
2593         int rc;
2594         ENTRY;
2595
2596         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2597                 rc = 0;
2598         else
2599                 rc = -ENOTDIR;
2600
2601         RETURN(rc);
2602 }
2603
2604 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2605                               struct lu_dirpage *dp, int nob,
2606                               const struct dt_it_ops *iops, struct dt_it *it,
2607                               __u32 attr)
2608 {
2609         void                   *area = dp;
2610         int                     result;
2611         __u64                   hash = 0;
2612         struct lu_dirent       *ent;
2613         struct lu_dirent       *last = NULL;
2614         int                     first = 1;
2615
2616         memset(area, 0, sizeof (*dp));
2617         area += sizeof (*dp);
2618         nob  -= sizeof (*dp);
2619
2620         ent  = area;
2621         do {
2622                 int    len;
2623                 int    recsize;
2624
2625                 len = iops->key_size(env, it);
2626
2627                 /* IAM iterator can return record with zero len. */
2628                 if (len == 0)
2629                         goto next;
2630
2631                 hash = iops->store(env, it);
2632                 if (unlikely(first)) {
2633                         first = 0;
2634                         dp->ldp_hash_start = cpu_to_le64(hash);
2635                 }
2636
2637                 /* calculate max space required for lu_dirent */
2638                 recsize = lu_dirent_calc_size(len, attr);
2639
2640                 if (nob >= recsize) {
2641                         result = iops->rec(env, it, (struct dt_rec *)ent, attr);
2642                         if (result == -ESTALE)
2643                                 goto next;
2644                         if (result != 0)
2645                                 goto out;
2646
2647                         /* osd might not able to pack all attributes,
2648                          * so recheck rec length */
2649                         recsize = le16_to_cpu(ent->lde_reclen);
2650                 } else {
2651                         result = (last != NULL) ? 0 :-EINVAL;
2652                         goto out;
2653                 }
2654                 last = ent;
2655                 ent = (void *)ent + recsize;
2656                 nob -= recsize;
2657
2658 next:
2659                 result = iops->next(env, it);
2660                 if (result == -ESTALE)
2661                         goto next;
2662         } while (result == 0);
2663
2664 out:
2665         dp->ldp_hash_end = cpu_to_le64(hash);
2666         if (last != NULL) {
2667                 if (last->lde_hash == dp->ldp_hash_end)
2668                         dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2669                 last->lde_reclen = 0; /* end mark */
2670         }
2671         return result;
2672 }
2673
2674 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2675                           const struct lu_rdpg *rdpg)
2676 {
2677         struct dt_it      *it;
2678         struct dt_object  *next = mdd_object_child(obj);
2679         const struct dt_it_ops  *iops;
2680         struct page       *pg;
2681         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2682         int i;
2683         int nlupgs = 0;
2684         int rc;
2685         int nob;
2686
2687         LASSERT(rdpg->rp_pages != NULL);
2688         LASSERT(next->do_index_ops != NULL);
2689
2690         if (rdpg->rp_count <= 0)
2691                 return -EFAULT;
2692
2693         /*
2694          * iterate through directory and fill pages from @rdpg
2695          */
2696         iops = &next->do_index_ops->dio_it;
2697         it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
2698         if (IS_ERR(it))
2699                 return PTR_ERR(it);
2700
2701         rc = iops->load(env, it, rdpg->rp_hash);
2702
2703         if (rc == 0) {
2704                 /*
2705                  * Iterator didn't find record with exactly the key requested.
2706                  *
2707                  * It is currently either
2708                  *
2709                  *     - positioned above record with key less than
2710                  *     requested---skip it.
2711                  *
2712                  *     - or not positioned at all (is in IAM_IT_SKEWED
2713                  *     state)---position it on the next item.
2714                  */
2715                 rc = iops->next(env, it);
2716         } else if (rc > 0)
2717                 rc = 0;
2718
2719         /*
2720          * At this point and across for-loop:
2721          *
2722          *  rc == 0 -> ok, proceed.
2723          *  rc >  0 -> end of directory.
2724          *  rc <  0 -> error.
2725          */
2726         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2727              i++, nob -= CFS_PAGE_SIZE) {
2728                 struct lu_dirpage *dp;
2729
2730                 LASSERT(i < rdpg->rp_npages);
2731                 pg = rdpg->rp_pages[i];
2732                 dp = cfs_kmap(pg);
2733 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2734 repeat:
2735 #endif
2736                 rc = mdd_dir_page_build(env, mdd, dp,
2737                                         min_t(int, nob, LU_PAGE_SIZE),
2738                                         iops, it, rdpg->rp_attrs);
2739                 if (rc > 0) {
2740                         /*
2741                          * end of directory.
2742                          */
2743                         dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2744                         nlupgs++;
2745                 } else if (rc < 0) {
2746                         CWARN("build page failed: %d!\n", rc);
2747                 } else {
2748                         nlupgs++;
2749 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2750                         dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2751                         if ((unsigned long)dp & ~CFS_PAGE_MASK)
2752                                 goto repeat;
2753 #endif
2754                 }
2755                 cfs_kunmap(pg);
2756         }
2757         if (rc >= 0) {
2758                 struct lu_dirpage *dp;
2759
2760                 dp = cfs_kmap(rdpg->rp_pages[0]);
2761                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2762                 if (nlupgs == 0) {
2763                         /*
2764                          * No pages were processed, mark this for first page
2765                          * and send back.
2766                          */
2767                         dp->ldp_flags  = cpu_to_le32(LDF_EMPTY);
2768                         nlupgs = 1;
2769                 }
2770                 cfs_kunmap(rdpg->rp_pages[0]);
2771
2772                 rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
2773         }
2774         iops->put(env, it);
2775         iops->fini(env, it);
2776
2777         return rc;
2778 }
2779
2780 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2781                  const struct lu_rdpg *rdpg)
2782 {
2783         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2784         int rc;
2785         ENTRY;
2786
2787         if (mdd_object_exists(mdd_obj) == 0) {
2788                 CERROR("%s: object "DFID" not found: rc = -2\n",
2789                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2790                 return -ENOENT;
2791         }
2792
2793         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2794         rc = mdd_readpage_sanity_check(env, mdd_obj);
2795         if (rc)
2796                 GOTO(out_unlock, rc);
2797
2798         if (mdd_is_dead_obj(mdd_obj)) {
2799                 struct page *pg;
2800                 struct lu_dirpage *dp;
2801
2802                 /*
2803                  * According to POSIX, please do not return any entry to client:
2804                  * even dot and dotdot should not be returned.
2805                  */
2806                 CWARN("readdir from dead object: "DFID"\n",
2807                         PFID(mdd_object_fid(mdd_obj)));
2808
2809                 if (rdpg->rp_count <= 0)
2810                         GOTO(out_unlock, rc = -EFAULT);
2811                 LASSERT(rdpg->rp_pages != NULL);
2812
2813                 pg = rdpg->rp_pages[0];
2814                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2815                 memset(dp, 0 , sizeof(struct lu_dirpage));
2816                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2817                 dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
2818                 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2819                 cfs_kunmap(pg);
2820                 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2821         }
2822
2823         rc = __mdd_readpage(env, mdd_obj, rdpg);
2824
2825         EXIT;
2826 out_unlock:
2827         mdd_read_unlock(env, mdd_obj);
2828         return rc;
2829 }
2830
2831 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2832 {
2833         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2834         struct dt_object *next;
2835
2836         if (mdd_object_exists(mdd_obj) == 0) {
2837                 CERROR("%s: object "DFID" not found: rc = -2\n",
2838                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2839                 return -ENOENT;
2840         }
2841         next = mdd_object_child(mdd_obj);
2842         return next->do_ops->do_object_sync(env, next);
2843 }
2844
2845 const struct md_object_operations mdd_obj_ops = {
2846         .moo_permission    = mdd_permission,
2847         .moo_attr_get      = mdd_attr_get,
2848         .moo_attr_set      = mdd_attr_set,
2849         .moo_xattr_get     = mdd_xattr_get,
2850         .moo_xattr_set     = mdd_xattr_set,
2851         .moo_xattr_list    = mdd_xattr_list,
2852         .moo_xattr_del     = mdd_xattr_del,
2853         .moo_object_create = mdd_object_create,
2854         .moo_ref_add       = mdd_ref_add,
2855         .moo_ref_del       = mdd_ref_del,
2856         .moo_open          = mdd_open,
2857         .moo_close         = mdd_close,
2858         .moo_readpage      = mdd_readpage,
2859         .moo_readlink      = mdd_readlink,
2860         .moo_changelog     = mdd_changelog,
2861         .moo_capa_get      = mdd_capa_get,
2862         .moo_object_sync   = mdd_object_sync,
2863         .moo_path          = mdd_path,
2864         .moo_file_lock     = mdd_file_lock,
2865         .moo_file_unlock   = mdd_file_unlock,
2866 };