Whamcloud - gitweb
LU-1304 mdd: use slab for mdd objects
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_object.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <linux/module.h>
46 #include <obd.h>
47 #include <obd_class.h>
48 #include <obd_support.h>
49 #include <lprocfs_status.h>
50 /* fid_be_cpu(), fid_cpu_to_be(). */
51 #include <lustre_fid.h>
52 #include <obd_lov.h>
53
54 #include <lustre_param.h>
55 #include <lustre_mds.h>
56 #include <lustre/lustre_idl.h>
57
58 #include "mdd_internal.h"
59
60 static const struct lu_object_operations mdd_lu_obj_ops;
61 extern cfs_mem_cache_t *mdd_object_kmem;
62
63 static int mdd_xattr_get(const struct lu_env *env,
64                          struct md_object *obj, struct lu_buf *buf,
65                          const char *name);
66
67 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
68                  void **data)
69 {
70         if (mdd_object_exists(obj) == 0) {
71                 CERROR("%s: object "DFID" not found: rc = -2\n",
72                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
73                 return -ENOENT;
74         }
75         mdo_data_get(env, obj, data);
76         return 0;
77 }
78
79 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
80                struct lu_attr *la, struct lustre_capa *capa)
81 {
82         if (mdd_object_exists(obj) == 0) {
83                 CERROR("%s: object "DFID" not found: rc = -2\n",
84                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
85                 return -ENOENT;
86         }
87         return mdo_attr_get(env, obj, la, capa);
88 }
89
90 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
91 {
92         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
93
94         if (flags & LUSTRE_APPEND_FL)
95                 obj->mod_flags |= APPEND_OBJ;
96
97         if (flags & LUSTRE_IMMUTABLE_FL)
98                 obj->mod_flags |= IMMUTE_OBJ;
99 }
100
101 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
102 {
103         struct mdd_thread_info *info;
104
105         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
106         LASSERT(info != NULL);
107         return info;
108 }
109
110 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
111 {
112         struct lu_buf *buf;
113
114         buf = &mdd_env_info(env)->mti_buf;
115         buf->lb_buf = area;
116         buf->lb_len = len;
117         return buf;
118 }
119
120 void mdd_buf_put(struct lu_buf *buf)
121 {
122         if (buf == NULL || buf->lb_buf == NULL)
123                 return;
124         OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
125         buf->lb_buf = NULL;
126         buf->lb_len = 0;
127 }
128
129 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
130                                        const void *area, ssize_t len)
131 {
132         struct lu_buf *buf;
133
134         buf = &mdd_env_info(env)->mti_buf;
135         buf->lb_buf = (void *)area;
136         buf->lb_len = len;
137         return buf;
138 }
139
140 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
141 {
142         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
143
144         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
145                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
146                 buf->lb_buf = NULL;
147         }
148         if (buf->lb_buf == NULL) {
149                 buf->lb_len = len;
150                 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
151                 if (buf->lb_buf == NULL)
152                         buf->lb_len = 0;
153         }
154         return buf;
155 }
156
157 /** Increase the size of the \a mti_big_buf.
158  * preserves old data in buffer
159  * old buffer remains unchanged on error
160  * \retval 0 or -ENOMEM
161  */
162 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
163 {
164         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
165         struct lu_buf buf;
166
167         LASSERT(len >= oldbuf->lb_len);
168         OBD_ALLOC_LARGE(buf.lb_buf, len);
169
170         if (buf.lb_buf == NULL)
171                 return -ENOMEM;
172
173         buf.lb_len = len;
174         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
175
176         OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
177
178         memcpy(oldbuf, &buf, sizeof(buf));
179
180         return 0;
181 }
182
183 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
184                                        struct mdd_device *mdd)
185 {
186         struct mdd_thread_info *mti = mdd_env_info(env);
187         int                     max_cookie_size;
188
189         max_cookie_size = mdd_lov_cookiesize(env, mdd);
190         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
191                 if (mti->mti_max_cookie)
192                         OBD_FREE_LARGE(mti->mti_max_cookie,
193                                        mti->mti_max_cookie_size);
194                 mti->mti_max_cookie = NULL;
195                 mti->mti_max_cookie_size = 0;
196         }
197         if (unlikely(mti->mti_max_cookie == NULL)) {
198                 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
199                 if (likely(mti->mti_max_cookie != NULL))
200                         mti->mti_max_cookie_size = max_cookie_size;
201         }
202         if (likely(mti->mti_max_cookie != NULL))
203                 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
204         return mti->mti_max_cookie;
205 }
206
207 struct lov_mds_md *mdd_max_lmm_buffer(const struct lu_env *env, int size)
208 {
209         struct mdd_thread_info *mti = mdd_env_info(env);
210
211         if (unlikely(mti->mti_max_lmm_size < size)) {
212                 int rsize = size_roundup_power2(size);
213
214                 if (mti->mti_max_lmm_size > 0) {
215                         LASSERT(mti->mti_max_lmm);
216                         OBD_FREE_LARGE(mti->mti_max_lmm,
217                                        mti->mti_max_lmm_size);
218                         mti->mti_max_lmm = NULL;
219                         mti->mti_max_lmm_size = 0;
220                 }
221
222                 OBD_ALLOC_LARGE(mti->mti_max_lmm, rsize);
223                 if (likely(mti->mti_max_lmm != NULL))
224                         mti->mti_max_lmm_size = rsize;
225         }
226         return mti->mti_max_lmm;
227 }
228
229 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
230                                    struct mdd_device *mdd)
231 {
232         int max_lmm_size;
233
234         max_lmm_size = mdd_lov_mdsize(env, mdd);
235         return mdd_max_lmm_buffer(env, max_lmm_size);
236 }
237
238 struct lu_object *mdd_object_alloc(const struct lu_env *env,
239                                    const struct lu_object_header *hdr,
240                                    struct lu_device *d)
241 {
242         struct mdd_object *mdd_obj;
243
244         OBD_SLAB_ALLOC_PTR_GFP(mdd_obj, mdd_object_kmem, CFS_ALLOC_IO);
245         if (mdd_obj != NULL) {
246                 struct lu_object *o;
247
248                 o = mdd2lu_obj(mdd_obj);
249                 lu_object_init(o, NULL, d);
250                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
251                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
252                 mdd_obj->mod_count = 0;
253                 o->lo_ops = &mdd_lu_obj_ops;
254                 return o;
255         } else {
256                 return NULL;
257         }
258 }
259
260 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
261                            const struct lu_object_conf *unused)
262 {
263         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
264         struct mdd_object *mdd_obj = lu2mdd_obj(o);
265         struct lu_object  *below;
266         struct lu_device  *under;
267         ENTRY;
268
269         mdd_obj->mod_cltime = 0;
270         under = &d->mdd_child->dd_lu_dev;
271         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
272         mdd_pdlock_init(mdd_obj);
273         if (below == NULL)
274                 RETURN(-ENOMEM);
275
276         lu_object_add(o, below);
277
278         RETURN(0);
279 }
280
281 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
282 {
283         if (lu_object_exists(o))
284                 return mdd_get_flags(env, lu2mdd_obj(o));
285         else
286                 return 0;
287 }
288
289 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
290 {
291         struct mdd_object *mdd = lu2mdd_obj(o);
292
293         lu_object_fini(o);
294         OBD_SLAB_FREE_PTR(mdd, mdd_object_kmem);
295 }
296
297 static int mdd_object_print(const struct lu_env *env, void *cookie,
298                             lu_printer_t p, const struct lu_object *o)
299 {
300         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
301         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
302                     "valid=%x, cltime="LPU64", flags=%lx)",
303                     mdd, mdd->mod_count, mdd->mod_valid,
304                     mdd->mod_cltime, mdd->mod_flags);
305 }
306
307 static const struct lu_object_operations mdd_lu_obj_ops = {
308         .loo_object_init    = mdd_object_init,
309         .loo_object_start   = mdd_object_start,
310         .loo_object_free    = mdd_object_free,
311         .loo_object_print   = mdd_object_print,
312 };
313
314 struct mdd_object *mdd_object_find(const struct lu_env *env,
315                                    struct mdd_device *d,
316                                    const struct lu_fid *f)
317 {
318         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
319 }
320
321 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
322                         const char *path, struct lu_fid *fid)
323 {
324         struct lu_buf *buf;
325         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
326         struct mdd_object *obj;
327         struct lu_name *lname = &mdd_env_info(env)->mti_name;
328         char *name;
329         int rc = 0;
330         ENTRY;
331
332         /* temp buffer for path element */
333         buf = mdd_buf_alloc(env, PATH_MAX);
334         if (buf->lb_buf == NULL)
335                 RETURN(-ENOMEM);
336
337         lname->ln_name = name = buf->lb_buf;
338         lname->ln_namelen = 0;
339         *f = mdd->mdd_root_fid;
340
341         while(1) {
342                 while (*path == '/')
343                         path++;
344                 if (*path == '\0')
345                         break;
346                 while (*path != '/' && *path != '\0') {
347                         *name = *path;
348                         path++;
349                         name++;
350                         lname->ln_namelen++;
351                 }
352
353                 *name = '\0';
354                 /* find obj corresponding to fid */
355                 obj = mdd_object_find(env, mdd, f);
356                 if (obj == NULL)
357                         GOTO(out, rc = -EREMOTE);
358                 if (IS_ERR(obj))
359                         GOTO(out, rc = PTR_ERR(obj));
360                 /* get child fid from parent and name */
361                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
362                 mdd_object_put(env, obj);
363                 if (rc)
364                         break;
365
366                 name = buf->lb_buf;
367                 lname->ln_namelen = 0;
368         }
369
370         if (!rc)
371                 *fid = *f;
372 out:
373         RETURN(rc);
374 }
375
376 /** The maximum depth that fid2path() will search.
377  * This is limited only because we want to store the fids for
378  * historical path lookup purposes.
379  */
380 #define MAX_PATH_DEPTH 100
381
382 /** mdd_path() lookup structure. */
383 struct path_lookup_info {
384         __u64                pli_recno;        /**< history point */
385         __u64                pli_currec;       /**< current record */
386         struct lu_fid        pli_fid;
387         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
388         struct mdd_object   *pli_mdd_obj;
389         char                *pli_path;         /**< full path */
390         int                  pli_pathlen;
391         int                  pli_linkno;       /**< which hardlink to follow */
392         int                  pli_fidcount;     /**< number of \a pli_fids */
393 };
394
395 static int mdd_path_current(const struct lu_env *env,
396                             struct path_lookup_info *pli)
397 {
398         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
399         struct mdd_object *mdd_obj;
400         struct lu_buf     *buf = NULL;
401         struct link_ea_header *leh;
402         struct link_ea_entry  *lee;
403         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
404         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
405         char *ptr;
406         int reclen;
407         int rc;
408         ENTRY;
409
410         ptr = pli->pli_path + pli->pli_pathlen - 1;
411         *ptr = 0;
412         --ptr;
413         pli->pli_fidcount = 0;
414         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
415
416         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
417                 mdd_obj = mdd_object_find(env, mdd,
418                                           &pli->pli_fids[pli->pli_fidcount]);
419                 if (mdd_obj == NULL)
420                         GOTO(out, rc = -EREMOTE);
421                 if (IS_ERR(mdd_obj))
422                         GOTO(out, rc = PTR_ERR(mdd_obj));
423                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
424                 if (rc <= 0) {
425                         mdd_object_put(env, mdd_obj);
426                         if (rc == -1)
427                                 rc = -EREMOTE;
428                         else if (rc == 0)
429                                 /* Do I need to error out here? */
430                                 rc = -ENOENT;
431                         GOTO(out, rc);
432                 }
433
434                 /* Get parent fid and object name */
435                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
436                 buf = mdd_links_get(env, mdd_obj);
437                 mdd_read_unlock(env, mdd_obj);
438                 mdd_object_put(env, mdd_obj);
439                 if (IS_ERR(buf))
440                         GOTO(out, rc = PTR_ERR(buf));
441
442                 leh = buf->lb_buf;
443                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
444                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
445
446                 /* If set, use link #linkno for path lookup, otherwise use
447                    link #0.  Only do this for the final path element. */
448                 if ((pli->pli_fidcount == 0) &&
449                     (pli->pli_linkno < leh->leh_reccount)) {
450                         int count;
451                         for (count = 0; count < pli->pli_linkno; count++) {
452                                 lee = (struct link_ea_entry *)
453                                      ((char *)lee + reclen);
454                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
455                         }
456                         if (pli->pli_linkno < leh->leh_reccount - 1)
457                                 /* indicate to user there are more links */
458                                 pli->pli_linkno++;
459                 }
460
461                 /* Pack the name in the end of the buffer */
462                 ptr -= tmpname->ln_namelen;
463                 if (ptr - 1 <= pli->pli_path)
464                         GOTO(out, rc = -EOVERFLOW);
465                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
466                 *(--ptr) = '/';
467
468                 /* Store the parent fid for historic lookup */
469                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
470                         GOTO(out, rc = -EOVERFLOW);
471                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
472         }
473
474         /* Verify that our path hasn't changed since we started the lookup.
475            Record the current index, and verify the path resolves to the
476            same fid. If it does, then the path is correct as of this index. */
477         cfs_spin_lock(&mdd->mdd_cl.mc_lock);
478         pli->pli_currec = mdd->mdd_cl.mc_index;
479         cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
480         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
481         if (rc) {
482                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
483                 GOTO (out, rc = -EAGAIN);
484         }
485         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
486                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
487                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
488                        PFID(&pli->pli_fid));
489                 GOTO(out, rc = -EAGAIN);
490         }
491         ptr++; /* skip leading / */
492         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
493
494         EXIT;
495 out:
496         if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
497                 /* if we vmalloced a large buffer drop it */
498                 mdd_buf_put(buf);
499
500         return rc;
501 }
502
503 static int mdd_path_historic(const struct lu_env *env,
504                              struct path_lookup_info *pli)
505 {
506         return 0;
507 }
508
509 /* Returns the full path to this fid, as of changelog record recno. */
510 static int mdd_path(const struct lu_env *env, struct md_object *obj,
511                     char *path, int pathlen, __u64 *recno, int *linkno)
512 {
513         struct path_lookup_info *pli;
514         int tries = 3;
515         int rc = -EAGAIN;
516         ENTRY;
517
518         if (pathlen < 3)
519                 RETURN(-EOVERFLOW);
520
521         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
522                 path[0] = '\0';
523                 RETURN(0);
524         }
525
526         OBD_ALLOC_PTR(pli);
527         if (pli == NULL)
528                 RETURN(-ENOMEM);
529
530         pli->pli_mdd_obj = md2mdd_obj(obj);
531         pli->pli_recno = *recno;
532         pli->pli_path = path;
533         pli->pli_pathlen = pathlen;
534         pli->pli_linkno = *linkno;
535
536         /* Retry multiple times in case file is being moved */
537         while (tries-- && rc == -EAGAIN)
538                 rc = mdd_path_current(env, pli);
539
540         /* For historical path lookup, the current links may not have existed
541          * at "recno" time.  We must switch over to earlier links/parents
542          * by using the changelog records.  If the earlier parent doesn't
543          * exist, we must search back through the changelog to reconstruct
544          * its parents, then check if it exists, etc.
545          * We may ignore this problem for the initial implementation and
546          * state that an "original" hardlink must still exist for us to find
547          * historic path name. */
548         if (pli->pli_recno != -1) {
549                 rc = mdd_path_historic(env, pli);
550         } else {
551                 *recno = pli->pli_currec;
552                 /* Return next link index to caller */
553                 *linkno = pli->pli_linkno;
554         }
555
556         OBD_FREE_PTR(pli);
557
558         RETURN (rc);
559 }
560
561 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
562 {
563         struct lu_attr *la = &mdd_env_info(env)->mti_la;
564         int rc;
565
566         ENTRY;
567         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
568         if (rc == 0) {
569                 mdd_flags_xlate(obj, la->la_flags);
570         }
571         RETURN(rc);
572 }
573
574 /* get only inode attributes */
575 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
576                   struct md_attr *ma)
577 {
578         int rc = 0;
579         ENTRY;
580
581         if (ma->ma_valid & MA_INODE)
582                 RETURN(0);
583
584         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
585                           mdd_object_capa(env, mdd_obj));
586         if (rc == 0)
587                 ma->ma_valid |= MA_INODE;
588         RETURN(rc);
589 }
590
591 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
592 {
593         struct lov_desc *ldesc;
594         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
595         struct lov_user_md *lum = (struct lov_user_md*)lmm;
596         ENTRY;
597
598         if (!lum)
599                 RETURN(0);
600
601         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
602         LASSERT(ldesc != NULL);
603
604         lum->lmm_magic = LOV_MAGIC_V1;
605         lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
606         lum->lmm_pattern = ldesc->ld_pattern;
607         lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
608         lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
609         lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
610
611         RETURN(sizeof(*lum));
612 }
613
614 static int is_rootdir(struct mdd_object *mdd_obj)
615 {
616         const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
617         const struct lu_fid *fid = mdo2fid(mdd_obj);
618
619         return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
620 }
621
622 int mdd_big_lmm_get(const struct lu_env *env, struct mdd_object *obj,
623                     struct md_attr *ma)
624 {
625         struct mdd_thread_info *info = mdd_env_info(env);
626         int                     size;
627         int                     rc   = -EINVAL;
628         ENTRY;
629
630         LASSERT(info != NULL);
631         LASSERT(ma->ma_big_lmm_used == 0);
632
633         if (ma->ma_lmm_size == 0) {
634                 CERROR("No buffer to hold %s xattr of object "DFID"\n",
635                        XATTR_NAME_LOV, PFID(mdd_object_fid(obj)));
636                 RETURN(rc);
637         }
638
639         rc = mdo_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
640                            mdd_object_capa(env, obj));
641         if (rc < 0)
642                 RETURN(rc);
643
644         /* big_lmm may need to grow */
645         size = rc;
646         mdd_max_lmm_buffer(env, size);
647         if (info->mti_max_lmm == NULL)
648                 RETURN(-ENOMEM);
649
650         LASSERT(info->mti_max_lmm_size >= size);
651         rc = mdd_get_md(env, obj, info->mti_max_lmm, &size,
652                         XATTR_NAME_LOV);
653         if (rc < 0)
654                 RETURN(rc);
655
656         ma->ma_big_lmm_used = 1;
657         ma->ma_valid |= MA_LOV;
658         ma->ma_lmm = info->mti_max_lmm;
659         ma->ma_lmm_size = size;
660         LASSERT(size == rc);
661         RETURN(rc);
662 }
663
664 /* get lov EA only */
665 static int __mdd_lmm_get(const struct lu_env *env,
666                          struct mdd_object *mdd_obj, struct md_attr *ma)
667 {
668         int rc;
669         ENTRY;
670
671         if (ma->ma_valid & MA_LOV)
672                 RETURN(0);
673
674         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
675                         XATTR_NAME_LOV);
676         if (rc == -ERANGE)
677                 rc = mdd_big_lmm_get(env, mdd_obj, ma);
678         else if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
679                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
680
681         if (rc > 0) {
682                 ma->ma_lmm_size = rc;
683                 ma->ma_layout_gen = ma->ma_lmm->lmm_layout_gen;
684                 ma->ma_valid |= MA_LOV | MA_LAY_GEN;
685                 rc = 0;
686         }
687         RETURN(rc);
688 }
689
690 /* get the first parent fid from link EA */
691 static int mdd_pfid_get(const struct lu_env *env,
692                         struct mdd_object *mdd_obj, struct md_attr *ma)
693 {
694         struct lu_buf *buf;
695         struct link_ea_header *leh;
696         struct link_ea_entry *lee;
697         struct lu_fid *pfid = &ma->ma_pfid;
698         ENTRY;
699
700         if (ma->ma_valid & MA_PFID)
701                 RETURN(0);
702
703         buf = mdd_links_get(env, mdd_obj);
704         if (IS_ERR(buf))
705                 RETURN(PTR_ERR(buf));
706
707         leh = buf->lb_buf;
708         lee = (struct link_ea_entry *)(leh + 1);
709         memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
710         fid_be_to_cpu(pfid, pfid);
711         ma->ma_valid |= MA_PFID;
712         if (buf->lb_len > OBD_ALLOC_BIG)
713                 /* if we vmalloced a large buffer drop it */
714                 mdd_buf_put(buf);
715         RETURN(0);
716 }
717
718 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
719                        struct md_attr *ma)
720 {
721         int rc;
722         ENTRY;
723
724         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
725         rc = __mdd_lmm_get(env, mdd_obj, ma);
726         mdd_read_unlock(env, mdd_obj);
727         RETURN(rc);
728 }
729
730 /* get lmv EA only*/
731 static int __mdd_lmv_get(const struct lu_env *env,
732                          struct mdd_object *mdd_obj, struct md_attr *ma)
733 {
734         int rc;
735         ENTRY;
736
737         if (ma->ma_valid & MA_LMV)
738                 RETURN(0);
739
740         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
741                         XATTR_NAME_LMV);
742         if (rc > 0) {
743                 ma->ma_valid |= MA_LMV;
744                 rc = 0;
745         }
746         RETURN(rc);
747 }
748
749 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
750                          struct md_attr *ma)
751 {
752         struct mdd_thread_info *info = mdd_env_info(env);
753         struct lustre_mdt_attrs *lma =
754                                  (struct lustre_mdt_attrs *)info->mti_xattr_buf;
755         int lma_size;
756         int rc;
757         ENTRY;
758
759         /* If all needed data are already valid, nothing to do */
760         if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
761             (ma->ma_need & (MA_HSM | MA_SOM)))
762                 RETURN(0);
763
764         /* Read LMA from disk EA */
765         lma_size = sizeof(info->mti_xattr_buf);
766         rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
767         if (rc <= 0)
768                 RETURN(rc);
769
770         /* Useless to check LMA incompatibility because this is already done in
771          * osd_ea_fid_get(), and this will fail long before this code is
772          * called.
773          * So, if we are here, LMA is compatible.
774          */
775
776         lustre_lma_swab(lma);
777
778         /* Swab and copy LMA */
779         if (ma->ma_need & MA_HSM) {
780                 if (lma->lma_compat & LMAC_HSM)
781                         ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
782                 else
783                         ma->ma_hsm.mh_flags = 0;
784                 ma->ma_valid |= MA_HSM;
785         }
786
787         /* Copy SOM */
788         if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
789                 LASSERT(ma->ma_som != NULL);
790                 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
791                 ma->ma_som->msd_size    = lma->lma_som_size;
792                 ma->ma_som->msd_blocks  = lma->lma_som_blocks;
793                 ma->ma_som->msd_mountid = lma->lma_som_mountid;
794                 ma->ma_valid |= MA_SOM;
795         }
796
797         RETURN(0);
798 }
799
800 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
801                                  struct md_attr *ma)
802 {
803         int rc = 0;
804         ENTRY;
805
806         if (ma->ma_need & MA_INODE)
807                 rc = mdd_iattr_get(env, mdd_obj, ma);
808
809         if (rc == 0 && ma->ma_need & MA_LOV) {
810                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
811                     S_ISDIR(mdd_object_type(mdd_obj)))
812                         rc = __mdd_lmm_get(env, mdd_obj, ma);
813         }
814         if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
815                 if (S_ISREG(mdd_object_type(mdd_obj)))
816                         rc = mdd_pfid_get(env, mdd_obj, ma);
817         }
818         if (rc == 0 && ma->ma_need & MA_LMV) {
819                 if (S_ISDIR(mdd_object_type(mdd_obj)))
820                         rc = __mdd_lmv_get(env, mdd_obj, ma);
821         }
822         if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
823                 if (S_ISREG(mdd_object_type(mdd_obj)))
824                         rc = __mdd_lma_get(env, mdd_obj, ma);
825         }
826 #ifdef CONFIG_FS_POSIX_ACL
827         if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
828                 if (S_ISDIR(mdd_object_type(mdd_obj)))
829                         rc = mdd_def_acl_get(env, mdd_obj, ma);
830         }
831 #endif
832         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
833                rc, ma->ma_valid, ma->ma_lmm);
834         RETURN(rc);
835 }
836
837 int mdd_attr_get_internal_locked(const struct lu_env *env,
838                                  struct mdd_object *mdd_obj, struct md_attr *ma)
839 {
840         int rc;
841         int needlock = ma->ma_need &
842                        (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
843
844         if (needlock)
845                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
846         rc = mdd_attr_get_internal(env, mdd_obj, ma);
847         if (needlock)
848                 mdd_read_unlock(env, mdd_obj);
849         return rc;
850 }
851
852 /*
853  * No permission check is needed.
854  */
855 int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
856                  struct md_attr *ma)
857 {
858         struct mdd_object *mdd_obj = md2mdd_obj(obj);
859         int                rc;
860
861         ENTRY;
862         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
863         RETURN(rc);
864 }
865
866 /*
867  * No permission check is needed.
868  */
869 static int mdd_xattr_get(const struct lu_env *env,
870                          struct md_object *obj, struct lu_buf *buf,
871                          const char *name)
872 {
873         struct mdd_object *mdd_obj = md2mdd_obj(obj);
874         int rc;
875
876         ENTRY;
877
878         if (mdd_object_exists(mdd_obj) == 0) {
879                 CERROR("%s: object "DFID" not found: rc = -2\n",
880                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
881                 return -ENOENT;
882         }
883
884         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
885         rc = mdo_xattr_get(env, mdd_obj, buf, name,
886                            mdd_object_capa(env, mdd_obj));
887         mdd_read_unlock(env, mdd_obj);
888
889         RETURN(rc);
890 }
891
892 /*
893  * Permission check is done when open,
894  * no need check again.
895  */
896 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
897                         struct lu_buf *buf)
898 {
899         struct mdd_object *mdd_obj = md2mdd_obj(obj);
900         struct dt_object  *next;
901         loff_t             pos = 0;
902         int                rc;
903         ENTRY;
904
905         if (mdd_object_exists(mdd_obj) == 0) {
906                 CERROR("%s: object "DFID" not found: rc = -2\n",
907                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
908                 return -ENOENT;
909         }
910
911         next = mdd_object_child(mdd_obj);
912         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
913         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
914                                          mdd_object_capa(env, mdd_obj));
915         mdd_read_unlock(env, mdd_obj);
916         RETURN(rc);
917 }
918
919 /*
920  * No permission check is needed.
921  */
922 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
923                           struct lu_buf *buf)
924 {
925         struct mdd_object *mdd_obj = md2mdd_obj(obj);
926         int rc;
927
928         ENTRY;
929
930         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
931         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
932         mdd_read_unlock(env, mdd_obj);
933
934         RETURN(rc);
935 }
936
937 int mdd_declare_object_create_internal(const struct lu_env *env,
938                                        struct mdd_object *p,
939                                        struct mdd_object *c,
940                                        struct md_attr *ma,
941                                        struct thandle *handle,
942                                        const struct md_op_spec *spec)
943 {
944         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
945         const struct dt_index_features *feat = spec->sp_feat;
946         int rc;
947         ENTRY;
948
949         if (feat != &dt_directory_features && feat != NULL)
950                 dof->dof_type = DFT_INDEX;
951         else
952                 dof->dof_type = dt_mode_to_dft(ma->ma_attr.la_mode);
953
954         dof->u.dof_idx.di_feat = feat;
955
956         rc = mdo_declare_create_obj(env, c, &ma->ma_attr, NULL, dof, handle);
957
958         RETURN(rc);
959 }
960
961 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
962                                struct mdd_object *c, struct md_attr *ma,
963                                struct thandle *handle,
964                                const struct md_op_spec *spec)
965 {
966         struct lu_attr *attr = &ma->ma_attr;
967         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
968         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
969         const struct dt_index_features *feat = spec->sp_feat;
970         int rc;
971         ENTRY;
972
973         if (!mdd_object_exists(c)) {
974                 struct dt_object *next = mdd_object_child(c);
975                 LASSERT(next);
976
977                 if (feat != &dt_directory_features && feat != NULL)
978                         dof->dof_type = DFT_INDEX;
979                 else
980                         dof->dof_type = dt_mode_to_dft(attr->la_mode);
981
982                 dof->u.dof_idx.di_feat = feat;
983
984                 /* @hint will be initialized by underlying device. */
985                 next->do_ops->do_ah_init(env, hint,
986                                          p ? mdd_object_child(p) : NULL,
987                                          attr->la_mode & S_IFMT);
988
989                 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
990                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
991         } else
992                 rc = -EEXIST;
993
994         RETURN(rc);
995 }
996
997 /**
998  * Make sure the ctime is increased only.
999  */
1000 static inline int mdd_attr_check(const struct lu_env *env,
1001                                  struct mdd_object *obj,
1002                                  struct lu_attr *attr)
1003 {
1004         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1005         int rc;
1006         ENTRY;
1007
1008         if (attr->la_valid & LA_CTIME) {
1009                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1010                 if (rc)
1011                         RETURN(rc);
1012
1013                 if (attr->la_ctime < tmp_la->la_ctime)
1014                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
1015                 else if (attr->la_valid == LA_CTIME &&
1016                          attr->la_ctime == tmp_la->la_ctime)
1017                         attr->la_valid &= ~LA_CTIME;
1018         }
1019         RETURN(0);
1020 }
1021
1022 int mdd_attr_set_internal(const struct lu_env *env,
1023                           struct mdd_object *obj,
1024                           struct lu_attr *attr,
1025                           struct thandle *handle,
1026                           int needacl)
1027 {
1028         int rc;
1029         ENTRY;
1030
1031         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
1032 #ifdef CONFIG_FS_POSIX_ACL
1033         if (!rc && (attr->la_valid & LA_MODE) && needacl)
1034                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
1035 #endif
1036         RETURN(rc);
1037 }
1038
1039 int mdd_attr_check_set_internal(const struct lu_env *env,
1040                                 struct mdd_object *obj,
1041                                 struct lu_attr *attr,
1042                                 struct thandle *handle,
1043                                 int needacl)
1044 {
1045         int rc;
1046         ENTRY;
1047
1048         rc = mdd_attr_check(env, obj, attr);
1049         if (rc)
1050                 RETURN(rc);
1051
1052         if (attr->la_valid)
1053                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1054         RETURN(rc);
1055 }
1056
1057 static int mdd_attr_set_internal_locked(const struct lu_env *env,
1058                                         struct mdd_object *obj,
1059                                         struct lu_attr *attr,
1060                                         struct thandle *handle,
1061                                         int needacl)
1062 {
1063         int rc;
1064         ENTRY;
1065
1066         needacl = needacl && (attr->la_valid & LA_MODE);
1067         if (needacl)
1068                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1069         rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1070         if (needacl)
1071                 mdd_write_unlock(env, obj);
1072         RETURN(rc);
1073 }
1074
1075 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
1076                                        struct mdd_object *obj,
1077                                        struct lu_attr *attr,
1078                                        struct thandle *handle,
1079                                        int needacl)
1080 {
1081         int rc;
1082         ENTRY;
1083
1084         needacl = needacl && (attr->la_valid & LA_MODE);
1085         if (needacl)
1086                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1087         rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
1088         if (needacl)
1089                 mdd_write_unlock(env, obj);
1090         RETURN(rc);
1091 }
1092
1093 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
1094                     const struct lu_buf *buf, const char *name,
1095                     int fl, struct thandle *handle)
1096 {
1097         struct lustre_capa *capa = mdd_object_capa(env, obj);
1098         int rc = -EINVAL;
1099         ENTRY;
1100
1101         if (buf->lb_buf && buf->lb_len > 0)
1102                 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1103         else if (buf->lb_buf == NULL && buf->lb_len == 0)
1104                 rc = mdo_xattr_del(env, obj, name, handle, capa);
1105
1106         RETURN(rc);
1107 }
1108
1109 /*
1110  * This gives the same functionality as the code between
1111  * sys_chmod and inode_setattr
1112  * chown_common and inode_setattr
1113  * utimes and inode_setattr
1114  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1115  */
1116 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1117                         struct lu_attr *la, const struct md_attr *ma)
1118 {
1119         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
1120         struct md_ucred  *uc;
1121         int               rc;
1122         ENTRY;
1123
1124         if (!la->la_valid)
1125                 RETURN(0);
1126
1127         /* Do not permit change file type */
1128         if (la->la_valid & LA_TYPE)
1129                 RETURN(-EPERM);
1130
1131         /* They should not be processed by setattr */
1132         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1133                 RETURN(-EPERM);
1134
1135         /* export destroy does not have ->le_ses, but we may want
1136          * to drop LUSTRE_SOM_FL. */
1137         if (!env->le_ses)
1138                 RETURN(0);
1139
1140         uc = md_ucred(env);
1141
1142         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1143         if (rc)
1144                 RETURN(rc);
1145
1146         if (la->la_valid == LA_CTIME) {
1147                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1148                         /* This is only for set ctime when rename's source is
1149                          * on remote MDS. */
1150                         rc = mdd_may_delete(env, NULL, obj,
1151                                             (struct md_attr *)ma, 1, 0);
1152                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1153                         la->la_valid &= ~LA_CTIME;
1154                 RETURN(rc);
1155         }
1156
1157         if (la->la_valid == LA_ATIME) {
1158                 /* This is atime only set for read atime update on close. */
1159                 if (la->la_atime >= tmp_la->la_atime &&
1160                     la->la_atime < (tmp_la->la_atime +
1161                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1162                         la->la_valid &= ~LA_ATIME;
1163                 RETURN(0);
1164         }
1165
1166         /* Check if flags change. */
1167         if (la->la_valid & LA_FLAGS) {
1168                 unsigned int oldflags = 0;
1169                 unsigned int newflags = la->la_flags &
1170                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1171
1172                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1173                     !mdd_capable(uc, CFS_CAP_FOWNER))
1174                         RETURN(-EPERM);
1175
1176                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1177                  * only be changed by the relevant capability. */
1178                 if (mdd_is_immutable(obj))
1179                         oldflags |= LUSTRE_IMMUTABLE_FL;
1180                 if (mdd_is_append(obj))
1181                         oldflags |= LUSTRE_APPEND_FL;
1182                 if ((oldflags ^ newflags) &&
1183                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1184                         RETURN(-EPERM);
1185
1186                 if (!S_ISDIR(tmp_la->la_mode))
1187                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1188         }
1189
1190         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1191             (la->la_valid & ~LA_FLAGS) &&
1192             !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1193                 RETURN(-EPERM);
1194
1195         /* Check for setting the obj time. */
1196         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1197             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1198                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1199                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
1200                         rc = mdd_permission_internal_locked(env, obj, tmp_la,
1201                                                             MAY_WRITE,
1202                                                             MOR_TGT_CHILD);
1203                         if (rc)
1204                                 RETURN(rc);
1205                 }
1206         }
1207
1208         if (la->la_valid & LA_KILL_SUID) {
1209                 la->la_valid &= ~LA_KILL_SUID;
1210                 if ((tmp_la->la_mode & S_ISUID) &&
1211                     !(la->la_valid & LA_MODE)) {
1212                         la->la_mode = tmp_la->la_mode;
1213                         la->la_valid |= LA_MODE;
1214                 }
1215                 la->la_mode &= ~S_ISUID;
1216         }
1217
1218         if (la->la_valid & LA_KILL_SGID) {
1219                 la->la_valid &= ~LA_KILL_SGID;
1220                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1221                                         (S_ISGID | S_IXGRP)) &&
1222                     !(la->la_valid & LA_MODE)) {
1223                         la->la_mode = tmp_la->la_mode;
1224                         la->la_valid |= LA_MODE;
1225                 }
1226                 la->la_mode &= ~S_ISGID;
1227         }
1228
1229         /* Make sure a caller can chmod. */
1230         if (la->la_valid & LA_MODE) {
1231                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1232                     (uc->mu_fsuid != tmp_la->la_uid) &&
1233                     !mdd_capable(uc, CFS_CAP_FOWNER))
1234                         RETURN(-EPERM);
1235
1236                 if (la->la_mode == (cfs_umode_t) -1)
1237                         la->la_mode = tmp_la->la_mode;
1238                 else
1239                         la->la_mode = (la->la_mode & S_IALLUGO) |
1240                                       (tmp_la->la_mode & ~S_IALLUGO);
1241
1242                 /* Also check the setgid bit! */
1243                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1244                                        la->la_gid : tmp_la->la_gid) &&
1245                     !mdd_capable(uc, CFS_CAP_FSETID))
1246                         la->la_mode &= ~S_ISGID;
1247         } else {
1248                la->la_mode = tmp_la->la_mode;
1249         }
1250
1251         /* Make sure a caller can chown. */
1252         if (la->la_valid & LA_UID) {
1253                 if (la->la_uid == (uid_t) -1)
1254                         la->la_uid = tmp_la->la_uid;
1255                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1256                     (la->la_uid != tmp_la->la_uid)) &&
1257                     !mdd_capable(uc, CFS_CAP_CHOWN))
1258                         RETURN(-EPERM);
1259
1260                 /* If the user or group of a non-directory has been
1261                  * changed by a non-root user, remove the setuid bit.
1262                  * 19981026 David C Niemi <niemi@tux.org>
1263                  *
1264                  * Changed this to apply to all users, including root,
1265                  * to avoid some races. This is the behavior we had in
1266                  * 2.0. The check for non-root was definitely wrong
1267                  * for 2.2 anyway, as it should have been using
1268                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
1269                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1270                     !S_ISDIR(tmp_la->la_mode)) {
1271                         la->la_mode &= ~S_ISUID;
1272                         la->la_valid |= LA_MODE;
1273                 }
1274         }
1275
1276         /* Make sure caller can chgrp. */
1277         if (la->la_valid & LA_GID) {
1278                 if (la->la_gid == (gid_t) -1)
1279                         la->la_gid = tmp_la->la_gid;
1280                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1281                     ((la->la_gid != tmp_la->la_gid) &&
1282                     !lustre_in_group_p(uc, la->la_gid))) &&
1283                     !mdd_capable(uc, CFS_CAP_CHOWN))
1284                         RETURN(-EPERM);
1285
1286                 /* Likewise, if the user or group of a non-directory
1287                  * has been changed by a non-root user, remove the
1288                  * setgid bit UNLESS there is no group execute bit
1289                  * (this would be a file marked for mandatory
1290                  * locking).  19981026 David C Niemi <niemi@tux.org>
1291                  *
1292                  * Removed the fsuid check (see the comment above) --
1293                  * 19990830 SD. */
1294                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1295                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1296                         la->la_mode &= ~S_ISGID;
1297                         la->la_valid |= LA_MODE;
1298                 }
1299         }
1300
1301         /* For both Size-on-MDS case and truncate case,
1302          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1303          * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1304          * For SOM case, it is true, the MAY_WRITE perm has been checked
1305          * when open, no need check again. For truncate case, it is false,
1306          * the MAY_WRITE perm should be checked here. */
1307         if (ma->ma_attr_flags & MDS_SOM) {
1308                 /* For the "Size-on-MDS" setattr update, merge coming
1309                  * attributes with the set in the inode. BUG 10641 */
1310                 if ((la->la_valid & LA_ATIME) &&
1311                     (la->la_atime <= tmp_la->la_atime))
1312                         la->la_valid &= ~LA_ATIME;
1313
1314                 /* OST attributes do not have a priority over MDS attributes,
1315                  * so drop times if ctime is equal. */
1316                 if ((la->la_valid & LA_CTIME) &&
1317                     (la->la_ctime <= tmp_la->la_ctime))
1318                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1319         } else {
1320                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1321                         if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1322                               (uc->mu_fsuid == tmp_la->la_uid)) &&
1323                             !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1324                                 rc = mdd_permission_internal_locked(env, obj,
1325                                                             tmp_la, MAY_WRITE,
1326                                                             MOR_TGT_CHILD);
1327                                 if (rc)
1328                                         RETURN(rc);
1329                         }
1330                 }
1331                 if (la->la_valid & LA_CTIME) {
1332                         /* The pure setattr, it has the priority over what is
1333                          * already set, do not drop it if ctime is equal. */
1334                         if (la->la_ctime < tmp_la->la_ctime)
1335                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1336                                                   LA_CTIME);
1337                 }
1338         }
1339
1340         RETURN(0);
1341 }
1342
1343 /** Store a data change changelog record
1344  * If this fails, we must fail the whole transaction; we don't
1345  * want the change to commit without the log entry.
1346  * \param mdd_obj - mdd_object of change
1347  * \param handle - transacion handle
1348  */
1349 static int mdd_changelog_data_store(const struct lu_env     *env,
1350                                     struct mdd_device       *mdd,
1351                                     enum changelog_rec_type type,
1352                                     int                     flags,
1353                                     struct mdd_object       *mdd_obj,
1354                                     struct thandle          *handle)
1355 {
1356         const struct lu_fid *tfid = mdo2fid(mdd_obj);
1357         struct llog_changelog_rec *rec;
1358         struct thandle *th = NULL;
1359         struct lu_buf *buf;
1360         int reclen;
1361         int rc;
1362
1363         /* Not recording */
1364         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1365                 RETURN(0);
1366         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1367                 RETURN(0);
1368
1369         LASSERT(mdd_obj != NULL);
1370         LASSERT(handle != NULL);
1371
1372         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1373             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1374                 /* Don't need multiple updates in this log */
1375                 /* Don't check under lock - no big deal if we get an extra
1376                    entry */
1377                 RETURN(0);
1378         }
1379
1380         reclen = llog_data_len(sizeof(*rec));
1381         buf = mdd_buf_alloc(env, reclen);
1382         if (buf->lb_buf == NULL)
1383                 RETURN(-ENOMEM);
1384         rec = (struct llog_changelog_rec *)buf->lb_buf;
1385
1386         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1387         rec->cr.cr_type = (__u32)type;
1388         rec->cr.cr_tfid = *tfid;
1389         rec->cr.cr_namelen = 0;
1390         mdd_obj->mod_cltime = cfs_time_current_64();
1391
1392         rc = mdd_changelog_llog_write(mdd, rec, handle ? : th);
1393
1394         if (th)
1395                 mdd_trans_stop(env, mdd, rc, th);
1396
1397         if (rc < 0) {
1398                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1399                        rc, type, PFID(tfid));
1400                 return -EFAULT;
1401         }
1402
1403         return 0;
1404 }
1405
1406 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1407                   int flags, struct md_object *obj)
1408 {
1409         struct thandle *handle;
1410         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1411         struct mdd_device *mdd = mdo2mdd(obj);
1412         int rc;
1413         ENTRY;
1414
1415         handle = mdd_trans_create(env, mdd);
1416         if (IS_ERR(handle))
1417                 return(PTR_ERR(handle));
1418
1419         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1420         if (rc)
1421                 GOTO(stop, rc);
1422
1423         rc = mdd_trans_start(env, mdd, handle);
1424         if (rc)
1425                 GOTO(stop, rc);
1426
1427         rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1428                                       handle);
1429
1430 stop:
1431         mdd_trans_stop(env, mdd, rc, handle);
1432
1433         RETURN(rc);
1434 }
1435
1436 /**
1437  * Should be called with write lock held.
1438  *
1439  * \see mdd_lma_set_locked().
1440  */
1441 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1442                        const struct md_attr *ma, struct thandle *handle)
1443 {
1444         struct mdd_thread_info *info = mdd_env_info(env);
1445         struct lu_buf *buf;
1446         struct lustre_mdt_attrs *lma =
1447                                 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1448         int lmasize = sizeof(struct lustre_mdt_attrs);
1449         int rc = 0;
1450
1451         ENTRY;
1452
1453         /* Either HSM or SOM part is not valid, we need to read it before */
1454         if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1455                 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1456                 if (rc <= 0)
1457                         RETURN(rc);
1458
1459                 lustre_lma_swab(lma);
1460         } else {
1461                 memset(lma, 0, lmasize);
1462         }
1463
1464         /* Copy HSM data */
1465         if (ma->ma_valid & MA_HSM) {
1466                 lma->lma_flags  |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1467                 lma->lma_compat |= LMAC_HSM;
1468         }
1469
1470         /* Copy SOM data */
1471         if (ma->ma_valid & MA_SOM) {
1472                 LASSERT(ma->ma_som != NULL);
1473                 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1474                         lma->lma_compat     &= ~LMAC_SOM;
1475                 } else {
1476                         lma->lma_compat     |= LMAC_SOM;
1477                         lma->lma_ioepoch     = ma->ma_som->msd_ioepoch;
1478                         lma->lma_som_size    = ma->ma_som->msd_size;
1479                         lma->lma_som_blocks  = ma->ma_som->msd_blocks;
1480                         lma->lma_som_mountid = ma->ma_som->msd_mountid;
1481                 }
1482         }
1483
1484         /* Copy FID */
1485         memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1486
1487         lustre_lma_swab(lma);
1488         buf = mdd_buf_get(env, lma, lmasize);
1489         rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1490
1491         RETURN(rc);
1492 }
1493
1494 /**
1495  * Save LMA extended attributes with data from \a ma.
1496  *
1497  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1498  * not, LMA EA will be first read from disk, modified and write back.
1499  *
1500  */
1501 static int mdd_lma_set_locked(const struct lu_env *env,
1502                               struct mdd_object *mdd_obj,
1503                               const struct md_attr *ma, struct thandle *handle)
1504 {
1505         int rc;
1506
1507         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1508         rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1509         mdd_write_unlock(env, mdd_obj);
1510         return rc;
1511 }
1512
1513 /* Precedence for choosing record type when multiple
1514  * attributes change: setattr > mtime > ctime > atime
1515  * (ctime changes when mtime does, plus chmod/chown.
1516  * atime and ctime are independent.) */
1517 static int mdd_attr_set_changelog(const struct lu_env *env,
1518                                   struct md_object *obj, struct thandle *handle,
1519                                   __u64 valid)
1520 {
1521         struct mdd_device *mdd = mdo2mdd(obj);
1522         int bits, type = 0;
1523
1524         bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1525         bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1526         bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1527         bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1528         bits = bits & mdd->mdd_cl.mc_mask;
1529         if (bits == 0)
1530                 return 0;
1531
1532         /* The record type is the lowest non-masked set bit */
1533         while (bits && ((bits & 1) == 0)) {
1534                 bits = bits >> 1;
1535                 type++;
1536         }
1537
1538         /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1539         return mdd_changelog_data_store(env, mdd, type, (int)valid,
1540                                         md2mdd_obj(obj), handle);
1541 }
1542
1543 static int mdd_declare_attr_set(const struct lu_env *env,
1544                                 struct mdd_device *mdd,
1545                                 struct mdd_object *obj,
1546                                 const struct md_attr *ma,
1547                                 struct lov_mds_md *lmm,
1548                                 struct thandle *handle)
1549 {
1550         struct lu_buf  *buf = &mdd_env_info(env)->mti_buf;
1551         int             rc, i;
1552
1553         rc = mdo_declare_attr_set(env, obj, &ma->ma_attr, handle);
1554         if (rc)
1555                 return rc;
1556
1557         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1558         if (rc)
1559                 return rc;
1560
1561         if (ma->ma_valid & MA_LOV) {
1562                 buf->lb_buf = NULL;
1563                 buf->lb_len = ma->ma_lmm_size;
1564                 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
1565                                            0, handle);
1566                 if (rc)
1567                         return rc;
1568         }
1569
1570         if (ma->ma_valid & (MA_HSM | MA_SOM)) {
1571                 buf->lb_buf = NULL;
1572                 buf->lb_len = sizeof(struct lustre_mdt_attrs);
1573                 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA,
1574                                            0, handle);
1575                 if (rc)
1576                         return rc;
1577         }
1578
1579 #ifdef CONFIG_FS_POSIX_ACL
1580         if (ma->ma_attr.la_valid & LA_MODE) {
1581                 mdd_read_lock(env, obj, MOR_TGT_CHILD);
1582                 rc = mdo_xattr_get(env, obj, &LU_BUF_NULL,XATTR_NAME_ACL_ACCESS,
1583                                    BYPASS_CAPA);
1584                 mdd_read_unlock(env, obj);
1585                 if (rc == -EOPNOTSUPP || rc == -ENODATA)
1586                         rc = 0;
1587                 else if (rc < 0)
1588                         return rc;
1589
1590                 if (rc != 0) {
1591                         buf->lb_buf = NULL;
1592                         buf->lb_len = rc;
1593                         rc = mdo_declare_xattr_set(env, obj, buf,
1594                                                    XATTR_NAME_ACL_ACCESS, 0,
1595                                                    handle);
1596                         if (rc)
1597                                 return rc;
1598                 }
1599         }
1600 #endif
1601
1602         /* basically the log is the same as in unlink case */
1603         if (lmm) {
1604                 __u16 stripe;
1605
1606                 if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V1 &&
1607                                 le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V3) {
1608                         CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n",
1609                                mdd->mdd_obd_dev->obd_name,
1610                                le32_to_cpu(lmm->lmm_magic),
1611                                PFID(lu_object_fid(&obj->mod_obj.mo_lu)));
1612                         return -EINVAL;
1613                 }
1614
1615                 stripe = le16_to_cpu(lmm->lmm_stripe_count);
1616                 if (stripe == LOV_ALL_STRIPES) {
1617                         struct lov_desc *ldesc;
1618
1619                         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
1620                         LASSERT(ldesc != NULL);
1621                         stripe = ldesc->ld_tgt_count;
1622                 }
1623
1624                 for (i = 0; i < stripe; i++) {
1625                         rc = mdd_declare_llog_record(env, mdd,
1626                                         sizeof(struct llog_unlink_rec),
1627                                         handle);
1628                         if (rc)
1629                                 return rc;
1630                 }
1631         }
1632
1633         return rc;
1634 }
1635
1636 /* set attr and LOV EA at once, return updated attr */
1637 int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1638                  const struct md_attr *ma)
1639 {
1640         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1641         struct mdd_device *mdd = mdo2mdd(obj);
1642         struct thandle *handle;
1643         struct lov_mds_md *lmm = NULL;
1644         struct llog_cookie *logcookies = NULL;
1645         int  rc, lmm_size = 0, cookie_size = 0;
1646         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1647 #ifdef HAVE_QUOTA_SUPPORT
1648         struct obd_device *obd = mdd->mdd_obd_dev;
1649         struct mds_obd *mds = &obd->u.mds;
1650         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1651         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1652         int quota_opc = 0, block_count = 0;
1653         int inode_pending[MAXQUOTAS] = { 0, 0 };
1654         int block_pending[MAXQUOTAS] = { 0, 0 };
1655 #endif
1656         ENTRY;
1657
1658         *la_copy = ma->ma_attr;
1659         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1660         if (rc != 0)
1661                 RETURN(rc);
1662
1663         /* setattr on "close" only change atime, or do nothing */
1664         if (ma->ma_valid == MA_INODE &&
1665             ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1666                 RETURN(0);
1667
1668         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1669             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1670                 lmm_size = mdd_lov_mdsize(env, mdd);
1671                 lmm = mdd_max_lmm_get(env, mdd);
1672                 if (lmm == NULL)
1673                         RETURN(-ENOMEM);
1674
1675                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1676                                 XATTR_NAME_LOV);
1677
1678                 if (rc < 0)
1679                         RETURN(rc);
1680         }
1681
1682         handle = mdd_trans_create(env, mdd);
1683         if (IS_ERR(handle))
1684                 RETURN(PTR_ERR(handle));
1685
1686         rc = mdd_declare_attr_set(env, mdd, mdd_obj, ma,
1687                                   lmm_size > 0 ? lmm : NULL, handle);
1688         if (rc)
1689                 GOTO(stop, rc);
1690
1691         rc = mdd_trans_start(env, mdd, handle);
1692         if (rc)
1693                 GOTO(stop, rc);
1694
1695         /* permission changes may require sync operation */
1696         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1697                 handle->th_sync |= !!mdd->mdd_sync_permission;
1698
1699         if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1700                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1701                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1702
1703 #ifdef HAVE_QUOTA_SUPPORT
1704         if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1705                 struct obd_export *exp = md_quota(env)->mq_exp;
1706                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1707
1708                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1709                 if (!rc) {
1710                         quota_opc = FSFILT_OP_SETATTR;
1711                         mdd_quota_wrapper(la_copy, qnids);
1712                         mdd_quota_wrapper(la_tmp, qoids);
1713                         /* get file quota for new owner */
1714                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1715                                         qnids, inode_pending, 1, NULL, 0,
1716                                         NULL, 0);
1717                         block_count = (la_tmp->la_blocks + 7) >> 3;
1718                         if (block_count) {
1719                                 void *data = NULL;
1720                                 mdd_data_get(env, mdd_obj, &data);
1721                                 /* get block quota for new owner */
1722                                 lquota_chkquota(mds_quota_interface_ref, obd,
1723                                                 exp, qnids, block_pending,
1724                                                 block_count, NULL,
1725                                                 LQUOTA_FLAGS_BLK, data, 1);
1726                         }
1727                 }
1728         }
1729 #endif
1730
1731         if (la_copy->la_valid & LA_FLAGS) {
1732                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1733                                                   handle, 1);
1734                 if (rc == 0)
1735                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1736         } else if (la_copy->la_valid) {            /* setattr */
1737                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1738                                                   handle, 1);
1739                 /* journal chown/chgrp in llog, just like unlink */
1740                 if (rc == 0 && lmm_size){
1741                         cookie_size = mdd_lov_cookiesize(env, mdd);
1742                         logcookies = mdd_max_cookie_get(env, mdd);
1743                         if (logcookies == NULL)
1744                                 GOTO(cleanup, rc = -ENOMEM);
1745
1746                         if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1747                                             logcookies, cookie_size) <= 0)
1748                                 logcookies = NULL;
1749                 }
1750         }
1751
1752         if (rc == 0 && ma->ma_valid & MA_LOV) {
1753                 cfs_umode_t mode;
1754
1755                 mode = mdd_object_type(mdd_obj);
1756                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1757                         rc = mdd_lsm_sanity_check(env, mdd_obj);
1758                         if (rc)
1759                                 GOTO(cleanup, rc);
1760
1761                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1762                                             ma->ma_lmm_size, handle, 1);
1763                 }
1764
1765         }
1766         if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1767                 cfs_umode_t mode;
1768
1769                 mode = mdd_object_type(mdd_obj);
1770                 if (S_ISREG(mode))
1771                         rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1772
1773         }
1774 cleanup:
1775         if (rc == 0)
1776                 rc = mdd_attr_set_changelog(env, obj, handle,
1777                                             ma->ma_attr.la_valid);
1778 stop:
1779         mdd_trans_stop(env, mdd, rc, handle);
1780         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1781                 /*set obd attr, if needed*/
1782                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1783                                            logcookies);
1784         }
1785 #ifdef HAVE_QUOTA_SUPPORT
1786         if (quota_opc) {
1787                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1788                                       inode_pending, 0);
1789                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1790                                       block_pending, 1);
1791                 /* Trigger dqrel/dqacq for original owner and new owner.
1792                  * If failed, the next call for lquota_chkquota will
1793                  * process it. */
1794                 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1795                               quota_opc);
1796         }
1797 #endif
1798         RETURN(rc);
1799 }
1800
1801 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1802                       const struct lu_buf *buf, const char *name, int fl,
1803                       struct thandle *handle)
1804 {
1805         int  rc;
1806         ENTRY;
1807
1808         mdd_write_lock(env, obj, MOR_TGT_CHILD);
1809         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1810         mdd_write_unlock(env, obj);
1811
1812         RETURN(rc);
1813 }
1814
1815 static int mdd_xattr_sanity_check(const struct lu_env *env,
1816                                   struct mdd_object *obj)
1817 {
1818         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1819         struct md_ucred *uc     = md_ucred(env);
1820         int rc;
1821         ENTRY;
1822
1823         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1824                 RETURN(-EPERM);
1825
1826         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1827         if (rc)
1828                 RETURN(rc);
1829
1830         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1831             !mdd_capable(uc, CFS_CAP_FOWNER))
1832                 RETURN(-EPERM);
1833
1834         RETURN(rc);
1835 }
1836
1837 static int mdd_declare_xattr_set(const struct lu_env *env,
1838                                  struct mdd_device *mdd,
1839                                  struct mdd_object *obj,
1840                                  const struct lu_buf *buf,
1841                                  const char *name,
1842                                  struct thandle *handle)
1843
1844 {
1845         int rc;
1846
1847         rc = mdo_declare_xattr_set(env, obj, buf, name, 0, handle);
1848         if (rc)
1849                 return rc;
1850
1851         /* Only record user xattr changes */
1852         if ((strncmp("user.", name, 5) == 0))
1853                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1854
1855         return rc;
1856 }
1857
1858 /**
1859  * The caller should guarantee to update the object ctime
1860  * after xattr_set if needed.
1861  */
1862 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1863                          const struct lu_buf *buf, const char *name,
1864                          int fl)
1865 {
1866         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1867         struct mdd_device *mdd = mdo2mdd(obj);
1868         struct thandle *handle;
1869         int  rc;
1870         ENTRY;
1871
1872         rc = mdd_xattr_sanity_check(env, mdd_obj);
1873         if (rc)
1874                 RETURN(rc);
1875
1876         handle = mdd_trans_create(env, mdd);
1877         if (IS_ERR(handle))
1878                 RETURN(PTR_ERR(handle));
1879
1880         rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, handle);
1881         if (rc)
1882                 GOTO(stop, rc);
1883
1884         rc = mdd_trans_start(env, mdd, handle);
1885         if (rc)
1886                 GOTO(stop, rc);
1887
1888         /* security-replated changes may require sync */
1889         if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1890                 handle->th_sync |= !!mdd->mdd_sync_permission;
1891
1892         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1893
1894         /* Only record system & user xattr changes */
1895         if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1896                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1897                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1898                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1899                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1900                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1901                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1902                                               handle);
1903
1904 stop:
1905         mdd_trans_stop(env, mdd, rc, handle);
1906
1907         RETURN(rc);
1908 }
1909
1910 static int mdd_declare_xattr_del(const struct lu_env *env,
1911                                  struct mdd_device *mdd,
1912                                  struct mdd_object *obj,
1913                                  const char *name,
1914                                  struct thandle *handle)
1915 {
1916         int rc;
1917
1918         rc = mdo_declare_xattr_del(env, obj, name, handle);
1919         if (rc)
1920                 return rc;
1921
1922         /* Only record user xattr changes */
1923         if ((strncmp("user.", name, 5) == 0))
1924                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1925
1926         return rc;
1927 }
1928
1929 /**
1930  * The caller should guarantee to update the object ctime
1931  * after xattr_set if needed.
1932  */
1933 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1934                   const char *name)
1935 {
1936         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1937         struct mdd_device *mdd = mdo2mdd(obj);
1938         struct thandle *handle;
1939         int  rc;
1940         ENTRY;
1941
1942         rc = mdd_xattr_sanity_check(env, mdd_obj);
1943         if (rc)
1944                 RETURN(rc);
1945
1946         handle = mdd_trans_create(env, mdd);
1947         if (IS_ERR(handle))
1948                 RETURN(PTR_ERR(handle));
1949
1950         rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
1951         if (rc)
1952                 GOTO(stop, rc);
1953
1954         rc = mdd_trans_start(env, mdd, handle);
1955         if (rc)
1956                 GOTO(stop, rc);
1957
1958         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1959         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1960                            mdd_object_capa(env, mdd_obj));
1961         mdd_write_unlock(env, mdd_obj);
1962
1963         /* Only record system & user xattr changes */
1964         if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1965                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1966                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1967                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1968                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1969                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1970                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1971                                               handle);
1972
1973 stop:
1974         mdd_trans_stop(env, mdd, rc, handle);
1975
1976         RETURN(rc);
1977 }
1978
1979 /* partial unlink */
1980 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1981                        struct md_attr *ma)
1982 {
1983         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1984         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1985         struct mdd_device *mdd = mdo2mdd(obj);
1986         struct thandle *handle;
1987 #ifdef HAVE_QUOTA_SUPPORT
1988         struct obd_device *obd = mdd->mdd_obd_dev;
1989         struct mds_obd *mds = &obd->u.mds;
1990         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1991         int quota_opc = 0;
1992 #endif
1993         int rc;
1994         ENTRY;
1995
1996         /* XXX: this code won't be used ever:
1997          * DNE uses slightly different approach */
1998         LBUG();
1999
2000         /*
2001          * Check -ENOENT early here because we need to get object type
2002          * to calculate credits before transaction start
2003          */
2004         if (mdd_object_exists(mdd_obj) == 0) {
2005                 CERROR("%s: object "DFID" not found: rc = -2\n",
2006                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2007                 RETURN(-ENOENT);
2008         }
2009
2010         LASSERT(mdd_object_exists(mdd_obj) > 0);
2011
2012         handle = mdd_trans_create(env, mdd);
2013         if (IS_ERR(handle))
2014                 RETURN(-ENOMEM);
2015
2016         rc = mdd_trans_start(env, mdd, handle);
2017
2018         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2019
2020         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
2021         if (rc)
2022                 GOTO(cleanup, rc);
2023
2024         mdo_ref_del(env, mdd_obj, handle);
2025
2026         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
2027                 /* unlink dot */
2028                 mdo_ref_del(env, mdd_obj, handle);
2029         }
2030
2031         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2032         la_copy->la_ctime = ma->ma_attr.la_ctime;
2033
2034         la_copy->la_valid = LA_CTIME;
2035         rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
2036         if (rc)
2037                 GOTO(cleanup, rc);
2038
2039         rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
2040 #ifdef HAVE_QUOTA_SUPPORT
2041         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
2042             ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
2043                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2044                 mdd_quota_wrapper(&ma->ma_attr, qids);
2045         }
2046 #endif
2047
2048
2049         EXIT;
2050 cleanup:
2051         mdd_write_unlock(env, mdd_obj);
2052         mdd_trans_stop(env, mdd, rc, handle);
2053 #ifdef HAVE_QUOTA_SUPPORT
2054         if (quota_opc)
2055                 /* Trigger dqrel on the owner of child. If failed,
2056                  * the next call for lquota_chkquota will process it */
2057                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2058                               quota_opc);
2059 #endif
2060         return rc;
2061 }
2062
2063 /* partial operation */
2064 static int mdd_oc_sanity_check(const struct lu_env *env,
2065                                struct mdd_object *obj,
2066                                struct md_attr *ma)
2067 {
2068         int rc;
2069         ENTRY;
2070
2071         switch (ma->ma_attr.la_mode & S_IFMT) {
2072         case S_IFREG:
2073         case S_IFDIR:
2074         case S_IFLNK:
2075         case S_IFCHR:
2076         case S_IFBLK:
2077         case S_IFIFO:
2078         case S_IFSOCK:
2079                 rc = 0;
2080                 break;
2081         default:
2082                 rc = -EINVAL;
2083                 break;
2084         }
2085         RETURN(rc);
2086 }
2087
2088 static int mdd_object_create(const struct lu_env *env,
2089                              struct md_object *obj,
2090                              const struct md_op_spec *spec,
2091                              struct md_attr *ma)
2092 {
2093
2094         struct mdd_device *mdd = mdo2mdd(obj);
2095         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2096         const struct lu_fid *pfid = spec->u.sp_pfid;
2097         struct thandle *handle;
2098 #ifdef HAVE_QUOTA_SUPPORT
2099         struct obd_device *obd = mdd->mdd_obd_dev;
2100         struct obd_export *exp = md_quota(env)->mq_exp;
2101         struct mds_obd *mds = &obd->u.mds;
2102         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2103         int quota_opc = 0, block_count = 0;
2104         int inode_pending[MAXQUOTAS] = { 0, 0 };
2105         int block_pending[MAXQUOTAS] = { 0, 0 };
2106 #endif
2107         int rc = 0;
2108         ENTRY;
2109
2110         /* XXX: this code won't be used ever:
2111          * DNE uses slightly different approach */
2112         LBUG();
2113
2114 #ifdef HAVE_QUOTA_SUPPORT
2115         if (mds->mds_quota) {
2116                 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
2117                 mdd_quota_wrapper(&ma->ma_attr, qids);
2118                 /* get file quota for child */
2119                 lquota_chkquota(mds_quota_interface_ref, obd, exp,
2120                                 qids, inode_pending, 1, NULL, 0,
2121                                 NULL, 0);
2122                 switch (ma->ma_attr.la_mode & S_IFMT) {
2123                 case S_IFLNK:
2124                 case S_IFDIR:
2125                         block_count = 2;
2126                         break;
2127                 case S_IFREG:
2128                         block_count = 1;
2129                         break;
2130                 }
2131                 /* get block quota for child */
2132                 if (block_count)
2133                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
2134                                         qids, block_pending, block_count,
2135                                         NULL, LQUOTA_FLAGS_BLK, NULL, 0);
2136         }
2137 #endif
2138
2139         handle = mdd_trans_create(env, mdd);
2140         if (IS_ERR(handle))
2141                 GOTO(out_pending, rc = PTR_ERR(handle));
2142
2143         rc = mdd_trans_start(env, mdd, handle);
2144
2145         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2146         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
2147         if (rc)
2148                 GOTO(unlock, rc);
2149
2150         rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
2151         if (rc)
2152                 GOTO(unlock, rc);
2153
2154         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
2155                 /* If creating the slave object, set slave EA here. */
2156                 int lmv_size = spec->u.sp_ea.eadatalen;
2157                 struct lmv_stripe_md *lmv;
2158
2159                 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
2160                 LASSERT(lmv != NULL && lmv_size > 0);
2161
2162                 rc = __mdd_xattr_set(env, mdd_obj,
2163                                      mdd_buf_get_const(env, lmv, lmv_size),
2164                                      XATTR_NAME_LMV, 0, handle);
2165                 if (rc)
2166                         GOTO(unlock, rc);
2167
2168                 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
2169                                            handle, 0);
2170         } else {
2171 #ifdef CONFIG_FS_POSIX_ACL
2172                 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
2173                         struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
2174
2175                         buf->lb_buf = (void *)spec->u.sp_ea.eadata;
2176                         buf->lb_len = spec->u.sp_ea.eadatalen;
2177                         if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
2178                                 rc = __mdd_acl_init(env, mdd_obj, buf,
2179                                                     &ma->ma_attr.la_mode,
2180                                                     handle);
2181                                 if (rc)
2182                                         GOTO(unlock, rc);
2183                                 else
2184                                         ma->ma_attr.la_valid |= LA_MODE;
2185                         }
2186
2187                         pfid = spec->u.sp_ea.fid;
2188                 }
2189 #endif
2190                 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
2191                                            spec);
2192         }
2193         EXIT;
2194 unlock:
2195         if (rc == 0)
2196                 rc = mdd_attr_get_internal(env, mdd_obj, ma);
2197         mdd_write_unlock(env, mdd_obj);
2198
2199         mdd_trans_stop(env, mdd, rc, handle);
2200 out_pending:
2201 #ifdef HAVE_QUOTA_SUPPORT
2202         if (quota_opc) {
2203                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2204                                       inode_pending, 0);
2205                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2206                                       block_pending, 1);
2207                 /* Trigger dqacq on the owner of child. If failed,
2208                  * the next call for lquota_chkquota will process it. */
2209                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2210                               quota_opc);
2211         }
2212 #endif
2213         return rc;
2214 }
2215
2216 /* partial link */
2217 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
2218                        const struct md_attr *ma)
2219 {
2220         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
2221         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2222         struct mdd_device *mdd = mdo2mdd(obj);
2223         struct thandle *handle;
2224         int rc;
2225         ENTRY;
2226
2227         /* XXX: this code won't be used ever:
2228          * DNE uses slightly different approach */
2229         LBUG();
2230
2231         handle = mdd_trans_create(env, mdd);
2232         if (IS_ERR(handle))
2233                 RETURN(-ENOMEM);
2234
2235         rc = mdd_trans_start(env, mdd, handle);
2236
2237         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2238         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
2239         if (rc == 0)
2240                 mdo_ref_add(env, mdd_obj, handle);
2241         mdd_write_unlock(env, mdd_obj);
2242         if (rc == 0) {
2243                 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2244                 la_copy->la_ctime = ma->ma_attr.la_ctime;
2245
2246                 la_copy->la_valid = LA_CTIME;
2247                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
2248                                                         handle, 0);
2249         }
2250         mdd_trans_stop(env, mdd, 0, handle);
2251
2252         RETURN(rc);
2253 }
2254
2255 /*
2256  * do NOT or the MAY_*'s, you'll get the weakest
2257  */
2258 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
2259 {
2260         int res = 0;
2261
2262         /* Sadly, NFSD reopens a file repeatedly during operation, so the
2263          * "acc_mode = 0" allowance for newly-created files isn't honoured.
2264          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
2265          * owner can write to a file even if it is marked readonly to hide
2266          * its brokenness. (bug 5781) */
2267         if (flags & MDS_OPEN_OWNEROVERRIDE) {
2268                 struct md_ucred *uc = md_ucred(env);
2269
2270                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
2271                     (la->la_uid == uc->mu_fsuid))
2272                         return 0;
2273         }
2274
2275         if (flags & FMODE_READ)
2276                 res |= MAY_READ;
2277         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2278                 res |= MAY_WRITE;
2279         if (flags & MDS_FMODE_EXEC)
2280                 res = MAY_EXEC;
2281         return res;
2282 }
2283
2284 static int mdd_open_sanity_check(const struct lu_env *env,
2285                                  struct mdd_object *obj, int flag)
2286 {
2287         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2288         int mode, rc;
2289         ENTRY;
2290
2291         /* EEXIST check */
2292         if (mdd_is_dead_obj(obj))
2293                 RETURN(-ENOENT);
2294
2295         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
2296         if (rc)
2297                RETURN(rc);
2298
2299         if (S_ISLNK(tmp_la->la_mode))
2300                 RETURN(-ELOOP);
2301
2302         mode = accmode(env, tmp_la, flag);
2303
2304         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2305                 RETURN(-EISDIR);
2306
2307         if (!(flag & MDS_OPEN_CREATED)) {
2308                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
2309                 if (rc)
2310                         RETURN(rc);
2311         }
2312
2313         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2314             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2315                 flag &= ~MDS_OPEN_TRUNC;
2316
2317         /* For writing append-only file must open it with append mode. */
2318         if (mdd_is_append(obj)) {
2319                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2320                         RETURN(-EPERM);
2321                 if (flag & MDS_OPEN_TRUNC)
2322                         RETURN(-EPERM);
2323         }
2324
2325 #if 0
2326         /*
2327          * Now, flag -- O_NOATIME does not be packed by client.
2328          */
2329         if (flag & O_NOATIME) {
2330                 struct md_ucred *uc = md_ucred(env);
2331
2332                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2333                     (uc->mu_valid == UCRED_NEW)) &&
2334                     (uc->mu_fsuid != tmp_la->la_uid) &&
2335                     !mdd_capable(uc, CFS_CAP_FOWNER))
2336                         RETURN(-EPERM);
2337         }
2338 #endif
2339
2340         RETURN(0);
2341 }
2342
2343 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2344                     int flags)
2345 {
2346         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2347         int rc = 0;
2348
2349         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2350
2351         rc = mdd_open_sanity_check(env, mdd_obj, flags);
2352         if (rc == 0)
2353                 mdd_obj->mod_count++;
2354
2355         mdd_write_unlock(env, mdd_obj);
2356         return rc;
2357 }
2358
2359 int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
2360                             struct md_attr *ma, struct thandle *handle)
2361 {
2362         int rc;
2363
2364         rc = mdd_declare_unlink_log(env, obj, ma, handle);
2365         if (rc)
2366                 return rc;
2367
2368         return mdo_declare_destroy(env, obj, handle);
2369 }
2370
2371 /* return md_attr back,
2372  * if it is last unlink then return lov ea + llog cookie*/
2373 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2374                     struct md_attr *ma, struct thandle *handle)
2375 {
2376         int rc = 0;
2377         ENTRY;
2378
2379         if (S_ISREG(mdd_object_type(obj))) {
2380                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2381                  * Caller must be ready for that. */
2382                 rc = __mdd_lmm_get(env, obj, ma);
2383                 if ((ma->ma_valid & MA_LOV))
2384                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2385                                             obj, ma);
2386         }
2387
2388         if (rc == 0)
2389                 rc = mdo_destroy(env, obj, handle);
2390
2391         RETURN(rc);
2392 }
2393
2394 static int mdd_declare_close(const struct lu_env *env,
2395                              struct mdd_object *obj,
2396                              struct md_attr *ma,
2397                              struct thandle *handle)
2398 {
2399         int rc;
2400
2401         rc = orph_declare_index_delete(env, obj, handle);
2402         if (rc)
2403                 return rc;
2404
2405         return mdd_declare_object_kill(env, obj, ma, handle);
2406 }
2407
2408 /*
2409  * No permission check is needed.
2410  */
2411 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2412                      struct md_attr *ma, int mode)
2413 {
2414         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2415         struct mdd_device *mdd = mdo2mdd(obj);
2416         struct thandle    *handle = NULL;
2417         int rc;
2418         int is_orphan = 0, reset = 1;
2419
2420 #ifdef HAVE_QUOTA_SUPPORT
2421         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2422         struct mds_obd *mds = &obd->u.mds;
2423         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2424         int quota_opc = 0;
2425 #endif
2426         ENTRY;
2427
2428         if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2429                 mdd_obj->mod_count--;
2430
2431                 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2432                         CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2433                                "list\n", PFID(mdd_object_fid(mdd_obj)));
2434                 RETURN(0);
2435         }
2436
2437         /* check without any lock */
2438         if (mdd_obj->mod_count == 1 &&
2439             (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2440  again:
2441                 handle = mdd_trans_create(env, mdo2mdd(obj));
2442                 if (IS_ERR(handle))
2443                         RETURN(PTR_ERR(handle));
2444
2445                 rc = mdd_declare_close(env, mdd_obj, ma, handle);
2446                 if (rc)
2447                         GOTO(stop, rc);
2448
2449                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
2450                 if (rc)
2451                         GOTO(stop, rc);
2452
2453                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2454                 if (rc)
2455                         GOTO(stop, rc);
2456         }
2457
2458         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2459         if (handle == NULL && mdd_obj->mod_count == 1 &&
2460             (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2461                 mdd_write_unlock(env, mdd_obj);
2462                 goto again;
2463         }
2464
2465         /* release open count */
2466         mdd_obj->mod_count --;
2467
2468         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2469                 /* remove link to object from orphan index */
2470                 LASSERT(handle != NULL);
2471                 rc = __mdd_orphan_del(env, mdd_obj, handle);
2472                 if (rc == 0) {
2473                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2474                                "list, OSS objects to be destroyed.\n",
2475                                PFID(mdd_object_fid(mdd_obj)));
2476                         is_orphan = 1;
2477                 } else {
2478                         CERROR("Object "DFID" can not be deleted from orphan "
2479                                 "list, maybe cause OST objects can not be "
2480                                 "destroyed (err: %d).\n",
2481                                 PFID(mdd_object_fid(mdd_obj)), rc);
2482                         /* If object was not deleted from orphan list, do not
2483                          * destroy OSS objects, which will be done when next
2484                          * recovery. */
2485                         GOTO(out, rc);
2486                 }
2487         }
2488
2489         rc = mdd_iattr_get(env, mdd_obj, ma);
2490         /* Object maybe not in orphan list originally, it is rare case for
2491          * mdd_finish_unlink() failure. */
2492         if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_orphan)) {
2493 #ifdef HAVE_QUOTA_SUPPORT
2494                 if (mds->mds_quota) {
2495                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2496                         mdd_quota_wrapper(&ma->ma_attr, qids);
2497                 }
2498 #endif
2499                 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2500                 if (ma->ma_valid & MA_FLAGS &&
2501                     ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2502                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2503                 } else {
2504                         if (handle == NULL) {
2505                                 handle = mdd_trans_create(env, mdo2mdd(obj));
2506                                 if (IS_ERR(handle))
2507                                         GOTO(out, rc = PTR_ERR(handle));
2508
2509                                 rc = mdd_declare_object_kill(env, mdd_obj, ma,
2510                                                              handle);
2511                                 if (rc)
2512                                         GOTO(out, rc);
2513
2514                                 rc = mdd_declare_changelog_store(env, mdd,
2515                                                                  NULL, handle);
2516                                 if (rc)
2517                                         GOTO(stop, rc);
2518
2519                                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2520                                 if (rc)
2521                                         GOTO(out, rc);
2522                         }
2523
2524                         rc = mdd_object_kill(env, mdd_obj, ma, handle);
2525                         if (rc == 0)
2526                                 reset = 0;
2527                 }
2528
2529                 if (rc != 0)
2530                         CERROR("Error when prepare to delete Object "DFID" , "
2531                                "which will cause OST objects can not be "
2532                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
2533         }
2534         EXIT;
2535
2536 out:
2537         if (reset)
2538                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2539
2540         mdd_write_unlock(env, mdd_obj);
2541
2542         if (rc == 0 &&
2543             (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
2544             !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
2545                 if (handle == NULL) {
2546                         handle = mdd_trans_create(env, mdo2mdd(obj));
2547                         if (IS_ERR(handle))
2548                                 GOTO(stop, rc = IS_ERR(handle));
2549
2550                         rc = mdd_declare_changelog_store(env, mdd, NULL,
2551                                                          handle);
2552                         if (rc)
2553                                 GOTO(stop, rc);
2554
2555                         rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2556                         if (rc)
2557                                 GOTO(stop, rc);
2558                 }
2559
2560                 mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
2561                                          mdd_obj, handle);
2562         }
2563
2564 stop:
2565         if (handle != NULL)
2566                 mdd_trans_stop(env, mdd, rc, handle);
2567 #ifdef HAVE_QUOTA_SUPPORT
2568         if (quota_opc)
2569                 /* Trigger dqrel on the owner of child. If failed,
2570                  * the next call for lquota_chkquota will process it */
2571                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2572                               quota_opc);
2573 #endif
2574         return rc;
2575 }
2576
2577 /*
2578  * Permission check is done when open,
2579  * no need check again.
2580  */
2581 static int mdd_readpage_sanity_check(const struct lu_env *env,
2582                                      struct mdd_object *obj)
2583 {
2584         struct dt_object *next = mdd_object_child(obj);
2585         int rc;
2586         ENTRY;
2587
2588         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2589                 rc = 0;
2590         else
2591                 rc = -ENOTDIR;
2592
2593         RETURN(rc);
2594 }
2595
2596 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2597                               struct lu_dirpage *dp, int nob,
2598                               const struct dt_it_ops *iops, struct dt_it *it,
2599                               __u32 attr)
2600 {
2601         void                   *area = dp;
2602         int                     result;
2603         __u64                   hash = 0;
2604         struct lu_dirent       *ent;
2605         struct lu_dirent       *last = NULL;
2606         int                     first = 1;
2607
2608         memset(area, 0, sizeof (*dp));
2609         area += sizeof (*dp);
2610         nob  -= sizeof (*dp);
2611
2612         ent  = area;
2613         do {
2614                 int    len;
2615                 int    recsize;
2616
2617                 len = iops->key_size(env, it);
2618
2619                 /* IAM iterator can return record with zero len. */
2620                 if (len == 0)
2621                         goto next;
2622
2623                 hash = iops->store(env, it);
2624                 if (unlikely(first)) {
2625                         first = 0;
2626                         dp->ldp_hash_start = cpu_to_le64(hash);
2627                 }
2628
2629                 /* calculate max space required for lu_dirent */
2630                 recsize = lu_dirent_calc_size(len, attr);
2631
2632                 if (nob >= recsize) {
2633                         result = iops->rec(env, it, (struct dt_rec *)ent, attr);
2634                         if (result == -ESTALE)
2635                                 goto next;
2636                         if (result != 0)
2637                                 goto out;
2638
2639                         /* osd might not able to pack all attributes,
2640                          * so recheck rec length */
2641                         recsize = le16_to_cpu(ent->lde_reclen);
2642                 } else {
2643                         result = (last != NULL) ? 0 :-EINVAL;
2644                         goto out;
2645                 }
2646                 last = ent;
2647                 ent = (void *)ent + recsize;
2648                 nob -= recsize;
2649
2650 next:
2651                 result = iops->next(env, it);
2652                 if (result == -ESTALE)
2653                         goto next;
2654         } while (result == 0);
2655
2656 out:
2657         dp->ldp_hash_end = cpu_to_le64(hash);
2658         if (last != NULL) {
2659                 if (last->lde_hash == dp->ldp_hash_end)
2660                         dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2661                 last->lde_reclen = 0; /* end mark */
2662         }
2663         return result;
2664 }
2665
2666 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2667                           const struct lu_rdpg *rdpg)
2668 {
2669         struct dt_it      *it;
2670         struct dt_object  *next = mdd_object_child(obj);
2671         const struct dt_it_ops  *iops;
2672         struct page       *pg;
2673         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2674         int i;
2675         int nlupgs = 0;
2676         int rc;
2677         int nob;
2678
2679         LASSERT(rdpg->rp_pages != NULL);
2680         LASSERT(next->do_index_ops != NULL);
2681
2682         if (rdpg->rp_count <= 0)
2683                 return -EFAULT;
2684
2685         /*
2686          * iterate through directory and fill pages from @rdpg
2687          */
2688         iops = &next->do_index_ops->dio_it;
2689         it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
2690         if (IS_ERR(it))
2691                 return PTR_ERR(it);
2692
2693         rc = iops->load(env, it, rdpg->rp_hash);
2694
2695         if (rc == 0) {
2696                 /*
2697                  * Iterator didn't find record with exactly the key requested.
2698                  *
2699                  * It is currently either
2700                  *
2701                  *     - positioned above record with key less than
2702                  *     requested---skip it.
2703                  *
2704                  *     - or not positioned at all (is in IAM_IT_SKEWED
2705                  *     state)---position it on the next item.
2706                  */
2707                 rc = iops->next(env, it);
2708         } else if (rc > 0)
2709                 rc = 0;
2710
2711         /*
2712          * At this point and across for-loop:
2713          *
2714          *  rc == 0 -> ok, proceed.
2715          *  rc >  0 -> end of directory.
2716          *  rc <  0 -> error.
2717          */
2718         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2719              i++, nob -= CFS_PAGE_SIZE) {
2720                 struct lu_dirpage *dp;
2721
2722                 LASSERT(i < rdpg->rp_npages);
2723                 pg = rdpg->rp_pages[i];
2724                 dp = cfs_kmap(pg);
2725 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2726 repeat:
2727 #endif
2728                 rc = mdd_dir_page_build(env, mdd, dp,
2729                                         min_t(int, nob, LU_PAGE_SIZE),
2730                                         iops, it, rdpg->rp_attrs);
2731                 if (rc > 0) {
2732                         /*
2733                          * end of directory.
2734                          */
2735                         dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2736                         nlupgs++;
2737                 } else if (rc < 0) {
2738                         CWARN("build page failed: %d!\n", rc);
2739                 } else {
2740                         nlupgs++;
2741 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2742                         dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2743                         if ((unsigned long)dp & ~CFS_PAGE_MASK)
2744                                 goto repeat;
2745 #endif
2746                 }
2747                 cfs_kunmap(pg);
2748         }
2749         if (rc >= 0) {
2750                 struct lu_dirpage *dp;
2751
2752                 dp = cfs_kmap(rdpg->rp_pages[0]);
2753                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2754                 if (nlupgs == 0) {
2755                         /*
2756                          * No pages were processed, mark this for first page
2757                          * and send back.
2758                          */
2759                         dp->ldp_flags  = cpu_to_le32(LDF_EMPTY);
2760                         nlupgs = 1;
2761                 }
2762                 cfs_kunmap(rdpg->rp_pages[0]);
2763
2764                 rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
2765         }
2766         iops->put(env, it);
2767         iops->fini(env, it);
2768
2769         return rc;
2770 }
2771
2772 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2773                  const struct lu_rdpg *rdpg)
2774 {
2775         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2776         int rc;
2777         ENTRY;
2778
2779         if (mdd_object_exists(mdd_obj) == 0) {
2780                 CERROR("%s: object "DFID" not found: rc = -2\n",
2781                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2782                 return -ENOENT;
2783         }
2784
2785         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2786         rc = mdd_readpage_sanity_check(env, mdd_obj);
2787         if (rc)
2788                 GOTO(out_unlock, rc);
2789
2790         if (mdd_is_dead_obj(mdd_obj)) {
2791                 struct page *pg;
2792                 struct lu_dirpage *dp;
2793
2794                 /*
2795                  * According to POSIX, please do not return any entry to client:
2796                  * even dot and dotdot should not be returned.
2797                  */
2798                 CDEBUG(D_INODE, "readdir from dead object: "DFID"\n",
2799                        PFID(mdd_object_fid(mdd_obj)));
2800
2801                 if (rdpg->rp_count <= 0)
2802                         GOTO(out_unlock, rc = -EFAULT);
2803                 LASSERT(rdpg->rp_pages != NULL);
2804
2805                 pg = rdpg->rp_pages[0];
2806                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2807                 memset(dp, 0 , sizeof(struct lu_dirpage));
2808                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2809                 dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
2810                 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2811                 cfs_kunmap(pg);
2812                 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2813         }
2814
2815         rc = __mdd_readpage(env, mdd_obj, rdpg);
2816
2817         EXIT;
2818 out_unlock:
2819         mdd_read_unlock(env, mdd_obj);
2820         return rc;
2821 }
2822
2823 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2824 {
2825         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2826
2827         if (mdd_object_exists(mdd_obj) == 0) {
2828                 CERROR("%s: object "DFID" not found: rc = -2\n",
2829                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2830                 return -ENOENT;
2831         }
2832         return dt_object_sync(env, mdd_object_child(mdd_obj));
2833 }
2834
2835 const struct md_object_operations mdd_obj_ops = {
2836         .moo_permission    = mdd_permission,
2837         .moo_attr_get      = mdd_attr_get,
2838         .moo_attr_set      = mdd_attr_set,
2839         .moo_xattr_get     = mdd_xattr_get,
2840         .moo_xattr_set     = mdd_xattr_set,
2841         .moo_xattr_list    = mdd_xattr_list,
2842         .moo_xattr_del     = mdd_xattr_del,
2843         .moo_object_create = mdd_object_create,
2844         .moo_ref_add       = mdd_ref_add,
2845         .moo_ref_del       = mdd_ref_del,
2846         .moo_open          = mdd_open,
2847         .moo_close         = mdd_close,
2848         .moo_readpage      = mdd_readpage,
2849         .moo_readlink      = mdd_readlink,
2850         .moo_changelog     = mdd_changelog,
2851         .moo_capa_get      = mdd_capa_get,
2852         .moo_object_sync   = mdd_object_sync,
2853         .moo_path          = mdd_path,
2854         .moo_file_lock     = mdd_file_lock,
2855         .moo_file_unlock   = mdd_file_unlock,
2856 };