Whamcloud - gitweb
LU-998 acl: declare acl operation for setattr
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * Copyright (c) 2011 Whamcloud, Inc.
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  *
39  * lustre/mdd/mdd_object.c
40  *
41  * Lustre Metadata Server (mdd) routines
42  *
43  * Author: Wang Di <wangdi@clusterfs.com>
44  */
45
46 #ifndef EXPORT_SYMTAB
47 # define EXPORT_SYMTAB
48 #endif
49 #define DEBUG_SUBSYSTEM S_MDS
50
51 #include <linux/module.h>
52 #include <obd.h>
53 #include <obd_class.h>
54 #include <obd_support.h>
55 #include <lprocfs_status.h>
56 /* fid_be_cpu(), fid_cpu_to_be(). */
57 #include <lustre_fid.h>
58 #include <obd_lov.h>
59
60 #include <lustre_param.h>
61 #include <lustre_mds.h>
62 #include <lustre/lustre_idl.h>
63
64 #include "mdd_internal.h"
65
66 static const struct lu_object_operations mdd_lu_obj_ops;
67
68 static int mdd_xattr_get(const struct lu_env *env,
69                          struct md_object *obj, struct lu_buf *buf,
70                          const char *name);
71
72 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
73                  void **data)
74 {
75         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
76                  PFID(mdd_object_fid(obj)));
77         mdo_data_get(env, obj, data);
78         return 0;
79 }
80
81 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
82                struct lu_attr *la, struct lustre_capa *capa)
83 {
84         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
85                  PFID(mdd_object_fid(obj)));
86         return mdo_attr_get(env, obj, la, capa);
87 }
88
89 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
90 {
91         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
92
93         if (flags & LUSTRE_APPEND_FL)
94                 obj->mod_flags |= APPEND_OBJ;
95
96         if (flags & LUSTRE_IMMUTABLE_FL)
97                 obj->mod_flags |= IMMUTE_OBJ;
98 }
99
100 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
101 {
102         struct mdd_thread_info *info;
103
104         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
105         LASSERT(info != NULL);
106         return info;
107 }
108
109 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
110 {
111         struct lu_buf *buf;
112
113         buf = &mdd_env_info(env)->mti_buf;
114         buf->lb_buf = area;
115         buf->lb_len = len;
116         return buf;
117 }
118
119 void mdd_buf_put(struct lu_buf *buf)
120 {
121         if (buf == NULL || buf->lb_buf == NULL)
122                 return;
123         OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
124         buf->lb_buf = NULL;
125         buf->lb_len = 0;
126 }
127
128 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
129                                        const void *area, ssize_t len)
130 {
131         struct lu_buf *buf;
132
133         buf = &mdd_env_info(env)->mti_buf;
134         buf->lb_buf = (void *)area;
135         buf->lb_len = len;
136         return buf;
137 }
138
139 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
140 {
141         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
142
143         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
144                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
145                 buf->lb_buf = NULL;
146         }
147         if (buf->lb_buf == NULL) {
148                 buf->lb_len = len;
149                 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
150                 if (buf->lb_buf == NULL)
151                         buf->lb_len = 0;
152         }
153         return buf;
154 }
155
156 /** Increase the size of the \a mti_big_buf.
157  * preserves old data in buffer
158  * old buffer remains unchanged on error
159  * \retval 0 or -ENOMEM
160  */
161 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
162 {
163         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
164         struct lu_buf buf;
165
166         LASSERT(len >= oldbuf->lb_len);
167         OBD_ALLOC_LARGE(buf.lb_buf, len);
168
169         if (buf.lb_buf == NULL)
170                 return -ENOMEM;
171
172         buf.lb_len = len;
173         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
174
175         OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
176
177         memcpy(oldbuf, &buf, sizeof(buf));
178
179         return 0;
180 }
181
182 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
183                                        struct mdd_device *mdd)
184 {
185         struct mdd_thread_info *mti = mdd_env_info(env);
186         int                     max_cookie_size;
187
188         max_cookie_size = mdd_lov_cookiesize(env, mdd);
189         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
190                 if (mti->mti_max_cookie)
191                         OBD_FREE_LARGE(mti->mti_max_cookie,
192                                        mti->mti_max_cookie_size);
193                 mti->mti_max_cookie = NULL;
194                 mti->mti_max_cookie_size = 0;
195         }
196         if (unlikely(mti->mti_max_cookie == NULL)) {
197                 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
198                 if (likely(mti->mti_max_cookie != NULL))
199                         mti->mti_max_cookie_size = max_cookie_size;
200         }
201         if (likely(mti->mti_max_cookie != NULL))
202                 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
203         return mti->mti_max_cookie;
204 }
205
206 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
207                                    struct mdd_device *mdd)
208 {
209         struct mdd_thread_info *mti = mdd_env_info(env);
210         int                     max_lmm_size;
211
212         max_lmm_size = mdd_lov_mdsize(env, mdd);
213         if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
214                 if (mti->mti_max_lmm)
215                         OBD_FREE_LARGE(mti->mti_max_lmm, mti->mti_max_lmm_size);
216                 mti->mti_max_lmm = NULL;
217                 mti->mti_max_lmm_size = 0;
218         }
219         if (unlikely(mti->mti_max_lmm == NULL)) {
220                 OBD_ALLOC_LARGE(mti->mti_max_lmm, max_lmm_size);
221                 if (likely(mti->mti_max_lmm != NULL))
222                         mti->mti_max_lmm_size = max_lmm_size;
223         }
224         return mti->mti_max_lmm;
225 }
226
227 struct lu_object *mdd_object_alloc(const struct lu_env *env,
228                                    const struct lu_object_header *hdr,
229                                    struct lu_device *d)
230 {
231         struct mdd_object *mdd_obj;
232
233         OBD_ALLOC_PTR(mdd_obj);
234         if (mdd_obj != NULL) {
235                 struct lu_object *o;
236
237                 o = mdd2lu_obj(mdd_obj);
238                 lu_object_init(o, NULL, d);
239                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
240                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
241                 mdd_obj->mod_count = 0;
242                 o->lo_ops = &mdd_lu_obj_ops;
243                 return o;
244         } else {
245                 return NULL;
246         }
247 }
248
249 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
250                            const struct lu_object_conf *unused)
251 {
252         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
253         struct mdd_object *mdd_obj = lu2mdd_obj(o);
254         struct lu_object  *below;
255         struct lu_device  *under;
256         ENTRY;
257
258         mdd_obj->mod_cltime = 0;
259         under = &d->mdd_child->dd_lu_dev;
260         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
261         mdd_pdlock_init(mdd_obj);
262         if (below == NULL)
263                 RETURN(-ENOMEM);
264
265         lu_object_add(o, below);
266
267         RETURN(0);
268 }
269
270 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
271 {
272         if (lu_object_exists(o))
273                 return mdd_get_flags(env, lu2mdd_obj(o));
274         else
275                 return 0;
276 }
277
278 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
279 {
280         struct mdd_object *mdd = lu2mdd_obj(o);
281
282         lu_object_fini(o);
283         OBD_FREE_PTR(mdd);
284 }
285
286 static int mdd_object_print(const struct lu_env *env, void *cookie,
287                             lu_printer_t p, const struct lu_object *o)
288 {
289         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
290         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
291                     "valid=%x, cltime="LPU64", flags=%lx)",
292                     mdd, mdd->mod_count, mdd->mod_valid,
293                     mdd->mod_cltime, mdd->mod_flags);
294 }
295
296 static const struct lu_object_operations mdd_lu_obj_ops = {
297         .loo_object_init    = mdd_object_init,
298         .loo_object_start   = mdd_object_start,
299         .loo_object_free    = mdd_object_free,
300         .loo_object_print   = mdd_object_print,
301 };
302
303 struct mdd_object *mdd_object_find(const struct lu_env *env,
304                                    struct mdd_device *d,
305                                    const struct lu_fid *f)
306 {
307         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
308 }
309
310 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
311                         const char *path, struct lu_fid *fid)
312 {
313         struct lu_buf *buf;
314         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
315         struct mdd_object *obj;
316         struct lu_name *lname = &mdd_env_info(env)->mti_name;
317         char *name;
318         int rc = 0;
319         ENTRY;
320
321         /* temp buffer for path element */
322         buf = mdd_buf_alloc(env, PATH_MAX);
323         if (buf->lb_buf == NULL)
324                 RETURN(-ENOMEM);
325
326         lname->ln_name = name = buf->lb_buf;
327         lname->ln_namelen = 0;
328         *f = mdd->mdd_root_fid;
329
330         while(1) {
331                 while (*path == '/')
332                         path++;
333                 if (*path == '\0')
334                         break;
335                 while (*path != '/' && *path != '\0') {
336                         *name = *path;
337                         path++;
338                         name++;
339                         lname->ln_namelen++;
340                 }
341
342                 *name = '\0';
343                 /* find obj corresponding to fid */
344                 obj = mdd_object_find(env, mdd, f);
345                 if (obj == NULL)
346                         GOTO(out, rc = -EREMOTE);
347                 if (IS_ERR(obj))
348                         GOTO(out, rc = PTR_ERR(obj));
349                 /* get child fid from parent and name */
350                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
351                 mdd_object_put(env, obj);
352                 if (rc)
353                         break;
354
355                 name = buf->lb_buf;
356                 lname->ln_namelen = 0;
357         }
358
359         if (!rc)
360                 *fid = *f;
361 out:
362         RETURN(rc);
363 }
364
365 /** The maximum depth that fid2path() will search.
366  * This is limited only because we want to store the fids for
367  * historical path lookup purposes.
368  */
369 #define MAX_PATH_DEPTH 100
370
371 /** mdd_path() lookup structure. */
372 struct path_lookup_info {
373         __u64                pli_recno;        /**< history point */
374         __u64                pli_currec;       /**< current record */
375         struct lu_fid        pli_fid;
376         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
377         struct mdd_object   *pli_mdd_obj;
378         char                *pli_path;         /**< full path */
379         int                  pli_pathlen;
380         int                  pli_linkno;       /**< which hardlink to follow */
381         int                  pli_fidcount;     /**< number of \a pli_fids */
382 };
383
384 static int mdd_path_current(const struct lu_env *env,
385                             struct path_lookup_info *pli)
386 {
387         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
388         struct mdd_object *mdd_obj;
389         struct lu_buf     *buf = NULL;
390         struct link_ea_header *leh;
391         struct link_ea_entry  *lee;
392         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
393         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
394         char *ptr;
395         int reclen;
396         int rc;
397         ENTRY;
398
399         ptr = pli->pli_path + pli->pli_pathlen - 1;
400         *ptr = 0;
401         --ptr;
402         pli->pli_fidcount = 0;
403         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
404
405         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
406                 mdd_obj = mdd_object_find(env, mdd,
407                                           &pli->pli_fids[pli->pli_fidcount]);
408                 if (mdd_obj == NULL)
409                         GOTO(out, rc = -EREMOTE);
410                 if (IS_ERR(mdd_obj))
411                         GOTO(out, rc = PTR_ERR(mdd_obj));
412                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
413                 if (rc <= 0) {
414                         mdd_object_put(env, mdd_obj);
415                         if (rc == -1)
416                                 rc = -EREMOTE;
417                         else if (rc == 0)
418                                 /* Do I need to error out here? */
419                                 rc = -ENOENT;
420                         GOTO(out, rc);
421                 }
422
423                 /* Get parent fid and object name */
424                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
425                 buf = mdd_links_get(env, mdd_obj);
426                 mdd_read_unlock(env, mdd_obj);
427                 mdd_object_put(env, mdd_obj);
428                 if (IS_ERR(buf))
429                         GOTO(out, rc = PTR_ERR(buf));
430
431                 leh = buf->lb_buf;
432                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
433                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
434
435                 /* If set, use link #linkno for path lookup, otherwise use
436                    link #0.  Only do this for the final path element. */
437                 if ((pli->pli_fidcount == 0) &&
438                     (pli->pli_linkno < leh->leh_reccount)) {
439                         int count;
440                         for (count = 0; count < pli->pli_linkno; count++) {
441                                 lee = (struct link_ea_entry *)
442                                      ((char *)lee + reclen);
443                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
444                         }
445                         if (pli->pli_linkno < leh->leh_reccount - 1)
446                                 /* indicate to user there are more links */
447                                 pli->pli_linkno++;
448                 }
449
450                 /* Pack the name in the end of the buffer */
451                 ptr -= tmpname->ln_namelen;
452                 if (ptr - 1 <= pli->pli_path)
453                         GOTO(out, rc = -EOVERFLOW);
454                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
455                 *(--ptr) = '/';
456
457                 /* Store the parent fid for historic lookup */
458                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
459                         GOTO(out, rc = -EOVERFLOW);
460                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
461         }
462
463         /* Verify that our path hasn't changed since we started the lookup.
464            Record the current index, and verify the path resolves to the
465            same fid. If it does, then the path is correct as of this index. */
466         cfs_spin_lock(&mdd->mdd_cl.mc_lock);
467         pli->pli_currec = mdd->mdd_cl.mc_index;
468         cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
469         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
470         if (rc) {
471                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
472                 GOTO (out, rc = -EAGAIN);
473         }
474         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
475                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
476                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
477                        PFID(&pli->pli_fid));
478                 GOTO(out, rc = -EAGAIN);
479         }
480         ptr++; /* skip leading / */
481         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
482
483         EXIT;
484 out:
485         if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
486                 /* if we vmalloced a large buffer drop it */
487                 mdd_buf_put(buf);
488
489         return rc;
490 }
491
492 static int mdd_path_historic(const struct lu_env *env,
493                              struct path_lookup_info *pli)
494 {
495         return 0;
496 }
497
498 /* Returns the full path to this fid, as of changelog record recno. */
499 static int mdd_path(const struct lu_env *env, struct md_object *obj,
500                     char *path, int pathlen, __u64 *recno, int *linkno)
501 {
502         struct path_lookup_info *pli;
503         int tries = 3;
504         int rc = -EAGAIN;
505         ENTRY;
506
507         if (pathlen < 3)
508                 RETURN(-EOVERFLOW);
509
510         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
511                 path[0] = '\0';
512                 RETURN(0);
513         }
514
515         OBD_ALLOC_PTR(pli);
516         if (pli == NULL)
517                 RETURN(-ENOMEM);
518
519         pli->pli_mdd_obj = md2mdd_obj(obj);
520         pli->pli_recno = *recno;
521         pli->pli_path = path;
522         pli->pli_pathlen = pathlen;
523         pli->pli_linkno = *linkno;
524
525         /* Retry multiple times in case file is being moved */
526         while (tries-- && rc == -EAGAIN)
527                 rc = mdd_path_current(env, pli);
528
529         /* For historical path lookup, the current links may not have existed
530          * at "recno" time.  We must switch over to earlier links/parents
531          * by using the changelog records.  If the earlier parent doesn't
532          * exist, we must search back through the changelog to reconstruct
533          * its parents, then check if it exists, etc.
534          * We may ignore this problem for the initial implementation and
535          * state that an "original" hardlink must still exist for us to find
536          * historic path name. */
537         if (pli->pli_recno != -1) {
538                 rc = mdd_path_historic(env, pli);
539         } else {
540                 *recno = pli->pli_currec;
541                 /* Return next link index to caller */
542                 *linkno = pli->pli_linkno;
543         }
544
545         OBD_FREE_PTR(pli);
546
547         RETURN (rc);
548 }
549
550 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
551 {
552         struct lu_attr *la = &mdd_env_info(env)->mti_la;
553         int rc;
554
555         ENTRY;
556         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
557         if (rc == 0) {
558                 mdd_flags_xlate(obj, la->la_flags);
559                 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
560                         obj->mod_flags |= MNLINK_OBJ;
561         }
562         RETURN(rc);
563 }
564
565 /* get only inode attributes */
566 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
567                   struct md_attr *ma)
568 {
569         int rc = 0;
570         ENTRY;
571
572         if (ma->ma_valid & MA_INODE)
573                 RETURN(0);
574
575         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
576                           mdd_object_capa(env, mdd_obj));
577         if (rc == 0)
578                 ma->ma_valid |= MA_INODE;
579         RETURN(rc);
580 }
581
582 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
583 {
584         struct lov_desc *ldesc;
585         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
586         struct lov_user_md *lum = (struct lov_user_md*)lmm;
587         ENTRY;
588
589         if (!lum)
590                 RETURN(0);
591
592         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
593         LASSERT(ldesc != NULL);
594
595         lum->lmm_magic = LOV_MAGIC_V1;
596         lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
597         lum->lmm_pattern = ldesc->ld_pattern;
598         lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
599         lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
600         lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
601
602         RETURN(sizeof(*lum));
603 }
604
605 static int is_rootdir(struct mdd_object *mdd_obj)
606 {
607         const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
608         const struct lu_fid *fid = mdo2fid(mdd_obj);
609
610         return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
611 }
612
613 /* get lov EA only */
614 static int __mdd_lmm_get(const struct lu_env *env,
615                          struct mdd_object *mdd_obj, struct md_attr *ma)
616 {
617         int rc;
618         ENTRY;
619
620         if (ma->ma_valid & MA_LOV)
621                 RETURN(0);
622
623         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
624                         XATTR_NAME_LOV);
625         if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
626                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
627         if (rc > 0) {
628                 ma->ma_lmm_size = rc;
629                 ma->ma_layout_gen = ma->ma_lmm->lmm_layout_gen;
630                 ma->ma_valid |= MA_LOV | MA_LAY_GEN;
631                 rc = 0;
632         }
633         RETURN(rc);
634 }
635
636 /* get the first parent fid from link EA */
637 static int mdd_pfid_get(const struct lu_env *env,
638                         struct mdd_object *mdd_obj, struct md_attr *ma)
639 {
640         struct lu_buf *buf;
641         struct link_ea_header *leh;
642         struct link_ea_entry *lee;
643         struct lu_fid *pfid = &ma->ma_pfid;
644         ENTRY;
645
646         if (ma->ma_valid & MA_PFID)
647                 RETURN(0);
648
649         buf = mdd_links_get(env, mdd_obj);
650         if (IS_ERR(buf))
651                 RETURN(PTR_ERR(buf));
652
653         leh = buf->lb_buf;
654         lee = (struct link_ea_entry *)(leh + 1);
655         memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
656         fid_be_to_cpu(pfid, pfid);
657         ma->ma_valid |= MA_PFID;
658         if (buf->lb_len > OBD_ALLOC_BIG)
659                 /* if we vmalloced a large buffer drop it */
660                 mdd_buf_put(buf);
661         RETURN(0);
662 }
663
664 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
665                        struct md_attr *ma)
666 {
667         int rc;
668         ENTRY;
669
670         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
671         rc = __mdd_lmm_get(env, mdd_obj, ma);
672         mdd_read_unlock(env, mdd_obj);
673         RETURN(rc);
674 }
675
676 /* get lmv EA only*/
677 static int __mdd_lmv_get(const struct lu_env *env,
678                          struct mdd_object *mdd_obj, struct md_attr *ma)
679 {
680         int rc;
681         ENTRY;
682
683         if (ma->ma_valid & MA_LMV)
684                 RETURN(0);
685
686         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
687                         XATTR_NAME_LMV);
688         if (rc > 0) {
689                 ma->ma_valid |= MA_LMV;
690                 rc = 0;
691         }
692         RETURN(rc);
693 }
694
695 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
696                          struct md_attr *ma)
697 {
698         struct mdd_thread_info *info = mdd_env_info(env);
699         struct lustre_mdt_attrs *lma =
700                                  (struct lustre_mdt_attrs *)info->mti_xattr_buf;
701         int lma_size;
702         int rc;
703         ENTRY;
704
705         /* If all needed data are already valid, nothing to do */
706         if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
707             (ma->ma_need & (MA_HSM | MA_SOM)))
708                 RETURN(0);
709
710         /* Read LMA from disk EA */
711         lma_size = sizeof(info->mti_xattr_buf);
712         rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
713         if (rc <= 0)
714                 RETURN(rc);
715
716         /* Useless to check LMA incompatibility because this is already done in
717          * osd_ea_fid_get(), and this will fail long before this code is
718          * called.
719          * So, if we are here, LMA is compatible.
720          */
721
722         lustre_lma_swab(lma);
723
724         /* Swab and copy LMA */
725         if (ma->ma_need & MA_HSM) {
726                 if (lma->lma_compat & LMAC_HSM)
727                         ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
728                 else
729                         ma->ma_hsm.mh_flags = 0;
730                 ma->ma_valid |= MA_HSM;
731         }
732
733         /* Copy SOM */
734         if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
735                 LASSERT(ma->ma_som != NULL);
736                 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
737                 ma->ma_som->msd_size    = lma->lma_som_size;
738                 ma->ma_som->msd_blocks  = lma->lma_som_blocks;
739                 ma->ma_som->msd_mountid = lma->lma_som_mountid;
740                 ma->ma_valid |= MA_SOM;
741         }
742
743         RETURN(0);
744 }
745
746 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
747                                  struct md_attr *ma)
748 {
749         int rc = 0;
750         ENTRY;
751
752         if (ma->ma_need & MA_INODE)
753                 rc = mdd_iattr_get(env, mdd_obj, ma);
754
755         if (rc == 0 && ma->ma_need & MA_LOV) {
756                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
757                     S_ISDIR(mdd_object_type(mdd_obj)))
758                         rc = __mdd_lmm_get(env, mdd_obj, ma);
759         }
760         if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
761                 if (S_ISREG(mdd_object_type(mdd_obj)))
762                         rc = mdd_pfid_get(env, mdd_obj, ma);
763         }
764         if (rc == 0 && ma->ma_need & MA_LMV) {
765                 if (S_ISDIR(mdd_object_type(mdd_obj)))
766                         rc = __mdd_lmv_get(env, mdd_obj, ma);
767         }
768         if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
769                 if (S_ISREG(mdd_object_type(mdd_obj)))
770                         rc = __mdd_lma_get(env, mdd_obj, ma);
771         }
772 #ifdef CONFIG_FS_POSIX_ACL
773         if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
774                 if (S_ISDIR(mdd_object_type(mdd_obj)))
775                         rc = mdd_def_acl_get(env, mdd_obj, ma);
776         }
777 #endif
778         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
779                rc, ma->ma_valid, ma->ma_lmm);
780         RETURN(rc);
781 }
782
783 int mdd_attr_get_internal_locked(const struct lu_env *env,
784                                  struct mdd_object *mdd_obj, struct md_attr *ma)
785 {
786         int rc;
787         int needlock = ma->ma_need &
788                        (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
789
790         if (needlock)
791                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
792         rc = mdd_attr_get_internal(env, mdd_obj, ma);
793         if (needlock)
794                 mdd_read_unlock(env, mdd_obj);
795         return rc;
796 }
797
798 /*
799  * No permission check is needed.
800  */
801 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
802                         struct md_attr *ma)
803 {
804         struct mdd_object *mdd_obj = md2mdd_obj(obj);
805         int                rc;
806
807         ENTRY;
808         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
809         RETURN(rc);
810 }
811
812 /*
813  * No permission check is needed.
814  */
815 static int mdd_xattr_get(const struct lu_env *env,
816                          struct md_object *obj, struct lu_buf *buf,
817                          const char *name)
818 {
819         struct mdd_object *mdd_obj = md2mdd_obj(obj);
820         int rc;
821
822         ENTRY;
823
824         LASSERT(mdd_object_exists(mdd_obj));
825
826         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
827         rc = mdo_xattr_get(env, mdd_obj, buf, name,
828                            mdd_object_capa(env, mdd_obj));
829         mdd_read_unlock(env, mdd_obj);
830
831         RETURN(rc);
832 }
833
834 /*
835  * Permission check is done when open,
836  * no need check again.
837  */
838 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
839                         struct lu_buf *buf)
840 {
841         struct mdd_object *mdd_obj = md2mdd_obj(obj);
842         struct dt_object  *next;
843         loff_t             pos = 0;
844         int                rc;
845         ENTRY;
846
847         LASSERT(mdd_object_exists(mdd_obj));
848
849         next = mdd_object_child(mdd_obj);
850         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
851         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
852                                          mdd_object_capa(env, mdd_obj));
853         mdd_read_unlock(env, mdd_obj);
854         RETURN(rc);
855 }
856
857 /*
858  * No permission check is needed.
859  */
860 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
861                           struct lu_buf *buf)
862 {
863         struct mdd_object *mdd_obj = md2mdd_obj(obj);
864         int rc;
865
866         ENTRY;
867
868         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
869         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
870         mdd_read_unlock(env, mdd_obj);
871
872         RETURN(rc);
873 }
874
875 int mdd_declare_object_create_internal(const struct lu_env *env,
876                                        struct mdd_object *p,
877                                        struct mdd_object *c,
878                                        struct md_attr *ma,
879                                        struct thandle *handle,
880                                        const struct md_op_spec *spec)
881 {
882         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
883         const struct dt_index_features *feat = spec->sp_feat;
884         int rc;
885         ENTRY;
886
887         if (feat != &dt_directory_features && feat != NULL)
888                 dof->dof_type = DFT_INDEX;
889         else
890                 dof->dof_type = dt_mode_to_dft(ma->ma_attr.la_mode);
891
892         dof->u.dof_idx.di_feat = feat;
893
894         rc = mdo_declare_create_obj(env, c, &ma->ma_attr, NULL, dof, handle);
895
896         RETURN(rc);
897 }
898
899 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
900                                struct mdd_object *c, struct md_attr *ma,
901                                struct thandle *handle,
902                                const struct md_op_spec *spec)
903 {
904         struct lu_attr *attr = &ma->ma_attr;
905         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
906         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
907         const struct dt_index_features *feat = spec->sp_feat;
908         int rc;
909         ENTRY;
910
911         if (!mdd_object_exists(c)) {
912                 struct dt_object *next = mdd_object_child(c);
913                 LASSERT(next);
914
915                 if (feat != &dt_directory_features && feat != NULL)
916                         dof->dof_type = DFT_INDEX;
917                 else
918                         dof->dof_type = dt_mode_to_dft(attr->la_mode);
919
920                 dof->u.dof_idx.di_feat = feat;
921
922                 /* @hint will be initialized by underlying device. */
923                 next->do_ops->do_ah_init(env, hint,
924                                          p ? mdd_object_child(p) : NULL,
925                                          attr->la_mode & S_IFMT);
926
927                 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
928                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
929         } else
930                 rc = -EEXIST;
931
932         RETURN(rc);
933 }
934
935 /**
936  * Make sure the ctime is increased only.
937  */
938 static inline int mdd_attr_check(const struct lu_env *env,
939                                  struct mdd_object *obj,
940                                  struct lu_attr *attr)
941 {
942         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
943         int rc;
944         ENTRY;
945
946         if (attr->la_valid & LA_CTIME) {
947                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
948                 if (rc)
949                         RETURN(rc);
950
951                 if (attr->la_ctime < tmp_la->la_ctime)
952                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
953                 else if (attr->la_valid == LA_CTIME &&
954                          attr->la_ctime == tmp_la->la_ctime)
955                         attr->la_valid &= ~LA_CTIME;
956         }
957         RETURN(0);
958 }
959
960 int mdd_attr_set_internal(const struct lu_env *env,
961                           struct mdd_object *obj,
962                           struct lu_attr *attr,
963                           struct thandle *handle,
964                           int needacl)
965 {
966         int rc;
967         ENTRY;
968
969         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
970 #ifdef CONFIG_FS_POSIX_ACL
971         if (!rc && (attr->la_valid & LA_MODE) && needacl)
972                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
973 #endif
974         RETURN(rc);
975 }
976
977 int mdd_attr_check_set_internal(const struct lu_env *env,
978                                 struct mdd_object *obj,
979                                 struct lu_attr *attr,
980                                 struct thandle *handle,
981                                 int needacl)
982 {
983         int rc;
984         ENTRY;
985
986         rc = mdd_attr_check(env, obj, attr);
987         if (rc)
988                 RETURN(rc);
989
990         if (attr->la_valid)
991                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
992         RETURN(rc);
993 }
994
995 static int mdd_attr_set_internal_locked(const struct lu_env *env,
996                                         struct mdd_object *obj,
997                                         struct lu_attr *attr,
998                                         struct thandle *handle,
999                                         int needacl)
1000 {
1001         int rc;
1002         ENTRY;
1003
1004         needacl = needacl && (attr->la_valid & LA_MODE);
1005         if (needacl)
1006                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1007         rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1008         if (needacl)
1009                 mdd_write_unlock(env, obj);
1010         RETURN(rc);
1011 }
1012
1013 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
1014                                        struct mdd_object *obj,
1015                                        struct lu_attr *attr,
1016                                        struct thandle *handle,
1017                                        int needacl)
1018 {
1019         int rc;
1020         ENTRY;
1021
1022         needacl = needacl && (attr->la_valid & LA_MODE);
1023         if (needacl)
1024                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1025         rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
1026         if (needacl)
1027                 mdd_write_unlock(env, obj);
1028         RETURN(rc);
1029 }
1030
1031 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
1032                     const struct lu_buf *buf, const char *name,
1033                     int fl, struct thandle *handle)
1034 {
1035         struct lustre_capa *capa = mdd_object_capa(env, obj);
1036         int rc = -EINVAL;
1037         ENTRY;
1038
1039         if (buf->lb_buf && buf->lb_len > 0)
1040                 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1041         else if (buf->lb_buf == NULL && buf->lb_len == 0)
1042                 rc = mdo_xattr_del(env, obj, name, handle, capa);
1043
1044         RETURN(rc);
1045 }
1046
1047 /*
1048  * This gives the same functionality as the code between
1049  * sys_chmod and inode_setattr
1050  * chown_common and inode_setattr
1051  * utimes and inode_setattr
1052  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1053  */
1054 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1055                         struct lu_attr *la, const struct md_attr *ma)
1056 {
1057         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
1058         struct md_ucred  *uc;
1059         int               rc;
1060         ENTRY;
1061
1062         if (!la->la_valid)
1063                 RETURN(0);
1064
1065         /* Do not permit change file type */
1066         if (la->la_valid & LA_TYPE)
1067                 RETURN(-EPERM);
1068
1069         /* They should not be processed by setattr */
1070         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1071                 RETURN(-EPERM);
1072
1073         /* export destroy does not have ->le_ses, but we may want
1074          * to drop LUSTRE_SOM_FL. */
1075         if (!env->le_ses)
1076                 RETURN(0);
1077
1078         uc = md_ucred(env);
1079
1080         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1081         if (rc)
1082                 RETURN(rc);
1083
1084         if (la->la_valid == LA_CTIME) {
1085                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1086                         /* This is only for set ctime when rename's source is
1087                          * on remote MDS. */
1088                         rc = mdd_may_delete(env, NULL, obj,
1089                                             (struct md_attr *)ma, 1, 0);
1090                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1091                         la->la_valid &= ~LA_CTIME;
1092                 RETURN(rc);
1093         }
1094
1095         if (la->la_valid == LA_ATIME) {
1096                 /* This is atime only set for read atime update on close. */
1097                 if (la->la_atime >= tmp_la->la_atime &&
1098                     la->la_atime < (tmp_la->la_atime +
1099                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1100                         la->la_valid &= ~LA_ATIME;
1101                 RETURN(0);
1102         }
1103
1104         /* Check if flags change. */
1105         if (la->la_valid & LA_FLAGS) {
1106                 unsigned int oldflags = 0;
1107                 unsigned int newflags = la->la_flags &
1108                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1109
1110                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1111                     !mdd_capable(uc, CFS_CAP_FOWNER))
1112                         RETURN(-EPERM);
1113
1114                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1115                  * only be changed by the relevant capability. */
1116                 if (mdd_is_immutable(obj))
1117                         oldflags |= LUSTRE_IMMUTABLE_FL;
1118                 if (mdd_is_append(obj))
1119                         oldflags |= LUSTRE_APPEND_FL;
1120                 if ((oldflags ^ newflags) &&
1121                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1122                         RETURN(-EPERM);
1123
1124                 if (!S_ISDIR(tmp_la->la_mode))
1125                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1126         }
1127
1128         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1129             (la->la_valid & ~LA_FLAGS) &&
1130             !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1131                 RETURN(-EPERM);
1132
1133         /* Check for setting the obj time. */
1134         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1135             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1136                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1137                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
1138                         rc = mdd_permission_internal_locked(env, obj, tmp_la,
1139                                                             MAY_WRITE,
1140                                                             MOR_TGT_CHILD);
1141                         if (rc)
1142                                 RETURN(rc);
1143                 }
1144         }
1145
1146         if (la->la_valid & LA_KILL_SUID) {
1147                 la->la_valid &= ~LA_KILL_SUID;
1148                 if ((tmp_la->la_mode & S_ISUID) &&
1149                     !(la->la_valid & LA_MODE)) {
1150                         la->la_mode = tmp_la->la_mode;
1151                         la->la_valid |= LA_MODE;
1152                 }
1153                 la->la_mode &= ~S_ISUID;
1154         }
1155
1156         if (la->la_valid & LA_KILL_SGID) {
1157                 la->la_valid &= ~LA_KILL_SGID;
1158                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1159                                         (S_ISGID | S_IXGRP)) &&
1160                     !(la->la_valid & LA_MODE)) {
1161                         la->la_mode = tmp_la->la_mode;
1162                         la->la_valid |= LA_MODE;
1163                 }
1164                 la->la_mode &= ~S_ISGID;
1165         }
1166
1167         /* Make sure a caller can chmod. */
1168         if (la->la_valid & LA_MODE) {
1169                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1170                     (uc->mu_fsuid != tmp_la->la_uid) &&
1171                     !mdd_capable(uc, CFS_CAP_FOWNER))
1172                         RETURN(-EPERM);
1173
1174                 if (la->la_mode == (cfs_umode_t) -1)
1175                         la->la_mode = tmp_la->la_mode;
1176                 else
1177                         la->la_mode = (la->la_mode & S_IALLUGO) |
1178                                       (tmp_la->la_mode & ~S_IALLUGO);
1179
1180                 /* Also check the setgid bit! */
1181                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1182                                        la->la_gid : tmp_la->la_gid) &&
1183                     !mdd_capable(uc, CFS_CAP_FSETID))
1184                         la->la_mode &= ~S_ISGID;
1185         } else {
1186                la->la_mode = tmp_la->la_mode;
1187         }
1188
1189         /* Make sure a caller can chown. */
1190         if (la->la_valid & LA_UID) {
1191                 if (la->la_uid == (uid_t) -1)
1192                         la->la_uid = tmp_la->la_uid;
1193                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1194                     (la->la_uid != tmp_la->la_uid)) &&
1195                     !mdd_capable(uc, CFS_CAP_CHOWN))
1196                         RETURN(-EPERM);
1197
1198                 /* If the user or group of a non-directory has been
1199                  * changed by a non-root user, remove the setuid bit.
1200                  * 19981026 David C Niemi <niemi@tux.org>
1201                  *
1202                  * Changed this to apply to all users, including root,
1203                  * to avoid some races. This is the behavior we had in
1204                  * 2.0. The check for non-root was definitely wrong
1205                  * for 2.2 anyway, as it should have been using
1206                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
1207                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1208                     !S_ISDIR(tmp_la->la_mode)) {
1209                         la->la_mode &= ~S_ISUID;
1210                         la->la_valid |= LA_MODE;
1211                 }
1212         }
1213
1214         /* Make sure caller can chgrp. */
1215         if (la->la_valid & LA_GID) {
1216                 if (la->la_gid == (gid_t) -1)
1217                         la->la_gid = tmp_la->la_gid;
1218                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1219                     ((la->la_gid != tmp_la->la_gid) &&
1220                     !lustre_in_group_p(uc, la->la_gid))) &&
1221                     !mdd_capable(uc, CFS_CAP_CHOWN))
1222                         RETURN(-EPERM);
1223
1224                 /* Likewise, if the user or group of a non-directory
1225                  * has been changed by a non-root user, remove the
1226                  * setgid bit UNLESS there is no group execute bit
1227                  * (this would be a file marked for mandatory
1228                  * locking).  19981026 David C Niemi <niemi@tux.org>
1229                  *
1230                  * Removed the fsuid check (see the comment above) --
1231                  * 19990830 SD. */
1232                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1233                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1234                         la->la_mode &= ~S_ISGID;
1235                         la->la_valid |= LA_MODE;
1236                 }
1237         }
1238
1239         /* For both Size-on-MDS case and truncate case,
1240          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1241          * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1242          * For SOM case, it is true, the MAY_WRITE perm has been checked
1243          * when open, no need check again. For truncate case, it is false,
1244          * the MAY_WRITE perm should be checked here. */
1245         if (ma->ma_attr_flags & MDS_SOM) {
1246                 /* For the "Size-on-MDS" setattr update, merge coming
1247                  * attributes with the set in the inode. BUG 10641 */
1248                 if ((la->la_valid & LA_ATIME) &&
1249                     (la->la_atime <= tmp_la->la_atime))
1250                         la->la_valid &= ~LA_ATIME;
1251
1252                 /* OST attributes do not have a priority over MDS attributes,
1253                  * so drop times if ctime is equal. */
1254                 if ((la->la_valid & LA_CTIME) &&
1255                     (la->la_ctime <= tmp_la->la_ctime))
1256                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1257         } else {
1258                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1259                         if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1260                               (uc->mu_fsuid == tmp_la->la_uid)) &&
1261                             !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1262                                 rc = mdd_permission_internal_locked(env, obj,
1263                                                             tmp_la, MAY_WRITE,
1264                                                             MOR_TGT_CHILD);
1265                                 if (rc)
1266                                         RETURN(rc);
1267                         }
1268                 }
1269                 if (la->la_valid & LA_CTIME) {
1270                         /* The pure setattr, it has the priority over what is
1271                          * already set, do not drop it if ctime is equal. */
1272                         if (la->la_ctime < tmp_la->la_ctime)
1273                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1274                                                   LA_CTIME);
1275                 }
1276         }
1277
1278         RETURN(0);
1279 }
1280
1281 /** Store a data change changelog record
1282  * If this fails, we must fail the whole transaction; we don't
1283  * want the change to commit without the log entry.
1284  * \param mdd_obj - mdd_object of change
1285  * \param handle - transacion handle
1286  */
1287 static int mdd_changelog_data_store(const struct lu_env     *env,
1288                                     struct mdd_device       *mdd,
1289                                     enum changelog_rec_type type,
1290                                     int                     flags,
1291                                     struct mdd_object       *mdd_obj,
1292                                     struct thandle          *handle)
1293 {
1294         const struct lu_fid *tfid = mdo2fid(mdd_obj);
1295         struct llog_changelog_rec *rec;
1296         struct thandle *th = NULL;
1297         struct lu_buf *buf;
1298         int reclen;
1299         int rc;
1300
1301         /* Not recording */
1302         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1303                 RETURN(0);
1304         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1305                 RETURN(0);
1306
1307         LASSERT(mdd_obj != NULL);
1308         LASSERT(handle != NULL);
1309
1310         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1311             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1312                 /* Don't need multiple updates in this log */
1313                 /* Don't check under lock - no big deal if we get an extra
1314                    entry */
1315                 RETURN(0);
1316         }
1317
1318         reclen = llog_data_len(sizeof(*rec));
1319         buf = mdd_buf_alloc(env, reclen);
1320         if (buf->lb_buf == NULL)
1321                 RETURN(-ENOMEM);
1322         rec = (struct llog_changelog_rec *)buf->lb_buf;
1323
1324         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1325         rec->cr.cr_type = (__u32)type;
1326         rec->cr.cr_tfid = *tfid;
1327         rec->cr.cr_namelen = 0;
1328         mdd_obj->mod_cltime = cfs_time_current_64();
1329
1330         rc = mdd_changelog_llog_write(mdd, rec, handle ? : th);
1331
1332         if (th)
1333                 mdd_trans_stop(env, mdd, rc, th);
1334
1335         if (rc < 0) {
1336                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1337                        rc, type, PFID(tfid));
1338                 return -EFAULT;
1339         }
1340
1341         return 0;
1342 }
1343
1344 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1345                   int flags, struct md_object *obj)
1346 {
1347         struct thandle *handle;
1348         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1349         struct mdd_device *mdd = mdo2mdd(obj);
1350         int rc;
1351         ENTRY;
1352
1353         handle = mdd_trans_create(env, mdd);
1354         if (IS_ERR(handle))
1355                 return(PTR_ERR(handle));
1356
1357         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1358         if (rc)
1359                 GOTO(stop, rc);
1360
1361         rc = mdd_trans_start(env, mdd, handle);
1362         if (rc)
1363                 GOTO(stop, rc);
1364
1365         rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1366                                       handle);
1367
1368 stop:
1369         mdd_trans_stop(env, mdd, rc, handle);
1370
1371         RETURN(rc);
1372 }
1373
1374 /**
1375  * Should be called with write lock held.
1376  *
1377  * \see mdd_lma_set_locked().
1378  */
1379 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1380                        const struct md_attr *ma, struct thandle *handle)
1381 {
1382         struct mdd_thread_info *info = mdd_env_info(env);
1383         struct lu_buf *buf;
1384         struct lustre_mdt_attrs *lma =
1385                                 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1386         int lmasize = sizeof(struct lustre_mdt_attrs);
1387         int rc = 0;
1388
1389         ENTRY;
1390
1391         /* Either HSM or SOM part is not valid, we need to read it before */
1392         if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1393                 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1394                 if (rc <= 0)
1395                         RETURN(rc);
1396
1397                 lustre_lma_swab(lma);
1398         } else {
1399                 memset(lma, 0, lmasize);
1400         }
1401
1402         /* Copy HSM data */
1403         if (ma->ma_valid & MA_HSM) {
1404                 lma->lma_flags  |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1405                 lma->lma_compat |= LMAC_HSM;
1406         }
1407
1408         /* Copy SOM data */
1409         if (ma->ma_valid & MA_SOM) {
1410                 LASSERT(ma->ma_som != NULL);
1411                 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1412                         lma->lma_compat     &= ~LMAC_SOM;
1413                 } else {
1414                         lma->lma_compat     |= LMAC_SOM;
1415                         lma->lma_ioepoch     = ma->ma_som->msd_ioepoch;
1416                         lma->lma_som_size    = ma->ma_som->msd_size;
1417                         lma->lma_som_blocks  = ma->ma_som->msd_blocks;
1418                         lma->lma_som_mountid = ma->ma_som->msd_mountid;
1419                 }
1420         }
1421
1422         /* Copy FID */
1423         memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1424
1425         lustre_lma_swab(lma);
1426         buf = mdd_buf_get(env, lma, lmasize);
1427         rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1428
1429         RETURN(rc);
1430 }
1431
1432 /**
1433  * Save LMA extended attributes with data from \a ma.
1434  *
1435  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1436  * not, LMA EA will be first read from disk, modified and write back.
1437  *
1438  */
1439 static int mdd_lma_set_locked(const struct lu_env *env,
1440                               struct mdd_object *mdd_obj,
1441                               const struct md_attr *ma, struct thandle *handle)
1442 {
1443         int rc;
1444
1445         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1446         rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1447         mdd_write_unlock(env, mdd_obj);
1448         return rc;
1449 }
1450
1451 /* Precedence for choosing record type when multiple
1452  * attributes change: setattr > mtime > ctime > atime
1453  * (ctime changes when mtime does, plus chmod/chown.
1454  * atime and ctime are independent.) */
1455 static int mdd_attr_set_changelog(const struct lu_env *env,
1456                                   struct md_object *obj, struct thandle *handle,
1457                                   __u64 valid)
1458 {
1459         struct mdd_device *mdd = mdo2mdd(obj);
1460         int bits, type = 0;
1461
1462         bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1463         bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1464         bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1465         bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1466         bits = bits & mdd->mdd_cl.mc_mask;
1467         if (bits == 0)
1468                 return 0;
1469
1470         /* The record type is the lowest non-masked set bit */
1471         while (bits && ((bits & 1) == 0)) {
1472                 bits = bits >> 1;
1473                 type++;
1474         }
1475
1476         /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1477         return mdd_changelog_data_store(env, mdd, type, (int)valid,
1478                                         md2mdd_obj(obj), handle);
1479 }
1480
1481 static int mdd_declare_attr_set(const struct lu_env *env,
1482                                 struct mdd_device *mdd,
1483                                 struct mdd_object *obj,
1484                                 const struct md_attr *ma,
1485                                 struct lov_mds_md *lmm,
1486                                 struct thandle *handle)
1487 {
1488         struct lu_buf  *buf = &mdd_env_info(env)->mti_buf;
1489         int             rc, i;
1490
1491         rc = mdo_declare_attr_set(env, obj, &ma->ma_attr, handle);
1492         if (rc)
1493                 return rc;
1494
1495         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1496         if (rc)
1497                 return rc;
1498
1499         if (ma->ma_valid & MA_LOV) {
1500                 buf->lb_buf = NULL;
1501                 buf->lb_len = ma->ma_lmm_size;
1502                 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
1503                                            0, handle);
1504                 if (rc)
1505                         return rc;
1506         }
1507
1508         if (ma->ma_valid & (MA_HSM | MA_SOM)) {
1509                 buf->lb_buf = NULL;
1510                 buf->lb_len = sizeof(struct lustre_mdt_attrs);
1511                 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA,
1512                                            0, handle);
1513                 if (rc)
1514                         return rc;
1515         }
1516
1517 #ifdef CONFIG_FS_POSIX_ACL
1518         if (ma->ma_attr.la_valid & LA_MODE) {
1519                 mdd_read_lock(env, obj, MOR_TGT_CHILD);
1520                 rc = mdo_xattr_get(env, obj, buf, XATTR_NAME_ACL_ACCESS,
1521                                    BYPASS_CAPA);
1522                 mdd_read_unlock(env, obj);
1523                 if (rc == -EOPNOTSUPP || rc == -ENODATA)
1524                         rc = 0;
1525                 else if (rc < 0)
1526                         return rc;
1527
1528                 if (rc != 0) {
1529                         buf->lb_buf = NULL;
1530                         buf->lb_len = rc;
1531                         rc = mdo_declare_xattr_set(env, obj, buf,
1532                                                    XATTR_NAME_ACL_ACCESS, 0,
1533                                                    handle);
1534                         if (rc)
1535                                 return rc;
1536                 }
1537         }
1538 #endif
1539
1540         /* basically the log is the same as in unlink case */
1541         if (lmm) {
1542                 __u16 stripe;
1543
1544                 if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V1 &&
1545                                 le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V3) {
1546                         CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n",
1547                                mdd->mdd_obd_dev->obd_name,
1548                                le32_to_cpu(lmm->lmm_magic),
1549                                PFID(lu_object_fid(&obj->mod_obj.mo_lu)));
1550                         return -EINVAL;
1551                 }
1552
1553                 stripe = le16_to_cpu(lmm->lmm_stripe_count);
1554                 if (stripe == LOV_ALL_STRIPES) {
1555                         struct lov_desc *ldesc;
1556
1557                         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
1558                         LASSERT(ldesc != NULL);
1559                         stripe = ldesc->ld_tgt_count;
1560                 }
1561
1562                 for (i = 0; i < stripe; i++) {
1563                         rc = mdd_declare_llog_record(env, mdd,
1564                                         sizeof(struct llog_unlink_rec),
1565                                         handle);
1566                         if (rc)
1567                                 return rc;
1568                 }
1569         }
1570
1571         return rc;
1572 }
1573
1574 /* set attr and LOV EA at once, return updated attr */
1575 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1576                         const struct md_attr *ma)
1577 {
1578         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1579         struct mdd_device *mdd = mdo2mdd(obj);
1580         struct thandle *handle;
1581         struct lov_mds_md *lmm = NULL;
1582         struct llog_cookie *logcookies = NULL;
1583         int  rc, lmm_size = 0, cookie_size = 0;
1584         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1585         struct obd_device *obd = mdd->mdd_obd_dev;
1586         struct mds_obd *mds = &obd->u.mds;
1587 #ifdef HAVE_QUOTA_SUPPORT
1588         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1589         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1590         int quota_opc = 0, block_count = 0;
1591         int inode_pending[MAXQUOTAS] = { 0, 0 };
1592         int block_pending[MAXQUOTAS] = { 0, 0 };
1593 #endif
1594         ENTRY;
1595
1596         *la_copy = ma->ma_attr;
1597         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1598         if (rc != 0)
1599                 RETURN(rc);
1600
1601         /* setattr on "close" only change atime, or do nothing */
1602         if (ma->ma_valid == MA_INODE &&
1603             ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1604                 RETURN(0);
1605
1606         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1607             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1608                 lmm_size = mdd_lov_mdsize(env, mdd);
1609                 lmm = mdd_max_lmm_get(env, mdd);
1610                 if (lmm == NULL)
1611                         RETURN(-ENOMEM);
1612
1613                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1614                                 XATTR_NAME_LOV);
1615
1616                 if (rc < 0)
1617                         RETURN(rc);
1618         }
1619
1620         handle = mdd_trans_create(env, mdd);
1621         if (IS_ERR(handle))
1622                 RETURN(PTR_ERR(handle));
1623
1624         rc = mdd_declare_attr_set(env, mdd, mdd_obj, ma,
1625                                   lmm_size > 0 ? lmm : NULL, handle);
1626         if (rc)
1627                 GOTO(stop, rc);
1628
1629         /* permission changes may require sync operation */
1630         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1631                 handle->th_sync = !!mdd->mdd_sync_permission;
1632
1633         rc = mdd_trans_start(env, mdd, handle);
1634         if (rc)
1635                 GOTO(stop, rc);
1636
1637         /* permission changes may require sync operation */
1638         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1639                 handle->th_sync |= mdd->mdd_sync_permission;
1640
1641         if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1642                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1643                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1644
1645 #ifdef HAVE_QUOTA_SUPPORT
1646         if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1647                 struct obd_export *exp = md_quota(env)->mq_exp;
1648                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1649
1650                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1651                 if (!rc) {
1652                         quota_opc = FSFILT_OP_SETATTR;
1653                         mdd_quota_wrapper(la_copy, qnids);
1654                         mdd_quota_wrapper(la_tmp, qoids);
1655                         /* get file quota for new owner */
1656                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1657                                         qnids, inode_pending, 1, NULL, 0,
1658                                         NULL, 0);
1659                         block_count = (la_tmp->la_blocks + 7) >> 3;
1660                         if (block_count) {
1661                                 void *data = NULL;
1662                                 mdd_data_get(env, mdd_obj, &data);
1663                                 /* get block quota for new owner */
1664                                 lquota_chkquota(mds_quota_interface_ref, obd,
1665                                                 exp, qnids, block_pending,
1666                                                 block_count, NULL,
1667                                                 LQUOTA_FLAGS_BLK, data, 1);
1668                         }
1669                 }
1670         }
1671 #endif
1672
1673         if (la_copy->la_valid & LA_FLAGS) {
1674                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1675                                                   handle, 1);
1676                 if (rc == 0)
1677                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1678         } else if (la_copy->la_valid) {            /* setattr */
1679                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1680                                                   handle, 1);
1681                 /* journal chown/chgrp in llog, just like unlink */
1682                 if (rc == 0 && lmm_size){
1683                         cookie_size = mdd_lov_cookiesize(env, mdd);
1684                         logcookies = mdd_max_cookie_get(env, mdd);
1685                         if (logcookies == NULL)
1686                                 GOTO(cleanup, rc = -ENOMEM);
1687
1688                         if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1689                                             logcookies, cookie_size) <= 0)
1690                                 logcookies = NULL;
1691                 }
1692         }
1693
1694         if (rc == 0 && ma->ma_valid & MA_LOV) {
1695                 cfs_umode_t mode;
1696
1697                 mode = mdd_object_type(mdd_obj);
1698                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1699                         rc = mdd_lsm_sanity_check(env, mdd_obj);
1700                         if (rc)
1701                                 GOTO(cleanup, rc);
1702
1703                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1704                                             ma->ma_lmm_size, handle, 1);
1705                 }
1706
1707         }
1708         if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1709                 cfs_umode_t mode;
1710
1711                 mode = mdd_object_type(mdd_obj);
1712                 if (S_ISREG(mode))
1713                         rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1714
1715         }
1716 cleanup:
1717         if (rc == 0)
1718                 rc = mdd_attr_set_changelog(env, obj, handle,
1719                                             ma->ma_attr.la_valid);
1720 stop:
1721         mdd_trans_stop(env, mdd, rc, handle);
1722         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1723                 /*set obd attr, if needed*/
1724                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1725                                            logcookies);
1726         }
1727 #ifdef HAVE_QUOTA_SUPPORT
1728         if (quota_opc) {
1729                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1730                                       inode_pending, 0);
1731                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1732                                       block_pending, 1);
1733                 /* Trigger dqrel/dqacq for original owner and new owner.
1734                  * If failed, the next call for lquota_chkquota will
1735                  * process it. */
1736                 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1737                               quota_opc);
1738         }
1739 #endif
1740         RETURN(rc);
1741 }
1742
1743 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1744                       const struct lu_buf *buf, const char *name, int fl,
1745                       struct thandle *handle)
1746 {
1747         int  rc;
1748         ENTRY;
1749
1750         mdd_write_lock(env, obj, MOR_TGT_CHILD);
1751         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1752         mdd_write_unlock(env, obj);
1753
1754         RETURN(rc);
1755 }
1756
1757 static int mdd_xattr_sanity_check(const struct lu_env *env,
1758                                   struct mdd_object *obj)
1759 {
1760         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1761         struct md_ucred *uc     = md_ucred(env);
1762         int rc;
1763         ENTRY;
1764
1765         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1766                 RETURN(-EPERM);
1767
1768         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1769         if (rc)
1770                 RETURN(rc);
1771
1772         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1773             !mdd_capable(uc, CFS_CAP_FOWNER))
1774                 RETURN(-EPERM);
1775
1776         RETURN(rc);
1777 }
1778
1779 static int mdd_declare_xattr_set(const struct lu_env *env,
1780                                  struct mdd_device *mdd,
1781                                  struct mdd_object *obj,
1782                                  const struct lu_buf *buf,
1783                                  const char *name,
1784                                  struct thandle *handle)
1785
1786 {
1787         int rc;
1788
1789         rc = mdo_declare_xattr_set(env, obj, buf, name, 0, handle);
1790         if (rc)
1791                 return rc;
1792
1793         /* Only record user xattr changes */
1794         if ((strncmp("user.", name, 5) == 0))
1795                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1796
1797         return rc;
1798 }
1799
1800 /**
1801  * The caller should guarantee to update the object ctime
1802  * after xattr_set if needed.
1803  */
1804 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1805                          const struct lu_buf *buf, const char *name,
1806                          int fl)
1807 {
1808         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1809         struct mdd_device *mdd = mdo2mdd(obj);
1810         struct thandle *handle;
1811         int  rc;
1812         ENTRY;
1813
1814         rc = mdd_xattr_sanity_check(env, mdd_obj);
1815         if (rc)
1816                 RETURN(rc);
1817
1818         handle = mdd_trans_create(env, mdd);
1819         if (IS_ERR(handle))
1820                 RETURN(PTR_ERR(handle));
1821
1822         /* security-replated changes may require sync */
1823         if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1824             mdd->mdd_sync_permission == 1)
1825                 handle->th_sync = 1;
1826
1827         rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, handle);
1828         if (rc)
1829                 GOTO(stop, rc);
1830
1831         rc = mdd_trans_start(env, mdd, handle);
1832         if (rc)
1833                 GOTO(stop, rc);
1834
1835         /* security-replated changes may require sync */
1836         if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1837                 handle->th_sync |= mdd->mdd_sync_permission;
1838
1839         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1840
1841         /* Only record system & user xattr changes */
1842         if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1843                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1844                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1845                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1846                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1847                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1848                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1849                                               handle);
1850
1851 stop:
1852         mdd_trans_stop(env, mdd, rc, handle);
1853
1854         RETURN(rc);
1855 }
1856
1857 static int mdd_declare_xattr_del(const struct lu_env *env,
1858                                  struct mdd_device *mdd,
1859                                  struct mdd_object *obj,
1860                                  const char *name,
1861                                  struct thandle *handle)
1862 {
1863         int rc;
1864
1865         rc = mdo_declare_xattr_del(env, obj, name, handle);
1866         if (rc)
1867                 return rc;
1868
1869         /* Only record user xattr changes */
1870         if ((strncmp("user.", name, 5) == 0))
1871                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1872
1873         return rc;
1874 }
1875
1876 /**
1877  * The caller should guarantee to update the object ctime
1878  * after xattr_set if needed.
1879  */
1880 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1881                   const char *name)
1882 {
1883         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1884         struct mdd_device *mdd = mdo2mdd(obj);
1885         struct thandle *handle;
1886         int  rc;
1887         ENTRY;
1888
1889         rc = mdd_xattr_sanity_check(env, mdd_obj);
1890         if (rc)
1891                 RETURN(rc);
1892
1893         handle = mdd_trans_create(env, mdd);
1894         if (IS_ERR(handle))
1895                 RETURN(PTR_ERR(handle));
1896
1897         rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
1898         if (rc)
1899                 GOTO(stop, rc);
1900
1901         rc = mdd_trans_start(env, mdd, handle);
1902         if (rc)
1903                 GOTO(stop, rc);
1904
1905         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1906         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1907                            mdd_object_capa(env, mdd_obj));
1908         mdd_write_unlock(env, mdd_obj);
1909
1910         /* Only record system & user xattr changes */
1911         if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1912                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1913                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1914                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1915                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1916                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1917                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1918                                               handle);
1919
1920 stop:
1921         mdd_trans_stop(env, mdd, rc, handle);
1922
1923         RETURN(rc);
1924 }
1925
1926 /* partial unlink */
1927 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1928                        struct md_attr *ma)
1929 {
1930         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1931         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1932         struct mdd_device *mdd = mdo2mdd(obj);
1933         struct thandle *handle;
1934 #ifdef HAVE_QUOTA_SUPPORT
1935         struct obd_device *obd = mdd->mdd_obd_dev;
1936         struct mds_obd *mds = &obd->u.mds;
1937         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1938         int quota_opc = 0;
1939 #endif
1940         int rc;
1941         ENTRY;
1942
1943         /* XXX: this code won't be used ever:
1944          * DNE uses slightly different approach */
1945         LBUG();
1946
1947         /*
1948          * Check -ENOENT early here because we need to get object type
1949          * to calculate credits before transaction start
1950          */
1951         if (!mdd_object_exists(mdd_obj))
1952                 RETURN(-ENOENT);
1953
1954         LASSERT(mdd_object_exists(mdd_obj) > 0);
1955
1956         handle = mdd_trans_create(env, mdd);
1957         if (IS_ERR(handle))
1958                 RETURN(-ENOMEM);
1959
1960         rc = mdd_trans_start(env, mdd, handle);
1961
1962         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1963
1964         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1965         if (rc)
1966                 GOTO(cleanup, rc);
1967
1968         __mdd_ref_del(env, mdd_obj, handle, 0);
1969
1970         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1971                 /* unlink dot */
1972                 __mdd_ref_del(env, mdd_obj, handle, 1);
1973         }
1974
1975         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1976         la_copy->la_ctime = ma->ma_attr.la_ctime;
1977
1978         la_copy->la_valid = LA_CTIME;
1979         rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1980         if (rc)
1981                 GOTO(cleanup, rc);
1982
1983         rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1984 #ifdef HAVE_QUOTA_SUPPORT
1985         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1986             ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1987                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1988                 mdd_quota_wrapper(&ma->ma_attr, qids);
1989         }
1990 #endif
1991
1992
1993         EXIT;
1994 cleanup:
1995         mdd_write_unlock(env, mdd_obj);
1996         mdd_trans_stop(env, mdd, rc, handle);
1997 #ifdef HAVE_QUOTA_SUPPORT
1998         if (quota_opc)
1999                 /* Trigger dqrel on the owner of child. If failed,
2000                  * the next call for lquota_chkquota will process it */
2001                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2002                               quota_opc);
2003 #endif
2004         return rc;
2005 }
2006
2007 /* partial operation */
2008 static int mdd_oc_sanity_check(const struct lu_env *env,
2009                                struct mdd_object *obj,
2010                                struct md_attr *ma)
2011 {
2012         int rc;
2013         ENTRY;
2014
2015         switch (ma->ma_attr.la_mode & S_IFMT) {
2016         case S_IFREG:
2017         case S_IFDIR:
2018         case S_IFLNK:
2019         case S_IFCHR:
2020         case S_IFBLK:
2021         case S_IFIFO:
2022         case S_IFSOCK:
2023                 rc = 0;
2024                 break;
2025         default:
2026                 rc = -EINVAL;
2027                 break;
2028         }
2029         RETURN(rc);
2030 }
2031
2032 static int mdd_object_create(const struct lu_env *env,
2033                              struct md_object *obj,
2034                              const struct md_op_spec *spec,
2035                              struct md_attr *ma)
2036 {
2037
2038         struct mdd_device *mdd = mdo2mdd(obj);
2039         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2040         const struct lu_fid *pfid = spec->u.sp_pfid;
2041         struct thandle *handle;
2042 #ifdef HAVE_QUOTA_SUPPORT
2043         struct obd_device *obd = mdd->mdd_obd_dev;
2044         struct obd_export *exp = md_quota(env)->mq_exp;
2045         struct mds_obd *mds = &obd->u.mds;
2046         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2047         int quota_opc = 0, block_count = 0;
2048         int inode_pending[MAXQUOTAS] = { 0, 0 };
2049         int block_pending[MAXQUOTAS] = { 0, 0 };
2050 #endif
2051         int rc = 0;
2052         ENTRY;
2053
2054         /* XXX: this code won't be used ever:
2055          * DNE uses slightly different approach */
2056         LBUG();
2057
2058 #ifdef HAVE_QUOTA_SUPPORT
2059         if (mds->mds_quota) {
2060                 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
2061                 mdd_quota_wrapper(&ma->ma_attr, qids);
2062                 /* get file quota for child */
2063                 lquota_chkquota(mds_quota_interface_ref, obd, exp,
2064                                 qids, inode_pending, 1, NULL, 0,
2065                                 NULL, 0);
2066                 switch (ma->ma_attr.la_mode & S_IFMT) {
2067                 case S_IFLNK:
2068                 case S_IFDIR:
2069                         block_count = 2;
2070                         break;
2071                 case S_IFREG:
2072                         block_count = 1;
2073                         break;
2074                 }
2075                 /* get block quota for child */
2076                 if (block_count)
2077                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
2078                                         qids, block_pending, block_count,
2079                                         NULL, LQUOTA_FLAGS_BLK, NULL, 0);
2080         }
2081 #endif
2082
2083         handle = mdd_trans_create(env, mdd);
2084         if (IS_ERR(handle))
2085                 GOTO(out_pending, rc = PTR_ERR(handle));
2086
2087         rc = mdd_trans_start(env, mdd, handle);
2088
2089         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2090         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
2091         if (rc)
2092                 GOTO(unlock, rc);
2093
2094         rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
2095         if (rc)
2096                 GOTO(unlock, rc);
2097
2098         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
2099                 /* If creating the slave object, set slave EA here. */
2100                 int lmv_size = spec->u.sp_ea.eadatalen;
2101                 struct lmv_stripe_md *lmv;
2102
2103                 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
2104                 LASSERT(lmv != NULL && lmv_size > 0);
2105
2106                 rc = __mdd_xattr_set(env, mdd_obj,
2107                                      mdd_buf_get_const(env, lmv, lmv_size),
2108                                      XATTR_NAME_LMV, 0, handle);
2109                 if (rc)
2110                         GOTO(unlock, rc);
2111
2112                 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
2113                                            handle, 0);
2114         } else {
2115 #ifdef CONFIG_FS_POSIX_ACL
2116                 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
2117                         struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
2118
2119                         buf->lb_buf = (void *)spec->u.sp_ea.eadata;
2120                         buf->lb_len = spec->u.sp_ea.eadatalen;
2121                         if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
2122                                 rc = __mdd_acl_init(env, mdd_obj, buf,
2123                                                     &ma->ma_attr.la_mode,
2124                                                     handle);
2125                                 if (rc)
2126                                         GOTO(unlock, rc);
2127                                 else
2128                                         ma->ma_attr.la_valid |= LA_MODE;
2129                         }
2130
2131                         pfid = spec->u.sp_ea.fid;
2132                 }
2133 #endif
2134                 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
2135                                            spec);
2136         }
2137         EXIT;
2138 unlock:
2139         if (rc == 0)
2140                 rc = mdd_attr_get_internal(env, mdd_obj, ma);
2141         mdd_write_unlock(env, mdd_obj);
2142
2143         mdd_trans_stop(env, mdd, rc, handle);
2144 out_pending:
2145 #ifdef HAVE_QUOTA_SUPPORT
2146         if (quota_opc) {
2147                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2148                                       inode_pending, 0);
2149                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2150                                       block_pending, 1);
2151                 /* Trigger dqacq on the owner of child. If failed,
2152                  * the next call for lquota_chkquota will process it. */
2153                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2154                               quota_opc);
2155         }
2156 #endif
2157         return rc;
2158 }
2159
2160 /* partial link */
2161 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
2162                        const struct md_attr *ma)
2163 {
2164         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
2165         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2166         struct mdd_device *mdd = mdo2mdd(obj);
2167         struct thandle *handle;
2168         int rc;
2169         ENTRY;
2170
2171         /* XXX: this code won't be used ever:
2172          * DNE uses slightly different approach */
2173         LBUG();
2174
2175         handle = mdd_trans_create(env, mdd);
2176         if (IS_ERR(handle))
2177                 RETURN(-ENOMEM);
2178
2179         rc = mdd_trans_start(env, mdd, handle);
2180
2181         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2182         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
2183         if (rc == 0)
2184                 __mdd_ref_add(env, mdd_obj, handle);
2185         mdd_write_unlock(env, mdd_obj);
2186         if (rc == 0) {
2187                 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2188                 la_copy->la_ctime = ma->ma_attr.la_ctime;
2189
2190                 la_copy->la_valid = LA_CTIME;
2191                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
2192                                                         handle, 0);
2193         }
2194         mdd_trans_stop(env, mdd, 0, handle);
2195
2196         RETURN(rc);
2197 }
2198
2199 /*
2200  * do NOT or the MAY_*'s, you'll get the weakest
2201  */
2202 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
2203 {
2204         int res = 0;
2205
2206         /* Sadly, NFSD reopens a file repeatedly during operation, so the
2207          * "acc_mode = 0" allowance for newly-created files isn't honoured.
2208          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
2209          * owner can write to a file even if it is marked readonly to hide
2210          * its brokenness. (bug 5781) */
2211         if (flags & MDS_OPEN_OWNEROVERRIDE) {
2212                 struct md_ucred *uc = md_ucred(env);
2213
2214                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
2215                     (la->la_uid == uc->mu_fsuid))
2216                         return 0;
2217         }
2218
2219         if (flags & FMODE_READ)
2220                 res |= MAY_READ;
2221         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2222                 res |= MAY_WRITE;
2223         if (flags & MDS_FMODE_EXEC)
2224                 res = MAY_EXEC;
2225         return res;
2226 }
2227
2228 static int mdd_open_sanity_check(const struct lu_env *env,
2229                                  struct mdd_object *obj, int flag)
2230 {
2231         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2232         int mode, rc;
2233         ENTRY;
2234
2235         /* EEXIST check */
2236         if (mdd_is_dead_obj(obj))
2237                 RETURN(-ENOENT);
2238
2239         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
2240         if (rc)
2241                RETURN(rc);
2242
2243         if (S_ISLNK(tmp_la->la_mode))
2244                 RETURN(-ELOOP);
2245
2246         mode = accmode(env, tmp_la, flag);
2247
2248         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2249                 RETURN(-EISDIR);
2250
2251         if (!(flag & MDS_OPEN_CREATED)) {
2252                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
2253                 if (rc)
2254                         RETURN(rc);
2255         }
2256
2257         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2258             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2259                 flag &= ~MDS_OPEN_TRUNC;
2260
2261         /* For writing append-only file must open it with append mode. */
2262         if (mdd_is_append(obj)) {
2263                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2264                         RETURN(-EPERM);
2265                 if (flag & MDS_OPEN_TRUNC)
2266                         RETURN(-EPERM);
2267         }
2268
2269 #if 0
2270         /*
2271          * Now, flag -- O_NOATIME does not be packed by client.
2272          */
2273         if (flag & O_NOATIME) {
2274                 struct md_ucred *uc = md_ucred(env);
2275
2276                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2277                     (uc->mu_valid == UCRED_NEW)) &&
2278                     (uc->mu_fsuid != tmp_la->la_uid) &&
2279                     !mdd_capable(uc, CFS_CAP_FOWNER))
2280                         RETURN(-EPERM);
2281         }
2282 #endif
2283
2284         RETURN(0);
2285 }
2286
2287 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2288                     int flags)
2289 {
2290         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2291         int rc = 0;
2292
2293         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2294
2295         rc = mdd_open_sanity_check(env, mdd_obj, flags);
2296         if (rc == 0)
2297                 mdd_obj->mod_count++;
2298
2299         mdd_write_unlock(env, mdd_obj);
2300         return rc;
2301 }
2302
2303 int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
2304                             struct md_attr *ma, struct thandle *handle)
2305 {
2306         int rc;
2307
2308         rc = mdd_declare_unlink_log(env, obj, ma, handle);
2309         if (rc)
2310                 return rc;
2311
2312         return mdo_declare_destroy(env, obj, handle);
2313 }
2314
2315 /* return md_attr back,
2316  * if it is last unlink then return lov ea + llog cookie*/
2317 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2318                     struct md_attr *ma, struct thandle *handle)
2319 {
2320         int rc = 0;
2321         ENTRY;
2322
2323         if (S_ISREG(mdd_object_type(obj))) {
2324                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2325                  * Caller must be ready for that. */
2326
2327                 rc = __mdd_lmm_get(env, obj, ma);
2328                 if ((ma->ma_valid & MA_LOV))
2329                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2330                                             obj, ma);
2331         }
2332
2333         if (rc == 0)
2334                 rc = mdo_destroy(env, obj, handle);
2335
2336         RETURN(rc);
2337 }
2338
2339 static int mdd_declare_close(const struct lu_env *env,
2340                              struct mdd_object *obj,
2341                              struct md_attr *ma,
2342                              struct thandle *handle)
2343 {
2344         int rc;
2345
2346         rc = orph_declare_index_delete(env, obj, handle);
2347         if (rc)
2348                 return rc;
2349
2350         return mdd_declare_object_kill(env, obj, ma, handle);
2351 }
2352
2353 /*
2354  * No permission check is needed.
2355  */
2356 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2357                      struct md_attr *ma, int mode)
2358 {
2359         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2360         struct mdd_device *mdd = mdo2mdd(obj);
2361         struct thandle    *handle = NULL;
2362         int rc;
2363         int reset = 1;
2364
2365 #ifdef HAVE_QUOTA_SUPPORT
2366         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2367         struct mds_obd *mds = &obd->u.mds;
2368         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2369         int quota_opc = 0;
2370 #endif
2371         ENTRY;
2372
2373         if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2374                 mdd_obj->mod_count--;
2375
2376                 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2377                         CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2378                                "list\n", PFID(mdd_object_fid(mdd_obj)));
2379                 RETURN(0);
2380         }
2381
2382         /* check without any lock */
2383         if (mdd_obj->mod_count == 1 &&
2384             (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2385  again:
2386                 handle = mdd_trans_create(env, mdo2mdd(obj));
2387                 if (IS_ERR(handle))
2388                         RETURN(PTR_ERR(handle));
2389
2390                 rc = mdd_declare_close(env, mdd_obj, ma, handle);
2391                 if (rc)
2392                         GOTO(stop, rc);
2393
2394                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
2395                 if (rc)
2396                         GOTO(stop, rc);
2397
2398                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2399                 if (rc)
2400                         GOTO(stop, rc);
2401         }
2402
2403         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2404         if (handle == NULL && mdd_obj->mod_count == 1 &&
2405             (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2406                 mdd_write_unlock(env, mdd_obj);
2407                 goto again;
2408         }
2409
2410         /* release open count */
2411         mdd_obj->mod_count --;
2412
2413         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2414                 /* remove link to object from orphan index */
2415                 LASSERT(handle != NULL);
2416                 rc = __mdd_orphan_del(env, mdd_obj, handle);
2417                 if (rc == 0) {
2418                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2419                                "list, OSS objects to be destroyed.\n",
2420                                PFID(mdd_object_fid(mdd_obj)));
2421                 } else {
2422                         CERROR("Object "DFID" can not be deleted from orphan "
2423                                 "list, maybe cause OST objects can not be "
2424                                 "destroyed (err: %d).\n",
2425                                 PFID(mdd_object_fid(mdd_obj)), rc);
2426                         /* If object was not deleted from orphan list, do not
2427                          * destroy OSS objects, which will be done when next
2428                          * recovery. */
2429                         GOTO(out, rc);
2430                 }
2431         }
2432
2433         rc = mdd_iattr_get(env, mdd_obj, ma);
2434         /* Object maybe not in orphan list originally, it is rare case for
2435          * mdd_finish_unlink() failure. */
2436         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
2437 #ifdef HAVE_QUOTA_SUPPORT
2438                 if (mds->mds_quota) {
2439                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2440                         mdd_quota_wrapper(&ma->ma_attr, qids);
2441                 }
2442 #endif
2443                 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2444                 if (ma->ma_valid & MA_FLAGS &&
2445                     ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2446                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2447                 } else {
2448                         if (handle == NULL) {
2449                                 handle = mdd_trans_create(env, mdo2mdd(obj));
2450                                 if (IS_ERR(handle))
2451                                         GOTO(out, rc = PTR_ERR(handle));
2452
2453                                 rc = mdd_declare_object_kill(env, mdd_obj, ma,
2454                                                              handle);
2455                                 if (rc)
2456                                         GOTO(out, rc);
2457
2458                                 rc = mdd_declare_changelog_store(env, mdd,
2459                                                                  NULL, handle);
2460                                 if (rc)
2461                                         GOTO(stop, rc);
2462
2463                                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2464                                 if (rc)
2465                                         GOTO(out, rc);
2466                         }
2467
2468                         rc = mdd_object_kill(env, mdd_obj, ma, handle);
2469                         if (rc == 0)
2470                                 reset = 0;
2471                 }
2472
2473                 if (rc != 0)
2474                         CERROR("Error when prepare to delete Object "DFID" , "
2475                                "which will cause OST objects can not be "
2476                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
2477         }
2478         EXIT;
2479
2480 out:
2481         if (reset)
2482                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2483
2484         mdd_write_unlock(env, mdd_obj);
2485
2486         if (rc == 0 &&
2487             (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
2488             !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
2489                 if (handle == NULL) {
2490                         handle = mdd_trans_create(env, mdo2mdd(obj));
2491                         if (IS_ERR(handle))
2492                                 GOTO(stop, rc = IS_ERR(handle));
2493
2494                         rc = mdd_declare_changelog_store(env, mdd, NULL,
2495                                                          handle);
2496                         if (rc)
2497                                 GOTO(stop, rc);
2498
2499                         rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2500                         if (rc)
2501                                 GOTO(stop, rc);
2502                 }
2503
2504                 mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
2505                                          mdd_obj, handle);
2506         }
2507
2508 stop:
2509         if (handle != NULL)
2510                 mdd_trans_stop(env, mdd, rc, handle);
2511 #ifdef HAVE_QUOTA_SUPPORT
2512         if (quota_opc)
2513                 /* Trigger dqrel on the owner of child. If failed,
2514                  * the next call for lquota_chkquota will process it */
2515                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2516                               quota_opc);
2517 #endif
2518         return rc;
2519 }
2520
2521 /*
2522  * Permission check is done when open,
2523  * no need check again.
2524  */
2525 static int mdd_readpage_sanity_check(const struct lu_env *env,
2526                                      struct mdd_object *obj)
2527 {
2528         struct dt_object *next = mdd_object_child(obj);
2529         int rc;
2530         ENTRY;
2531
2532         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2533                 rc = 0;
2534         else
2535                 rc = -ENOTDIR;
2536
2537         RETURN(rc);
2538 }
2539
2540 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2541                               struct lu_dirpage *dp, int nob,
2542                               const struct dt_it_ops *iops, struct dt_it *it,
2543                               __u32 attr)
2544 {
2545         void                   *area = dp;
2546         int                     result;
2547         __u64                   hash = 0;
2548         struct lu_dirent       *ent;
2549         struct lu_dirent       *last = NULL;
2550         int                     first = 1;
2551
2552         memset(area, 0, sizeof (*dp));
2553         area += sizeof (*dp);
2554         nob  -= sizeof (*dp);
2555
2556         ent  = area;
2557         do {
2558                 int    len;
2559                 int    recsize;
2560
2561                 len = iops->key_size(env, it);
2562
2563                 /* IAM iterator can return record with zero len. */
2564                 if (len == 0)
2565                         goto next;
2566
2567                 hash = iops->store(env, it);
2568                 if (unlikely(first)) {
2569                         first = 0;
2570                         dp->ldp_hash_start = cpu_to_le64(hash);
2571                 }
2572
2573                 /* calculate max space required for lu_dirent */
2574                 recsize = lu_dirent_calc_size(len, attr);
2575
2576                 if (nob >= recsize) {
2577                         result = iops->rec(env, it, (struct dt_rec *)ent, attr);
2578                         if (result == -ESTALE)
2579                                 goto next;
2580                         if (result != 0)
2581                                 goto out;
2582
2583                         /* osd might not able to pack all attributes,
2584                          * so recheck rec length */
2585                         recsize = le16_to_cpu(ent->lde_reclen);
2586                 } else {
2587                         result = (last != NULL) ? 0 :-EINVAL;
2588                         goto out;
2589                 }
2590                 last = ent;
2591                 ent = (void *)ent + recsize;
2592                 nob -= recsize;
2593
2594 next:
2595                 result = iops->next(env, it);
2596                 if (result == -ESTALE)
2597                         goto next;
2598         } while (result == 0);
2599
2600 out:
2601         dp->ldp_hash_end = cpu_to_le64(hash);
2602         if (last != NULL) {
2603                 if (last->lde_hash == dp->ldp_hash_end)
2604                         dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2605                 last->lde_reclen = 0; /* end mark */
2606         }
2607         return result;
2608 }
2609
2610 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2611                           const struct lu_rdpg *rdpg)
2612 {
2613         struct dt_it      *it;
2614         struct dt_object  *next = mdd_object_child(obj);
2615         const struct dt_it_ops  *iops;
2616         struct page       *pg;
2617         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2618         int i;
2619         int nlupgs = 0;
2620         int rc;
2621         int nob;
2622
2623         LASSERT(rdpg->rp_pages != NULL);
2624         LASSERT(next->do_index_ops != NULL);
2625
2626         if (rdpg->rp_count <= 0)
2627                 return -EFAULT;
2628
2629         /*
2630          * iterate through directory and fill pages from @rdpg
2631          */
2632         iops = &next->do_index_ops->dio_it;
2633         it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
2634         if (IS_ERR(it))
2635                 return PTR_ERR(it);
2636
2637         rc = iops->load(env, it, rdpg->rp_hash);
2638
2639         if (rc == 0) {
2640                 /*
2641                  * Iterator didn't find record with exactly the key requested.
2642                  *
2643                  * It is currently either
2644                  *
2645                  *     - positioned above record with key less than
2646                  *     requested---skip it.
2647                  *
2648                  *     - or not positioned at all (is in IAM_IT_SKEWED
2649                  *     state)---position it on the next item.
2650                  */
2651                 rc = iops->next(env, it);
2652         } else if (rc > 0)
2653                 rc = 0;
2654
2655         /*
2656          * At this point and across for-loop:
2657          *
2658          *  rc == 0 -> ok, proceed.
2659          *  rc >  0 -> end of directory.
2660          *  rc <  0 -> error.
2661          */
2662         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2663              i++, nob -= CFS_PAGE_SIZE) {
2664                 struct lu_dirpage *dp;
2665
2666                 LASSERT(i < rdpg->rp_npages);
2667                 pg = rdpg->rp_pages[i];
2668                 dp = cfs_kmap(pg);
2669 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2670 repeat:
2671 #endif
2672                 rc = mdd_dir_page_build(env, mdd, dp,
2673                                         min_t(int, nob, LU_PAGE_SIZE),
2674                                         iops, it, rdpg->rp_attrs);
2675                 if (rc > 0) {
2676                         /*
2677                          * end of directory.
2678                          */
2679                         dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2680                         nlupgs++;
2681                 } else if (rc < 0) {
2682                         CWARN("build page failed: %d!\n", rc);
2683                 } else {
2684                         nlupgs++;
2685 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2686                         dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2687                         if ((unsigned long)dp & ~CFS_PAGE_MASK)
2688                                 goto repeat;
2689 #endif
2690                 }
2691                 cfs_kunmap(pg);
2692         }
2693         if (rc >= 0) {
2694                 struct lu_dirpage *dp;
2695
2696                 dp = cfs_kmap(rdpg->rp_pages[0]);
2697                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2698                 if (nlupgs == 0) {
2699                         /*
2700                          * No pages were processed, mark this for first page
2701                          * and send back.
2702                          */
2703                         dp->ldp_flags  = cpu_to_le32(LDF_EMPTY);
2704                         nlupgs = 1;
2705                 }
2706                 cfs_kunmap(rdpg->rp_pages[0]);
2707
2708                 rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
2709         }
2710         iops->put(env, it);
2711         iops->fini(env, it);
2712
2713         return rc;
2714 }
2715
2716 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2717                  const struct lu_rdpg *rdpg)
2718 {
2719         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2720         int rc;
2721         ENTRY;
2722
2723         LASSERT(mdd_object_exists(mdd_obj));
2724
2725         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2726         rc = mdd_readpage_sanity_check(env, mdd_obj);
2727         if (rc)
2728                 GOTO(out_unlock, rc);
2729
2730         if (mdd_is_dead_obj(mdd_obj)) {
2731                 struct page *pg;
2732                 struct lu_dirpage *dp;
2733
2734                 /*
2735                  * According to POSIX, please do not return any entry to client:
2736                  * even dot and dotdot should not be returned.
2737                  */
2738                 CWARN("readdir from dead object: "DFID"\n",
2739                         PFID(mdd_object_fid(mdd_obj)));
2740
2741                 if (rdpg->rp_count <= 0)
2742                         GOTO(out_unlock, rc = -EFAULT);
2743                 LASSERT(rdpg->rp_pages != NULL);
2744
2745                 pg = rdpg->rp_pages[0];
2746                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2747                 memset(dp, 0 , sizeof(struct lu_dirpage));
2748                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2749                 dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
2750                 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2751                 cfs_kunmap(pg);
2752                 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2753         }
2754
2755         rc = __mdd_readpage(env, mdd_obj, rdpg);
2756
2757         EXIT;
2758 out_unlock:
2759         mdd_read_unlock(env, mdd_obj);
2760         return rc;
2761 }
2762
2763 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2764 {
2765         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2766         struct dt_object *next;
2767
2768         LASSERT(mdd_object_exists(mdd_obj));
2769         next = mdd_object_child(mdd_obj);
2770         return next->do_ops->do_object_sync(env, next);
2771 }
2772
2773 const struct md_object_operations mdd_obj_ops = {
2774         .moo_permission    = mdd_permission,
2775         .moo_attr_get      = mdd_attr_get,
2776         .moo_attr_set      = mdd_attr_set,
2777         .moo_xattr_get     = mdd_xattr_get,
2778         .moo_xattr_set     = mdd_xattr_set,
2779         .moo_xattr_list    = mdd_xattr_list,
2780         .moo_xattr_del     = mdd_xattr_del,
2781         .moo_object_create = mdd_object_create,
2782         .moo_ref_add       = mdd_ref_add,
2783         .moo_ref_del       = mdd_ref_del,
2784         .moo_open          = mdd_open,
2785         .moo_close         = mdd_close,
2786         .moo_readpage      = mdd_readpage,
2787         .moo_readlink      = mdd_readlink,
2788         .moo_changelog     = mdd_changelog,
2789         .moo_capa_get      = mdd_capa_get,
2790         .moo_object_sync   = mdd_object_sync,
2791         .moo_path          = mdd_path,
2792         .moo_file_lock     = mdd_file_lock,
2793         .moo_file_unlock   = mdd_file_unlock,
2794 };