Whamcloud - gitweb
LU-1304 mdt: get attributes with explicit calls
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_object.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <linux/module.h>
46 #include <obd.h>
47 #include <obd_class.h>
48 #include <obd_support.h>
49 #include <lprocfs_status.h>
50 /* fid_be_cpu(), fid_cpu_to_be(). */
51 #include <lustre_fid.h>
52 #include <obd_lov.h>
53
54 #include <lustre_param.h>
55 #include <lustre_mds.h>
56 #include <lustre/lustre_idl.h>
57
58 #include "mdd_internal.h"
59
60 static const struct lu_object_operations mdd_lu_obj_ops;
61 extern cfs_mem_cache_t *mdd_object_kmem;
62
63 static int mdd_xattr_get(const struct lu_env *env,
64                          struct md_object *obj, struct lu_buf *buf,
65                          const char *name);
66
67 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
68                  void **data)
69 {
70         if (mdd_object_exists(obj) == 0) {
71                 CERROR("%s: object "DFID" not found: rc = -2\n",
72                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
73                 return -ENOENT;
74         }
75         mdo_data_get(env, obj, data);
76         return 0;
77 }
78
79 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
80                struct lu_attr *la, struct lustre_capa *capa)
81 {
82         if (mdd_object_exists(obj) == 0) {
83                 CERROR("%s: object "DFID" not found: rc = -2\n",
84                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
85                 return -ENOENT;
86         }
87         return mdo_attr_get(env, obj, la, capa);
88 }
89
90 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
91 {
92         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
93
94         if (flags & LUSTRE_APPEND_FL)
95                 obj->mod_flags |= APPEND_OBJ;
96
97         if (flags & LUSTRE_IMMUTABLE_FL)
98                 obj->mod_flags |= IMMUTE_OBJ;
99 }
100
101 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
102 {
103         struct mdd_thread_info *info;
104
105         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
106         LASSERT(info != NULL);
107         return info;
108 }
109
110 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
111 {
112         struct lu_buf *buf;
113
114         buf = &mdd_env_info(env)->mti_buf;
115         buf->lb_buf = area;
116         buf->lb_len = len;
117         return buf;
118 }
119
120 void mdd_buf_put(struct lu_buf *buf)
121 {
122         if (buf == NULL || buf->lb_buf == NULL)
123                 return;
124         OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
125         buf->lb_buf = NULL;
126         buf->lb_len = 0;
127 }
128
129 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
130                                        const void *area, ssize_t len)
131 {
132         struct lu_buf *buf;
133
134         buf = &mdd_env_info(env)->mti_buf;
135         buf->lb_buf = (void *)area;
136         buf->lb_len = len;
137         return buf;
138 }
139
140 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
141 {
142         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
143
144         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
145                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
146                 buf->lb_buf = NULL;
147         }
148         if (buf->lb_buf == NULL) {
149                 buf->lb_len = len;
150                 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
151                 if (buf->lb_buf == NULL)
152                         buf->lb_len = 0;
153         }
154         return buf;
155 }
156
157 /** Increase the size of the \a mti_big_buf.
158  * preserves old data in buffer
159  * old buffer remains unchanged on error
160  * \retval 0 or -ENOMEM
161  */
162 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
163 {
164         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
165         struct lu_buf buf;
166
167         LASSERT(len >= oldbuf->lb_len);
168         OBD_ALLOC_LARGE(buf.lb_buf, len);
169
170         if (buf.lb_buf == NULL)
171                 return -ENOMEM;
172
173         buf.lb_len = len;
174         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
175
176         OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
177
178         memcpy(oldbuf, &buf, sizeof(buf));
179
180         return 0;
181 }
182
183 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
184                                        struct mdd_device *mdd)
185 {
186         struct mdd_thread_info *mti = mdd_env_info(env);
187         int                     max_cookie_size;
188
189         max_cookie_size = mdd_lov_cookiesize(env, mdd);
190         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
191                 if (mti->mti_max_cookie)
192                         OBD_FREE_LARGE(mti->mti_max_cookie,
193                                        mti->mti_max_cookie_size);
194                 mti->mti_max_cookie = NULL;
195                 mti->mti_max_cookie_size = 0;
196         }
197         if (unlikely(mti->mti_max_cookie == NULL)) {
198                 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
199                 if (likely(mti->mti_max_cookie != NULL))
200                         mti->mti_max_cookie_size = max_cookie_size;
201         }
202         if (likely(mti->mti_max_cookie != NULL))
203                 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
204         return mti->mti_max_cookie;
205 }
206
207 struct lov_mds_md *mdd_max_lmm_buffer(const struct lu_env *env, int size)
208 {
209         struct mdd_thread_info *mti = mdd_env_info(env);
210
211         if (unlikely(mti->mti_max_lmm_size < size)) {
212                 int rsize = size_roundup_power2(size);
213
214                 if (mti->mti_max_lmm_size > 0) {
215                         LASSERT(mti->mti_max_lmm);
216                         OBD_FREE_LARGE(mti->mti_max_lmm,
217                                        mti->mti_max_lmm_size);
218                         mti->mti_max_lmm = NULL;
219                         mti->mti_max_lmm_size = 0;
220                 }
221
222                 OBD_ALLOC_LARGE(mti->mti_max_lmm, rsize);
223                 if (likely(mti->mti_max_lmm != NULL))
224                         mti->mti_max_lmm_size = rsize;
225         }
226         return mti->mti_max_lmm;
227 }
228
229 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
230                                    struct mdd_device *mdd)
231 {
232         int max_lmm_size;
233
234         max_lmm_size = mdd_lov_mdsize(env, mdd);
235         return mdd_max_lmm_buffer(env, max_lmm_size);
236 }
237
238 struct lu_object *mdd_object_alloc(const struct lu_env *env,
239                                    const struct lu_object_header *hdr,
240                                    struct lu_device *d)
241 {
242         struct mdd_object *mdd_obj;
243
244         OBD_SLAB_ALLOC_PTR_GFP(mdd_obj, mdd_object_kmem, CFS_ALLOC_IO);
245         if (mdd_obj != NULL) {
246                 struct lu_object *o;
247
248                 o = mdd2lu_obj(mdd_obj);
249                 lu_object_init(o, NULL, d);
250                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
251                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
252                 mdd_obj->mod_count = 0;
253                 o->lo_ops = &mdd_lu_obj_ops;
254                 return o;
255         } else {
256                 return NULL;
257         }
258 }
259
260 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
261                            const struct lu_object_conf *unused)
262 {
263         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
264         struct mdd_object *mdd_obj = lu2mdd_obj(o);
265         struct lu_object  *below;
266         struct lu_device  *under;
267         ENTRY;
268
269         mdd_obj->mod_cltime = 0;
270         under = &d->mdd_child->dd_lu_dev;
271         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
272         mdd_pdlock_init(mdd_obj);
273         if (below == NULL)
274                 RETURN(-ENOMEM);
275
276         lu_object_add(o, below);
277
278         RETURN(0);
279 }
280
281 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
282 {
283         if (lu_object_exists(o))
284                 return mdd_get_flags(env, lu2mdd_obj(o));
285         else
286                 return 0;
287 }
288
289 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
290 {
291         struct mdd_object *mdd = lu2mdd_obj(o);
292
293         lu_object_fini(o);
294         OBD_SLAB_FREE_PTR(mdd, mdd_object_kmem);
295 }
296
297 static int mdd_object_print(const struct lu_env *env, void *cookie,
298                             lu_printer_t p, const struct lu_object *o)
299 {
300         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
301         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
302                     "valid=%x, cltime="LPU64", flags=%lx)",
303                     mdd, mdd->mod_count, mdd->mod_valid,
304                     mdd->mod_cltime, mdd->mod_flags);
305 }
306
307 static const struct lu_object_operations mdd_lu_obj_ops = {
308         .loo_object_init    = mdd_object_init,
309         .loo_object_start   = mdd_object_start,
310         .loo_object_free    = mdd_object_free,
311         .loo_object_print   = mdd_object_print,
312 };
313
314 struct mdd_object *mdd_object_find(const struct lu_env *env,
315                                    struct mdd_device *d,
316                                    const struct lu_fid *f)
317 {
318         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
319 }
320
321 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
322                         const char *path, struct lu_fid *fid)
323 {
324         struct lu_buf *buf;
325         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
326         struct mdd_object *obj;
327         struct lu_name *lname = &mdd_env_info(env)->mti_name;
328         char *name;
329         int rc = 0;
330         ENTRY;
331
332         /* temp buffer for path element */
333         buf = mdd_buf_alloc(env, PATH_MAX);
334         if (buf->lb_buf == NULL)
335                 RETURN(-ENOMEM);
336
337         lname->ln_name = name = buf->lb_buf;
338         lname->ln_namelen = 0;
339         *f = mdd->mdd_root_fid;
340
341         while(1) {
342                 while (*path == '/')
343                         path++;
344                 if (*path == '\0')
345                         break;
346                 while (*path != '/' && *path != '\0') {
347                         *name = *path;
348                         path++;
349                         name++;
350                         lname->ln_namelen++;
351                 }
352
353                 *name = '\0';
354                 /* find obj corresponding to fid */
355                 obj = mdd_object_find(env, mdd, f);
356                 if (obj == NULL)
357                         GOTO(out, rc = -EREMOTE);
358                 if (IS_ERR(obj))
359                         GOTO(out, rc = PTR_ERR(obj));
360                 /* get child fid from parent and name */
361                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
362                 mdd_object_put(env, obj);
363                 if (rc)
364                         break;
365
366                 name = buf->lb_buf;
367                 lname->ln_namelen = 0;
368         }
369
370         if (!rc)
371                 *fid = *f;
372 out:
373         RETURN(rc);
374 }
375
376 /** The maximum depth that fid2path() will search.
377  * This is limited only because we want to store the fids for
378  * historical path lookup purposes.
379  */
380 #define MAX_PATH_DEPTH 100
381
382 /** mdd_path() lookup structure. */
383 struct path_lookup_info {
384         __u64                pli_recno;        /**< history point */
385         __u64                pli_currec;       /**< current record */
386         struct lu_fid        pli_fid;
387         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
388         struct mdd_object   *pli_mdd_obj;
389         char                *pli_path;         /**< full path */
390         int                  pli_pathlen;
391         int                  pli_linkno;       /**< which hardlink to follow */
392         int                  pli_fidcount;     /**< number of \a pli_fids */
393 };
394
395 static int mdd_path_current(const struct lu_env *env,
396                             struct path_lookup_info *pli)
397 {
398         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
399         struct mdd_object *mdd_obj;
400         struct lu_buf     *buf = NULL;
401         struct link_ea_header *leh;
402         struct link_ea_entry  *lee;
403         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
404         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
405         char *ptr;
406         int reclen;
407         int rc;
408         ENTRY;
409
410         ptr = pli->pli_path + pli->pli_pathlen - 1;
411         *ptr = 0;
412         --ptr;
413         pli->pli_fidcount = 0;
414         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
415
416         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
417                 mdd_obj = mdd_object_find(env, mdd,
418                                           &pli->pli_fids[pli->pli_fidcount]);
419                 if (mdd_obj == NULL)
420                         GOTO(out, rc = -EREMOTE);
421                 if (IS_ERR(mdd_obj))
422                         GOTO(out, rc = PTR_ERR(mdd_obj));
423                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
424                 if (rc <= 0) {
425                         mdd_object_put(env, mdd_obj);
426                         if (rc == -1)
427                                 rc = -EREMOTE;
428                         else if (rc == 0)
429                                 /* Do I need to error out here? */
430                                 rc = -ENOENT;
431                         GOTO(out, rc);
432                 }
433
434                 /* Get parent fid and object name */
435                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
436                 buf = mdd_links_get(env, mdd_obj);
437                 mdd_read_unlock(env, mdd_obj);
438                 mdd_object_put(env, mdd_obj);
439                 if (IS_ERR(buf))
440                         GOTO(out, rc = PTR_ERR(buf));
441
442                 leh = buf->lb_buf;
443                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
444                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
445
446                 /* If set, use link #linkno for path lookup, otherwise use
447                    link #0.  Only do this for the final path element. */
448                 if ((pli->pli_fidcount == 0) &&
449                     (pli->pli_linkno < leh->leh_reccount)) {
450                         int count;
451                         for (count = 0; count < pli->pli_linkno; count++) {
452                                 lee = (struct link_ea_entry *)
453                                      ((char *)lee + reclen);
454                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
455                         }
456                         if (pli->pli_linkno < leh->leh_reccount - 1)
457                                 /* indicate to user there are more links */
458                                 pli->pli_linkno++;
459                 }
460
461                 /* Pack the name in the end of the buffer */
462                 ptr -= tmpname->ln_namelen;
463                 if (ptr - 1 <= pli->pli_path)
464                         GOTO(out, rc = -EOVERFLOW);
465                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
466                 *(--ptr) = '/';
467
468                 /* Store the parent fid for historic lookup */
469                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
470                         GOTO(out, rc = -EOVERFLOW);
471                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
472         }
473
474         /* Verify that our path hasn't changed since we started the lookup.
475            Record the current index, and verify the path resolves to the
476            same fid. If it does, then the path is correct as of this index. */
477         cfs_spin_lock(&mdd->mdd_cl.mc_lock);
478         pli->pli_currec = mdd->mdd_cl.mc_index;
479         cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
480         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
481         if (rc) {
482                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
483                 GOTO (out, rc = -EAGAIN);
484         }
485         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
486                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
487                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
488                        PFID(&pli->pli_fid));
489                 GOTO(out, rc = -EAGAIN);
490         }
491         ptr++; /* skip leading / */
492         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
493
494         EXIT;
495 out:
496         if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
497                 /* if we vmalloced a large buffer drop it */
498                 mdd_buf_put(buf);
499
500         return rc;
501 }
502
503 static int mdd_path_historic(const struct lu_env *env,
504                              struct path_lookup_info *pli)
505 {
506         return 0;
507 }
508
509 /* Returns the full path to this fid, as of changelog record recno. */
510 static int mdd_path(const struct lu_env *env, struct md_object *obj,
511                     char *path, int pathlen, __u64 *recno, int *linkno)
512 {
513         struct path_lookup_info *pli;
514         int tries = 3;
515         int rc = -EAGAIN;
516         ENTRY;
517
518         if (pathlen < 3)
519                 RETURN(-EOVERFLOW);
520
521         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
522                 path[0] = '\0';
523                 RETURN(0);
524         }
525
526         OBD_ALLOC_PTR(pli);
527         if (pli == NULL)
528                 RETURN(-ENOMEM);
529
530         pli->pli_mdd_obj = md2mdd_obj(obj);
531         pli->pli_recno = *recno;
532         pli->pli_path = path;
533         pli->pli_pathlen = pathlen;
534         pli->pli_linkno = *linkno;
535
536         /* Retry multiple times in case file is being moved */
537         while (tries-- && rc == -EAGAIN)
538                 rc = mdd_path_current(env, pli);
539
540         /* For historical path lookup, the current links may not have existed
541          * at "recno" time.  We must switch over to earlier links/parents
542          * by using the changelog records.  If the earlier parent doesn't
543          * exist, we must search back through the changelog to reconstruct
544          * its parents, then check if it exists, etc.
545          * We may ignore this problem for the initial implementation and
546          * state that an "original" hardlink must still exist for us to find
547          * historic path name. */
548         if (pli->pli_recno != -1) {
549                 rc = mdd_path_historic(env, pli);
550         } else {
551                 *recno = pli->pli_currec;
552                 /* Return next link index to caller */
553                 *linkno = pli->pli_linkno;
554         }
555
556         OBD_FREE_PTR(pli);
557
558         RETURN (rc);
559 }
560
561 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
562 {
563         struct lu_attr *la = &mdd_env_info(env)->mti_la;
564         int rc;
565
566         ENTRY;
567         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
568         if (rc == 0) {
569                 mdd_flags_xlate(obj, la->la_flags);
570         }
571         RETURN(rc);
572 }
573
574 /* get only inode attributes */
575 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
576                   struct md_attr *ma)
577 {
578         int rc = 0;
579         ENTRY;
580
581         if (ma->ma_valid & MA_INODE)
582                 RETURN(0);
583
584         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
585                           mdd_object_capa(env, mdd_obj));
586         if (rc == 0)
587                 ma->ma_valid |= MA_INODE;
588         RETURN(rc);
589 }
590
591 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
592 {
593         struct lov_desc *ldesc;
594         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
595         struct lov_user_md *lum = (struct lov_user_md*)lmm;
596         ENTRY;
597
598         if (!lum)
599                 RETURN(0);
600
601         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
602         LASSERT(ldesc != NULL);
603
604         lum->lmm_magic = LOV_MAGIC_V1;
605         lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
606         lum->lmm_pattern = ldesc->ld_pattern;
607         lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
608         lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
609         lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
610
611         RETURN(sizeof(*lum));
612 }
613
614 static int is_rootdir(struct mdd_object *mdd_obj)
615 {
616         const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
617         const struct lu_fid *fid = mdo2fid(mdd_obj);
618
619         return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
620 }
621
622 int mdd_big_lmm_get(const struct lu_env *env, struct mdd_object *obj,
623                     struct md_attr *ma)
624 {
625         struct mdd_thread_info *info = mdd_env_info(env);
626         int                     size;
627         int                     rc   = -EINVAL;
628         ENTRY;
629
630         LASSERT(info != NULL);
631         LASSERT(ma->ma_big_lmm_used == 0);
632
633         if (ma->ma_lmm_size == 0) {
634                 CERROR("No buffer to hold %s xattr of object "DFID"\n",
635                        XATTR_NAME_LOV, PFID(mdd_object_fid(obj)));
636                 RETURN(rc);
637         }
638
639         rc = mdo_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
640                            mdd_object_capa(env, obj));
641         if (rc < 0)
642                 RETURN(rc);
643
644         /* big_lmm may need to grow */
645         size = rc;
646         mdd_max_lmm_buffer(env, size);
647         if (info->mti_max_lmm == NULL)
648                 RETURN(-ENOMEM);
649
650         LASSERT(info->mti_max_lmm_size >= size);
651         rc = mdd_get_md(env, obj, info->mti_max_lmm, &size,
652                         XATTR_NAME_LOV);
653         if (rc < 0)
654                 RETURN(rc);
655
656         ma->ma_big_lmm_used = 1;
657         ma->ma_valid |= MA_LOV;
658         ma->ma_lmm = info->mti_max_lmm;
659         ma->ma_lmm_size = size;
660         LASSERT(size == rc);
661         RETURN(rc);
662 }
663
664 /* get lov EA only */
665 static int __mdd_lmm_get(const struct lu_env *env,
666                          struct mdd_object *mdd_obj, struct md_attr *ma)
667 {
668         int rc;
669         ENTRY;
670
671         if (ma->ma_valid & MA_LOV)
672                 RETURN(0);
673
674         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
675                         XATTR_NAME_LOV);
676         if (rc == -ERANGE)
677                 rc = mdd_big_lmm_get(env, mdd_obj, ma);
678         else if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
679                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
680
681         if (rc > 0) {
682                 ma->ma_lmm_size = rc;
683                 ma->ma_layout_gen = ma->ma_lmm->lmm_layout_gen;
684                 ma->ma_valid |= MA_LOV | MA_LAY_GEN;
685                 rc = 0;
686         }
687         RETURN(rc);
688 }
689
690 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
691                        struct md_attr *ma)
692 {
693         int rc;
694         ENTRY;
695
696         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
697         rc = __mdd_lmm_get(env, mdd_obj, ma);
698         mdd_read_unlock(env, mdd_obj);
699         RETURN(rc);
700 }
701
702 /*
703  * No permission check is needed.
704  */
705 int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
706                  struct md_attr *ma)
707 {
708         struct mdd_object *mdd_obj = md2mdd_obj(obj);
709         int                rc;
710
711         ENTRY;
712         rc = mdd_iattr_get(env, mdd_obj, ma);
713         RETURN(rc);
714 }
715
716 /*
717  * No permission check is needed.
718  */
719 static int mdd_xattr_get(const struct lu_env *env,
720                          struct md_object *obj, struct lu_buf *buf,
721                          const char *name)
722 {
723         struct mdd_object *mdd_obj = md2mdd_obj(obj);
724         struct mdd_device *mdd = mdo2mdd(obj);
725         struct lu_fid      rootfid;
726         int is_root;
727         int rc;
728
729         ENTRY;
730
731         if (mdd_object_exists(mdd_obj) == 0) {
732                 CERROR("%s: object "DFID" not found: rc = -2\n",
733                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
734                 return -ENOENT;
735         }
736
737         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
738         rc = mdo_xattr_get(env, mdd_obj, buf, name,
739                            mdd_object_capa(env, mdd_obj));
740         mdd_read_unlock(env, mdd_obj);
741
742         dt_root_get(env, mdd->mdd_child, &rootfid);
743         is_root = lu_fid_eq(mdd_object_fid(mdd_obj), &rootfid);
744
745         /* XXX: a temp. solution till LOD/OSP is landed */
746         if (rc == -ENODATA && strcmp(name, XATTR_NAME_LOV) == 0 && is_root) {
747                 if (buf->lb_buf == NULL) {
748                         rc = sizeof(struct lov_user_md);
749                 } else if (buf->lb_len >= sizeof(struct lov_user_md)) {
750                         rc = mdd_get_default_md(mdd_obj, buf->lb_buf);
751                 } else {
752                         rc = -ERANGE;
753                 }
754         }
755
756         RETURN(rc);
757 }
758
759 /*
760  * Permission check is done when open,
761  * no need check again.
762  */
763 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
764                         struct lu_buf *buf)
765 {
766         struct mdd_object *mdd_obj = md2mdd_obj(obj);
767         struct dt_object  *next;
768         loff_t             pos = 0;
769         int                rc;
770         ENTRY;
771
772         if (mdd_object_exists(mdd_obj) == 0) {
773                 CERROR("%s: object "DFID" not found: rc = -2\n",
774                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
775                 return -ENOENT;
776         }
777
778         next = mdd_object_child(mdd_obj);
779         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
780         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
781                                          mdd_object_capa(env, mdd_obj));
782         mdd_read_unlock(env, mdd_obj);
783         RETURN(rc);
784 }
785
786 /*
787  * No permission check is needed.
788  */
789 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
790                           struct lu_buf *buf)
791 {
792         struct mdd_object *mdd_obj = md2mdd_obj(obj);
793         int rc;
794
795         ENTRY;
796
797         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
798         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
799         mdd_read_unlock(env, mdd_obj);
800
801         RETURN(rc);
802 }
803
804 int mdd_declare_object_create_internal(const struct lu_env *env,
805                                        struct mdd_object *p,
806                                        struct mdd_object *c,
807                                        struct lu_attr *attr,
808                                        struct thandle *handle,
809                                        const struct md_op_spec *spec)
810 {
811         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
812         const struct dt_index_features *feat = spec->sp_feat;
813         int rc;
814         ENTRY;
815
816         if (feat != &dt_directory_features && feat != NULL)
817                 dof->dof_type = DFT_INDEX;
818         else
819                 dof->dof_type = dt_mode_to_dft(attr->la_mode);
820
821         dof->u.dof_idx.di_feat = feat;
822
823         rc = mdo_declare_create_obj(env, c, attr, NULL, dof, handle);
824
825         RETURN(rc);
826 }
827
828 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
829                                struct mdd_object *c, struct lu_attr *attr,
830                                struct thandle *handle,
831                                const struct md_op_spec *spec)
832 {
833         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
834         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
835         const struct dt_index_features *feat = spec->sp_feat;
836         int rc;
837         ENTRY;
838
839         if (!mdd_object_exists(c)) {
840                 struct dt_object *next = mdd_object_child(c);
841                 LASSERT(next);
842
843                 if (feat != &dt_directory_features && feat != NULL)
844                         dof->dof_type = DFT_INDEX;
845                 else
846                         dof->dof_type = dt_mode_to_dft(attr->la_mode);
847
848                 dof->u.dof_idx.di_feat = feat;
849
850                 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
851                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
852         } else
853                 rc = -EEXIST;
854
855         RETURN(rc);
856 }
857
858 /**
859  * Make sure the ctime is increased only.
860  */
861 static inline int mdd_attr_check(const struct lu_env *env,
862                                  struct mdd_object *obj,
863                                  struct lu_attr *attr)
864 {
865         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
866         int rc;
867         ENTRY;
868
869         if (attr->la_valid & LA_CTIME) {
870                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
871                 if (rc)
872                         RETURN(rc);
873
874                 if (attr->la_ctime < tmp_la->la_ctime)
875                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
876                 else if (attr->la_valid == LA_CTIME &&
877                          attr->la_ctime == tmp_la->la_ctime)
878                         attr->la_valid &= ~LA_CTIME;
879         }
880         RETURN(0);
881 }
882
883 int mdd_attr_set_internal(const struct lu_env *env, struct mdd_object *obj,
884                           struct lu_attr *attr, struct thandle *handle,
885                           int needacl)
886 {
887         int rc;
888         ENTRY;
889
890         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
891 #ifdef CONFIG_FS_POSIX_ACL
892         if (!rc && (attr->la_valid & LA_MODE) && needacl)
893                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
894 #endif
895         RETURN(rc);
896 }
897
898 int mdd_attr_check_set_internal(const struct lu_env *env,
899                                 struct mdd_object *obj, struct lu_attr *attr,
900                                 struct thandle *handle, int needacl)
901 {
902         int rc;
903         ENTRY;
904
905         rc = mdd_attr_check(env, obj, attr);
906         if (rc)
907                 RETURN(rc);
908
909         if (attr->la_valid)
910                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
911         RETURN(rc);
912 }
913
914 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
915                                        struct mdd_object *obj,
916                                        struct lu_attr *attr,
917                                        struct thandle *handle,
918                                        int needacl)
919 {
920         int rc;
921         ENTRY;
922
923         needacl = needacl && (attr->la_valid & LA_MODE);
924         if (needacl)
925                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
926         rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
927         if (needacl)
928                 mdd_write_unlock(env, obj);
929         RETURN(rc);
930 }
931
932 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
933                     const struct lu_buf *buf, const char *name,
934                     int fl, struct thandle *handle)
935 {
936         struct lustre_capa *capa = mdd_object_capa(env, obj);
937         int rc = -EINVAL;
938         ENTRY;
939
940         if (buf->lb_buf && buf->lb_len > 0)
941                 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
942         else if (buf->lb_buf == NULL && buf->lb_len == 0)
943                 rc = mdo_xattr_del(env, obj, name, handle, capa);
944
945         RETURN(rc);
946 }
947
948 /*
949  * This gives the same functionality as the code between
950  * sys_chmod and inode_setattr
951  * chown_common and inode_setattr
952  * utimes and inode_setattr
953  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
954  */
955 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
956                         struct lu_attr *la, const unsigned long flags)
957 {
958         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
959         struct md_ucred  *uc;
960         int               rc;
961         ENTRY;
962
963         if (!la->la_valid)
964                 RETURN(0);
965
966         /* Do not permit change file type */
967         if (la->la_valid & LA_TYPE)
968                 RETURN(-EPERM);
969
970         /* They should not be processed by setattr */
971         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
972                 RETURN(-EPERM);
973
974         /* export destroy does not have ->le_ses, but we may want
975          * to drop LUSTRE_SOM_FL. */
976         if (!env->le_ses)
977                 RETURN(0);
978
979         uc = md_ucred(env);
980
981         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
982         if (rc)
983                 RETURN(rc);
984
985         if (la->la_valid == LA_CTIME) {
986                 if (!(flags & MDS_PERM_BYPASS))
987                         /* This is only for set ctime when rename's source is
988                          * on remote MDS. */
989                         rc = mdd_may_delete(env, NULL, obj, tmp_la, NULL, 1, 0);
990                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
991                         la->la_valid &= ~LA_CTIME;
992                 RETURN(rc);
993         }
994
995         if (la->la_valid == LA_ATIME) {
996                 /* This is atime only set for read atime update on close. */
997                 if (la->la_atime >= tmp_la->la_atime &&
998                     la->la_atime < (tmp_la->la_atime +
999                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1000                         la->la_valid &= ~LA_ATIME;
1001                 RETURN(0);
1002         }
1003
1004         /* Check if flags change. */
1005         if (la->la_valid & LA_FLAGS) {
1006                 unsigned int oldflags = 0;
1007                 unsigned int newflags = la->la_flags &
1008                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1009
1010                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1011                     !mdd_capable(uc, CFS_CAP_FOWNER))
1012                         RETURN(-EPERM);
1013
1014                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1015                  * only be changed by the relevant capability. */
1016                 if (mdd_is_immutable(obj))
1017                         oldflags |= LUSTRE_IMMUTABLE_FL;
1018                 if (mdd_is_append(obj))
1019                         oldflags |= LUSTRE_APPEND_FL;
1020                 if ((oldflags ^ newflags) &&
1021                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1022                         RETURN(-EPERM);
1023
1024                 if (!S_ISDIR(tmp_la->la_mode))
1025                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1026         }
1027
1028         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1029             (la->la_valid & ~LA_FLAGS) &&
1030             !(flags & MDS_PERM_BYPASS))
1031                 RETURN(-EPERM);
1032
1033         /* Check for setting the obj time. */
1034         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1035             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1036                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1037                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
1038                         rc = mdd_permission_internal(env, obj, tmp_la,
1039                                                      MAY_WRITE);
1040                         if (rc)
1041                                 RETURN(rc);
1042                 }
1043         }
1044
1045         if (la->la_valid & LA_KILL_SUID) {
1046                 la->la_valid &= ~LA_KILL_SUID;
1047                 if ((tmp_la->la_mode & S_ISUID) &&
1048                     !(la->la_valid & LA_MODE)) {
1049                         la->la_mode = tmp_la->la_mode;
1050                         la->la_valid |= LA_MODE;
1051                 }
1052                 la->la_mode &= ~S_ISUID;
1053         }
1054
1055         if (la->la_valid & LA_KILL_SGID) {
1056                 la->la_valid &= ~LA_KILL_SGID;
1057                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1058                                         (S_ISGID | S_IXGRP)) &&
1059                     !(la->la_valid & LA_MODE)) {
1060                         la->la_mode = tmp_la->la_mode;
1061                         la->la_valid |= LA_MODE;
1062                 }
1063                 la->la_mode &= ~S_ISGID;
1064         }
1065
1066         /* Make sure a caller can chmod. */
1067         if (la->la_valid & LA_MODE) {
1068                 if (!(flags & MDS_PERM_BYPASS) &&
1069                     (uc->mu_fsuid != tmp_la->la_uid) &&
1070                     !mdd_capable(uc, CFS_CAP_FOWNER))
1071                         RETURN(-EPERM);
1072
1073                 if (la->la_mode == (cfs_umode_t) -1)
1074                         la->la_mode = tmp_la->la_mode;
1075                 else
1076                         la->la_mode = (la->la_mode & S_IALLUGO) |
1077                                       (tmp_la->la_mode & ~S_IALLUGO);
1078
1079                 /* Also check the setgid bit! */
1080                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1081                                        la->la_gid : tmp_la->la_gid) &&
1082                     !mdd_capable(uc, CFS_CAP_FSETID))
1083                         la->la_mode &= ~S_ISGID;
1084         } else {
1085                la->la_mode = tmp_la->la_mode;
1086         }
1087
1088         /* Make sure a caller can chown. */
1089         if (la->la_valid & LA_UID) {
1090                 if (la->la_uid == (uid_t) -1)
1091                         la->la_uid = tmp_la->la_uid;
1092                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1093                     (la->la_uid != tmp_la->la_uid)) &&
1094                     !mdd_capable(uc, CFS_CAP_CHOWN))
1095                         RETURN(-EPERM);
1096
1097                 /* If the user or group of a non-directory has been
1098                  * changed by a non-root user, remove the setuid bit.
1099                  * 19981026 David C Niemi <niemi@tux.org>
1100                  *
1101                  * Changed this to apply to all users, including root,
1102                  * to avoid some races. This is the behavior we had in
1103                  * 2.0. The check for non-root was definitely wrong
1104                  * for 2.2 anyway, as it should have been using
1105                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
1106                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1107                     !S_ISDIR(tmp_la->la_mode)) {
1108                         la->la_mode &= ~S_ISUID;
1109                         la->la_valid |= LA_MODE;
1110                 }
1111         }
1112
1113         /* Make sure caller can chgrp. */
1114         if (la->la_valid & LA_GID) {
1115                 if (la->la_gid == (gid_t) -1)
1116                         la->la_gid = tmp_la->la_gid;
1117                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1118                     ((la->la_gid != tmp_la->la_gid) &&
1119                     !lustre_in_group_p(uc, la->la_gid))) &&
1120                     !mdd_capable(uc, CFS_CAP_CHOWN))
1121                         RETURN(-EPERM);
1122
1123                 /* Likewise, if the user or group of a non-directory
1124                  * has been changed by a non-root user, remove the
1125                  * setgid bit UNLESS there is no group execute bit
1126                  * (this would be a file marked for mandatory
1127                  * locking).  19981026 David C Niemi <niemi@tux.org>
1128                  *
1129                  * Removed the fsuid check (see the comment above) --
1130                  * 19990830 SD. */
1131                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1132                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1133                         la->la_mode &= ~S_ISGID;
1134                         la->la_valid |= LA_MODE;
1135                 }
1136         }
1137
1138         /* For both Size-on-MDS case and truncate case,
1139          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1140          * We distinguish them by "flags & MDS_SOM".
1141          * For SOM case, it is true, the MAY_WRITE perm has been checked
1142          * when open, no need check again. For truncate case, it is false,
1143          * the MAY_WRITE perm should be checked here. */
1144         if (flags & MDS_SOM) {
1145                 /* For the "Size-on-MDS" setattr update, merge coming
1146                  * attributes with the set in the inode. BUG 10641 */
1147                 if ((la->la_valid & LA_ATIME) &&
1148                     (la->la_atime <= tmp_la->la_atime))
1149                         la->la_valid &= ~LA_ATIME;
1150
1151                 /* OST attributes do not have a priority over MDS attributes,
1152                  * so drop times if ctime is equal. */
1153                 if ((la->la_valid & LA_CTIME) &&
1154                     (la->la_ctime <= tmp_la->la_ctime))
1155                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1156         } else {
1157                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1158                         if (!((flags & MDS_OPEN_OWNEROVERRIDE) &&
1159                               (uc->mu_fsuid == tmp_la->la_uid)) &&
1160                             !(flags & MDS_PERM_BYPASS)) {
1161                                 rc = mdd_permission_internal(env, obj,
1162                                                              tmp_la, MAY_WRITE);
1163                                 if (rc)
1164                                         RETURN(rc);
1165                         }
1166                 }
1167                 if (la->la_valid & LA_CTIME) {
1168                         /* The pure setattr, it has the priority over what is
1169                          * already set, do not drop it if ctime is equal. */
1170                         if (la->la_ctime < tmp_la->la_ctime)
1171                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1172                                                   LA_CTIME);
1173                 }
1174         }
1175
1176         RETURN(0);
1177 }
1178
1179 /** Store a data change changelog record
1180  * If this fails, we must fail the whole transaction; we don't
1181  * want the change to commit without the log entry.
1182  * \param mdd_obj - mdd_object of change
1183  * \param handle - transacion handle
1184  */
1185 static int mdd_changelog_data_store(const struct lu_env     *env,
1186                                     struct mdd_device       *mdd,
1187                                     enum changelog_rec_type type,
1188                                     int                     flags,
1189                                     struct mdd_object       *mdd_obj,
1190                                     struct thandle          *handle)
1191 {
1192         const struct lu_fid *tfid = mdo2fid(mdd_obj);
1193         struct llog_changelog_rec *rec;
1194         struct thandle *th = NULL;
1195         struct lu_buf *buf;
1196         int reclen;
1197         int rc;
1198
1199         /* Not recording */
1200         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1201                 RETURN(0);
1202         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1203                 RETURN(0);
1204
1205         LASSERT(mdd_obj != NULL);
1206         LASSERT(handle != NULL);
1207
1208         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1209             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1210                 /* Don't need multiple updates in this log */
1211                 /* Don't check under lock - no big deal if we get an extra
1212                    entry */
1213                 RETURN(0);
1214         }
1215
1216         reclen = llog_data_len(sizeof(*rec));
1217         buf = mdd_buf_alloc(env, reclen);
1218         if (buf->lb_buf == NULL)
1219                 RETURN(-ENOMEM);
1220         rec = (struct llog_changelog_rec *)buf->lb_buf;
1221
1222         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1223         rec->cr.cr_type = (__u32)type;
1224         rec->cr.cr_tfid = *tfid;
1225         rec->cr.cr_namelen = 0;
1226         mdd_obj->mod_cltime = cfs_time_current_64();
1227
1228         rc = mdd_changelog_llog_write(mdd, rec, handle ? : th);
1229
1230         if (th)
1231                 mdd_trans_stop(env, mdd, rc, th);
1232
1233         if (rc < 0) {
1234                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1235                        rc, type, PFID(tfid));
1236                 return -EFAULT;
1237         }
1238
1239         return 0;
1240 }
1241
1242 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1243                   int flags, struct md_object *obj)
1244 {
1245         struct thandle *handle;
1246         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1247         struct mdd_device *mdd = mdo2mdd(obj);
1248         int rc;
1249         ENTRY;
1250
1251         handle = mdd_trans_create(env, mdd);
1252         if (IS_ERR(handle))
1253                 return(PTR_ERR(handle));
1254
1255         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1256         if (rc)
1257                 GOTO(stop, rc);
1258
1259         rc = mdd_trans_start(env, mdd, handle);
1260         if (rc)
1261                 GOTO(stop, rc);
1262
1263         rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1264                                       handle);
1265
1266 stop:
1267         mdd_trans_stop(env, mdd, rc, handle);
1268
1269         RETURN(rc);
1270 }
1271
1272 /**
1273  * Should be called with write lock held.
1274  *
1275  * \see mdd_lma_set_locked().
1276  */
1277 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1278                        const struct md_attr *ma, struct thandle *handle)
1279 {
1280         struct mdd_thread_info *info = mdd_env_info(env);
1281         struct lu_buf *buf;
1282         struct lustre_mdt_attrs *lma =
1283                                 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1284         int lmasize = sizeof(struct lustre_mdt_attrs);
1285         int rc = 0;
1286
1287         ENTRY;
1288
1289         /* Either HSM or SOM part is not valid, we need to read it before */
1290         if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1291                 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1292                 if (rc <= 0)
1293                         RETURN(rc);
1294
1295                 lustre_lma_swab(lma);
1296         } else {
1297                 memset(lma, 0, lmasize);
1298         }
1299
1300         /* Copy HSM data */
1301         if (ma->ma_valid & MA_HSM) {
1302                 lma->lma_flags  |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1303                 lma->lma_compat |= LMAC_HSM;
1304         }
1305
1306         /* Copy SOM data */
1307         if (ma->ma_valid & MA_SOM) {
1308                 LASSERT(ma->ma_som != NULL);
1309                 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1310                         lma->lma_compat     &= ~LMAC_SOM;
1311                 } else {
1312                         lma->lma_compat     |= LMAC_SOM;
1313                         lma->lma_ioepoch     = ma->ma_som->msd_ioepoch;
1314                         lma->lma_som_size    = ma->ma_som->msd_size;
1315                         lma->lma_som_blocks  = ma->ma_som->msd_blocks;
1316                         lma->lma_som_mountid = ma->ma_som->msd_mountid;
1317                 }
1318         }
1319
1320         /* Copy FID */
1321         memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1322
1323         lustre_lma_swab(lma);
1324         buf = mdd_buf_get(env, lma, lmasize);
1325         rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1326
1327         RETURN(rc);
1328 }
1329
1330 /**
1331  * Save LMA extended attributes with data from \a ma.
1332  *
1333  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1334  * not, LMA EA will be first read from disk, modified and write back.
1335  *
1336  */
1337 static int mdd_lma_set_locked(const struct lu_env *env,
1338                               struct mdd_object *mdd_obj,
1339                               const struct md_attr *ma, struct thandle *handle)
1340 {
1341         int rc;
1342
1343         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1344         rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1345         mdd_write_unlock(env, mdd_obj);
1346         return rc;
1347 }
1348
1349 /* Precedence for choosing record type when multiple
1350  * attributes change: setattr > mtime > ctime > atime
1351  * (ctime changes when mtime does, plus chmod/chown.
1352  * atime and ctime are independent.) */
1353 static int mdd_attr_set_changelog(const struct lu_env *env,
1354                                   struct md_object *obj, struct thandle *handle,
1355                                   __u64 valid)
1356 {
1357         struct mdd_device *mdd = mdo2mdd(obj);
1358         int bits, type = 0;
1359
1360         bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1361         bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1362         bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1363         bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1364         bits = bits & mdd->mdd_cl.mc_mask;
1365         if (bits == 0)
1366                 return 0;
1367
1368         /* The record type is the lowest non-masked set bit */
1369         while (bits && ((bits & 1) == 0)) {
1370                 bits = bits >> 1;
1371                 type++;
1372         }
1373
1374         /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1375         return mdd_changelog_data_store(env, mdd, type, (int)valid,
1376                                         md2mdd_obj(obj), handle);
1377 }
1378
1379 static int mdd_declare_attr_set(const struct lu_env *env,
1380                                 struct mdd_device *mdd,
1381                                 struct mdd_object *obj,
1382                                 const struct md_attr *ma,
1383                                 struct lov_mds_md *lmm,
1384                                 struct thandle *handle)
1385 {
1386         struct lu_buf  *buf = &mdd_env_info(env)->mti_buf;
1387         struct lu_attr *attr = (struct lu_attr *) &ma->ma_attr;
1388         int             rc, i;
1389
1390         rc = mdo_declare_attr_set(env, obj, attr, handle);
1391         if (rc)
1392                 return rc;
1393
1394         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1395         if (rc)
1396                 return rc;
1397
1398         if (ma->ma_valid & MA_LOV) {
1399                 buf->lb_buf = NULL;
1400                 buf->lb_len = ma->ma_lmm_size;
1401                 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
1402                                            0, handle);
1403                 if (rc)
1404                         return rc;
1405         }
1406
1407         if (ma->ma_valid & (MA_HSM | MA_SOM)) {
1408                 buf->lb_buf = NULL;
1409                 buf->lb_len = sizeof(struct lustre_mdt_attrs);
1410                 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA,
1411                                            0, handle);
1412                 if (rc)
1413                         return rc;
1414         }
1415
1416 #ifdef CONFIG_FS_POSIX_ACL
1417         if (attr->la_valid & LA_MODE) {
1418                 mdd_read_lock(env, obj, MOR_TGT_CHILD);
1419                 rc = mdo_xattr_get(env, obj, &LU_BUF_NULL,
1420                                    XATTR_NAME_ACL_ACCESS, BYPASS_CAPA);
1421                 mdd_read_unlock(env, obj);
1422                 if (rc == -EOPNOTSUPP || rc == -ENODATA)
1423                         rc = 0;
1424                 else if (rc < 0)
1425                         return rc;
1426
1427                 if (rc != 0) {
1428                         struct lu_buf *buf = mdd_buf_get(env, NULL, rc);
1429                         rc = mdo_declare_xattr_set(env, obj, buf,
1430                                                    XATTR_NAME_ACL_ACCESS, 0,
1431                                                    handle);
1432                         if (rc)
1433                                 return rc;
1434                 }
1435         }
1436 #endif
1437
1438         /* basically the log is the same as in unlink case */
1439         if (lmm) {
1440                 __u16 stripe;
1441
1442                 if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V1 &&
1443                                 le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V3) {
1444                         CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n",
1445                                mdd->mdd_obd_dev->obd_name,
1446                                le32_to_cpu(lmm->lmm_magic),
1447                                PFID(lu_object_fid(&obj->mod_obj.mo_lu)));
1448                         return -EINVAL;
1449                 }
1450
1451                 stripe = le16_to_cpu(lmm->lmm_stripe_count);
1452                 if (stripe == LOV_ALL_STRIPES) {
1453                         struct lov_desc *ldesc;
1454
1455                         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
1456                         LASSERT(ldesc != NULL);
1457                         stripe = ldesc->ld_tgt_count;
1458                 }
1459
1460                 for (i = 0; i < stripe; i++) {
1461                         rc = mdd_declare_llog_record(env, mdd,
1462                                         sizeof(struct llog_unlink_rec),
1463                                         handle);
1464                         if (rc)
1465                                 return rc;
1466                 }
1467         }
1468
1469         return rc;
1470 }
1471
1472 /* set attr and LOV EA at once, return updated attr */
1473 int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1474                  const struct md_attr *ma)
1475 {
1476         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1477         struct mdd_device *mdd = mdo2mdd(obj);
1478         struct thandle *handle;
1479         struct lov_mds_md *lmm = NULL;
1480         struct llog_cookie *logcookies = NULL;
1481         int  rc, lmm_size = 0, cookie_size = 0;
1482         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1483         const struct lu_attr *la = &ma->ma_attr;
1484 #ifdef HAVE_QUOTA_SUPPORT
1485         struct obd_device *obd = mdd->mdd_obd_dev;
1486         struct mds_obd *mds = &obd->u.mds;
1487         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1488         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1489         int quota_opc = 0, block_count = 0;
1490         int inode_pending[MAXQUOTAS] = { 0, 0 };
1491         int block_pending[MAXQUOTAS] = { 0, 0 };
1492 #endif
1493         ENTRY;
1494
1495         *la_copy = ma->ma_attr;
1496         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma->ma_attr_flags);
1497         if (rc)
1498                 RETURN(rc);
1499
1500         /* setattr on "close" only change atime, or do nothing */
1501         if (la->la_valid == LA_ATIME && la_copy->la_valid == 0)
1502                 RETURN(0);
1503
1504         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1505             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1506                 lmm_size = mdd_lov_mdsize(env, mdd);
1507                 lmm = mdd_max_lmm_get(env, mdd);
1508                 if (lmm == NULL)
1509                         RETURN(-ENOMEM);
1510
1511                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1512                                 XATTR_NAME_LOV);
1513
1514                 if (rc < 0)
1515                         RETURN(rc);
1516         }
1517
1518         handle = mdd_trans_create(env, mdd);
1519         if (IS_ERR(handle))
1520                 RETURN(PTR_ERR(handle));
1521
1522         rc = mdd_declare_attr_set(env, mdd, mdd_obj, ma,
1523                                   lmm_size > 0 ? lmm : NULL, handle);
1524         if (rc)
1525                 GOTO(stop, rc);
1526
1527         rc = mdd_trans_start(env, mdd, handle);
1528         if (rc)
1529                 GOTO(stop, rc);
1530
1531         /* permission changes may require sync operation */
1532         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1533                 handle->th_sync |= !!mdd->mdd_sync_permission;
1534
1535         if (la->la_valid & (LA_MTIME | LA_CTIME))
1536                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1537                        la->la_mtime, la->la_ctime);
1538
1539 #ifdef HAVE_QUOTA_SUPPORT
1540         if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1541                 struct obd_export *exp = md_quota(env)->mq_exp;
1542                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1543
1544                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1545                 if (!rc) {
1546                         quota_opc = FSFILT_OP_SETATTR;
1547                         mdd_quota_wrapper(la_copy, qnids);
1548                         mdd_quota_wrapper(la_tmp, qoids);
1549                         /* get file quota for new owner */
1550                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1551                                         qnids, inode_pending, 1, NULL, 0,
1552                                         NULL, 0);
1553                         block_count = (la_tmp->la_blocks + 7) >> 3;
1554                         if (block_count) {
1555                                 void *data = NULL;
1556                                 mdd_data_get(env, mdd_obj, &data);
1557                                 /* get block quota for new owner */
1558                                 lquota_chkquota(mds_quota_interface_ref, obd,
1559                                                 exp, qnids, block_pending,
1560                                                 block_count, NULL,
1561                                                 LQUOTA_FLAGS_BLK, data, 1);
1562                         }
1563                 }
1564         }
1565 #endif
1566
1567         if (la_copy->la_valid & LA_FLAGS) {
1568                 rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
1569                 if (rc == 0)
1570                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1571         } else if (la_copy->la_valid) {            /* setattr */
1572                 rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
1573                 /* journal chown/chgrp in llog, just like unlink */
1574                 if (rc == 0 && lmm_size){
1575                         cookie_size = mdd_lov_cookiesize(env, mdd);
1576                         logcookies = mdd_max_cookie_get(env, mdd);
1577                         if (logcookies == NULL)
1578                                 GOTO(cleanup, rc = -ENOMEM);
1579
1580                         if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1581                                             logcookies, cookie_size) <= 0)
1582                                 logcookies = NULL;
1583                 }
1584         }
1585
1586         if (rc == 0 && ma->ma_valid & MA_LOV) {
1587                 cfs_umode_t mode;
1588
1589                 mode = mdd_object_type(mdd_obj);
1590                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1591                         rc = mdd_lsm_sanity_check(env, mdd_obj);
1592                         if (rc)
1593                                 GOTO(cleanup, rc);
1594
1595                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1596                                             ma->ma_lmm_size, handle, 1);
1597                 }
1598
1599         }
1600         if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1601                 cfs_umode_t mode;
1602
1603                 mode = mdd_object_type(mdd_obj);
1604                 if (S_ISREG(mode))
1605                         rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1606
1607         }
1608 cleanup:
1609         if (rc == 0)
1610                 rc = mdd_attr_set_changelog(env, obj, handle,
1611                                             ma->ma_attr.la_valid);
1612 stop:
1613         mdd_trans_stop(env, mdd, rc, handle);
1614         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1615                 /*set obd attr, if needed*/
1616                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1617                                            logcookies);
1618         }
1619 #ifdef HAVE_QUOTA_SUPPORT
1620         if (quota_opc) {
1621                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1622                                       inode_pending, 0);
1623                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1624                                       block_pending, 1);
1625                 /* Trigger dqrel/dqacq for original owner and new owner.
1626                  * If failed, the next call for lquota_chkquota will
1627                  * process it. */
1628                 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1629                               quota_opc);
1630         }
1631 #endif
1632         RETURN(rc);
1633 }
1634
1635 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1636                       const struct lu_buf *buf, const char *name, int fl,
1637                       struct thandle *handle)
1638 {
1639         int  rc;
1640         ENTRY;
1641
1642         mdd_write_lock(env, obj, MOR_TGT_CHILD);
1643         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1644         mdd_write_unlock(env, obj);
1645
1646         RETURN(rc);
1647 }
1648
1649 static int mdd_xattr_sanity_check(const struct lu_env *env,
1650                                   struct mdd_object *obj)
1651 {
1652         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1653         struct md_ucred *uc     = md_ucred(env);
1654         int rc;
1655         ENTRY;
1656
1657         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1658                 RETURN(-EPERM);
1659
1660         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1661         if (rc)
1662                 RETURN(rc);
1663
1664         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1665             !mdd_capable(uc, CFS_CAP_FOWNER))
1666                 RETURN(-EPERM);
1667
1668         RETURN(rc);
1669 }
1670
1671 static int mdd_declare_xattr_set(const struct lu_env *env,
1672                                  struct mdd_device *mdd,
1673                                  struct mdd_object *obj,
1674                                  const struct lu_buf *buf,
1675                                  const char *name,
1676                                  struct thandle *handle)
1677 {
1678         int rc;
1679
1680         rc = mdo_declare_xattr_set(env, obj, buf, name, 0, handle);
1681         if (rc)
1682                 return rc;
1683
1684         /* Only record user xattr changes */
1685         if ((strncmp("user.", name, 5) == 0))
1686                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1687
1688         return rc;
1689 }
1690
1691 /**
1692  * The caller should guarantee to update the object ctime
1693  * after xattr_set if needed.
1694  */
1695 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1696                          const struct lu_buf *buf, const char *name,
1697                          int fl)
1698 {
1699         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1700         struct mdd_device *mdd = mdo2mdd(obj);
1701         struct thandle *handle;
1702         int  rc;
1703         ENTRY;
1704
1705         if (!strcmp(name, XATTR_NAME_ACL_ACCESS)) {
1706                 rc = mdd_acl_set(env, mdd_obj, buf, fl);
1707                 RETURN(rc);
1708         }
1709
1710         rc = mdd_xattr_sanity_check(env, mdd_obj);
1711         if (rc)
1712                 RETURN(rc);
1713
1714         handle = mdd_trans_create(env, mdd);
1715         if (IS_ERR(handle))
1716                 RETURN(PTR_ERR(handle));
1717
1718         rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, handle);
1719         if (rc)
1720                 GOTO(stop, rc);
1721
1722         rc = mdd_trans_start(env, mdd, handle);
1723         if (rc)
1724                 GOTO(stop, rc);
1725
1726         /* security-replated changes may require sync */
1727         if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1728                 handle->th_sync |= !!mdd->mdd_sync_permission;
1729
1730         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1731         rc = mdo_xattr_set(env, mdd_obj, buf, name, fl, handle,
1732                            mdd_object_capa(env, mdd_obj));
1733         mdd_write_unlock(env, mdd_obj);
1734         if (rc)
1735                 GOTO(stop, rc);
1736
1737         /* Only record system & user xattr changes */
1738         if (strncmp(XATTR_USER_PREFIX, name,
1739                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1740                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1741                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1742                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1743                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0)
1744                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1745                                               handle);
1746
1747 stop:
1748         mdd_trans_stop(env, mdd, rc, handle);
1749
1750         RETURN(rc);
1751 }
1752
1753 static int mdd_declare_xattr_del(const struct lu_env *env,
1754                                  struct mdd_device *mdd,
1755                                  struct mdd_object *obj,
1756                                  const char *name,
1757                                  struct thandle *handle)
1758 {
1759         int rc;
1760
1761         rc = mdo_declare_xattr_del(env, obj, name, handle);
1762         if (rc)
1763                 return rc;
1764
1765         /* Only record user xattr changes */
1766         if ((strncmp("user.", name, 5) == 0))
1767                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1768
1769         return rc;
1770 }
1771
1772 /**
1773  * The caller should guarantee to update the object ctime
1774  * after xattr_set if needed.
1775  */
1776 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1777                   const char *name)
1778 {
1779         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1780         struct mdd_device *mdd = mdo2mdd(obj);
1781         struct thandle *handle;
1782         int  rc;
1783         ENTRY;
1784
1785         rc = mdd_xattr_sanity_check(env, mdd_obj);
1786         if (rc)
1787                 RETURN(rc);
1788
1789         handle = mdd_trans_create(env, mdd);
1790         if (IS_ERR(handle))
1791                 RETURN(PTR_ERR(handle));
1792
1793         rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
1794         if (rc)
1795                 GOTO(stop, rc);
1796
1797         rc = mdd_trans_start(env, mdd, handle);
1798         if (rc)
1799                 GOTO(stop, rc);
1800
1801         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1802         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1803                            mdd_object_capa(env, mdd_obj));
1804         mdd_write_unlock(env, mdd_obj);
1805         if (rc)
1806                 GOTO(stop, rc);
1807
1808         /* Only record system & user xattr changes */
1809         if (strncmp(XATTR_USER_PREFIX, name,
1810                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1811                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1812                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1813                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1814                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0)
1815                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1816                                               handle);
1817
1818 stop:
1819         mdd_trans_stop(env, mdd, rc, handle);
1820
1821         RETURN(rc);
1822 }
1823
1824 void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent,
1825                 struct mdd_object *child, struct lu_attr *attr)
1826 {
1827         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
1828         struct dt_object *np = parent ? mdd_object_child(parent) : NULL;
1829         struct dt_object *nc = mdd_object_child(child);
1830
1831         /* @hint will be initialized by underlying device. */
1832         nc->do_ops->do_ah_init(env, hint, np, nc, attr->la_mode & S_IFMT);
1833 }
1834
1835 /*
1836  * do NOT or the MAY_*'s, you'll get the weakest
1837  */
1838 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1839 {
1840         int res = 0;
1841
1842         /* Sadly, NFSD reopens a file repeatedly during operation, so the
1843          * "acc_mode = 0" allowance for newly-created files isn't honoured.
1844          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1845          * owner can write to a file even if it is marked readonly to hide
1846          * its brokenness. (bug 5781) */
1847         if (flags & MDS_OPEN_OWNEROVERRIDE) {
1848                 struct md_ucred *uc = md_ucred(env);
1849
1850                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1851                     (la->la_uid == uc->mu_fsuid))
1852                         return 0;
1853         }
1854
1855         if (flags & FMODE_READ)
1856                 res |= MAY_READ;
1857         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1858                 res |= MAY_WRITE;
1859         if (flags & MDS_FMODE_EXEC)
1860                 res = MAY_EXEC;
1861         return res;
1862 }
1863
1864 static int mdd_open_sanity_check(const struct lu_env *env,
1865                                  struct mdd_object *obj, int flag)
1866 {
1867         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1868         int mode, rc;
1869         ENTRY;
1870
1871         /* EEXIST check */
1872         if (mdd_is_dead_obj(obj))
1873                 RETURN(-ENOENT);
1874
1875         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1876         if (rc)
1877                RETURN(rc);
1878
1879         if (S_ISLNK(tmp_la->la_mode))
1880                 RETURN(-ELOOP);
1881
1882         mode = accmode(env, tmp_la, flag);
1883
1884         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1885                 RETURN(-EISDIR);
1886
1887         if (!(flag & MDS_OPEN_CREATED)) {
1888                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1889                 if (rc)
1890                         RETURN(rc);
1891         }
1892
1893         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1894             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1895                 flag &= ~MDS_OPEN_TRUNC;
1896
1897         /* For writing append-only file must open it with append mode. */
1898         if (mdd_is_append(obj)) {
1899                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
1900                         RETURN(-EPERM);
1901                 if (flag & MDS_OPEN_TRUNC)
1902                         RETURN(-EPERM);
1903         }
1904
1905 #if 0
1906         /*
1907          * Now, flag -- O_NOATIME does not be packed by client.
1908          */
1909         if (flag & O_NOATIME) {
1910                 struct md_ucred *uc = md_ucred(env);
1911
1912                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
1913                     (uc->mu_valid == UCRED_NEW)) &&
1914                     (uc->mu_fsuid != tmp_la->la_uid) &&
1915                     !mdd_capable(uc, CFS_CAP_FOWNER))
1916                         RETURN(-EPERM);
1917         }
1918 #endif
1919
1920         RETURN(0);
1921 }
1922
1923 static int mdd_open(const struct lu_env *env, struct md_object *obj,
1924                     int flags)
1925 {
1926         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1927         int rc = 0;
1928
1929         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1930
1931         rc = mdd_open_sanity_check(env, mdd_obj, flags);
1932         if (rc == 0)
1933                 mdd_obj->mod_count++;
1934
1935         mdd_write_unlock(env, mdd_obj);
1936         return rc;
1937 }
1938
1939 int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
1940                             struct md_attr *ma, struct thandle *handle)
1941 {
1942         int rc;
1943
1944         rc = mdd_declare_unlink_log(env, obj, ma, handle);
1945         if (rc)
1946                 return rc;
1947
1948         return mdo_declare_destroy(env, obj, handle);
1949 }
1950
1951 /* return md_attr back,
1952  * if it is last unlink then return lov ea + llog cookie*/
1953 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
1954                     struct md_attr *ma, struct thandle *handle)
1955 {
1956         int rc = 0;
1957         ENTRY;
1958
1959         if (S_ISREG(mdd_object_type(obj))) {
1960                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
1961                  * Caller must be ready for that. */
1962                 rc = __mdd_lmm_get(env, obj, ma);
1963                 if ((ma->ma_valid & MA_LOV))
1964                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
1965                                             obj, ma);
1966         }
1967
1968         if (rc == 0)
1969                 rc = mdo_destroy(env, obj, handle);
1970
1971         RETURN(rc);
1972 }
1973
1974 static int mdd_declare_close(const struct lu_env *env,
1975                              struct mdd_object *obj,
1976                              struct md_attr *ma,
1977                              struct thandle *handle)
1978 {
1979         int rc;
1980
1981         rc = orph_declare_index_delete(env, obj, handle);
1982         if (rc)
1983                 return rc;
1984
1985         return mdd_declare_object_kill(env, obj, ma, handle);
1986 }
1987
1988 /*
1989  * No permission check is needed.
1990  */
1991 static int mdd_close(const struct lu_env *env, struct md_object *obj,
1992                      struct md_attr *ma, int mode)
1993 {
1994         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1995         struct mdd_device *mdd = mdo2mdd(obj);
1996         struct thandle    *handle = NULL;
1997         int rc;
1998         int is_orphan = 0, reset = 1;
1999
2000 #ifdef HAVE_QUOTA_SUPPORT
2001         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2002         struct mds_obd *mds = &obd->u.mds;
2003         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2004         int quota_opc = 0;
2005 #endif
2006         ENTRY;
2007
2008         if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2009                 mdd_obj->mod_count--;
2010
2011                 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2012                         CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2013                                "list\n", PFID(mdd_object_fid(mdd_obj)));
2014                 RETURN(0);
2015         }
2016
2017         /* check without any lock */
2018         if (mdd_obj->mod_count == 1 &&
2019             (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2020  again:
2021                 handle = mdd_trans_create(env, mdo2mdd(obj));
2022                 if (IS_ERR(handle))
2023                         RETURN(PTR_ERR(handle));
2024
2025                 rc = mdd_declare_close(env, mdd_obj, ma, handle);
2026                 if (rc)
2027                         GOTO(stop, rc);
2028
2029                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
2030                 if (rc)
2031                         GOTO(stop, rc);
2032
2033                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2034                 if (rc)
2035                         GOTO(stop, rc);
2036         }
2037
2038         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2039         if (handle == NULL && mdd_obj->mod_count == 1 &&
2040             (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2041                 mdd_write_unlock(env, mdd_obj);
2042                 goto again;
2043         }
2044
2045         /* release open count */
2046         mdd_obj->mod_count --;
2047
2048         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2049                 /* remove link to object from orphan index */
2050                 LASSERT(handle != NULL);
2051                 rc = __mdd_orphan_del(env, mdd_obj, handle);
2052                 if (rc == 0) {
2053                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2054                                "list, OSS objects to be destroyed.\n",
2055                                PFID(mdd_object_fid(mdd_obj)));
2056                         is_orphan = 1;
2057                 } else {
2058                         CERROR("Object "DFID" can not be deleted from orphan "
2059                                 "list, maybe cause OST objects can not be "
2060                                 "destroyed (err: %d).\n",
2061                                 PFID(mdd_object_fid(mdd_obj)), rc);
2062                         /* If object was not deleted from orphan list, do not
2063                          * destroy OSS objects, which will be done when next
2064                          * recovery. */
2065                         GOTO(out, rc);
2066                 }
2067         }
2068
2069         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
2070                         mdd_object_capa(env, mdd_obj));
2071         /* Object maybe not in orphan list originally, it is rare case for
2072          * mdd_finish_unlink() failure. */
2073         if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_orphan)) {
2074 #ifdef HAVE_QUOTA_SUPPORT
2075                 if (mds->mds_quota) {
2076                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2077                         mdd_quota_wrapper(&ma->ma_attr, qids);
2078                 }
2079 #endif
2080                 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2081                 if (ma->ma_valid & MA_FLAGS &&
2082                     ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2083                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2084                 } else {
2085                         if (handle == NULL) {
2086                                 handle = mdd_trans_create(env, mdo2mdd(obj));
2087                                 if (IS_ERR(handle))
2088                                         GOTO(out, rc = PTR_ERR(handle));
2089
2090                                 rc = mdd_declare_object_kill(env, mdd_obj, ma,
2091                                                              handle);
2092                                 if (rc)
2093                                         GOTO(out, rc);
2094
2095                                 rc = mdd_declare_changelog_store(env, mdd,
2096                                                                  NULL, handle);
2097                                 if (rc)
2098                                         GOTO(stop, rc);
2099
2100                                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2101                                 if (rc)
2102                                         GOTO(out, rc);
2103                         }
2104
2105                         rc = mdd_object_kill(env, mdd_obj, ma, handle);
2106                         if (rc == 0)
2107                                 reset = 0;
2108                 }
2109
2110                 if (rc != 0)
2111                         CERROR("Error when prepare to delete Object "DFID" , "
2112                                "which will cause OST objects can not be "
2113                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
2114         }
2115         EXIT;
2116
2117 out:
2118         if (reset)
2119                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2120
2121         mdd_write_unlock(env, mdd_obj);
2122
2123         if (rc == 0 &&
2124             (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
2125             !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
2126                 if (handle == NULL) {
2127                         handle = mdd_trans_create(env, mdo2mdd(obj));
2128                         if (IS_ERR(handle))
2129                                 GOTO(stop, rc = IS_ERR(handle));
2130
2131                         rc = mdd_declare_changelog_store(env, mdd, NULL,
2132                                                          handle);
2133                         if (rc)
2134                                 GOTO(stop, rc);
2135
2136                         rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2137                         if (rc)
2138                                 GOTO(stop, rc);
2139                 }
2140
2141                 mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
2142                                          mdd_obj, handle);
2143         }
2144
2145 stop:
2146         if (handle != NULL)
2147                 mdd_trans_stop(env, mdd, rc, handle);
2148 #ifdef HAVE_QUOTA_SUPPORT
2149         if (quota_opc)
2150                 /* Trigger dqrel on the owner of child. If failed,
2151                  * the next call for lquota_chkquota will process it */
2152                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2153                               quota_opc);
2154 #endif
2155         return rc;
2156 }
2157
2158 /*
2159  * Permission check is done when open,
2160  * no need check again.
2161  */
2162 static int mdd_readpage_sanity_check(const struct lu_env *env,
2163                                      struct mdd_object *obj)
2164 {
2165         struct dt_object *next = mdd_object_child(obj);
2166         int rc;
2167         ENTRY;
2168
2169         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2170                 rc = 0;
2171         else
2172                 rc = -ENOTDIR;
2173
2174         RETURN(rc);
2175 }
2176
2177 static int mdd_dir_page_build(const struct lu_env *env, union lu_page *lp,
2178                               int nob, const struct dt_it_ops *iops,
2179                               struct dt_it *it, __u32 attr, void *arg)
2180 {
2181         struct lu_dirpage       *dp = &lp->lp_dir;
2182         void                    *area = dp;
2183         int                      result;
2184         __u64                    hash = 0;
2185         struct lu_dirent        *ent;
2186         struct lu_dirent        *last = NULL;
2187         int                      first = 1;
2188
2189         memset(area, 0, sizeof (*dp));
2190         area += sizeof (*dp);
2191         nob  -= sizeof (*dp);
2192
2193         ent  = area;
2194         do {
2195                 int    len;
2196                 int    recsize;
2197
2198                 len = iops->key_size(env, it);
2199
2200                 /* IAM iterator can return record with zero len. */
2201                 if (len == 0)
2202                         goto next;
2203
2204                 hash = iops->store(env, it);
2205                 if (unlikely(first)) {
2206                         first = 0;
2207                         dp->ldp_hash_start = cpu_to_le64(hash);
2208                 }
2209
2210                 /* calculate max space required for lu_dirent */
2211                 recsize = lu_dirent_calc_size(len, attr);
2212
2213                 if (nob >= recsize) {
2214                         result = iops->rec(env, it, (struct dt_rec *)ent, attr);
2215                         if (result == -ESTALE)
2216                                 goto next;
2217                         if (result != 0)
2218                                 goto out;
2219
2220                         /* osd might not able to pack all attributes,
2221                          * so recheck rec length */
2222                         recsize = le16_to_cpu(ent->lde_reclen);
2223                 } else {
2224                         result = (last != NULL) ? 0 :-EINVAL;
2225                         goto out;
2226                 }
2227                 last = ent;
2228                 ent = (void *)ent + recsize;
2229                 nob -= recsize;
2230
2231 next:
2232                 result = iops->next(env, it);
2233                 if (result == -ESTALE)
2234                         goto next;
2235         } while (result == 0);
2236
2237 out:
2238         dp->ldp_hash_end = cpu_to_le64(hash);
2239         if (last != NULL) {
2240                 if (last->lde_hash == dp->ldp_hash_end)
2241                         dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2242                 last->lde_reclen = 0; /* end mark */
2243         }
2244         if (result > 0)
2245                 /* end of directory */
2246                 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2247         if (result < 0)
2248                 CWARN("build page failed: %d!\n", result);
2249         return result;
2250 }
2251
2252 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2253                  const struct lu_rdpg *rdpg)
2254 {
2255         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2256         int rc;
2257         ENTRY;
2258
2259         if (mdd_object_exists(mdd_obj) == 0) {
2260                 CERROR("%s: object "DFID" not found: rc = -2\n",
2261                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2262                 return -ENOENT;
2263         }
2264
2265         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2266         rc = mdd_readpage_sanity_check(env, mdd_obj);
2267         if (rc)
2268                 GOTO(out_unlock, rc);
2269
2270         if (mdd_is_dead_obj(mdd_obj)) {
2271                 struct page *pg;
2272                 struct lu_dirpage *dp;
2273
2274                 /*
2275                  * According to POSIX, please do not return any entry to client:
2276                  * even dot and dotdot should not be returned.
2277                  */
2278                 CDEBUG(D_INODE, "readdir from dead object: "DFID"\n",
2279                        PFID(mdd_object_fid(mdd_obj)));
2280
2281                 if (rdpg->rp_count <= 0)
2282                         GOTO(out_unlock, rc = -EFAULT);
2283                 LASSERT(rdpg->rp_pages != NULL);
2284
2285                 pg = rdpg->rp_pages[0];
2286                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2287                 memset(dp, 0 , sizeof(struct lu_dirpage));
2288                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2289                 dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
2290                 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2291                 cfs_kunmap(pg);
2292                 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2293         }
2294
2295         rc = dt_index_walk(env, mdd_object_child(mdd_obj), rdpg,
2296                            mdd_dir_page_build, NULL);
2297         if (rc >= 0) {
2298                 struct lu_dirpage       *dp;
2299
2300                 dp = cfs_kmap(rdpg->rp_pages[0]);
2301                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2302                 if (rc == 0) {
2303                         /*
2304                          * No pages were processed, mark this for first page
2305                          * and send back.
2306                          */
2307                         dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2308                         rc = min_t(unsigned int, LU_PAGE_SIZE, rdpg->rp_count);
2309                 }
2310                 cfs_kunmap(rdpg->rp_pages[0]);
2311         }
2312
2313         GOTO(out_unlock, rc);
2314 out_unlock:
2315         mdd_read_unlock(env, mdd_obj);
2316         return rc;
2317 }
2318
2319 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2320 {
2321         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2322
2323         if (mdd_object_exists(mdd_obj) == 0) {
2324                 CERROR("%s: object "DFID" not found: rc = -2\n",
2325                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2326                 return -ENOENT;
2327         }
2328         return dt_object_sync(env, mdd_object_child(mdd_obj));
2329 }
2330
2331 const struct md_object_operations mdd_obj_ops = {
2332         .moo_permission    = mdd_permission,
2333         .moo_attr_get      = mdd_attr_get,
2334         .moo_attr_set      = mdd_attr_set,
2335         .moo_xattr_get     = mdd_xattr_get,
2336         .moo_xattr_set     = mdd_xattr_set,
2337         .moo_xattr_list    = mdd_xattr_list,
2338         .moo_xattr_del     = mdd_xattr_del,
2339         .moo_open          = mdd_open,
2340         .moo_close         = mdd_close,
2341         .moo_readpage      = mdd_readpage,
2342         .moo_readlink      = mdd_readlink,
2343         .moo_changelog     = mdd_changelog,
2344         .moo_capa_get      = mdd_capa_get,
2345         .moo_object_sync   = mdd_object_sync,
2346         .moo_path          = mdd_path,
2347 };