Whamcloud - gitweb
dee2a93bddf382f61a5b7f39573517d2b07b74be
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_object.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49 #include <obd.h>
50 #include <obd_class.h>
51 #include <obd_support.h>
52 #include <lprocfs_status.h>
53 /* fid_be_cpu(), fid_cpu_to_be(). */
54 #include <lustre_fid.h>
55 #include <obd_lov.h>
56
57 #include <lustre_param.h>
58 #include <lustre_mds.h>
59 #include <lustre/lustre_idl.h>
60
61 #include "mdd_internal.h"
62
63 static const struct lu_object_operations mdd_lu_obj_ops;
64
65 static int mdd_xattr_get(const struct lu_env *env,
66                          struct md_object *obj, struct lu_buf *buf,
67                          const char *name);
68
69 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
70                  void **data)
71 {
72         if (mdd_object_exists(obj) == 0) {
73                 CERROR("%s: object "DFID" not found: rc = -2\n",
74                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
75                 return -ENOENT;
76         }
77         mdo_data_get(env, obj, data);
78         return 0;
79 }
80
81 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
82                struct lu_attr *la, struct lustre_capa *capa)
83 {
84         if (mdd_object_exists(obj) == 0) {
85                 CERROR("%s: object "DFID" not found: rc = -2\n",
86                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
87                 return -ENOENT;
88         }
89         return mdo_attr_get(env, obj, la, capa);
90 }
91
92 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
93 {
94         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
95
96         if (flags & LUSTRE_APPEND_FL)
97                 obj->mod_flags |= APPEND_OBJ;
98
99         if (flags & LUSTRE_IMMUTABLE_FL)
100                 obj->mod_flags |= IMMUTE_OBJ;
101 }
102
103 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
104 {
105         struct mdd_thread_info *info;
106
107         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
108         LASSERT(info != NULL);
109         return info;
110 }
111
112 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
113 {
114         struct lu_buf *buf;
115
116         buf = &mdd_env_info(env)->mti_buf;
117         buf->lb_buf = area;
118         buf->lb_len = len;
119         return buf;
120 }
121
122 void mdd_buf_put(struct lu_buf *buf)
123 {
124         if (buf == NULL || buf->lb_buf == NULL)
125                 return;
126         OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
127         buf->lb_buf = NULL;
128         buf->lb_len = 0;
129 }
130
131 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
132                                        const void *area, ssize_t len)
133 {
134         struct lu_buf *buf;
135
136         buf = &mdd_env_info(env)->mti_buf;
137         buf->lb_buf = (void *)area;
138         buf->lb_len = len;
139         return buf;
140 }
141
142 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
143 {
144         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
145
146         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
147                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
148                 buf->lb_buf = NULL;
149         }
150         if (buf->lb_buf == NULL) {
151                 buf->lb_len = len;
152                 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
153                 if (buf->lb_buf == NULL)
154                         buf->lb_len = 0;
155         }
156         return buf;
157 }
158
159 /** Increase the size of the \a mti_big_buf.
160  * preserves old data in buffer
161  * old buffer remains unchanged on error
162  * \retval 0 or -ENOMEM
163  */
164 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
165 {
166         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
167         struct lu_buf buf;
168
169         LASSERT(len >= oldbuf->lb_len);
170         OBD_ALLOC_LARGE(buf.lb_buf, len);
171
172         if (buf.lb_buf == NULL)
173                 return -ENOMEM;
174
175         buf.lb_len = len;
176         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
177
178         OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
179
180         memcpy(oldbuf, &buf, sizeof(buf));
181
182         return 0;
183 }
184
185 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
186                                        struct mdd_device *mdd)
187 {
188         struct mdd_thread_info *mti = mdd_env_info(env);
189         int                     max_cookie_size;
190
191         max_cookie_size = mdd_lov_cookiesize(env, mdd);
192         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
193                 if (mti->mti_max_cookie)
194                         OBD_FREE_LARGE(mti->mti_max_cookie,
195                                        mti->mti_max_cookie_size);
196                 mti->mti_max_cookie = NULL;
197                 mti->mti_max_cookie_size = 0;
198         }
199         if (unlikely(mti->mti_max_cookie == NULL)) {
200                 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
201                 if (likely(mti->mti_max_cookie != NULL))
202                         mti->mti_max_cookie_size = max_cookie_size;
203         }
204         if (likely(mti->mti_max_cookie != NULL))
205                 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
206         return mti->mti_max_cookie;
207 }
208
209 struct lov_mds_md *mdd_max_lmm_buffer(const struct lu_env *env, int size)
210 {
211         struct mdd_thread_info *mti = mdd_env_info(env);
212
213         if (unlikely(mti->mti_max_lmm_size < size)) {
214                 int rsize = size_roundup_power2(size);
215
216                 if (mti->mti_max_lmm_size > 0) {
217                         LASSERT(mti->mti_max_lmm);
218                         OBD_FREE_LARGE(mti->mti_max_lmm,
219                                        mti->mti_max_lmm_size);
220                         mti->mti_max_lmm = NULL;
221                         mti->mti_max_lmm_size = 0;
222                 }
223
224                 OBD_ALLOC_LARGE(mti->mti_max_lmm, rsize);
225                 if (likely(mti->mti_max_lmm != NULL))
226                         mti->mti_max_lmm_size = rsize;
227         }
228         return mti->mti_max_lmm;
229 }
230
231 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
232                                    struct mdd_device *mdd)
233 {
234         int max_lmm_size;
235
236         max_lmm_size = mdd_lov_mdsize(env, mdd);
237         return mdd_max_lmm_buffer(env, max_lmm_size);
238 }
239
240 struct lu_object *mdd_object_alloc(const struct lu_env *env,
241                                    const struct lu_object_header *hdr,
242                                    struct lu_device *d)
243 {
244         struct mdd_object *mdd_obj;
245
246         OBD_ALLOC_PTR(mdd_obj);
247         if (mdd_obj != NULL) {
248                 struct lu_object *o;
249
250                 o = mdd2lu_obj(mdd_obj);
251                 lu_object_init(o, NULL, d);
252                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
253                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
254                 mdd_obj->mod_count = 0;
255                 o->lo_ops = &mdd_lu_obj_ops;
256                 return o;
257         } else {
258                 return NULL;
259         }
260 }
261
262 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
263                            const struct lu_object_conf *unused)
264 {
265         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
266         struct mdd_object *mdd_obj = lu2mdd_obj(o);
267         struct lu_object  *below;
268         struct lu_device  *under;
269         ENTRY;
270
271         mdd_obj->mod_cltime = 0;
272         under = &d->mdd_child->dd_lu_dev;
273         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
274         mdd_pdlock_init(mdd_obj);
275         if (below == NULL)
276                 RETURN(-ENOMEM);
277
278         lu_object_add(o, below);
279
280         RETURN(0);
281 }
282
283 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
284 {
285         if (lu_object_exists(o))
286                 return mdd_get_flags(env, lu2mdd_obj(o));
287         else
288                 return 0;
289 }
290
291 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
292 {
293         struct mdd_object *mdd = lu2mdd_obj(o);
294
295         lu_object_fini(o);
296         OBD_FREE_PTR(mdd);
297 }
298
299 static int mdd_object_print(const struct lu_env *env, void *cookie,
300                             lu_printer_t p, const struct lu_object *o)
301 {
302         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
303         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
304                     "valid=%x, cltime="LPU64", flags=%lx)",
305                     mdd, mdd->mod_count, mdd->mod_valid,
306                     mdd->mod_cltime, mdd->mod_flags);
307 }
308
309 static const struct lu_object_operations mdd_lu_obj_ops = {
310         .loo_object_init    = mdd_object_init,
311         .loo_object_start   = mdd_object_start,
312         .loo_object_free    = mdd_object_free,
313         .loo_object_print   = mdd_object_print,
314 };
315
316 struct mdd_object *mdd_object_find(const struct lu_env *env,
317                                    struct mdd_device *d,
318                                    const struct lu_fid *f)
319 {
320         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
321 }
322
323 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
324                         const char *path, struct lu_fid *fid)
325 {
326         struct lu_buf *buf;
327         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
328         struct mdd_object *obj;
329         struct lu_name *lname = &mdd_env_info(env)->mti_name;
330         char *name;
331         int rc = 0;
332         ENTRY;
333
334         /* temp buffer for path element */
335         buf = mdd_buf_alloc(env, PATH_MAX);
336         if (buf->lb_buf == NULL)
337                 RETURN(-ENOMEM);
338
339         lname->ln_name = name = buf->lb_buf;
340         lname->ln_namelen = 0;
341         *f = mdd->mdd_root_fid;
342
343         while(1) {
344                 while (*path == '/')
345                         path++;
346                 if (*path == '\0')
347                         break;
348                 while (*path != '/' && *path != '\0') {
349                         *name = *path;
350                         path++;
351                         name++;
352                         lname->ln_namelen++;
353                 }
354
355                 *name = '\0';
356                 /* find obj corresponding to fid */
357                 obj = mdd_object_find(env, mdd, f);
358                 if (obj == NULL)
359                         GOTO(out, rc = -EREMOTE);
360                 if (IS_ERR(obj))
361                         GOTO(out, rc = PTR_ERR(obj));
362                 /* get child fid from parent and name */
363                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
364                 mdd_object_put(env, obj);
365                 if (rc)
366                         break;
367
368                 name = buf->lb_buf;
369                 lname->ln_namelen = 0;
370         }
371
372         if (!rc)
373                 *fid = *f;
374 out:
375         RETURN(rc);
376 }
377
378 /** The maximum depth that fid2path() will search.
379  * This is limited only because we want to store the fids for
380  * historical path lookup purposes.
381  */
382 #define MAX_PATH_DEPTH 100
383
384 /** mdd_path() lookup structure. */
385 struct path_lookup_info {
386         __u64                pli_recno;        /**< history point */
387         __u64                pli_currec;       /**< current record */
388         struct lu_fid        pli_fid;
389         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
390         struct mdd_object   *pli_mdd_obj;
391         char                *pli_path;         /**< full path */
392         int                  pli_pathlen;
393         int                  pli_linkno;       /**< which hardlink to follow */
394         int                  pli_fidcount;     /**< number of \a pli_fids */
395 };
396
397 static int mdd_path_current(const struct lu_env *env,
398                             struct path_lookup_info *pli)
399 {
400         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
401         struct mdd_object *mdd_obj;
402         struct lu_buf     *buf = NULL;
403         struct link_ea_header *leh;
404         struct link_ea_entry  *lee;
405         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
406         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
407         char *ptr;
408         int reclen;
409         int rc;
410         ENTRY;
411
412         ptr = pli->pli_path + pli->pli_pathlen - 1;
413         *ptr = 0;
414         --ptr;
415         pli->pli_fidcount = 0;
416         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
417
418         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
419                 mdd_obj = mdd_object_find(env, mdd,
420                                           &pli->pli_fids[pli->pli_fidcount]);
421                 if (mdd_obj == NULL)
422                         GOTO(out, rc = -EREMOTE);
423                 if (IS_ERR(mdd_obj))
424                         GOTO(out, rc = PTR_ERR(mdd_obj));
425                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
426                 if (rc <= 0) {
427                         mdd_object_put(env, mdd_obj);
428                         if (rc == -1)
429                                 rc = -EREMOTE;
430                         else if (rc == 0)
431                                 /* Do I need to error out here? */
432                                 rc = -ENOENT;
433                         GOTO(out, rc);
434                 }
435
436                 /* Get parent fid and object name */
437                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
438                 buf = mdd_links_get(env, mdd_obj);
439                 mdd_read_unlock(env, mdd_obj);
440                 mdd_object_put(env, mdd_obj);
441                 if (IS_ERR(buf))
442                         GOTO(out, rc = PTR_ERR(buf));
443
444                 leh = buf->lb_buf;
445                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
446                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
447
448                 /* If set, use link #linkno for path lookup, otherwise use
449                    link #0.  Only do this for the final path element. */
450                 if ((pli->pli_fidcount == 0) &&
451                     (pli->pli_linkno < leh->leh_reccount)) {
452                         int count;
453                         for (count = 0; count < pli->pli_linkno; count++) {
454                                 lee = (struct link_ea_entry *)
455                                      ((char *)lee + reclen);
456                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
457                         }
458                         if (pli->pli_linkno < leh->leh_reccount - 1)
459                                 /* indicate to user there are more links */
460                                 pli->pli_linkno++;
461                 }
462
463                 /* Pack the name in the end of the buffer */
464                 ptr -= tmpname->ln_namelen;
465                 if (ptr - 1 <= pli->pli_path)
466                         GOTO(out, rc = -EOVERFLOW);
467                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
468                 *(--ptr) = '/';
469
470                 /* Store the parent fid for historic lookup */
471                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
472                         GOTO(out, rc = -EOVERFLOW);
473                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
474         }
475
476         /* Verify that our path hasn't changed since we started the lookup.
477            Record the current index, and verify the path resolves to the
478            same fid. If it does, then the path is correct as of this index. */
479         cfs_spin_lock(&mdd->mdd_cl.mc_lock);
480         pli->pli_currec = mdd->mdd_cl.mc_index;
481         cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
482         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
483         if (rc) {
484                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
485                 GOTO (out, rc = -EAGAIN);
486         }
487         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
488                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
489                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
490                        PFID(&pli->pli_fid));
491                 GOTO(out, rc = -EAGAIN);
492         }
493         ptr++; /* skip leading / */
494         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
495
496         EXIT;
497 out:
498         if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
499                 /* if we vmalloced a large buffer drop it */
500                 mdd_buf_put(buf);
501
502         return rc;
503 }
504
505 static int mdd_path_historic(const struct lu_env *env,
506                              struct path_lookup_info *pli)
507 {
508         return 0;
509 }
510
511 /* Returns the full path to this fid, as of changelog record recno. */
512 static int mdd_path(const struct lu_env *env, struct md_object *obj,
513                     char *path, int pathlen, __u64 *recno, int *linkno)
514 {
515         struct path_lookup_info *pli;
516         int tries = 3;
517         int rc = -EAGAIN;
518         ENTRY;
519
520         if (pathlen < 3)
521                 RETURN(-EOVERFLOW);
522
523         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
524                 path[0] = '\0';
525                 RETURN(0);
526         }
527
528         OBD_ALLOC_PTR(pli);
529         if (pli == NULL)
530                 RETURN(-ENOMEM);
531
532         pli->pli_mdd_obj = md2mdd_obj(obj);
533         pli->pli_recno = *recno;
534         pli->pli_path = path;
535         pli->pli_pathlen = pathlen;
536         pli->pli_linkno = *linkno;
537
538         /* Retry multiple times in case file is being moved */
539         while (tries-- && rc == -EAGAIN)
540                 rc = mdd_path_current(env, pli);
541
542         /* For historical path lookup, the current links may not have existed
543          * at "recno" time.  We must switch over to earlier links/parents
544          * by using the changelog records.  If the earlier parent doesn't
545          * exist, we must search back through the changelog to reconstruct
546          * its parents, then check if it exists, etc.
547          * We may ignore this problem for the initial implementation and
548          * state that an "original" hardlink must still exist for us to find
549          * historic path name. */
550         if (pli->pli_recno != -1) {
551                 rc = mdd_path_historic(env, pli);
552         } else {
553                 *recno = pli->pli_currec;
554                 /* Return next link index to caller */
555                 *linkno = pli->pli_linkno;
556         }
557
558         OBD_FREE_PTR(pli);
559
560         RETURN (rc);
561 }
562
563 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
564 {
565         struct lu_attr *la = &mdd_env_info(env)->mti_la;
566         int rc;
567
568         ENTRY;
569         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
570         if (rc == 0) {
571                 mdd_flags_xlate(obj, la->la_flags);
572         }
573         RETURN(rc);
574 }
575
576 /* get only inode attributes */
577 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
578                   struct md_attr *ma)
579 {
580         int rc = 0;
581         ENTRY;
582
583         if (ma->ma_valid & MA_INODE)
584                 RETURN(0);
585
586         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
587                           mdd_object_capa(env, mdd_obj));
588         if (rc == 0)
589                 ma->ma_valid |= MA_INODE;
590         RETURN(rc);
591 }
592
593 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
594 {
595         struct lov_desc *ldesc;
596         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
597         struct lov_user_md *lum = (struct lov_user_md*)lmm;
598         ENTRY;
599
600         if (!lum)
601                 RETURN(0);
602
603         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
604         LASSERT(ldesc != NULL);
605
606         lum->lmm_magic = LOV_MAGIC_V1;
607         lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
608         lum->lmm_pattern = ldesc->ld_pattern;
609         lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
610         lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
611         lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
612
613         RETURN(sizeof(*lum));
614 }
615
616 static int is_rootdir(struct mdd_object *mdd_obj)
617 {
618         const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
619         const struct lu_fid *fid = mdo2fid(mdd_obj);
620
621         return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
622 }
623
624 int mdd_big_lmm_get(const struct lu_env *env, struct mdd_object *obj,
625                     struct md_attr *ma)
626 {
627         struct mdd_thread_info *info = mdd_env_info(env);
628         int size;
629         int rc;
630         ENTRY;
631
632         LASSERT(info != NULL);
633         LASSERT(ma->ma_lmm_size > 0);
634         LASSERT(ma->ma_big_lmm_used == 0);
635
636         rc = mdo_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
637                            mdd_object_capa(env, obj));
638         if (rc < 0)
639                 RETURN(rc);
640
641         /* big_lmm may need to grow */
642         size = rc;
643         mdd_max_lmm_buffer(env, size);
644         if (info->mti_max_lmm == NULL)
645                 RETURN(-ENOMEM);
646
647         LASSERT(info->mti_max_lmm_size >= size);
648         rc = mdd_get_md(env, obj, info->mti_max_lmm, &size,
649                         XATTR_NAME_LOV);
650         if (rc < 0)
651                 RETURN(rc);
652
653         ma->ma_big_lmm_used = 1;
654         ma->ma_valid |= MA_LOV;
655         ma->ma_lmm = info->mti_max_lmm;
656         ma->ma_lmm_size = size;
657         LASSERT(size == rc);
658         RETURN(rc);
659 }
660
661 /* get lov EA only */
662 static int __mdd_lmm_get(const struct lu_env *env,
663                          struct mdd_object *mdd_obj, struct md_attr *ma)
664 {
665         int rc;
666         ENTRY;
667
668         if (ma->ma_valid & MA_LOV)
669                 RETURN(0);
670
671         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
672                         XATTR_NAME_LOV);
673         if (rc == -ERANGE)
674                 rc = mdd_big_lmm_get(env, mdd_obj, ma);
675         else if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
676                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
677
678         if (rc > 0) {
679                 ma->ma_lmm_size = rc;
680                 ma->ma_layout_gen = ma->ma_lmm->lmm_layout_gen;
681                 ma->ma_valid |= MA_LOV | MA_LAY_GEN;
682                 rc = 0;
683         }
684         RETURN(rc);
685 }
686
687 /* get the first parent fid from link EA */
688 static int mdd_pfid_get(const struct lu_env *env,
689                         struct mdd_object *mdd_obj, struct md_attr *ma)
690 {
691         struct lu_buf *buf;
692         struct link_ea_header *leh;
693         struct link_ea_entry *lee;
694         struct lu_fid *pfid = &ma->ma_pfid;
695         ENTRY;
696
697         if (ma->ma_valid & MA_PFID)
698                 RETURN(0);
699
700         buf = mdd_links_get(env, mdd_obj);
701         if (IS_ERR(buf))
702                 RETURN(PTR_ERR(buf));
703
704         leh = buf->lb_buf;
705         lee = (struct link_ea_entry *)(leh + 1);
706         memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
707         fid_be_to_cpu(pfid, pfid);
708         ma->ma_valid |= MA_PFID;
709         if (buf->lb_len > OBD_ALLOC_BIG)
710                 /* if we vmalloced a large buffer drop it */
711                 mdd_buf_put(buf);
712         RETURN(0);
713 }
714
715 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
716                        struct md_attr *ma)
717 {
718         int rc;
719         ENTRY;
720
721         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
722         rc = __mdd_lmm_get(env, mdd_obj, ma);
723         mdd_read_unlock(env, mdd_obj);
724         RETURN(rc);
725 }
726
727 /* get lmv EA only*/
728 static int __mdd_lmv_get(const struct lu_env *env,
729                          struct mdd_object *mdd_obj, struct md_attr *ma)
730 {
731         int rc;
732         ENTRY;
733
734         if (ma->ma_valid & MA_LMV)
735                 RETURN(0);
736
737         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
738                         XATTR_NAME_LMV);
739         if (rc > 0) {
740                 ma->ma_valid |= MA_LMV;
741                 rc = 0;
742         }
743         RETURN(rc);
744 }
745
746 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
747                          struct md_attr *ma)
748 {
749         struct mdd_thread_info *info = mdd_env_info(env);
750         struct lustre_mdt_attrs *lma =
751                                  (struct lustre_mdt_attrs *)info->mti_xattr_buf;
752         int lma_size;
753         int rc;
754         ENTRY;
755
756         /* If all needed data are already valid, nothing to do */
757         if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
758             (ma->ma_need & (MA_HSM | MA_SOM)))
759                 RETURN(0);
760
761         /* Read LMA from disk EA */
762         lma_size = sizeof(info->mti_xattr_buf);
763         rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
764         if (rc <= 0)
765                 RETURN(rc);
766
767         /* Useless to check LMA incompatibility because this is already done in
768          * osd_ea_fid_get(), and this will fail long before this code is
769          * called.
770          * So, if we are here, LMA is compatible.
771          */
772
773         lustre_lma_swab(lma);
774
775         /* Swab and copy LMA */
776         if (ma->ma_need & MA_HSM) {
777                 if (lma->lma_compat & LMAC_HSM)
778                         ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
779                 else
780                         ma->ma_hsm.mh_flags = 0;
781                 ma->ma_valid |= MA_HSM;
782         }
783
784         /* Copy SOM */
785         if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
786                 LASSERT(ma->ma_som != NULL);
787                 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
788                 ma->ma_som->msd_size    = lma->lma_som_size;
789                 ma->ma_som->msd_blocks  = lma->lma_som_blocks;
790                 ma->ma_som->msd_mountid = lma->lma_som_mountid;
791                 ma->ma_valid |= MA_SOM;
792         }
793
794         RETURN(0);
795 }
796
797 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
798                                  struct md_attr *ma)
799 {
800         int rc = 0;
801         ENTRY;
802
803         if (ma->ma_need & MA_INODE)
804                 rc = mdd_iattr_get(env, mdd_obj, ma);
805
806         if (rc == 0 && ma->ma_need & MA_LOV) {
807                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
808                     S_ISDIR(mdd_object_type(mdd_obj)))
809                         rc = __mdd_lmm_get(env, mdd_obj, ma);
810         }
811         if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
812                 if (S_ISREG(mdd_object_type(mdd_obj)))
813                         rc = mdd_pfid_get(env, mdd_obj, ma);
814         }
815         if (rc == 0 && ma->ma_need & MA_LMV) {
816                 if (S_ISDIR(mdd_object_type(mdd_obj)))
817                         rc = __mdd_lmv_get(env, mdd_obj, ma);
818         }
819         if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
820                 if (S_ISREG(mdd_object_type(mdd_obj)))
821                         rc = __mdd_lma_get(env, mdd_obj, ma);
822         }
823 #ifdef CONFIG_FS_POSIX_ACL
824         if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
825                 if (S_ISDIR(mdd_object_type(mdd_obj)))
826                         rc = mdd_def_acl_get(env, mdd_obj, ma);
827         }
828 #endif
829         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
830                rc, ma->ma_valid, ma->ma_lmm);
831         RETURN(rc);
832 }
833
834 int mdd_attr_get_internal_locked(const struct lu_env *env,
835                                  struct mdd_object *mdd_obj, struct md_attr *ma)
836 {
837         int rc;
838         int needlock = ma->ma_need &
839                        (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
840
841         if (needlock)
842                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
843         rc = mdd_attr_get_internal(env, mdd_obj, ma);
844         if (needlock)
845                 mdd_read_unlock(env, mdd_obj);
846         return rc;
847 }
848
849 /*
850  * No permission check is needed.
851  */
852 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
853                         struct md_attr *ma)
854 {
855         struct mdd_object *mdd_obj = md2mdd_obj(obj);
856         int                rc;
857
858         ENTRY;
859         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
860         RETURN(rc);
861 }
862
863 /*
864  * No permission check is needed.
865  */
866 static int mdd_xattr_get(const struct lu_env *env,
867                          struct md_object *obj, struct lu_buf *buf,
868                          const char *name)
869 {
870         struct mdd_object *mdd_obj = md2mdd_obj(obj);
871         int rc;
872
873         ENTRY;
874
875         if (mdd_object_exists(mdd_obj) == 0) {
876                 CERROR("%s: object "DFID" not found: rc = -2\n",
877                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
878                 return -ENOENT;
879         }
880
881         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
882         rc = mdo_xattr_get(env, mdd_obj, buf, name,
883                            mdd_object_capa(env, mdd_obj));
884         mdd_read_unlock(env, mdd_obj);
885
886         RETURN(rc);
887 }
888
889 /*
890  * Permission check is done when open,
891  * no need check again.
892  */
893 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
894                         struct lu_buf *buf)
895 {
896         struct mdd_object *mdd_obj = md2mdd_obj(obj);
897         struct dt_object  *next;
898         loff_t             pos = 0;
899         int                rc;
900         ENTRY;
901
902         if (mdd_object_exists(mdd_obj) == 0) {
903                 CERROR("%s: object "DFID" not found: rc = -2\n",
904                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
905                 return -ENOENT;
906         }
907
908         next = mdd_object_child(mdd_obj);
909         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
910         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
911                                          mdd_object_capa(env, mdd_obj));
912         mdd_read_unlock(env, mdd_obj);
913         RETURN(rc);
914 }
915
916 /*
917  * No permission check is needed.
918  */
919 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
920                           struct lu_buf *buf)
921 {
922         struct mdd_object *mdd_obj = md2mdd_obj(obj);
923         int rc;
924
925         ENTRY;
926
927         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
928         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
929         mdd_read_unlock(env, mdd_obj);
930
931         RETURN(rc);
932 }
933
934 int mdd_declare_object_create_internal(const struct lu_env *env,
935                                        struct mdd_object *p,
936                                        struct mdd_object *c,
937                                        struct md_attr *ma,
938                                        struct thandle *handle,
939                                        const struct md_op_spec *spec)
940 {
941         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
942         const struct dt_index_features *feat = spec->sp_feat;
943         int rc;
944         ENTRY;
945
946         if (feat != &dt_directory_features && feat != NULL)
947                 dof->dof_type = DFT_INDEX;
948         else
949                 dof->dof_type = dt_mode_to_dft(ma->ma_attr.la_mode);
950
951         dof->u.dof_idx.di_feat = feat;
952
953         rc = mdo_declare_create_obj(env, c, &ma->ma_attr, NULL, dof, handle);
954
955         RETURN(rc);
956 }
957
958 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
959                                struct mdd_object *c, struct md_attr *ma,
960                                struct thandle *handle,
961                                const struct md_op_spec *spec)
962 {
963         struct lu_attr *attr = &ma->ma_attr;
964         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
965         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
966         const struct dt_index_features *feat = spec->sp_feat;
967         int rc;
968         ENTRY;
969
970         if (!mdd_object_exists(c)) {
971                 struct dt_object *next = mdd_object_child(c);
972                 LASSERT(next);
973
974                 if (feat != &dt_directory_features && feat != NULL)
975                         dof->dof_type = DFT_INDEX;
976                 else
977                         dof->dof_type = dt_mode_to_dft(attr->la_mode);
978
979                 dof->u.dof_idx.di_feat = feat;
980
981                 /* @hint will be initialized by underlying device. */
982                 next->do_ops->do_ah_init(env, hint,
983                                          p ? mdd_object_child(p) : NULL,
984                                          attr->la_mode & S_IFMT);
985
986                 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
987                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
988         } else
989                 rc = -EEXIST;
990
991         RETURN(rc);
992 }
993
994 /**
995  * Make sure the ctime is increased only.
996  */
997 static inline int mdd_attr_check(const struct lu_env *env,
998                                  struct mdd_object *obj,
999                                  struct lu_attr *attr)
1000 {
1001         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1002         int rc;
1003         ENTRY;
1004
1005         if (attr->la_valid & LA_CTIME) {
1006                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1007                 if (rc)
1008                         RETURN(rc);
1009
1010                 if (attr->la_ctime < tmp_la->la_ctime)
1011                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
1012                 else if (attr->la_valid == LA_CTIME &&
1013                          attr->la_ctime == tmp_la->la_ctime)
1014                         attr->la_valid &= ~LA_CTIME;
1015         }
1016         RETURN(0);
1017 }
1018
1019 int mdd_attr_set_internal(const struct lu_env *env,
1020                           struct mdd_object *obj,
1021                           struct lu_attr *attr,
1022                           struct thandle *handle,
1023                           int needacl)
1024 {
1025         int rc;
1026         ENTRY;
1027
1028         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
1029 #ifdef CONFIG_FS_POSIX_ACL
1030         if (!rc && (attr->la_valid & LA_MODE) && needacl)
1031                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
1032 #endif
1033         RETURN(rc);
1034 }
1035
1036 int mdd_attr_check_set_internal(const struct lu_env *env,
1037                                 struct mdd_object *obj,
1038                                 struct lu_attr *attr,
1039                                 struct thandle *handle,
1040                                 int needacl)
1041 {
1042         int rc;
1043         ENTRY;
1044
1045         rc = mdd_attr_check(env, obj, attr);
1046         if (rc)
1047                 RETURN(rc);
1048
1049         if (attr->la_valid)
1050                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1051         RETURN(rc);
1052 }
1053
1054 static int mdd_attr_set_internal_locked(const struct lu_env *env,
1055                                         struct mdd_object *obj,
1056                                         struct lu_attr *attr,
1057                                         struct thandle *handle,
1058                                         int needacl)
1059 {
1060         int rc;
1061         ENTRY;
1062
1063         needacl = needacl && (attr->la_valid & LA_MODE);
1064         if (needacl)
1065                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1066         rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1067         if (needacl)
1068                 mdd_write_unlock(env, obj);
1069         RETURN(rc);
1070 }
1071
1072 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
1073                                        struct mdd_object *obj,
1074                                        struct lu_attr *attr,
1075                                        struct thandle *handle,
1076                                        int needacl)
1077 {
1078         int rc;
1079         ENTRY;
1080
1081         needacl = needacl && (attr->la_valid & LA_MODE);
1082         if (needacl)
1083                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1084         rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
1085         if (needacl)
1086                 mdd_write_unlock(env, obj);
1087         RETURN(rc);
1088 }
1089
1090 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
1091                     const struct lu_buf *buf, const char *name,
1092                     int fl, struct thandle *handle)
1093 {
1094         struct lustre_capa *capa = mdd_object_capa(env, obj);
1095         int rc = -EINVAL;
1096         ENTRY;
1097
1098         if (buf->lb_buf && buf->lb_len > 0)
1099                 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1100         else if (buf->lb_buf == NULL && buf->lb_len == 0)
1101                 rc = mdo_xattr_del(env, obj, name, handle, capa);
1102
1103         RETURN(rc);
1104 }
1105
1106 /*
1107  * This gives the same functionality as the code between
1108  * sys_chmod and inode_setattr
1109  * chown_common and inode_setattr
1110  * utimes and inode_setattr
1111  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1112  */
1113 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1114                         struct lu_attr *la, const struct md_attr *ma)
1115 {
1116         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
1117         struct md_ucred  *uc;
1118         int               rc;
1119         ENTRY;
1120
1121         if (!la->la_valid)
1122                 RETURN(0);
1123
1124         /* Do not permit change file type */
1125         if (la->la_valid & LA_TYPE)
1126                 RETURN(-EPERM);
1127
1128         /* They should not be processed by setattr */
1129         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1130                 RETURN(-EPERM);
1131
1132         /* export destroy does not have ->le_ses, but we may want
1133          * to drop LUSTRE_SOM_FL. */
1134         if (!env->le_ses)
1135                 RETURN(0);
1136
1137         uc = md_ucred(env);
1138
1139         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1140         if (rc)
1141                 RETURN(rc);
1142
1143         if (la->la_valid == LA_CTIME) {
1144                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1145                         /* This is only for set ctime when rename's source is
1146                          * on remote MDS. */
1147                         rc = mdd_may_delete(env, NULL, obj,
1148                                             (struct md_attr *)ma, 1, 0);
1149                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1150                         la->la_valid &= ~LA_CTIME;
1151                 RETURN(rc);
1152         }
1153
1154         if (la->la_valid == LA_ATIME) {
1155                 /* This is atime only set for read atime update on close. */
1156                 if (la->la_atime >= tmp_la->la_atime &&
1157                     la->la_atime < (tmp_la->la_atime +
1158                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1159                         la->la_valid &= ~LA_ATIME;
1160                 RETURN(0);
1161         }
1162
1163         /* Check if flags change. */
1164         if (la->la_valid & LA_FLAGS) {
1165                 unsigned int oldflags = 0;
1166                 unsigned int newflags = la->la_flags &
1167                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1168
1169                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1170                     !mdd_capable(uc, CFS_CAP_FOWNER))
1171                         RETURN(-EPERM);
1172
1173                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1174                  * only be changed by the relevant capability. */
1175                 if (mdd_is_immutable(obj))
1176                         oldflags |= LUSTRE_IMMUTABLE_FL;
1177                 if (mdd_is_append(obj))
1178                         oldflags |= LUSTRE_APPEND_FL;
1179                 if ((oldflags ^ newflags) &&
1180                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1181                         RETURN(-EPERM);
1182
1183                 if (!S_ISDIR(tmp_la->la_mode))
1184                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1185         }
1186
1187         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1188             (la->la_valid & ~LA_FLAGS) &&
1189             !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1190                 RETURN(-EPERM);
1191
1192         /* Check for setting the obj time. */
1193         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1194             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1195                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1196                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
1197                         rc = mdd_permission_internal_locked(env, obj, tmp_la,
1198                                                             MAY_WRITE,
1199                                                             MOR_TGT_CHILD);
1200                         if (rc)
1201                                 RETURN(rc);
1202                 }
1203         }
1204
1205         if (la->la_valid & LA_KILL_SUID) {
1206                 la->la_valid &= ~LA_KILL_SUID;
1207                 if ((tmp_la->la_mode & S_ISUID) &&
1208                     !(la->la_valid & LA_MODE)) {
1209                         la->la_mode = tmp_la->la_mode;
1210                         la->la_valid |= LA_MODE;
1211                 }
1212                 la->la_mode &= ~S_ISUID;
1213         }
1214
1215         if (la->la_valid & LA_KILL_SGID) {
1216                 la->la_valid &= ~LA_KILL_SGID;
1217                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1218                                         (S_ISGID | S_IXGRP)) &&
1219                     !(la->la_valid & LA_MODE)) {
1220                         la->la_mode = tmp_la->la_mode;
1221                         la->la_valid |= LA_MODE;
1222                 }
1223                 la->la_mode &= ~S_ISGID;
1224         }
1225
1226         /* Make sure a caller can chmod. */
1227         if (la->la_valid & LA_MODE) {
1228                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1229                     (uc->mu_fsuid != tmp_la->la_uid) &&
1230                     !mdd_capable(uc, CFS_CAP_FOWNER))
1231                         RETURN(-EPERM);
1232
1233                 if (la->la_mode == (cfs_umode_t) -1)
1234                         la->la_mode = tmp_la->la_mode;
1235                 else
1236                         la->la_mode = (la->la_mode & S_IALLUGO) |
1237                                       (tmp_la->la_mode & ~S_IALLUGO);
1238
1239                 /* Also check the setgid bit! */
1240                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1241                                        la->la_gid : tmp_la->la_gid) &&
1242                     !mdd_capable(uc, CFS_CAP_FSETID))
1243                         la->la_mode &= ~S_ISGID;
1244         } else {
1245                la->la_mode = tmp_la->la_mode;
1246         }
1247
1248         /* Make sure a caller can chown. */
1249         if (la->la_valid & LA_UID) {
1250                 if (la->la_uid == (uid_t) -1)
1251                         la->la_uid = tmp_la->la_uid;
1252                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1253                     (la->la_uid != tmp_la->la_uid)) &&
1254                     !mdd_capable(uc, CFS_CAP_CHOWN))
1255                         RETURN(-EPERM);
1256
1257                 /* If the user or group of a non-directory has been
1258                  * changed by a non-root user, remove the setuid bit.
1259                  * 19981026 David C Niemi <niemi@tux.org>
1260                  *
1261                  * Changed this to apply to all users, including root,
1262                  * to avoid some races. This is the behavior we had in
1263                  * 2.0. The check for non-root was definitely wrong
1264                  * for 2.2 anyway, as it should have been using
1265                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
1266                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1267                     !S_ISDIR(tmp_la->la_mode)) {
1268                         la->la_mode &= ~S_ISUID;
1269                         la->la_valid |= LA_MODE;
1270                 }
1271         }
1272
1273         /* Make sure caller can chgrp. */
1274         if (la->la_valid & LA_GID) {
1275                 if (la->la_gid == (gid_t) -1)
1276                         la->la_gid = tmp_la->la_gid;
1277                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1278                     ((la->la_gid != tmp_la->la_gid) &&
1279                     !lustre_in_group_p(uc, la->la_gid))) &&
1280                     !mdd_capable(uc, CFS_CAP_CHOWN))
1281                         RETURN(-EPERM);
1282
1283                 /* Likewise, if the user or group of a non-directory
1284                  * has been changed by a non-root user, remove the
1285                  * setgid bit UNLESS there is no group execute bit
1286                  * (this would be a file marked for mandatory
1287                  * locking).  19981026 David C Niemi <niemi@tux.org>
1288                  *
1289                  * Removed the fsuid check (see the comment above) --
1290                  * 19990830 SD. */
1291                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1292                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1293                         la->la_mode &= ~S_ISGID;
1294                         la->la_valid |= LA_MODE;
1295                 }
1296         }
1297
1298         /* For both Size-on-MDS case and truncate case,
1299          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1300          * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1301          * For SOM case, it is true, the MAY_WRITE perm has been checked
1302          * when open, no need check again. For truncate case, it is false,
1303          * the MAY_WRITE perm should be checked here. */
1304         if (ma->ma_attr_flags & MDS_SOM) {
1305                 /* For the "Size-on-MDS" setattr update, merge coming
1306                  * attributes with the set in the inode. BUG 10641 */
1307                 if ((la->la_valid & LA_ATIME) &&
1308                     (la->la_atime <= tmp_la->la_atime))
1309                         la->la_valid &= ~LA_ATIME;
1310
1311                 /* OST attributes do not have a priority over MDS attributes,
1312                  * so drop times if ctime is equal. */
1313                 if ((la->la_valid & LA_CTIME) &&
1314                     (la->la_ctime <= tmp_la->la_ctime))
1315                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1316         } else {
1317                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1318                         if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1319                               (uc->mu_fsuid == tmp_la->la_uid)) &&
1320                             !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1321                                 rc = mdd_permission_internal_locked(env, obj,
1322                                                             tmp_la, MAY_WRITE,
1323                                                             MOR_TGT_CHILD);
1324                                 if (rc)
1325                                         RETURN(rc);
1326                         }
1327                 }
1328                 if (la->la_valid & LA_CTIME) {
1329                         /* The pure setattr, it has the priority over what is
1330                          * already set, do not drop it if ctime is equal. */
1331                         if (la->la_ctime < tmp_la->la_ctime)
1332                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1333                                                   LA_CTIME);
1334                 }
1335         }
1336
1337         RETURN(0);
1338 }
1339
1340 /** Store a data change changelog record
1341  * If this fails, we must fail the whole transaction; we don't
1342  * want the change to commit without the log entry.
1343  * \param mdd_obj - mdd_object of change
1344  * \param handle - transacion handle
1345  */
1346 static int mdd_changelog_data_store(const struct lu_env     *env,
1347                                     struct mdd_device       *mdd,
1348                                     enum changelog_rec_type type,
1349                                     int                     flags,
1350                                     struct mdd_object       *mdd_obj,
1351                                     struct thandle          *handle)
1352 {
1353         const struct lu_fid *tfid = mdo2fid(mdd_obj);
1354         struct llog_changelog_rec *rec;
1355         struct thandle *th = NULL;
1356         struct lu_buf *buf;
1357         int reclen;
1358         int rc;
1359
1360         /* Not recording */
1361         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1362                 RETURN(0);
1363         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1364                 RETURN(0);
1365
1366         LASSERT(mdd_obj != NULL);
1367         LASSERT(handle != NULL);
1368
1369         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1370             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1371                 /* Don't need multiple updates in this log */
1372                 /* Don't check under lock - no big deal if we get an extra
1373                    entry */
1374                 RETURN(0);
1375         }
1376
1377         reclen = llog_data_len(sizeof(*rec));
1378         buf = mdd_buf_alloc(env, reclen);
1379         if (buf->lb_buf == NULL)
1380                 RETURN(-ENOMEM);
1381         rec = (struct llog_changelog_rec *)buf->lb_buf;
1382
1383         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1384         rec->cr.cr_type = (__u32)type;
1385         rec->cr.cr_tfid = *tfid;
1386         rec->cr.cr_namelen = 0;
1387         mdd_obj->mod_cltime = cfs_time_current_64();
1388
1389         rc = mdd_changelog_llog_write(mdd, rec, handle ? : th);
1390
1391         if (th)
1392                 mdd_trans_stop(env, mdd, rc, th);
1393
1394         if (rc < 0) {
1395                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1396                        rc, type, PFID(tfid));
1397                 return -EFAULT;
1398         }
1399
1400         return 0;
1401 }
1402
1403 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1404                   int flags, struct md_object *obj)
1405 {
1406         struct thandle *handle;
1407         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1408         struct mdd_device *mdd = mdo2mdd(obj);
1409         int rc;
1410         ENTRY;
1411
1412         handle = mdd_trans_create(env, mdd);
1413         if (IS_ERR(handle))
1414                 return(PTR_ERR(handle));
1415
1416         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1417         if (rc)
1418                 GOTO(stop, rc);
1419
1420         rc = mdd_trans_start(env, mdd, handle);
1421         if (rc)
1422                 GOTO(stop, rc);
1423
1424         rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1425                                       handle);
1426
1427 stop:
1428         mdd_trans_stop(env, mdd, rc, handle);
1429
1430         RETURN(rc);
1431 }
1432
1433 /**
1434  * Should be called with write lock held.
1435  *
1436  * \see mdd_lma_set_locked().
1437  */
1438 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1439                        const struct md_attr *ma, struct thandle *handle)
1440 {
1441         struct mdd_thread_info *info = mdd_env_info(env);
1442         struct lu_buf *buf;
1443         struct lustre_mdt_attrs *lma =
1444                                 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1445         int lmasize = sizeof(struct lustre_mdt_attrs);
1446         int rc = 0;
1447
1448         ENTRY;
1449
1450         /* Either HSM or SOM part is not valid, we need to read it before */
1451         if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1452                 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1453                 if (rc <= 0)
1454                         RETURN(rc);
1455
1456                 lustre_lma_swab(lma);
1457         } else {
1458                 memset(lma, 0, lmasize);
1459         }
1460
1461         /* Copy HSM data */
1462         if (ma->ma_valid & MA_HSM) {
1463                 lma->lma_flags  |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1464                 lma->lma_compat |= LMAC_HSM;
1465         }
1466
1467         /* Copy SOM data */
1468         if (ma->ma_valid & MA_SOM) {
1469                 LASSERT(ma->ma_som != NULL);
1470                 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1471                         lma->lma_compat     &= ~LMAC_SOM;
1472                 } else {
1473                         lma->lma_compat     |= LMAC_SOM;
1474                         lma->lma_ioepoch     = ma->ma_som->msd_ioepoch;
1475                         lma->lma_som_size    = ma->ma_som->msd_size;
1476                         lma->lma_som_blocks  = ma->ma_som->msd_blocks;
1477                         lma->lma_som_mountid = ma->ma_som->msd_mountid;
1478                 }
1479         }
1480
1481         /* Copy FID */
1482         memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1483
1484         lustre_lma_swab(lma);
1485         buf = mdd_buf_get(env, lma, lmasize);
1486         rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1487
1488         RETURN(rc);
1489 }
1490
1491 /**
1492  * Save LMA extended attributes with data from \a ma.
1493  *
1494  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1495  * not, LMA EA will be first read from disk, modified and write back.
1496  *
1497  */
1498 static int mdd_lma_set_locked(const struct lu_env *env,
1499                               struct mdd_object *mdd_obj,
1500                               const struct md_attr *ma, struct thandle *handle)
1501 {
1502         int rc;
1503
1504         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1505         rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1506         mdd_write_unlock(env, mdd_obj);
1507         return rc;
1508 }
1509
1510 /* Precedence for choosing record type when multiple
1511  * attributes change: setattr > mtime > ctime > atime
1512  * (ctime changes when mtime does, plus chmod/chown.
1513  * atime and ctime are independent.) */
1514 static int mdd_attr_set_changelog(const struct lu_env *env,
1515                                   struct md_object *obj, struct thandle *handle,
1516                                   __u64 valid)
1517 {
1518         struct mdd_device *mdd = mdo2mdd(obj);
1519         int bits, type = 0;
1520
1521         bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1522         bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1523         bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1524         bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1525         bits = bits & mdd->mdd_cl.mc_mask;
1526         if (bits == 0)
1527                 return 0;
1528
1529         /* The record type is the lowest non-masked set bit */
1530         while (bits && ((bits & 1) == 0)) {
1531                 bits = bits >> 1;
1532                 type++;
1533         }
1534
1535         /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1536         return mdd_changelog_data_store(env, mdd, type, (int)valid,
1537                                         md2mdd_obj(obj), handle);
1538 }
1539
1540 static int mdd_declare_attr_set(const struct lu_env *env,
1541                                 struct mdd_device *mdd,
1542                                 struct mdd_object *obj,
1543                                 const struct md_attr *ma,
1544                                 struct lov_mds_md *lmm,
1545                                 struct thandle *handle)
1546 {
1547         struct lu_buf  *buf = &mdd_env_info(env)->mti_buf;
1548         int             rc, i;
1549
1550         rc = mdo_declare_attr_set(env, obj, &ma->ma_attr, handle);
1551         if (rc)
1552                 return rc;
1553
1554         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1555         if (rc)
1556                 return rc;
1557
1558         if (ma->ma_valid & MA_LOV) {
1559                 buf->lb_buf = NULL;
1560                 buf->lb_len = ma->ma_lmm_size;
1561                 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
1562                                            0, handle);
1563                 if (rc)
1564                         return rc;
1565         }
1566
1567         if (ma->ma_valid & (MA_HSM | MA_SOM)) {
1568                 buf->lb_buf = NULL;
1569                 buf->lb_len = sizeof(struct lustre_mdt_attrs);
1570                 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA,
1571                                            0, handle);
1572                 if (rc)
1573                         return rc;
1574         }
1575
1576 #ifdef CONFIG_FS_POSIX_ACL
1577         if (ma->ma_attr.la_valid & LA_MODE) {
1578                 mdd_read_lock(env, obj, MOR_TGT_CHILD);
1579                 rc = mdo_xattr_get(env, obj, buf, XATTR_NAME_ACL_ACCESS,
1580                                    BYPASS_CAPA);
1581                 mdd_read_unlock(env, obj);
1582                 if (rc == -EOPNOTSUPP || rc == -ENODATA)
1583                         rc = 0;
1584                 else if (rc < 0)
1585                         return rc;
1586
1587                 if (rc != 0) {
1588                         buf->lb_buf = NULL;
1589                         buf->lb_len = rc;
1590                         rc = mdo_declare_xattr_set(env, obj, buf,
1591                                                    XATTR_NAME_ACL_ACCESS, 0,
1592                                                    handle);
1593                         if (rc)
1594                                 return rc;
1595                 }
1596         }
1597 #endif
1598
1599         /* basically the log is the same as in unlink case */
1600         if (lmm) {
1601                 __u16 stripe;
1602
1603                 if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V1 &&
1604                                 le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V3) {
1605                         CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n",
1606                                mdd->mdd_obd_dev->obd_name,
1607                                le32_to_cpu(lmm->lmm_magic),
1608                                PFID(lu_object_fid(&obj->mod_obj.mo_lu)));
1609                         return -EINVAL;
1610                 }
1611
1612                 stripe = le16_to_cpu(lmm->lmm_stripe_count);
1613                 if (stripe == LOV_ALL_STRIPES) {
1614                         struct lov_desc *ldesc;
1615
1616                         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
1617                         LASSERT(ldesc != NULL);
1618                         stripe = ldesc->ld_tgt_count;
1619                 }
1620
1621                 for (i = 0; i < stripe; i++) {
1622                         rc = mdd_declare_llog_record(env, mdd,
1623                                         sizeof(struct llog_unlink_rec),
1624                                         handle);
1625                         if (rc)
1626                                 return rc;
1627                 }
1628         }
1629
1630         return rc;
1631 }
1632
1633 /* set attr and LOV EA at once, return updated attr */
1634 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1635                         const struct md_attr *ma)
1636 {
1637         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1638         struct mdd_device *mdd = mdo2mdd(obj);
1639         struct thandle *handle;
1640         struct lov_mds_md *lmm = NULL;
1641         struct llog_cookie *logcookies = NULL;
1642         int  rc, lmm_size = 0, cookie_size = 0;
1643         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1644         struct obd_device *obd = mdd->mdd_obd_dev;
1645         struct mds_obd *mds = &obd->u.mds;
1646 #ifdef HAVE_QUOTA_SUPPORT
1647         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1648         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1649         int quota_opc = 0, block_count = 0;
1650         int inode_pending[MAXQUOTAS] = { 0, 0 };
1651         int block_pending[MAXQUOTAS] = { 0, 0 };
1652 #endif
1653         ENTRY;
1654
1655         *la_copy = ma->ma_attr;
1656         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1657         if (rc != 0)
1658                 RETURN(rc);
1659
1660         /* setattr on "close" only change atime, or do nothing */
1661         if (ma->ma_valid == MA_INODE &&
1662             ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1663                 RETURN(0);
1664
1665         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1666             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1667                 lmm_size = mdd_lov_mdsize(env, mdd);
1668                 lmm = mdd_max_lmm_get(env, mdd);
1669                 if (lmm == NULL)
1670                         RETURN(-ENOMEM);
1671
1672                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1673                                 XATTR_NAME_LOV);
1674
1675                 if (rc < 0)
1676                         RETURN(rc);
1677         }
1678
1679         handle = mdd_trans_create(env, mdd);
1680         if (IS_ERR(handle))
1681                 RETURN(PTR_ERR(handle));
1682
1683         rc = mdd_declare_attr_set(env, mdd, mdd_obj, ma,
1684                                   lmm_size > 0 ? lmm : NULL, handle);
1685         if (rc)
1686                 GOTO(stop, rc);
1687
1688         rc = mdd_trans_start(env, mdd, handle);
1689         if (rc)
1690                 GOTO(stop, rc);
1691
1692         /* permission changes may require sync operation */
1693         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1694                 handle->th_sync |= !!mdd->mdd_sync_permission;
1695
1696         if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1697                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1698                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1699
1700 #ifdef HAVE_QUOTA_SUPPORT
1701         if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1702                 struct obd_export *exp = md_quota(env)->mq_exp;
1703                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1704
1705                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1706                 if (!rc) {
1707                         quota_opc = FSFILT_OP_SETATTR;
1708                         mdd_quota_wrapper(la_copy, qnids);
1709                         mdd_quota_wrapper(la_tmp, qoids);
1710                         /* get file quota for new owner */
1711                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1712                                         qnids, inode_pending, 1, NULL, 0,
1713                                         NULL, 0);
1714                         block_count = (la_tmp->la_blocks + 7) >> 3;
1715                         if (block_count) {
1716                                 void *data = NULL;
1717                                 mdd_data_get(env, mdd_obj, &data);
1718                                 /* get block quota for new owner */
1719                                 lquota_chkquota(mds_quota_interface_ref, obd,
1720                                                 exp, qnids, block_pending,
1721                                                 block_count, NULL,
1722                                                 LQUOTA_FLAGS_BLK, data, 1);
1723                         }
1724                 }
1725         }
1726 #endif
1727
1728         if (la_copy->la_valid & LA_FLAGS) {
1729                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1730                                                   handle, 1);
1731                 if (rc == 0)
1732                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1733         } else if (la_copy->la_valid) {            /* setattr */
1734                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1735                                                   handle, 1);
1736                 /* journal chown/chgrp in llog, just like unlink */
1737                 if (rc == 0 && lmm_size){
1738                         cookie_size = mdd_lov_cookiesize(env, mdd);
1739                         logcookies = mdd_max_cookie_get(env, mdd);
1740                         if (logcookies == NULL)
1741                                 GOTO(cleanup, rc = -ENOMEM);
1742
1743                         if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1744                                             logcookies, cookie_size) <= 0)
1745                                 logcookies = NULL;
1746                 }
1747         }
1748
1749         if (rc == 0 && ma->ma_valid & MA_LOV) {
1750                 cfs_umode_t mode;
1751
1752                 mode = mdd_object_type(mdd_obj);
1753                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1754                         rc = mdd_lsm_sanity_check(env, mdd_obj);
1755                         if (rc)
1756                                 GOTO(cleanup, rc);
1757
1758                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1759                                             ma->ma_lmm_size, handle, 1);
1760                 }
1761
1762         }
1763         if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1764                 cfs_umode_t mode;
1765
1766                 mode = mdd_object_type(mdd_obj);
1767                 if (S_ISREG(mode))
1768                         rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1769
1770         }
1771 cleanup:
1772         if (rc == 0)
1773                 rc = mdd_attr_set_changelog(env, obj, handle,
1774                                             ma->ma_attr.la_valid);
1775 stop:
1776         mdd_trans_stop(env, mdd, rc, handle);
1777         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1778                 /*set obd attr, if needed*/
1779                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1780                                            logcookies);
1781         }
1782 #ifdef HAVE_QUOTA_SUPPORT
1783         if (quota_opc) {
1784                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1785                                       inode_pending, 0);
1786                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1787                                       block_pending, 1);
1788                 /* Trigger dqrel/dqacq for original owner and new owner.
1789                  * If failed, the next call for lquota_chkquota will
1790                  * process it. */
1791                 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1792                               quota_opc);
1793         }
1794 #endif
1795         RETURN(rc);
1796 }
1797
1798 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1799                       const struct lu_buf *buf, const char *name, int fl,
1800                       struct thandle *handle)
1801 {
1802         int  rc;
1803         ENTRY;
1804
1805         mdd_write_lock(env, obj, MOR_TGT_CHILD);
1806         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1807         mdd_write_unlock(env, obj);
1808
1809         RETURN(rc);
1810 }
1811
1812 static int mdd_xattr_sanity_check(const struct lu_env *env,
1813                                   struct mdd_object *obj)
1814 {
1815         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1816         struct md_ucred *uc     = md_ucred(env);
1817         int rc;
1818         ENTRY;
1819
1820         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1821                 RETURN(-EPERM);
1822
1823         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1824         if (rc)
1825                 RETURN(rc);
1826
1827         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1828             !mdd_capable(uc, CFS_CAP_FOWNER))
1829                 RETURN(-EPERM);
1830
1831         RETURN(rc);
1832 }
1833
1834 static int mdd_declare_xattr_set(const struct lu_env *env,
1835                                  struct mdd_device *mdd,
1836                                  struct mdd_object *obj,
1837                                  const struct lu_buf *buf,
1838                                  const char *name,
1839                                  struct thandle *handle)
1840
1841 {
1842         int rc;
1843
1844         rc = mdo_declare_xattr_set(env, obj, buf, name, 0, handle);
1845         if (rc)
1846                 return rc;
1847
1848         /* Only record user xattr changes */
1849         if ((strncmp("user.", name, 5) == 0))
1850                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1851
1852         return rc;
1853 }
1854
1855 /**
1856  * The caller should guarantee to update the object ctime
1857  * after xattr_set if needed.
1858  */
1859 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1860                          const struct lu_buf *buf, const char *name,
1861                          int fl)
1862 {
1863         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1864         struct mdd_device *mdd = mdo2mdd(obj);
1865         struct thandle *handle;
1866         int  rc;
1867         ENTRY;
1868
1869         rc = mdd_xattr_sanity_check(env, mdd_obj);
1870         if (rc)
1871                 RETURN(rc);
1872
1873         handle = mdd_trans_create(env, mdd);
1874         if (IS_ERR(handle))
1875                 RETURN(PTR_ERR(handle));
1876
1877         rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, handle);
1878         if (rc)
1879                 GOTO(stop, rc);
1880
1881         rc = mdd_trans_start(env, mdd, handle);
1882         if (rc)
1883                 GOTO(stop, rc);
1884
1885         /* security-replated changes may require sync */
1886         if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1887                 handle->th_sync |= !!mdd->mdd_sync_permission;
1888
1889         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1890
1891         /* Only record system & user xattr changes */
1892         if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1893                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1894                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1895                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1896                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1897                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1898                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1899                                               handle);
1900
1901 stop:
1902         mdd_trans_stop(env, mdd, rc, handle);
1903
1904         RETURN(rc);
1905 }
1906
1907 static int mdd_declare_xattr_del(const struct lu_env *env,
1908                                  struct mdd_device *mdd,
1909                                  struct mdd_object *obj,
1910                                  const char *name,
1911                                  struct thandle *handle)
1912 {
1913         int rc;
1914
1915         rc = mdo_declare_xattr_del(env, obj, name, handle);
1916         if (rc)
1917                 return rc;
1918
1919         /* Only record user xattr changes */
1920         if ((strncmp("user.", name, 5) == 0))
1921                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1922
1923         return rc;
1924 }
1925
1926 /**
1927  * The caller should guarantee to update the object ctime
1928  * after xattr_set if needed.
1929  */
1930 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1931                   const char *name)
1932 {
1933         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1934         struct mdd_device *mdd = mdo2mdd(obj);
1935         struct thandle *handle;
1936         int  rc;
1937         ENTRY;
1938
1939         rc = mdd_xattr_sanity_check(env, mdd_obj);
1940         if (rc)
1941                 RETURN(rc);
1942
1943         handle = mdd_trans_create(env, mdd);
1944         if (IS_ERR(handle))
1945                 RETURN(PTR_ERR(handle));
1946
1947         rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
1948         if (rc)
1949                 GOTO(stop, rc);
1950
1951         rc = mdd_trans_start(env, mdd, handle);
1952         if (rc)
1953                 GOTO(stop, rc);
1954
1955         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1956         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1957                            mdd_object_capa(env, mdd_obj));
1958         mdd_write_unlock(env, mdd_obj);
1959
1960         /* Only record system & user xattr changes */
1961         if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1962                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1963                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1964                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1965                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1966                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1967                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1968                                               handle);
1969
1970 stop:
1971         mdd_trans_stop(env, mdd, rc, handle);
1972
1973         RETURN(rc);
1974 }
1975
1976 /* partial unlink */
1977 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1978                        struct md_attr *ma)
1979 {
1980         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1981         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1982         struct mdd_device *mdd = mdo2mdd(obj);
1983         struct thandle *handle;
1984 #ifdef HAVE_QUOTA_SUPPORT
1985         struct obd_device *obd = mdd->mdd_obd_dev;
1986         struct mds_obd *mds = &obd->u.mds;
1987         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1988         int quota_opc = 0;
1989 #endif
1990         int rc;
1991         ENTRY;
1992
1993         /* XXX: this code won't be used ever:
1994          * DNE uses slightly different approach */
1995         LBUG();
1996
1997         /*
1998          * Check -ENOENT early here because we need to get object type
1999          * to calculate credits before transaction start
2000          */
2001         if (mdd_object_exists(mdd_obj) == 0) {
2002                 CERROR("%s: object "DFID" not found: rc = -2\n",
2003                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2004                 RETURN(-ENOENT);
2005         }
2006
2007         LASSERT(mdd_object_exists(mdd_obj) > 0);
2008
2009         handle = mdd_trans_create(env, mdd);
2010         if (IS_ERR(handle))
2011                 RETURN(-ENOMEM);
2012
2013         rc = mdd_trans_start(env, mdd, handle);
2014
2015         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2016
2017         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
2018         if (rc)
2019                 GOTO(cleanup, rc);
2020
2021         mdo_ref_del(env, mdd_obj, handle);
2022
2023         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
2024                 /* unlink dot */
2025                 mdo_ref_del(env, mdd_obj, handle);
2026         }
2027
2028         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2029         la_copy->la_ctime = ma->ma_attr.la_ctime;
2030
2031         la_copy->la_valid = LA_CTIME;
2032         rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
2033         if (rc)
2034                 GOTO(cleanup, rc);
2035
2036         rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
2037 #ifdef HAVE_QUOTA_SUPPORT
2038         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
2039             ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
2040                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2041                 mdd_quota_wrapper(&ma->ma_attr, qids);
2042         }
2043 #endif
2044
2045
2046         EXIT;
2047 cleanup:
2048         mdd_write_unlock(env, mdd_obj);
2049         mdd_trans_stop(env, mdd, rc, handle);
2050 #ifdef HAVE_QUOTA_SUPPORT
2051         if (quota_opc)
2052                 /* Trigger dqrel on the owner of child. If failed,
2053                  * the next call for lquota_chkquota will process it */
2054                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2055                               quota_opc);
2056 #endif
2057         return rc;
2058 }
2059
2060 /* partial operation */
2061 static int mdd_oc_sanity_check(const struct lu_env *env,
2062                                struct mdd_object *obj,
2063                                struct md_attr *ma)
2064 {
2065         int rc;
2066         ENTRY;
2067
2068         switch (ma->ma_attr.la_mode & S_IFMT) {
2069         case S_IFREG:
2070         case S_IFDIR:
2071         case S_IFLNK:
2072         case S_IFCHR:
2073         case S_IFBLK:
2074         case S_IFIFO:
2075         case S_IFSOCK:
2076                 rc = 0;
2077                 break;
2078         default:
2079                 rc = -EINVAL;
2080                 break;
2081         }
2082         RETURN(rc);
2083 }
2084
2085 static int mdd_object_create(const struct lu_env *env,
2086                              struct md_object *obj,
2087                              const struct md_op_spec *spec,
2088                              struct md_attr *ma)
2089 {
2090
2091         struct mdd_device *mdd = mdo2mdd(obj);
2092         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2093         const struct lu_fid *pfid = spec->u.sp_pfid;
2094         struct thandle *handle;
2095 #ifdef HAVE_QUOTA_SUPPORT
2096         struct obd_device *obd = mdd->mdd_obd_dev;
2097         struct obd_export *exp = md_quota(env)->mq_exp;
2098         struct mds_obd *mds = &obd->u.mds;
2099         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2100         int quota_opc = 0, block_count = 0;
2101         int inode_pending[MAXQUOTAS] = { 0, 0 };
2102         int block_pending[MAXQUOTAS] = { 0, 0 };
2103 #endif
2104         int rc = 0;
2105         ENTRY;
2106
2107         /* XXX: this code won't be used ever:
2108          * DNE uses slightly different approach */
2109         LBUG();
2110
2111 #ifdef HAVE_QUOTA_SUPPORT
2112         if (mds->mds_quota) {
2113                 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
2114                 mdd_quota_wrapper(&ma->ma_attr, qids);
2115                 /* get file quota for child */
2116                 lquota_chkquota(mds_quota_interface_ref, obd, exp,
2117                                 qids, inode_pending, 1, NULL, 0,
2118                                 NULL, 0);
2119                 switch (ma->ma_attr.la_mode & S_IFMT) {
2120                 case S_IFLNK:
2121                 case S_IFDIR:
2122                         block_count = 2;
2123                         break;
2124                 case S_IFREG:
2125                         block_count = 1;
2126                         break;
2127                 }
2128                 /* get block quota for child */
2129                 if (block_count)
2130                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
2131                                         qids, block_pending, block_count,
2132                                         NULL, LQUOTA_FLAGS_BLK, NULL, 0);
2133         }
2134 #endif
2135
2136         handle = mdd_trans_create(env, mdd);
2137         if (IS_ERR(handle))
2138                 GOTO(out_pending, rc = PTR_ERR(handle));
2139
2140         rc = mdd_trans_start(env, mdd, handle);
2141
2142         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2143         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
2144         if (rc)
2145                 GOTO(unlock, rc);
2146
2147         rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
2148         if (rc)
2149                 GOTO(unlock, rc);
2150
2151         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
2152                 /* If creating the slave object, set slave EA here. */
2153                 int lmv_size = spec->u.sp_ea.eadatalen;
2154                 struct lmv_stripe_md *lmv;
2155
2156                 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
2157                 LASSERT(lmv != NULL && lmv_size > 0);
2158
2159                 rc = __mdd_xattr_set(env, mdd_obj,
2160                                      mdd_buf_get_const(env, lmv, lmv_size),
2161                                      XATTR_NAME_LMV, 0, handle);
2162                 if (rc)
2163                         GOTO(unlock, rc);
2164
2165                 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
2166                                            handle, 0);
2167         } else {
2168 #ifdef CONFIG_FS_POSIX_ACL
2169                 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
2170                         struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
2171
2172                         buf->lb_buf = (void *)spec->u.sp_ea.eadata;
2173                         buf->lb_len = spec->u.sp_ea.eadatalen;
2174                         if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
2175                                 rc = __mdd_acl_init(env, mdd_obj, buf,
2176                                                     &ma->ma_attr.la_mode,
2177                                                     handle);
2178                                 if (rc)
2179                                         GOTO(unlock, rc);
2180                                 else
2181                                         ma->ma_attr.la_valid |= LA_MODE;
2182                         }
2183
2184                         pfid = spec->u.sp_ea.fid;
2185                 }
2186 #endif
2187                 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
2188                                            spec);
2189         }
2190         EXIT;
2191 unlock:
2192         if (rc == 0)
2193                 rc = mdd_attr_get_internal(env, mdd_obj, ma);
2194         mdd_write_unlock(env, mdd_obj);
2195
2196         mdd_trans_stop(env, mdd, rc, handle);
2197 out_pending:
2198 #ifdef HAVE_QUOTA_SUPPORT
2199         if (quota_opc) {
2200                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2201                                       inode_pending, 0);
2202                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2203                                       block_pending, 1);
2204                 /* Trigger dqacq on the owner of child. If failed,
2205                  * the next call for lquota_chkquota will process it. */
2206                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2207                               quota_opc);
2208         }
2209 #endif
2210         return rc;
2211 }
2212
2213 /* partial link */
2214 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
2215                        const struct md_attr *ma)
2216 {
2217         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
2218         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2219         struct mdd_device *mdd = mdo2mdd(obj);
2220         struct thandle *handle;
2221         int rc;
2222         ENTRY;
2223
2224         /* XXX: this code won't be used ever:
2225          * DNE uses slightly different approach */
2226         LBUG();
2227
2228         handle = mdd_trans_create(env, mdd);
2229         if (IS_ERR(handle))
2230                 RETURN(-ENOMEM);
2231
2232         rc = mdd_trans_start(env, mdd, handle);
2233
2234         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2235         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
2236         if (rc == 0)
2237                 mdo_ref_add(env, mdd_obj, handle);
2238         mdd_write_unlock(env, mdd_obj);
2239         if (rc == 0) {
2240                 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2241                 la_copy->la_ctime = ma->ma_attr.la_ctime;
2242
2243                 la_copy->la_valid = LA_CTIME;
2244                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
2245                                                         handle, 0);
2246         }
2247         mdd_trans_stop(env, mdd, 0, handle);
2248
2249         RETURN(rc);
2250 }
2251
2252 /*
2253  * do NOT or the MAY_*'s, you'll get the weakest
2254  */
2255 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
2256 {
2257         int res = 0;
2258
2259         /* Sadly, NFSD reopens a file repeatedly during operation, so the
2260          * "acc_mode = 0" allowance for newly-created files isn't honoured.
2261          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
2262          * owner can write to a file even if it is marked readonly to hide
2263          * its brokenness. (bug 5781) */
2264         if (flags & MDS_OPEN_OWNEROVERRIDE) {
2265                 struct md_ucred *uc = md_ucred(env);
2266
2267                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
2268                     (la->la_uid == uc->mu_fsuid))
2269                         return 0;
2270         }
2271
2272         if (flags & FMODE_READ)
2273                 res |= MAY_READ;
2274         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2275                 res |= MAY_WRITE;
2276         if (flags & MDS_FMODE_EXEC)
2277                 res = MAY_EXEC;
2278         return res;
2279 }
2280
2281 static int mdd_open_sanity_check(const struct lu_env *env,
2282                                  struct mdd_object *obj, int flag)
2283 {
2284         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2285         int mode, rc;
2286         ENTRY;
2287
2288         /* EEXIST check */
2289         if (mdd_is_dead_obj(obj))
2290                 RETURN(-ENOENT);
2291
2292         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
2293         if (rc)
2294                RETURN(rc);
2295
2296         if (S_ISLNK(tmp_la->la_mode))
2297                 RETURN(-ELOOP);
2298
2299         mode = accmode(env, tmp_la, flag);
2300
2301         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2302                 RETURN(-EISDIR);
2303
2304         if (!(flag & MDS_OPEN_CREATED)) {
2305                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
2306                 if (rc)
2307                         RETURN(rc);
2308         }
2309
2310         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2311             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2312                 flag &= ~MDS_OPEN_TRUNC;
2313
2314         /* For writing append-only file must open it with append mode. */
2315         if (mdd_is_append(obj)) {
2316                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2317                         RETURN(-EPERM);
2318                 if (flag & MDS_OPEN_TRUNC)
2319                         RETURN(-EPERM);
2320         }
2321
2322 #if 0
2323         /*
2324          * Now, flag -- O_NOATIME does not be packed by client.
2325          */
2326         if (flag & O_NOATIME) {
2327                 struct md_ucred *uc = md_ucred(env);
2328
2329                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2330                     (uc->mu_valid == UCRED_NEW)) &&
2331                     (uc->mu_fsuid != tmp_la->la_uid) &&
2332                     !mdd_capable(uc, CFS_CAP_FOWNER))
2333                         RETURN(-EPERM);
2334         }
2335 #endif
2336
2337         RETURN(0);
2338 }
2339
2340 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2341                     int flags)
2342 {
2343         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2344         int rc = 0;
2345
2346         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2347
2348         rc = mdd_open_sanity_check(env, mdd_obj, flags);
2349         if (rc == 0)
2350                 mdd_obj->mod_count++;
2351
2352         mdd_write_unlock(env, mdd_obj);
2353         return rc;
2354 }
2355
2356 int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
2357                             struct md_attr *ma, struct thandle *handle)
2358 {
2359         int rc;
2360
2361         rc = mdd_declare_unlink_log(env, obj, ma, handle);
2362         if (rc)
2363                 return rc;
2364
2365         return mdo_declare_destroy(env, obj, handle);
2366 }
2367
2368 /* return md_attr back,
2369  * if it is last unlink then return lov ea + llog cookie*/
2370 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2371                     struct md_attr *ma, struct thandle *handle)
2372 {
2373         int rc = 0;
2374         ENTRY;
2375
2376         if (S_ISREG(mdd_object_type(obj))) {
2377                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2378                  * Caller must be ready for that. */
2379                 rc = __mdd_lmm_get(env, obj, ma);
2380                 if ((ma->ma_valid & MA_LOV))
2381                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2382                                             obj, ma);
2383         }
2384
2385         if (rc == 0)
2386                 rc = mdo_destroy(env, obj, handle);
2387
2388         RETURN(rc);
2389 }
2390
2391 static int mdd_declare_close(const struct lu_env *env,
2392                              struct mdd_object *obj,
2393                              struct md_attr *ma,
2394                              struct thandle *handle)
2395 {
2396         int rc;
2397
2398         rc = orph_declare_index_delete(env, obj, handle);
2399         if (rc)
2400                 return rc;
2401
2402         return mdd_declare_object_kill(env, obj, ma, handle);
2403 }
2404
2405 /*
2406  * No permission check is needed.
2407  */
2408 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2409                      struct md_attr *ma, int mode)
2410 {
2411         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2412         struct mdd_device *mdd = mdo2mdd(obj);
2413         struct thandle    *handle = NULL;
2414         int rc;
2415         int is_orphan = 0, reset = 1;
2416
2417 #ifdef HAVE_QUOTA_SUPPORT
2418         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2419         struct mds_obd *mds = &obd->u.mds;
2420         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2421         int quota_opc = 0;
2422 #endif
2423         ENTRY;
2424
2425         if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2426                 mdd_obj->mod_count--;
2427
2428                 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2429                         CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2430                                "list\n", PFID(mdd_object_fid(mdd_obj)));
2431                 RETURN(0);
2432         }
2433
2434         /* check without any lock */
2435         if (mdd_obj->mod_count == 1 &&
2436             (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2437  again:
2438                 handle = mdd_trans_create(env, mdo2mdd(obj));
2439                 if (IS_ERR(handle))
2440                         RETURN(PTR_ERR(handle));
2441
2442                 rc = mdd_declare_close(env, mdd_obj, ma, handle);
2443                 if (rc)
2444                         GOTO(stop, rc);
2445
2446                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
2447                 if (rc)
2448                         GOTO(stop, rc);
2449
2450                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2451                 if (rc)
2452                         GOTO(stop, rc);
2453         }
2454
2455         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2456         if (handle == NULL && mdd_obj->mod_count == 1 &&
2457             (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2458                 mdd_write_unlock(env, mdd_obj);
2459                 goto again;
2460         }
2461
2462         /* release open count */
2463         mdd_obj->mod_count --;
2464
2465         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2466                 /* remove link to object from orphan index */
2467                 LASSERT(handle != NULL);
2468                 rc = __mdd_orphan_del(env, mdd_obj, handle);
2469                 if (rc == 0) {
2470                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2471                                "list, OSS objects to be destroyed.\n",
2472                                PFID(mdd_object_fid(mdd_obj)));
2473                         is_orphan = 1;
2474                 } else {
2475                         CERROR("Object "DFID" can not be deleted from orphan "
2476                                 "list, maybe cause OST objects can not be "
2477                                 "destroyed (err: %d).\n",
2478                                 PFID(mdd_object_fid(mdd_obj)), rc);
2479                         /* If object was not deleted from orphan list, do not
2480                          * destroy OSS objects, which will be done when next
2481                          * recovery. */
2482                         GOTO(out, rc);
2483                 }
2484         }
2485
2486         rc = mdd_iattr_get(env, mdd_obj, ma);
2487         /* Object maybe not in orphan list originally, it is rare case for
2488          * mdd_finish_unlink() failure. */
2489         if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_orphan)) {
2490 #ifdef HAVE_QUOTA_SUPPORT
2491                 if (mds->mds_quota) {
2492                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2493                         mdd_quota_wrapper(&ma->ma_attr, qids);
2494                 }
2495 #endif
2496                 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2497                 if (ma->ma_valid & MA_FLAGS &&
2498                     ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2499                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2500                 } else {
2501                         if (handle == NULL) {
2502                                 handle = mdd_trans_create(env, mdo2mdd(obj));
2503                                 if (IS_ERR(handle))
2504                                         GOTO(out, rc = PTR_ERR(handle));
2505
2506                                 rc = mdd_declare_object_kill(env, mdd_obj, ma,
2507                                                              handle);
2508                                 if (rc)
2509                                         GOTO(out, rc);
2510
2511                                 rc = mdd_declare_changelog_store(env, mdd,
2512                                                                  NULL, handle);
2513                                 if (rc)
2514                                         GOTO(stop, rc);
2515
2516                                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2517                                 if (rc)
2518                                         GOTO(out, rc);
2519                         }
2520
2521                         rc = mdd_object_kill(env, mdd_obj, ma, handle);
2522                         if (rc == 0)
2523                                 reset = 0;
2524                 }
2525
2526                 if (rc != 0)
2527                         CERROR("Error when prepare to delete Object "DFID" , "
2528                                "which will cause OST objects can not be "
2529                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
2530         }
2531         EXIT;
2532
2533 out:
2534         if (reset)
2535                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2536
2537         mdd_write_unlock(env, mdd_obj);
2538
2539         if (rc == 0 &&
2540             (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
2541             !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
2542                 if (handle == NULL) {
2543                         handle = mdd_trans_create(env, mdo2mdd(obj));
2544                         if (IS_ERR(handle))
2545                                 GOTO(stop, rc = IS_ERR(handle));
2546
2547                         rc = mdd_declare_changelog_store(env, mdd, NULL,
2548                                                          handle);
2549                         if (rc)
2550                                 GOTO(stop, rc);
2551
2552                         rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2553                         if (rc)
2554                                 GOTO(stop, rc);
2555                 }
2556
2557                 mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
2558                                          mdd_obj, handle);
2559         }
2560
2561 stop:
2562         if (handle != NULL)
2563                 mdd_trans_stop(env, mdd, rc, handle);
2564 #ifdef HAVE_QUOTA_SUPPORT
2565         if (quota_opc)
2566                 /* Trigger dqrel on the owner of child. If failed,
2567                  * the next call for lquota_chkquota will process it */
2568                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2569                               quota_opc);
2570 #endif
2571         return rc;
2572 }
2573
2574 /*
2575  * Permission check is done when open,
2576  * no need check again.
2577  */
2578 static int mdd_readpage_sanity_check(const struct lu_env *env,
2579                                      struct mdd_object *obj)
2580 {
2581         struct dt_object *next = mdd_object_child(obj);
2582         int rc;
2583         ENTRY;
2584
2585         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2586                 rc = 0;
2587         else
2588                 rc = -ENOTDIR;
2589
2590         RETURN(rc);
2591 }
2592
2593 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2594                               struct lu_dirpage *dp, int nob,
2595                               const struct dt_it_ops *iops, struct dt_it *it,
2596                               __u32 attr)
2597 {
2598         void                   *area = dp;
2599         int                     result;
2600         __u64                   hash = 0;
2601         struct lu_dirent       *ent;
2602         struct lu_dirent       *last = NULL;
2603         int                     first = 1;
2604
2605         memset(area, 0, sizeof (*dp));
2606         area += sizeof (*dp);
2607         nob  -= sizeof (*dp);
2608
2609         ent  = area;
2610         do {
2611                 int    len;
2612                 int    recsize;
2613
2614                 len = iops->key_size(env, it);
2615
2616                 /* IAM iterator can return record with zero len. */
2617                 if (len == 0)
2618                         goto next;
2619
2620                 hash = iops->store(env, it);
2621                 if (unlikely(first)) {
2622                         first = 0;
2623                         dp->ldp_hash_start = cpu_to_le64(hash);
2624                 }
2625
2626                 /* calculate max space required for lu_dirent */
2627                 recsize = lu_dirent_calc_size(len, attr);
2628
2629                 if (nob >= recsize) {
2630                         result = iops->rec(env, it, (struct dt_rec *)ent, attr);
2631                         if (result == -ESTALE)
2632                                 goto next;
2633                         if (result != 0)
2634                                 goto out;
2635
2636                         /* osd might not able to pack all attributes,
2637                          * so recheck rec length */
2638                         recsize = le16_to_cpu(ent->lde_reclen);
2639                 } else {
2640                         result = (last != NULL) ? 0 :-EINVAL;
2641                         goto out;
2642                 }
2643                 last = ent;
2644                 ent = (void *)ent + recsize;
2645                 nob -= recsize;
2646
2647 next:
2648                 result = iops->next(env, it);
2649                 if (result == -ESTALE)
2650                         goto next;
2651         } while (result == 0);
2652
2653 out:
2654         dp->ldp_hash_end = cpu_to_le64(hash);
2655         if (last != NULL) {
2656                 if (last->lde_hash == dp->ldp_hash_end)
2657                         dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2658                 last->lde_reclen = 0; /* end mark */
2659         }
2660         return result;
2661 }
2662
2663 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2664                           const struct lu_rdpg *rdpg)
2665 {
2666         struct dt_it      *it;
2667         struct dt_object  *next = mdd_object_child(obj);
2668         const struct dt_it_ops  *iops;
2669         struct page       *pg;
2670         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2671         int i;
2672         int nlupgs = 0;
2673         int rc;
2674         int nob;
2675
2676         LASSERT(rdpg->rp_pages != NULL);
2677         LASSERT(next->do_index_ops != NULL);
2678
2679         if (rdpg->rp_count <= 0)
2680                 return -EFAULT;
2681
2682         /*
2683          * iterate through directory and fill pages from @rdpg
2684          */
2685         iops = &next->do_index_ops->dio_it;
2686         it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
2687         if (IS_ERR(it))
2688                 return PTR_ERR(it);
2689
2690         rc = iops->load(env, it, rdpg->rp_hash);
2691
2692         if (rc == 0) {
2693                 /*
2694                  * Iterator didn't find record with exactly the key requested.
2695                  *
2696                  * It is currently either
2697                  *
2698                  *     - positioned above record with key less than
2699                  *     requested---skip it.
2700                  *
2701                  *     - or not positioned at all (is in IAM_IT_SKEWED
2702                  *     state)---position it on the next item.
2703                  */
2704                 rc = iops->next(env, it);
2705         } else if (rc > 0)
2706                 rc = 0;
2707
2708         /*
2709          * At this point and across for-loop:
2710          *
2711          *  rc == 0 -> ok, proceed.
2712          *  rc >  0 -> end of directory.
2713          *  rc <  0 -> error.
2714          */
2715         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2716              i++, nob -= CFS_PAGE_SIZE) {
2717                 struct lu_dirpage *dp;
2718
2719                 LASSERT(i < rdpg->rp_npages);
2720                 pg = rdpg->rp_pages[i];
2721                 dp = cfs_kmap(pg);
2722 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2723 repeat:
2724 #endif
2725                 rc = mdd_dir_page_build(env, mdd, dp,
2726                                         min_t(int, nob, LU_PAGE_SIZE),
2727                                         iops, it, rdpg->rp_attrs);
2728                 if (rc > 0) {
2729                         /*
2730                          * end of directory.
2731                          */
2732                         dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2733                         nlupgs++;
2734                 } else if (rc < 0) {
2735                         CWARN("build page failed: %d!\n", rc);
2736                 } else {
2737                         nlupgs++;
2738 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2739                         dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2740                         if ((unsigned long)dp & ~CFS_PAGE_MASK)
2741                                 goto repeat;
2742 #endif
2743                 }
2744                 cfs_kunmap(pg);
2745         }
2746         if (rc >= 0) {
2747                 struct lu_dirpage *dp;
2748
2749                 dp = cfs_kmap(rdpg->rp_pages[0]);
2750                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2751                 if (nlupgs == 0) {
2752                         /*
2753                          * No pages were processed, mark this for first page
2754                          * and send back.
2755                          */
2756                         dp->ldp_flags  = cpu_to_le32(LDF_EMPTY);
2757                         nlupgs = 1;
2758                 }
2759                 cfs_kunmap(rdpg->rp_pages[0]);
2760
2761                 rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
2762         }
2763         iops->put(env, it);
2764         iops->fini(env, it);
2765
2766         return rc;
2767 }
2768
2769 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2770                  const struct lu_rdpg *rdpg)
2771 {
2772         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2773         int rc;
2774         ENTRY;
2775
2776         if (mdd_object_exists(mdd_obj) == 0) {
2777                 CERROR("%s: object "DFID" not found: rc = -2\n",
2778                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2779                 return -ENOENT;
2780         }
2781
2782         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2783         rc = mdd_readpage_sanity_check(env, mdd_obj);
2784         if (rc)
2785                 GOTO(out_unlock, rc);
2786
2787         if (mdd_is_dead_obj(mdd_obj)) {
2788                 struct page *pg;
2789                 struct lu_dirpage *dp;
2790
2791                 /*
2792                  * According to POSIX, please do not return any entry to client:
2793                  * even dot and dotdot should not be returned.
2794                  */
2795                 CWARN("readdir from dead object: "DFID"\n",
2796                         PFID(mdd_object_fid(mdd_obj)));
2797
2798                 if (rdpg->rp_count <= 0)
2799                         GOTO(out_unlock, rc = -EFAULT);
2800                 LASSERT(rdpg->rp_pages != NULL);
2801
2802                 pg = rdpg->rp_pages[0];
2803                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2804                 memset(dp, 0 , sizeof(struct lu_dirpage));
2805                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2806                 dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
2807                 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2808                 cfs_kunmap(pg);
2809                 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2810         }
2811
2812         rc = __mdd_readpage(env, mdd_obj, rdpg);
2813
2814         EXIT;
2815 out_unlock:
2816         mdd_read_unlock(env, mdd_obj);
2817         return rc;
2818 }
2819
2820 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2821 {
2822         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2823         struct dt_object *next;
2824
2825         if (mdd_object_exists(mdd_obj) == 0) {
2826                 CERROR("%s: object "DFID" not found: rc = -2\n",
2827                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2828                 return -ENOENT;
2829         }
2830         next = mdd_object_child(mdd_obj);
2831         return next->do_ops->do_object_sync(env, next);
2832 }
2833
2834 const struct md_object_operations mdd_obj_ops = {
2835         .moo_permission    = mdd_permission,
2836         .moo_attr_get      = mdd_attr_get,
2837         .moo_attr_set      = mdd_attr_set,
2838         .moo_xattr_get     = mdd_xattr_get,
2839         .moo_xattr_set     = mdd_xattr_set,
2840         .moo_xattr_list    = mdd_xattr_list,
2841         .moo_xattr_del     = mdd_xattr_del,
2842         .moo_object_create = mdd_object_create,
2843         .moo_ref_add       = mdd_ref_add,
2844         .moo_ref_del       = mdd_ref_del,
2845         .moo_open          = mdd_open,
2846         .moo_close         = mdd_close,
2847         .moo_readpage      = mdd_readpage,
2848         .moo_readlink      = mdd_readlink,
2849         .moo_changelog     = mdd_changelog,
2850         .moo_capa_get      = mdd_capa_get,
2851         .moo_object_sync   = mdd_object_sync,
2852         .moo_path          = mdd_path,
2853         .moo_file_lock     = mdd_file_lock,
2854         .moo_file_unlock   = mdd_file_unlock,
2855 };