Whamcloud - gitweb
20c4385f97ee22ec02d32dc911b9ee720f616491
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * Copyright (c) 2011 Whamcloud, Inc.
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  *
39  * lustre/mdd/mdd_object.c
40  *
41  * Lustre Metadata Server (mdd) routines
42  *
43  * Author: Wang Di <wangdi@clusterfs.com>
44  */
45
46 #ifndef EXPORT_SYMTAB
47 # define EXPORT_SYMTAB
48 #endif
49 #define DEBUG_SUBSYSTEM S_MDS
50
51 #include <linux/module.h>
52 #include <obd.h>
53 #include <obd_class.h>
54 #include <obd_support.h>
55 #include <lprocfs_status.h>
56 /* fid_be_cpu(), fid_cpu_to_be(). */
57 #include <lustre_fid.h>
58 #include <obd_lov.h>
59
60 #include <lustre_param.h>
61 #include <lustre_mds.h>
62 #include <lustre/lustre_idl.h>
63
64 #include "mdd_internal.h"
65
66 static const struct lu_object_operations mdd_lu_obj_ops;
67
68 static int mdd_xattr_get(const struct lu_env *env,
69                          struct md_object *obj, struct lu_buf *buf,
70                          const char *name);
71
72 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
73                  void **data)
74 {
75         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
76                  PFID(mdd_object_fid(obj)));
77         mdo_data_get(env, obj, data);
78         return 0;
79 }
80
81 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
82                struct lu_attr *la, struct lustre_capa *capa)
83 {
84         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
85                  PFID(mdd_object_fid(obj)));
86         return mdo_attr_get(env, obj, la, capa);
87 }
88
89 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
90 {
91         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
92
93         if (flags & LUSTRE_APPEND_FL)
94                 obj->mod_flags |= APPEND_OBJ;
95
96         if (flags & LUSTRE_IMMUTABLE_FL)
97                 obj->mod_flags |= IMMUTE_OBJ;
98 }
99
100 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
101 {
102         struct mdd_thread_info *info;
103
104         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
105         LASSERT(info != NULL);
106         return info;
107 }
108
109 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
110 {
111         struct lu_buf *buf;
112
113         buf = &mdd_env_info(env)->mti_buf;
114         buf->lb_buf = area;
115         buf->lb_len = len;
116         return buf;
117 }
118
119 void mdd_buf_put(struct lu_buf *buf)
120 {
121         if (buf == NULL || buf->lb_buf == NULL)
122                 return;
123         OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
124         buf->lb_buf = NULL;
125         buf->lb_len = 0;
126 }
127
128 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
129                                        const void *area, ssize_t len)
130 {
131         struct lu_buf *buf;
132
133         buf = &mdd_env_info(env)->mti_buf;
134         buf->lb_buf = (void *)area;
135         buf->lb_len = len;
136         return buf;
137 }
138
139 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
140 {
141         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
142
143         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
144                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
145                 buf->lb_buf = NULL;
146         }
147         if (buf->lb_buf == NULL) {
148                 buf->lb_len = len;
149                 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
150                 if (buf->lb_buf == NULL)
151                         buf->lb_len = 0;
152         }
153         return buf;
154 }
155
156 /** Increase the size of the \a mti_big_buf.
157  * preserves old data in buffer
158  * old buffer remains unchanged on error
159  * \retval 0 or -ENOMEM
160  */
161 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
162 {
163         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
164         struct lu_buf buf;
165
166         LASSERT(len >= oldbuf->lb_len);
167         OBD_ALLOC_LARGE(buf.lb_buf, len);
168
169         if (buf.lb_buf == NULL)
170                 return -ENOMEM;
171
172         buf.lb_len = len;
173         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
174
175         OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
176
177         memcpy(oldbuf, &buf, sizeof(buf));
178
179         return 0;
180 }
181
182 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
183                                        struct mdd_device *mdd)
184 {
185         struct mdd_thread_info *mti = mdd_env_info(env);
186         int                     max_cookie_size;
187
188         max_cookie_size = mdd_lov_cookiesize(env, mdd);
189         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
190                 if (mti->mti_max_cookie)
191                         OBD_FREE_LARGE(mti->mti_max_cookie,
192                                        mti->mti_max_cookie_size);
193                 mti->mti_max_cookie = NULL;
194                 mti->mti_max_cookie_size = 0;
195         }
196         if (unlikely(mti->mti_max_cookie == NULL)) {
197                 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
198                 if (likely(mti->mti_max_cookie != NULL))
199                         mti->mti_max_cookie_size = max_cookie_size;
200         }
201         if (likely(mti->mti_max_cookie != NULL))
202                 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
203         return mti->mti_max_cookie;
204 }
205
206 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
207                                    struct mdd_device *mdd)
208 {
209         struct mdd_thread_info *mti = mdd_env_info(env);
210         int                     max_lmm_size;
211
212         max_lmm_size = mdd_lov_mdsize(env, mdd);
213         if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
214                 if (mti->mti_max_lmm)
215                         OBD_FREE_LARGE(mti->mti_max_lmm, mti->mti_max_lmm_size);
216                 mti->mti_max_lmm = NULL;
217                 mti->mti_max_lmm_size = 0;
218         }
219         if (unlikely(mti->mti_max_lmm == NULL)) {
220                 OBD_ALLOC_LARGE(mti->mti_max_lmm, max_lmm_size);
221                 if (likely(mti->mti_max_lmm != NULL))
222                         mti->mti_max_lmm_size = max_lmm_size;
223         }
224         return mti->mti_max_lmm;
225 }
226
227 struct lu_object *mdd_object_alloc(const struct lu_env *env,
228                                    const struct lu_object_header *hdr,
229                                    struct lu_device *d)
230 {
231         struct mdd_object *mdd_obj;
232
233         OBD_ALLOC_PTR(mdd_obj);
234         if (mdd_obj != NULL) {
235                 struct lu_object *o;
236
237                 o = mdd2lu_obj(mdd_obj);
238                 lu_object_init(o, NULL, d);
239                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
240                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
241                 mdd_obj->mod_count = 0;
242                 o->lo_ops = &mdd_lu_obj_ops;
243                 return o;
244         } else {
245                 return NULL;
246         }
247 }
248
249 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
250                            const struct lu_object_conf *unused)
251 {
252         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
253         struct mdd_object *mdd_obj = lu2mdd_obj(o);
254         struct lu_object  *below;
255         struct lu_device  *under;
256         ENTRY;
257
258         mdd_obj->mod_cltime = 0;
259         under = &d->mdd_child->dd_lu_dev;
260         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
261         mdd_pdlock_init(mdd_obj);
262         if (below == NULL)
263                 RETURN(-ENOMEM);
264
265         lu_object_add(o, below);
266
267         RETURN(0);
268 }
269
270 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
271 {
272         if (lu_object_exists(o))
273                 return mdd_get_flags(env, lu2mdd_obj(o));
274         else
275                 return 0;
276 }
277
278 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
279 {
280         struct mdd_object *mdd = lu2mdd_obj(o);
281
282         lu_object_fini(o);
283         OBD_FREE_PTR(mdd);
284 }
285
286 static int mdd_object_print(const struct lu_env *env, void *cookie,
287                             lu_printer_t p, const struct lu_object *o)
288 {
289         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
290         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
291                     "valid=%x, cltime="LPU64", flags=%lx)",
292                     mdd, mdd->mod_count, mdd->mod_valid,
293                     mdd->mod_cltime, mdd->mod_flags);
294 }
295
296 static const struct lu_object_operations mdd_lu_obj_ops = {
297         .loo_object_init    = mdd_object_init,
298         .loo_object_start   = mdd_object_start,
299         .loo_object_free    = mdd_object_free,
300         .loo_object_print   = mdd_object_print,
301 };
302
303 struct mdd_object *mdd_object_find(const struct lu_env *env,
304                                    struct mdd_device *d,
305                                    const struct lu_fid *f)
306 {
307         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
308 }
309
310 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
311                         const char *path, struct lu_fid *fid)
312 {
313         struct lu_buf *buf;
314         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
315         struct mdd_object *obj;
316         struct lu_name *lname = &mdd_env_info(env)->mti_name;
317         char *name;
318         int rc = 0;
319         ENTRY;
320
321         /* temp buffer for path element */
322         buf = mdd_buf_alloc(env, PATH_MAX);
323         if (buf->lb_buf == NULL)
324                 RETURN(-ENOMEM);
325
326         lname->ln_name = name = buf->lb_buf;
327         lname->ln_namelen = 0;
328         *f = mdd->mdd_root_fid;
329
330         while(1) {
331                 while (*path == '/')
332                         path++;
333                 if (*path == '\0')
334                         break;
335                 while (*path != '/' && *path != '\0') {
336                         *name = *path;
337                         path++;
338                         name++;
339                         lname->ln_namelen++;
340                 }
341
342                 *name = '\0';
343                 /* find obj corresponding to fid */
344                 obj = mdd_object_find(env, mdd, f);
345                 if (obj == NULL)
346                         GOTO(out, rc = -EREMOTE);
347                 if (IS_ERR(obj))
348                         GOTO(out, rc = PTR_ERR(obj));
349                 /* get child fid from parent and name */
350                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
351                 mdd_object_put(env, obj);
352                 if (rc)
353                         break;
354
355                 name = buf->lb_buf;
356                 lname->ln_namelen = 0;
357         }
358
359         if (!rc)
360                 *fid = *f;
361 out:
362         RETURN(rc);
363 }
364
365 /** The maximum depth that fid2path() will search.
366  * This is limited only because we want to store the fids for
367  * historical path lookup purposes.
368  */
369 #define MAX_PATH_DEPTH 100
370
371 /** mdd_path() lookup structure. */
372 struct path_lookup_info {
373         __u64                pli_recno;        /**< history point */
374         __u64                pli_currec;       /**< current record */
375         struct lu_fid        pli_fid;
376         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
377         struct mdd_object   *pli_mdd_obj;
378         char                *pli_path;         /**< full path */
379         int                  pli_pathlen;
380         int                  pli_linkno;       /**< which hardlink to follow */
381         int                  pli_fidcount;     /**< number of \a pli_fids */
382 };
383
384 static int mdd_path_current(const struct lu_env *env,
385                             struct path_lookup_info *pli)
386 {
387         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
388         struct mdd_object *mdd_obj;
389         struct lu_buf     *buf = NULL;
390         struct link_ea_header *leh;
391         struct link_ea_entry  *lee;
392         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
393         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
394         char *ptr;
395         int reclen;
396         int rc;
397         ENTRY;
398
399         ptr = pli->pli_path + pli->pli_pathlen - 1;
400         *ptr = 0;
401         --ptr;
402         pli->pli_fidcount = 0;
403         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
404
405         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
406                 mdd_obj = mdd_object_find(env, mdd,
407                                           &pli->pli_fids[pli->pli_fidcount]);
408                 if (mdd_obj == NULL)
409                         GOTO(out, rc = -EREMOTE);
410                 if (IS_ERR(mdd_obj))
411                         GOTO(out, rc = PTR_ERR(mdd_obj));
412                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
413                 if (rc <= 0) {
414                         mdd_object_put(env, mdd_obj);
415                         if (rc == -1)
416                                 rc = -EREMOTE;
417                         else if (rc == 0)
418                                 /* Do I need to error out here? */
419                                 rc = -ENOENT;
420                         GOTO(out, rc);
421                 }
422
423                 /* Get parent fid and object name */
424                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
425                 buf = mdd_links_get(env, mdd_obj);
426                 mdd_read_unlock(env, mdd_obj);
427                 mdd_object_put(env, mdd_obj);
428                 if (IS_ERR(buf))
429                         GOTO(out, rc = PTR_ERR(buf));
430
431                 leh = buf->lb_buf;
432                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
433                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
434
435                 /* If set, use link #linkno for path lookup, otherwise use
436                    link #0.  Only do this for the final path element. */
437                 if ((pli->pli_fidcount == 0) &&
438                     (pli->pli_linkno < leh->leh_reccount)) {
439                         int count;
440                         for (count = 0; count < pli->pli_linkno; count++) {
441                                 lee = (struct link_ea_entry *)
442                                      ((char *)lee + reclen);
443                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
444                         }
445                         if (pli->pli_linkno < leh->leh_reccount - 1)
446                                 /* indicate to user there are more links */
447                                 pli->pli_linkno++;
448                 }
449
450                 /* Pack the name in the end of the buffer */
451                 ptr -= tmpname->ln_namelen;
452                 if (ptr - 1 <= pli->pli_path)
453                         GOTO(out, rc = -EOVERFLOW);
454                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
455                 *(--ptr) = '/';
456
457                 /* Store the parent fid for historic lookup */
458                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
459                         GOTO(out, rc = -EOVERFLOW);
460                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
461         }
462
463         /* Verify that our path hasn't changed since we started the lookup.
464            Record the current index, and verify the path resolves to the
465            same fid. If it does, then the path is correct as of this index. */
466         cfs_spin_lock(&mdd->mdd_cl.mc_lock);
467         pli->pli_currec = mdd->mdd_cl.mc_index;
468         cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
469         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
470         if (rc) {
471                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
472                 GOTO (out, rc = -EAGAIN);
473         }
474         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
475                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
476                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
477                        PFID(&pli->pli_fid));
478                 GOTO(out, rc = -EAGAIN);
479         }
480         ptr++; /* skip leading / */
481         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
482
483         EXIT;
484 out:
485         if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
486                 /* if we vmalloced a large buffer drop it */
487                 mdd_buf_put(buf);
488
489         return rc;
490 }
491
492 static int mdd_path_historic(const struct lu_env *env,
493                              struct path_lookup_info *pli)
494 {
495         return 0;
496 }
497
498 /* Returns the full path to this fid, as of changelog record recno. */
499 static int mdd_path(const struct lu_env *env, struct md_object *obj,
500                     char *path, int pathlen, __u64 *recno, int *linkno)
501 {
502         struct path_lookup_info *pli;
503         int tries = 3;
504         int rc = -EAGAIN;
505         ENTRY;
506
507         if (pathlen < 3)
508                 RETURN(-EOVERFLOW);
509
510         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
511                 path[0] = '\0';
512                 RETURN(0);
513         }
514
515         OBD_ALLOC_PTR(pli);
516         if (pli == NULL)
517                 RETURN(-ENOMEM);
518
519         pli->pli_mdd_obj = md2mdd_obj(obj);
520         pli->pli_recno = *recno;
521         pli->pli_path = path;
522         pli->pli_pathlen = pathlen;
523         pli->pli_linkno = *linkno;
524
525         /* Retry multiple times in case file is being moved */
526         while (tries-- && rc == -EAGAIN)
527                 rc = mdd_path_current(env, pli);
528
529         /* For historical path lookup, the current links may not have existed
530          * at "recno" time.  We must switch over to earlier links/parents
531          * by using the changelog records.  If the earlier parent doesn't
532          * exist, we must search back through the changelog to reconstruct
533          * its parents, then check if it exists, etc.
534          * We may ignore this problem for the initial implementation and
535          * state that an "original" hardlink must still exist for us to find
536          * historic path name. */
537         if (pli->pli_recno != -1) {
538                 rc = mdd_path_historic(env, pli);
539         } else {
540                 *recno = pli->pli_currec;
541                 /* Return next link index to caller */
542                 *linkno = pli->pli_linkno;
543         }
544
545         OBD_FREE_PTR(pli);
546
547         RETURN (rc);
548 }
549
550 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
551 {
552         struct lu_attr *la = &mdd_env_info(env)->mti_la;
553         int rc;
554
555         ENTRY;
556         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
557         if (rc == 0) {
558                 mdd_flags_xlate(obj, la->la_flags);
559         }
560         RETURN(rc);
561 }
562
563 /* get only inode attributes */
564 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
565                   struct md_attr *ma)
566 {
567         int rc = 0;
568         ENTRY;
569
570         if (ma->ma_valid & MA_INODE)
571                 RETURN(0);
572
573         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
574                           mdd_object_capa(env, mdd_obj));
575         if (rc == 0)
576                 ma->ma_valid |= MA_INODE;
577         RETURN(rc);
578 }
579
580 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
581 {
582         struct lov_desc *ldesc;
583         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
584         struct lov_user_md *lum = (struct lov_user_md*)lmm;
585         ENTRY;
586
587         if (!lum)
588                 RETURN(0);
589
590         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
591         LASSERT(ldesc != NULL);
592
593         lum->lmm_magic = LOV_MAGIC_V1;
594         lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
595         lum->lmm_pattern = ldesc->ld_pattern;
596         lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
597         lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
598         lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
599
600         RETURN(sizeof(*lum));
601 }
602
603 static int is_rootdir(struct mdd_object *mdd_obj)
604 {
605         const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
606         const struct lu_fid *fid = mdo2fid(mdd_obj);
607
608         return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
609 }
610
611 /* get lov EA only */
612 static int __mdd_lmm_get(const struct lu_env *env,
613                          struct mdd_object *mdd_obj, struct md_attr *ma)
614 {
615         int rc;
616         ENTRY;
617
618         if (ma->ma_valid & MA_LOV)
619                 RETURN(0);
620
621         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
622                         XATTR_NAME_LOV);
623         if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
624                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
625         if (rc > 0) {
626                 ma->ma_lmm_size = rc;
627                 ma->ma_layout_gen = ma->ma_lmm->lmm_layout_gen;
628                 ma->ma_valid |= MA_LOV | MA_LAY_GEN;
629                 rc = 0;
630         }
631         RETURN(rc);
632 }
633
634 /* get the first parent fid from link EA */
635 static int mdd_pfid_get(const struct lu_env *env,
636                         struct mdd_object *mdd_obj, struct md_attr *ma)
637 {
638         struct lu_buf *buf;
639         struct link_ea_header *leh;
640         struct link_ea_entry *lee;
641         struct lu_fid *pfid = &ma->ma_pfid;
642         ENTRY;
643
644         if (ma->ma_valid & MA_PFID)
645                 RETURN(0);
646
647         buf = mdd_links_get(env, mdd_obj);
648         if (IS_ERR(buf))
649                 RETURN(PTR_ERR(buf));
650
651         leh = buf->lb_buf;
652         lee = (struct link_ea_entry *)(leh + 1);
653         memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
654         fid_be_to_cpu(pfid, pfid);
655         ma->ma_valid |= MA_PFID;
656         if (buf->lb_len > OBD_ALLOC_BIG)
657                 /* if we vmalloced a large buffer drop it */
658                 mdd_buf_put(buf);
659         RETURN(0);
660 }
661
662 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
663                        struct md_attr *ma)
664 {
665         int rc;
666         ENTRY;
667
668         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
669         rc = __mdd_lmm_get(env, mdd_obj, ma);
670         mdd_read_unlock(env, mdd_obj);
671         RETURN(rc);
672 }
673
674 /* get lmv EA only*/
675 static int __mdd_lmv_get(const struct lu_env *env,
676                          struct mdd_object *mdd_obj, struct md_attr *ma)
677 {
678         int rc;
679         ENTRY;
680
681         if (ma->ma_valid & MA_LMV)
682                 RETURN(0);
683
684         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
685                         XATTR_NAME_LMV);
686         if (rc > 0) {
687                 ma->ma_valid |= MA_LMV;
688                 rc = 0;
689         }
690         RETURN(rc);
691 }
692
693 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
694                          struct md_attr *ma)
695 {
696         struct mdd_thread_info *info = mdd_env_info(env);
697         struct lustre_mdt_attrs *lma =
698                                  (struct lustre_mdt_attrs *)info->mti_xattr_buf;
699         int lma_size;
700         int rc;
701         ENTRY;
702
703         /* If all needed data are already valid, nothing to do */
704         if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
705             (ma->ma_need & (MA_HSM | MA_SOM)))
706                 RETURN(0);
707
708         /* Read LMA from disk EA */
709         lma_size = sizeof(info->mti_xattr_buf);
710         rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
711         if (rc <= 0)
712                 RETURN(rc);
713
714         /* Useless to check LMA incompatibility because this is already done in
715          * osd_ea_fid_get(), and this will fail long before this code is
716          * called.
717          * So, if we are here, LMA is compatible.
718          */
719
720         lustre_lma_swab(lma);
721
722         /* Swab and copy LMA */
723         if (ma->ma_need & MA_HSM) {
724                 if (lma->lma_compat & LMAC_HSM)
725                         ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
726                 else
727                         ma->ma_hsm.mh_flags = 0;
728                 ma->ma_valid |= MA_HSM;
729         }
730
731         /* Copy SOM */
732         if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
733                 LASSERT(ma->ma_som != NULL);
734                 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
735                 ma->ma_som->msd_size    = lma->lma_som_size;
736                 ma->ma_som->msd_blocks  = lma->lma_som_blocks;
737                 ma->ma_som->msd_mountid = lma->lma_som_mountid;
738                 ma->ma_valid |= MA_SOM;
739         }
740
741         RETURN(0);
742 }
743
744 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
745                                  struct md_attr *ma)
746 {
747         int rc = 0;
748         ENTRY;
749
750         if (ma->ma_need & MA_INODE)
751                 rc = mdd_iattr_get(env, mdd_obj, ma);
752
753         if (rc == 0 && ma->ma_need & MA_LOV) {
754                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
755                     S_ISDIR(mdd_object_type(mdd_obj)))
756                         rc = __mdd_lmm_get(env, mdd_obj, ma);
757         }
758         if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
759                 if (S_ISREG(mdd_object_type(mdd_obj)))
760                         rc = mdd_pfid_get(env, mdd_obj, ma);
761         }
762         if (rc == 0 && ma->ma_need & MA_LMV) {
763                 if (S_ISDIR(mdd_object_type(mdd_obj)))
764                         rc = __mdd_lmv_get(env, mdd_obj, ma);
765         }
766         if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
767                 if (S_ISREG(mdd_object_type(mdd_obj)))
768                         rc = __mdd_lma_get(env, mdd_obj, ma);
769         }
770 #ifdef CONFIG_FS_POSIX_ACL
771         if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
772                 if (S_ISDIR(mdd_object_type(mdd_obj)))
773                         rc = mdd_def_acl_get(env, mdd_obj, ma);
774         }
775 #endif
776         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
777                rc, ma->ma_valid, ma->ma_lmm);
778         RETURN(rc);
779 }
780
781 int mdd_attr_get_internal_locked(const struct lu_env *env,
782                                  struct mdd_object *mdd_obj, struct md_attr *ma)
783 {
784         int rc;
785         int needlock = ma->ma_need &
786                        (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
787
788         if (needlock)
789                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
790         rc = mdd_attr_get_internal(env, mdd_obj, ma);
791         if (needlock)
792                 mdd_read_unlock(env, mdd_obj);
793         return rc;
794 }
795
796 /*
797  * No permission check is needed.
798  */
799 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
800                         struct md_attr *ma)
801 {
802         struct mdd_object *mdd_obj = md2mdd_obj(obj);
803         int                rc;
804
805         ENTRY;
806         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
807         RETURN(rc);
808 }
809
810 /*
811  * No permission check is needed.
812  */
813 static int mdd_xattr_get(const struct lu_env *env,
814                          struct md_object *obj, struct lu_buf *buf,
815                          const char *name)
816 {
817         struct mdd_object *mdd_obj = md2mdd_obj(obj);
818         int rc;
819
820         ENTRY;
821
822         LASSERT(mdd_object_exists(mdd_obj));
823
824         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
825         rc = mdo_xattr_get(env, mdd_obj, buf, name,
826                            mdd_object_capa(env, mdd_obj));
827         mdd_read_unlock(env, mdd_obj);
828
829         RETURN(rc);
830 }
831
832 /*
833  * Permission check is done when open,
834  * no need check again.
835  */
836 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
837                         struct lu_buf *buf)
838 {
839         struct mdd_object *mdd_obj = md2mdd_obj(obj);
840         struct dt_object  *next;
841         loff_t             pos = 0;
842         int                rc;
843         ENTRY;
844
845         LASSERT(mdd_object_exists(mdd_obj));
846
847         next = mdd_object_child(mdd_obj);
848         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
849         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
850                                          mdd_object_capa(env, mdd_obj));
851         mdd_read_unlock(env, mdd_obj);
852         RETURN(rc);
853 }
854
855 /*
856  * No permission check is needed.
857  */
858 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
859                           struct lu_buf *buf)
860 {
861         struct mdd_object *mdd_obj = md2mdd_obj(obj);
862         int rc;
863
864         ENTRY;
865
866         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
867         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
868         mdd_read_unlock(env, mdd_obj);
869
870         RETURN(rc);
871 }
872
873 int mdd_declare_object_create_internal(const struct lu_env *env,
874                                        struct mdd_object *p,
875                                        struct mdd_object *c,
876                                        struct md_attr *ma,
877                                        struct thandle *handle,
878                                        const struct md_op_spec *spec)
879 {
880         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
881         const struct dt_index_features *feat = spec->sp_feat;
882         int rc;
883         ENTRY;
884
885         if (feat != &dt_directory_features && feat != NULL)
886                 dof->dof_type = DFT_INDEX;
887         else
888                 dof->dof_type = dt_mode_to_dft(ma->ma_attr.la_mode);
889
890         dof->u.dof_idx.di_feat = feat;
891
892         rc = mdo_declare_create_obj(env, c, &ma->ma_attr, NULL, dof, handle);
893
894         RETURN(rc);
895 }
896
897 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
898                                struct mdd_object *c, struct md_attr *ma,
899                                struct thandle *handle,
900                                const struct md_op_spec *spec)
901 {
902         struct lu_attr *attr = &ma->ma_attr;
903         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
904         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
905         const struct dt_index_features *feat = spec->sp_feat;
906         int rc;
907         ENTRY;
908
909         if (!mdd_object_exists(c)) {
910                 struct dt_object *next = mdd_object_child(c);
911                 LASSERT(next);
912
913                 if (feat != &dt_directory_features && feat != NULL)
914                         dof->dof_type = DFT_INDEX;
915                 else
916                         dof->dof_type = dt_mode_to_dft(attr->la_mode);
917
918                 dof->u.dof_idx.di_feat = feat;
919
920                 /* @hint will be initialized by underlying device. */
921                 next->do_ops->do_ah_init(env, hint,
922                                          p ? mdd_object_child(p) : NULL,
923                                          attr->la_mode & S_IFMT);
924
925                 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
926                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
927         } else
928                 rc = -EEXIST;
929
930         RETURN(rc);
931 }
932
933 /**
934  * Make sure the ctime is increased only.
935  */
936 static inline int mdd_attr_check(const struct lu_env *env,
937                                  struct mdd_object *obj,
938                                  struct lu_attr *attr)
939 {
940         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
941         int rc;
942         ENTRY;
943
944         if (attr->la_valid & LA_CTIME) {
945                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
946                 if (rc)
947                         RETURN(rc);
948
949                 if (attr->la_ctime < tmp_la->la_ctime)
950                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
951                 else if (attr->la_valid == LA_CTIME &&
952                          attr->la_ctime == tmp_la->la_ctime)
953                         attr->la_valid &= ~LA_CTIME;
954         }
955         RETURN(0);
956 }
957
958 int mdd_attr_set_internal(const struct lu_env *env,
959                           struct mdd_object *obj,
960                           struct lu_attr *attr,
961                           struct thandle *handle,
962                           int needacl)
963 {
964         int rc;
965         ENTRY;
966
967         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
968 #ifdef CONFIG_FS_POSIX_ACL
969         if (!rc && (attr->la_valid & LA_MODE) && needacl)
970                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
971 #endif
972         RETURN(rc);
973 }
974
975 int mdd_attr_check_set_internal(const struct lu_env *env,
976                                 struct mdd_object *obj,
977                                 struct lu_attr *attr,
978                                 struct thandle *handle,
979                                 int needacl)
980 {
981         int rc;
982         ENTRY;
983
984         rc = mdd_attr_check(env, obj, attr);
985         if (rc)
986                 RETURN(rc);
987
988         if (attr->la_valid)
989                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
990         RETURN(rc);
991 }
992
993 static int mdd_attr_set_internal_locked(const struct lu_env *env,
994                                         struct mdd_object *obj,
995                                         struct lu_attr *attr,
996                                         struct thandle *handle,
997                                         int needacl)
998 {
999         int rc;
1000         ENTRY;
1001
1002         needacl = needacl && (attr->la_valid & LA_MODE);
1003         if (needacl)
1004                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1005         rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1006         if (needacl)
1007                 mdd_write_unlock(env, obj);
1008         RETURN(rc);
1009 }
1010
1011 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
1012                                        struct mdd_object *obj,
1013                                        struct lu_attr *attr,
1014                                        struct thandle *handle,
1015                                        int needacl)
1016 {
1017         int rc;
1018         ENTRY;
1019
1020         needacl = needacl && (attr->la_valid & LA_MODE);
1021         if (needacl)
1022                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1023         rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
1024         if (needacl)
1025                 mdd_write_unlock(env, obj);
1026         RETURN(rc);
1027 }
1028
1029 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
1030                     const struct lu_buf *buf, const char *name,
1031                     int fl, struct thandle *handle)
1032 {
1033         struct lustre_capa *capa = mdd_object_capa(env, obj);
1034         int rc = -EINVAL;
1035         ENTRY;
1036
1037         if (buf->lb_buf && buf->lb_len > 0)
1038                 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1039         else if (buf->lb_buf == NULL && buf->lb_len == 0)
1040                 rc = mdo_xattr_del(env, obj, name, handle, capa);
1041
1042         RETURN(rc);
1043 }
1044
1045 /*
1046  * This gives the same functionality as the code between
1047  * sys_chmod and inode_setattr
1048  * chown_common and inode_setattr
1049  * utimes and inode_setattr
1050  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1051  */
1052 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1053                         struct lu_attr *la, const struct md_attr *ma)
1054 {
1055         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
1056         struct md_ucred  *uc;
1057         int               rc;
1058         ENTRY;
1059
1060         if (!la->la_valid)
1061                 RETURN(0);
1062
1063         /* Do not permit change file type */
1064         if (la->la_valid & LA_TYPE)
1065                 RETURN(-EPERM);
1066
1067         /* They should not be processed by setattr */
1068         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1069                 RETURN(-EPERM);
1070
1071         /* export destroy does not have ->le_ses, but we may want
1072          * to drop LUSTRE_SOM_FL. */
1073         if (!env->le_ses)
1074                 RETURN(0);
1075
1076         uc = md_ucred(env);
1077
1078         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1079         if (rc)
1080                 RETURN(rc);
1081
1082         if (la->la_valid == LA_CTIME) {
1083                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1084                         /* This is only for set ctime when rename's source is
1085                          * on remote MDS. */
1086                         rc = mdd_may_delete(env, NULL, obj,
1087                                             (struct md_attr *)ma, 1, 0);
1088                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1089                         la->la_valid &= ~LA_CTIME;
1090                 RETURN(rc);
1091         }
1092
1093         if (la->la_valid == LA_ATIME) {
1094                 /* This is atime only set for read atime update on close. */
1095                 if (la->la_atime >= tmp_la->la_atime &&
1096                     la->la_atime < (tmp_la->la_atime +
1097                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1098                         la->la_valid &= ~LA_ATIME;
1099                 RETURN(0);
1100         }
1101
1102         /* Check if flags change. */
1103         if (la->la_valid & LA_FLAGS) {
1104                 unsigned int oldflags = 0;
1105                 unsigned int newflags = la->la_flags &
1106                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1107
1108                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1109                     !mdd_capable(uc, CFS_CAP_FOWNER))
1110                         RETURN(-EPERM);
1111
1112                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1113                  * only be changed by the relevant capability. */
1114                 if (mdd_is_immutable(obj))
1115                         oldflags |= LUSTRE_IMMUTABLE_FL;
1116                 if (mdd_is_append(obj))
1117                         oldflags |= LUSTRE_APPEND_FL;
1118                 if ((oldflags ^ newflags) &&
1119                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1120                         RETURN(-EPERM);
1121
1122                 if (!S_ISDIR(tmp_la->la_mode))
1123                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1124         }
1125
1126         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1127             (la->la_valid & ~LA_FLAGS) &&
1128             !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1129                 RETURN(-EPERM);
1130
1131         /* Check for setting the obj time. */
1132         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1133             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1134                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1135                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
1136                         rc = mdd_permission_internal_locked(env, obj, tmp_la,
1137                                                             MAY_WRITE,
1138                                                             MOR_TGT_CHILD);
1139                         if (rc)
1140                                 RETURN(rc);
1141                 }
1142         }
1143
1144         if (la->la_valid & LA_KILL_SUID) {
1145                 la->la_valid &= ~LA_KILL_SUID;
1146                 if ((tmp_la->la_mode & S_ISUID) &&
1147                     !(la->la_valid & LA_MODE)) {
1148                         la->la_mode = tmp_la->la_mode;
1149                         la->la_valid |= LA_MODE;
1150                 }
1151                 la->la_mode &= ~S_ISUID;
1152         }
1153
1154         if (la->la_valid & LA_KILL_SGID) {
1155                 la->la_valid &= ~LA_KILL_SGID;
1156                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1157                                         (S_ISGID | S_IXGRP)) &&
1158                     !(la->la_valid & LA_MODE)) {
1159                         la->la_mode = tmp_la->la_mode;
1160                         la->la_valid |= LA_MODE;
1161                 }
1162                 la->la_mode &= ~S_ISGID;
1163         }
1164
1165         /* Make sure a caller can chmod. */
1166         if (la->la_valid & LA_MODE) {
1167                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1168                     (uc->mu_fsuid != tmp_la->la_uid) &&
1169                     !mdd_capable(uc, CFS_CAP_FOWNER))
1170                         RETURN(-EPERM);
1171
1172                 if (la->la_mode == (cfs_umode_t) -1)
1173                         la->la_mode = tmp_la->la_mode;
1174                 else
1175                         la->la_mode = (la->la_mode & S_IALLUGO) |
1176                                       (tmp_la->la_mode & ~S_IALLUGO);
1177
1178                 /* Also check the setgid bit! */
1179                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1180                                        la->la_gid : tmp_la->la_gid) &&
1181                     !mdd_capable(uc, CFS_CAP_FSETID))
1182                         la->la_mode &= ~S_ISGID;
1183         } else {
1184                la->la_mode = tmp_la->la_mode;
1185         }
1186
1187         /* Make sure a caller can chown. */
1188         if (la->la_valid & LA_UID) {
1189                 if (la->la_uid == (uid_t) -1)
1190                         la->la_uid = tmp_la->la_uid;
1191                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1192                     (la->la_uid != tmp_la->la_uid)) &&
1193                     !mdd_capable(uc, CFS_CAP_CHOWN))
1194                         RETURN(-EPERM);
1195
1196                 /* If the user or group of a non-directory has been
1197                  * changed by a non-root user, remove the setuid bit.
1198                  * 19981026 David C Niemi <niemi@tux.org>
1199                  *
1200                  * Changed this to apply to all users, including root,
1201                  * to avoid some races. This is the behavior we had in
1202                  * 2.0. The check for non-root was definitely wrong
1203                  * for 2.2 anyway, as it should have been using
1204                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
1205                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1206                     !S_ISDIR(tmp_la->la_mode)) {
1207                         la->la_mode &= ~S_ISUID;
1208                         la->la_valid |= LA_MODE;
1209                 }
1210         }
1211
1212         /* Make sure caller can chgrp. */
1213         if (la->la_valid & LA_GID) {
1214                 if (la->la_gid == (gid_t) -1)
1215                         la->la_gid = tmp_la->la_gid;
1216                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1217                     ((la->la_gid != tmp_la->la_gid) &&
1218                     !lustre_in_group_p(uc, la->la_gid))) &&
1219                     !mdd_capable(uc, CFS_CAP_CHOWN))
1220                         RETURN(-EPERM);
1221
1222                 /* Likewise, if the user or group of a non-directory
1223                  * has been changed by a non-root user, remove the
1224                  * setgid bit UNLESS there is no group execute bit
1225                  * (this would be a file marked for mandatory
1226                  * locking).  19981026 David C Niemi <niemi@tux.org>
1227                  *
1228                  * Removed the fsuid check (see the comment above) --
1229                  * 19990830 SD. */
1230                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1231                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1232                         la->la_mode &= ~S_ISGID;
1233                         la->la_valid |= LA_MODE;
1234                 }
1235         }
1236
1237         /* For both Size-on-MDS case and truncate case,
1238          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1239          * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1240          * For SOM case, it is true, the MAY_WRITE perm has been checked
1241          * when open, no need check again. For truncate case, it is false,
1242          * the MAY_WRITE perm should be checked here. */
1243         if (ma->ma_attr_flags & MDS_SOM) {
1244                 /* For the "Size-on-MDS" setattr update, merge coming
1245                  * attributes with the set in the inode. BUG 10641 */
1246                 if ((la->la_valid & LA_ATIME) &&
1247                     (la->la_atime <= tmp_la->la_atime))
1248                         la->la_valid &= ~LA_ATIME;
1249
1250                 /* OST attributes do not have a priority over MDS attributes,
1251                  * so drop times if ctime is equal. */
1252                 if ((la->la_valid & LA_CTIME) &&
1253                     (la->la_ctime <= tmp_la->la_ctime))
1254                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1255         } else {
1256                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1257                         if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1258                               (uc->mu_fsuid == tmp_la->la_uid)) &&
1259                             !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1260                                 rc = mdd_permission_internal_locked(env, obj,
1261                                                             tmp_la, MAY_WRITE,
1262                                                             MOR_TGT_CHILD);
1263                                 if (rc)
1264                                         RETURN(rc);
1265                         }
1266                 }
1267                 if (la->la_valid & LA_CTIME) {
1268                         /* The pure setattr, it has the priority over what is
1269                          * already set, do not drop it if ctime is equal. */
1270                         if (la->la_ctime < tmp_la->la_ctime)
1271                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1272                                                   LA_CTIME);
1273                 }
1274         }
1275
1276         RETURN(0);
1277 }
1278
1279 /** Store a data change changelog record
1280  * If this fails, we must fail the whole transaction; we don't
1281  * want the change to commit without the log entry.
1282  * \param mdd_obj - mdd_object of change
1283  * \param handle - transacion handle
1284  */
1285 static int mdd_changelog_data_store(const struct lu_env     *env,
1286                                     struct mdd_device       *mdd,
1287                                     enum changelog_rec_type type,
1288                                     int                     flags,
1289                                     struct mdd_object       *mdd_obj,
1290                                     struct thandle          *handle)
1291 {
1292         const struct lu_fid *tfid = mdo2fid(mdd_obj);
1293         struct llog_changelog_rec *rec;
1294         struct thandle *th = NULL;
1295         struct lu_buf *buf;
1296         int reclen;
1297         int rc;
1298
1299         /* Not recording */
1300         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1301                 RETURN(0);
1302         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1303                 RETURN(0);
1304
1305         LASSERT(mdd_obj != NULL);
1306         LASSERT(handle != NULL);
1307
1308         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1309             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1310                 /* Don't need multiple updates in this log */
1311                 /* Don't check under lock - no big deal if we get an extra
1312                    entry */
1313                 RETURN(0);
1314         }
1315
1316         reclen = llog_data_len(sizeof(*rec));
1317         buf = mdd_buf_alloc(env, reclen);
1318         if (buf->lb_buf == NULL)
1319                 RETURN(-ENOMEM);
1320         rec = (struct llog_changelog_rec *)buf->lb_buf;
1321
1322         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1323         rec->cr.cr_type = (__u32)type;
1324         rec->cr.cr_tfid = *tfid;
1325         rec->cr.cr_namelen = 0;
1326         mdd_obj->mod_cltime = cfs_time_current_64();
1327
1328         rc = mdd_changelog_llog_write(mdd, rec, handle ? : th);
1329
1330         if (th)
1331                 mdd_trans_stop(env, mdd, rc, th);
1332
1333         if (rc < 0) {
1334                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1335                        rc, type, PFID(tfid));
1336                 return -EFAULT;
1337         }
1338
1339         return 0;
1340 }
1341
1342 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1343                   int flags, struct md_object *obj)
1344 {
1345         struct thandle *handle;
1346         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1347         struct mdd_device *mdd = mdo2mdd(obj);
1348         int rc;
1349         ENTRY;
1350
1351         handle = mdd_trans_create(env, mdd);
1352         if (IS_ERR(handle))
1353                 return(PTR_ERR(handle));
1354
1355         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1356         if (rc)
1357                 GOTO(stop, rc);
1358
1359         rc = mdd_trans_start(env, mdd, handle);
1360         if (rc)
1361                 GOTO(stop, rc);
1362
1363         rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1364                                       handle);
1365
1366 stop:
1367         mdd_trans_stop(env, mdd, rc, handle);
1368
1369         RETURN(rc);
1370 }
1371
1372 /**
1373  * Should be called with write lock held.
1374  *
1375  * \see mdd_lma_set_locked().
1376  */
1377 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1378                        const struct md_attr *ma, struct thandle *handle)
1379 {
1380         struct mdd_thread_info *info = mdd_env_info(env);
1381         struct lu_buf *buf;
1382         struct lustre_mdt_attrs *lma =
1383                                 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1384         int lmasize = sizeof(struct lustre_mdt_attrs);
1385         int rc = 0;
1386
1387         ENTRY;
1388
1389         /* Either HSM or SOM part is not valid, we need to read it before */
1390         if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1391                 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1392                 if (rc <= 0)
1393                         RETURN(rc);
1394
1395                 lustre_lma_swab(lma);
1396         } else {
1397                 memset(lma, 0, lmasize);
1398         }
1399
1400         /* Copy HSM data */
1401         if (ma->ma_valid & MA_HSM) {
1402                 lma->lma_flags  |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1403                 lma->lma_compat |= LMAC_HSM;
1404         }
1405
1406         /* Copy SOM data */
1407         if (ma->ma_valid & MA_SOM) {
1408                 LASSERT(ma->ma_som != NULL);
1409                 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1410                         lma->lma_compat     &= ~LMAC_SOM;
1411                 } else {
1412                         lma->lma_compat     |= LMAC_SOM;
1413                         lma->lma_ioepoch     = ma->ma_som->msd_ioepoch;
1414                         lma->lma_som_size    = ma->ma_som->msd_size;
1415                         lma->lma_som_blocks  = ma->ma_som->msd_blocks;
1416                         lma->lma_som_mountid = ma->ma_som->msd_mountid;
1417                 }
1418         }
1419
1420         /* Copy FID */
1421         memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1422
1423         lustre_lma_swab(lma);
1424         buf = mdd_buf_get(env, lma, lmasize);
1425         rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1426
1427         RETURN(rc);
1428 }
1429
1430 /**
1431  * Save LMA extended attributes with data from \a ma.
1432  *
1433  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1434  * not, LMA EA will be first read from disk, modified and write back.
1435  *
1436  */
1437 static int mdd_lma_set_locked(const struct lu_env *env,
1438                               struct mdd_object *mdd_obj,
1439                               const struct md_attr *ma, struct thandle *handle)
1440 {
1441         int rc;
1442
1443         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1444         rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1445         mdd_write_unlock(env, mdd_obj);
1446         return rc;
1447 }
1448
1449 /* Precedence for choosing record type when multiple
1450  * attributes change: setattr > mtime > ctime > atime
1451  * (ctime changes when mtime does, plus chmod/chown.
1452  * atime and ctime are independent.) */
1453 static int mdd_attr_set_changelog(const struct lu_env *env,
1454                                   struct md_object *obj, struct thandle *handle,
1455                                   __u64 valid)
1456 {
1457         struct mdd_device *mdd = mdo2mdd(obj);
1458         int bits, type = 0;
1459
1460         bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1461         bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1462         bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1463         bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1464         bits = bits & mdd->mdd_cl.mc_mask;
1465         if (bits == 0)
1466                 return 0;
1467
1468         /* The record type is the lowest non-masked set bit */
1469         while (bits && ((bits & 1) == 0)) {
1470                 bits = bits >> 1;
1471                 type++;
1472         }
1473
1474         /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1475         return mdd_changelog_data_store(env, mdd, type, (int)valid,
1476                                         md2mdd_obj(obj), handle);
1477 }
1478
1479 static int mdd_declare_attr_set(const struct lu_env *env,
1480                                 struct mdd_device *mdd,
1481                                 struct mdd_object *obj,
1482                                 const struct md_attr *ma,
1483                                 struct lov_mds_md *lmm,
1484                                 struct thandle *handle)
1485 {
1486         struct lu_buf  *buf = &mdd_env_info(env)->mti_buf;
1487         int             rc, i;
1488
1489         rc = mdo_declare_attr_set(env, obj, &ma->ma_attr, handle);
1490         if (rc)
1491                 return rc;
1492
1493         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1494         if (rc)
1495                 return rc;
1496
1497         if (ma->ma_valid & MA_LOV) {
1498                 buf->lb_buf = NULL;
1499                 buf->lb_len = ma->ma_lmm_size;
1500                 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
1501                                            0, handle);
1502                 if (rc)
1503                         return rc;
1504         }
1505
1506         if (ma->ma_valid & (MA_HSM | MA_SOM)) {
1507                 buf->lb_buf = NULL;
1508                 buf->lb_len = sizeof(struct lustre_mdt_attrs);
1509                 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA,
1510                                            0, handle);
1511                 if (rc)
1512                         return rc;
1513         }
1514
1515 #ifdef CONFIG_FS_POSIX_ACL
1516         if (ma->ma_attr.la_valid & LA_MODE) {
1517                 mdd_read_lock(env, obj, MOR_TGT_CHILD);
1518                 rc = mdo_xattr_get(env, obj, buf, XATTR_NAME_ACL_ACCESS,
1519                                    BYPASS_CAPA);
1520                 mdd_read_unlock(env, obj);
1521                 if (rc == -EOPNOTSUPP || rc == -ENODATA)
1522                         rc = 0;
1523                 else if (rc < 0)
1524                         return rc;
1525
1526                 if (rc != 0) {
1527                         buf->lb_buf = NULL;
1528                         buf->lb_len = rc;
1529                         rc = mdo_declare_xattr_set(env, obj, buf,
1530                                                    XATTR_NAME_ACL_ACCESS, 0,
1531                                                    handle);
1532                         if (rc)
1533                                 return rc;
1534                 }
1535         }
1536 #endif
1537
1538         /* basically the log is the same as in unlink case */
1539         if (lmm) {
1540                 __u16 stripe;
1541
1542                 if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V1 &&
1543                                 le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V3) {
1544                         CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n",
1545                                mdd->mdd_obd_dev->obd_name,
1546                                le32_to_cpu(lmm->lmm_magic),
1547                                PFID(lu_object_fid(&obj->mod_obj.mo_lu)));
1548                         return -EINVAL;
1549                 }
1550
1551                 stripe = le16_to_cpu(lmm->lmm_stripe_count);
1552                 if (stripe == LOV_ALL_STRIPES) {
1553                         struct lov_desc *ldesc;
1554
1555                         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
1556                         LASSERT(ldesc != NULL);
1557                         stripe = ldesc->ld_tgt_count;
1558                 }
1559
1560                 for (i = 0; i < stripe; i++) {
1561                         rc = mdd_declare_llog_record(env, mdd,
1562                                         sizeof(struct llog_unlink_rec),
1563                                         handle);
1564                         if (rc)
1565                                 return rc;
1566                 }
1567         }
1568
1569         return rc;
1570 }
1571
1572 /* set attr and LOV EA at once, return updated attr */
1573 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1574                         const struct md_attr *ma)
1575 {
1576         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1577         struct mdd_device *mdd = mdo2mdd(obj);
1578         struct thandle *handle;
1579         struct lov_mds_md *lmm = NULL;
1580         struct llog_cookie *logcookies = NULL;
1581         int  rc, lmm_size = 0, cookie_size = 0;
1582         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1583         struct obd_device *obd = mdd->mdd_obd_dev;
1584         struct mds_obd *mds = &obd->u.mds;
1585 #ifdef HAVE_QUOTA_SUPPORT
1586         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1587         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1588         int quota_opc = 0, block_count = 0;
1589         int inode_pending[MAXQUOTAS] = { 0, 0 };
1590         int block_pending[MAXQUOTAS] = { 0, 0 };
1591 #endif
1592         ENTRY;
1593
1594         *la_copy = ma->ma_attr;
1595         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1596         if (rc != 0)
1597                 RETURN(rc);
1598
1599         /* setattr on "close" only change atime, or do nothing */
1600         if (ma->ma_valid == MA_INODE &&
1601             ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1602                 RETURN(0);
1603
1604         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1605             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1606                 lmm_size = mdd_lov_mdsize(env, mdd);
1607                 lmm = mdd_max_lmm_get(env, mdd);
1608                 if (lmm == NULL)
1609                         RETURN(-ENOMEM);
1610
1611                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1612                                 XATTR_NAME_LOV);
1613
1614                 if (rc < 0)
1615                         RETURN(rc);
1616         }
1617
1618         handle = mdd_trans_create(env, mdd);
1619         if (IS_ERR(handle))
1620                 RETURN(PTR_ERR(handle));
1621
1622         rc = mdd_declare_attr_set(env, mdd, mdd_obj, ma,
1623                                   lmm_size > 0 ? lmm : NULL, handle);
1624         if (rc)
1625                 GOTO(stop, rc);
1626
1627         /* permission changes may require sync operation */
1628         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1629                 handle->th_sync = !!mdd->mdd_sync_permission;
1630
1631         rc = mdd_trans_start(env, mdd, handle);
1632         if (rc)
1633                 GOTO(stop, rc);
1634
1635         /* permission changes may require sync operation */
1636         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1637                 handle->th_sync |= mdd->mdd_sync_permission;
1638
1639         if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1640                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1641                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1642
1643 #ifdef HAVE_QUOTA_SUPPORT
1644         if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1645                 struct obd_export *exp = md_quota(env)->mq_exp;
1646                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1647
1648                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1649                 if (!rc) {
1650                         quota_opc = FSFILT_OP_SETATTR;
1651                         mdd_quota_wrapper(la_copy, qnids);
1652                         mdd_quota_wrapper(la_tmp, qoids);
1653                         /* get file quota for new owner */
1654                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1655                                         qnids, inode_pending, 1, NULL, 0,
1656                                         NULL, 0);
1657                         block_count = (la_tmp->la_blocks + 7) >> 3;
1658                         if (block_count) {
1659                                 void *data = NULL;
1660                                 mdd_data_get(env, mdd_obj, &data);
1661                                 /* get block quota for new owner */
1662                                 lquota_chkquota(mds_quota_interface_ref, obd,
1663                                                 exp, qnids, block_pending,
1664                                                 block_count, NULL,
1665                                                 LQUOTA_FLAGS_BLK, data, 1);
1666                         }
1667                 }
1668         }
1669 #endif
1670
1671         if (la_copy->la_valid & LA_FLAGS) {
1672                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1673                                                   handle, 1);
1674                 if (rc == 0)
1675                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1676         } else if (la_copy->la_valid) {            /* setattr */
1677                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1678                                                   handle, 1);
1679                 /* journal chown/chgrp in llog, just like unlink */
1680                 if (rc == 0 && lmm_size){
1681                         cookie_size = mdd_lov_cookiesize(env, mdd);
1682                         logcookies = mdd_max_cookie_get(env, mdd);
1683                         if (logcookies == NULL)
1684                                 GOTO(cleanup, rc = -ENOMEM);
1685
1686                         if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1687                                             logcookies, cookie_size) <= 0)
1688                                 logcookies = NULL;
1689                 }
1690         }
1691
1692         if (rc == 0 && ma->ma_valid & MA_LOV) {
1693                 cfs_umode_t mode;
1694
1695                 mode = mdd_object_type(mdd_obj);
1696                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1697                         rc = mdd_lsm_sanity_check(env, mdd_obj);
1698                         if (rc)
1699                                 GOTO(cleanup, rc);
1700
1701                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1702                                             ma->ma_lmm_size, handle, 1);
1703                 }
1704
1705         }
1706         if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1707                 cfs_umode_t mode;
1708
1709                 mode = mdd_object_type(mdd_obj);
1710                 if (S_ISREG(mode))
1711                         rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1712
1713         }
1714 cleanup:
1715         if (rc == 0)
1716                 rc = mdd_attr_set_changelog(env, obj, handle,
1717                                             ma->ma_attr.la_valid);
1718 stop:
1719         mdd_trans_stop(env, mdd, rc, handle);
1720         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1721                 /*set obd attr, if needed*/
1722                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1723                                            logcookies);
1724         }
1725 #ifdef HAVE_QUOTA_SUPPORT
1726         if (quota_opc) {
1727                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1728                                       inode_pending, 0);
1729                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1730                                       block_pending, 1);
1731                 /* Trigger dqrel/dqacq for original owner and new owner.
1732                  * If failed, the next call for lquota_chkquota will
1733                  * process it. */
1734                 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1735                               quota_opc);
1736         }
1737 #endif
1738         RETURN(rc);
1739 }
1740
1741 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1742                       const struct lu_buf *buf, const char *name, int fl,
1743                       struct thandle *handle)
1744 {
1745         int  rc;
1746         ENTRY;
1747
1748         mdd_write_lock(env, obj, MOR_TGT_CHILD);
1749         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1750         mdd_write_unlock(env, obj);
1751
1752         RETURN(rc);
1753 }
1754
1755 static int mdd_xattr_sanity_check(const struct lu_env *env,
1756                                   struct mdd_object *obj)
1757 {
1758         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1759         struct md_ucred *uc     = md_ucred(env);
1760         int rc;
1761         ENTRY;
1762
1763         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1764                 RETURN(-EPERM);
1765
1766         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1767         if (rc)
1768                 RETURN(rc);
1769
1770         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1771             !mdd_capable(uc, CFS_CAP_FOWNER))
1772                 RETURN(-EPERM);
1773
1774         RETURN(rc);
1775 }
1776
1777 static int mdd_declare_xattr_set(const struct lu_env *env,
1778                                  struct mdd_device *mdd,
1779                                  struct mdd_object *obj,
1780                                  const struct lu_buf *buf,
1781                                  const char *name,
1782                                  struct thandle *handle)
1783
1784 {
1785         int rc;
1786
1787         rc = mdo_declare_xattr_set(env, obj, buf, name, 0, handle);
1788         if (rc)
1789                 return rc;
1790
1791         /* Only record user xattr changes */
1792         if ((strncmp("user.", name, 5) == 0))
1793                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1794
1795         return rc;
1796 }
1797
1798 /**
1799  * The caller should guarantee to update the object ctime
1800  * after xattr_set if needed.
1801  */
1802 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1803                          const struct lu_buf *buf, const char *name,
1804                          int fl)
1805 {
1806         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1807         struct mdd_device *mdd = mdo2mdd(obj);
1808         struct thandle *handle;
1809         int  rc;
1810         ENTRY;
1811
1812         rc = mdd_xattr_sanity_check(env, mdd_obj);
1813         if (rc)
1814                 RETURN(rc);
1815
1816         handle = mdd_trans_create(env, mdd);
1817         if (IS_ERR(handle))
1818                 RETURN(PTR_ERR(handle));
1819
1820         /* security-replated changes may require sync */
1821         if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1822             mdd->mdd_sync_permission == 1)
1823                 handle->th_sync = 1;
1824
1825         rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, handle);
1826         if (rc)
1827                 GOTO(stop, rc);
1828
1829         rc = mdd_trans_start(env, mdd, handle);
1830         if (rc)
1831                 GOTO(stop, rc);
1832
1833         /* security-replated changes may require sync */
1834         if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1835                 handle->th_sync |= mdd->mdd_sync_permission;
1836
1837         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1838
1839         /* Only record system & user xattr changes */
1840         if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1841                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1842                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1843                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1844                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1845                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1846                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1847                                               handle);
1848
1849 stop:
1850         mdd_trans_stop(env, mdd, rc, handle);
1851
1852         RETURN(rc);
1853 }
1854
1855 static int mdd_declare_xattr_del(const struct lu_env *env,
1856                                  struct mdd_device *mdd,
1857                                  struct mdd_object *obj,
1858                                  const char *name,
1859                                  struct thandle *handle)
1860 {
1861         int rc;
1862
1863         rc = mdo_declare_xattr_del(env, obj, name, handle);
1864         if (rc)
1865                 return rc;
1866
1867         /* Only record user xattr changes */
1868         if ((strncmp("user.", name, 5) == 0))
1869                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1870
1871         return rc;
1872 }
1873
1874 /**
1875  * The caller should guarantee to update the object ctime
1876  * after xattr_set if needed.
1877  */
1878 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1879                   const char *name)
1880 {
1881         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1882         struct mdd_device *mdd = mdo2mdd(obj);
1883         struct thandle *handle;
1884         int  rc;
1885         ENTRY;
1886
1887         rc = mdd_xattr_sanity_check(env, mdd_obj);
1888         if (rc)
1889                 RETURN(rc);
1890
1891         handle = mdd_trans_create(env, mdd);
1892         if (IS_ERR(handle))
1893                 RETURN(PTR_ERR(handle));
1894
1895         rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
1896         if (rc)
1897                 GOTO(stop, rc);
1898
1899         rc = mdd_trans_start(env, mdd, handle);
1900         if (rc)
1901                 GOTO(stop, rc);
1902
1903         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1904         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1905                            mdd_object_capa(env, mdd_obj));
1906         mdd_write_unlock(env, mdd_obj);
1907
1908         /* Only record system & user xattr changes */
1909         if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1910                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1911                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1912                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1913                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1914                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1915                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1916                                               handle);
1917
1918 stop:
1919         mdd_trans_stop(env, mdd, rc, handle);
1920
1921         RETURN(rc);
1922 }
1923
1924 /* partial unlink */
1925 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1926                        struct md_attr *ma)
1927 {
1928         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1929         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1930         struct mdd_device *mdd = mdo2mdd(obj);
1931         struct thandle *handle;
1932 #ifdef HAVE_QUOTA_SUPPORT
1933         struct obd_device *obd = mdd->mdd_obd_dev;
1934         struct mds_obd *mds = &obd->u.mds;
1935         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1936         int quota_opc = 0;
1937 #endif
1938         int rc;
1939         ENTRY;
1940
1941         /* XXX: this code won't be used ever:
1942          * DNE uses slightly different approach */
1943         LBUG();
1944
1945         /*
1946          * Check -ENOENT early here because we need to get object type
1947          * to calculate credits before transaction start
1948          */
1949         if (!mdd_object_exists(mdd_obj))
1950                 RETURN(-ENOENT);
1951
1952         LASSERT(mdd_object_exists(mdd_obj) > 0);
1953
1954         handle = mdd_trans_create(env, mdd);
1955         if (IS_ERR(handle))
1956                 RETURN(-ENOMEM);
1957
1958         rc = mdd_trans_start(env, mdd, handle);
1959
1960         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1961
1962         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1963         if (rc)
1964                 GOTO(cleanup, rc);
1965
1966         mdo_ref_del(env, mdd_obj, handle);
1967
1968         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1969                 /* unlink dot */
1970                 mdo_ref_del(env, mdd_obj, handle);
1971         }
1972
1973         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1974         la_copy->la_ctime = ma->ma_attr.la_ctime;
1975
1976         la_copy->la_valid = LA_CTIME;
1977         rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1978         if (rc)
1979                 GOTO(cleanup, rc);
1980
1981         rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1982 #ifdef HAVE_QUOTA_SUPPORT
1983         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1984             ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1985                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1986                 mdd_quota_wrapper(&ma->ma_attr, qids);
1987         }
1988 #endif
1989
1990
1991         EXIT;
1992 cleanup:
1993         mdd_write_unlock(env, mdd_obj);
1994         mdd_trans_stop(env, mdd, rc, handle);
1995 #ifdef HAVE_QUOTA_SUPPORT
1996         if (quota_opc)
1997                 /* Trigger dqrel on the owner of child. If failed,
1998                  * the next call for lquota_chkquota will process it */
1999                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2000                               quota_opc);
2001 #endif
2002         return rc;
2003 }
2004
2005 /* partial operation */
2006 static int mdd_oc_sanity_check(const struct lu_env *env,
2007                                struct mdd_object *obj,
2008                                struct md_attr *ma)
2009 {
2010         int rc;
2011         ENTRY;
2012
2013         switch (ma->ma_attr.la_mode & S_IFMT) {
2014         case S_IFREG:
2015         case S_IFDIR:
2016         case S_IFLNK:
2017         case S_IFCHR:
2018         case S_IFBLK:
2019         case S_IFIFO:
2020         case S_IFSOCK:
2021                 rc = 0;
2022                 break;
2023         default:
2024                 rc = -EINVAL;
2025                 break;
2026         }
2027         RETURN(rc);
2028 }
2029
2030 static int mdd_object_create(const struct lu_env *env,
2031                              struct md_object *obj,
2032                              const struct md_op_spec *spec,
2033                              struct md_attr *ma)
2034 {
2035
2036         struct mdd_device *mdd = mdo2mdd(obj);
2037         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2038         const struct lu_fid *pfid = spec->u.sp_pfid;
2039         struct thandle *handle;
2040 #ifdef HAVE_QUOTA_SUPPORT
2041         struct obd_device *obd = mdd->mdd_obd_dev;
2042         struct obd_export *exp = md_quota(env)->mq_exp;
2043         struct mds_obd *mds = &obd->u.mds;
2044         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2045         int quota_opc = 0, block_count = 0;
2046         int inode_pending[MAXQUOTAS] = { 0, 0 };
2047         int block_pending[MAXQUOTAS] = { 0, 0 };
2048 #endif
2049         int rc = 0;
2050         ENTRY;
2051
2052         /* XXX: this code won't be used ever:
2053          * DNE uses slightly different approach */
2054         LBUG();
2055
2056 #ifdef HAVE_QUOTA_SUPPORT
2057         if (mds->mds_quota) {
2058                 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
2059                 mdd_quota_wrapper(&ma->ma_attr, qids);
2060                 /* get file quota for child */
2061                 lquota_chkquota(mds_quota_interface_ref, obd, exp,
2062                                 qids, inode_pending, 1, NULL, 0,
2063                                 NULL, 0);
2064                 switch (ma->ma_attr.la_mode & S_IFMT) {
2065                 case S_IFLNK:
2066                 case S_IFDIR:
2067                         block_count = 2;
2068                         break;
2069                 case S_IFREG:
2070                         block_count = 1;
2071                         break;
2072                 }
2073                 /* get block quota for child */
2074                 if (block_count)
2075                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
2076                                         qids, block_pending, block_count,
2077                                         NULL, LQUOTA_FLAGS_BLK, NULL, 0);
2078         }
2079 #endif
2080
2081         handle = mdd_trans_create(env, mdd);
2082         if (IS_ERR(handle))
2083                 GOTO(out_pending, rc = PTR_ERR(handle));
2084
2085         rc = mdd_trans_start(env, mdd, handle);
2086
2087         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2088         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
2089         if (rc)
2090                 GOTO(unlock, rc);
2091
2092         rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
2093         if (rc)
2094                 GOTO(unlock, rc);
2095
2096         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
2097                 /* If creating the slave object, set slave EA here. */
2098                 int lmv_size = spec->u.sp_ea.eadatalen;
2099                 struct lmv_stripe_md *lmv;
2100
2101                 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
2102                 LASSERT(lmv != NULL && lmv_size > 0);
2103
2104                 rc = __mdd_xattr_set(env, mdd_obj,
2105                                      mdd_buf_get_const(env, lmv, lmv_size),
2106                                      XATTR_NAME_LMV, 0, handle);
2107                 if (rc)
2108                         GOTO(unlock, rc);
2109
2110                 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
2111                                            handle, 0);
2112         } else {
2113 #ifdef CONFIG_FS_POSIX_ACL
2114                 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
2115                         struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
2116
2117                         buf->lb_buf = (void *)spec->u.sp_ea.eadata;
2118                         buf->lb_len = spec->u.sp_ea.eadatalen;
2119                         if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
2120                                 rc = __mdd_acl_init(env, mdd_obj, buf,
2121                                                     &ma->ma_attr.la_mode,
2122                                                     handle);
2123                                 if (rc)
2124                                         GOTO(unlock, rc);
2125                                 else
2126                                         ma->ma_attr.la_valid |= LA_MODE;
2127                         }
2128
2129                         pfid = spec->u.sp_ea.fid;
2130                 }
2131 #endif
2132                 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
2133                                            spec);
2134         }
2135         EXIT;
2136 unlock:
2137         if (rc == 0)
2138                 rc = mdd_attr_get_internal(env, mdd_obj, ma);
2139         mdd_write_unlock(env, mdd_obj);
2140
2141         mdd_trans_stop(env, mdd, rc, handle);
2142 out_pending:
2143 #ifdef HAVE_QUOTA_SUPPORT
2144         if (quota_opc) {
2145                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2146                                       inode_pending, 0);
2147                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2148                                       block_pending, 1);
2149                 /* Trigger dqacq on the owner of child. If failed,
2150                  * the next call for lquota_chkquota will process it. */
2151                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2152                               quota_opc);
2153         }
2154 #endif
2155         return rc;
2156 }
2157
2158 /* partial link */
2159 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
2160                        const struct md_attr *ma)
2161 {
2162         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
2163         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2164         struct mdd_device *mdd = mdo2mdd(obj);
2165         struct thandle *handle;
2166         int rc;
2167         ENTRY;
2168
2169         /* XXX: this code won't be used ever:
2170          * DNE uses slightly different approach */
2171         LBUG();
2172
2173         handle = mdd_trans_create(env, mdd);
2174         if (IS_ERR(handle))
2175                 RETURN(-ENOMEM);
2176
2177         rc = mdd_trans_start(env, mdd, handle);
2178
2179         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2180         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
2181         if (rc == 0)
2182                 mdo_ref_add(env, mdd_obj, handle);
2183         mdd_write_unlock(env, mdd_obj);
2184         if (rc == 0) {
2185                 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2186                 la_copy->la_ctime = ma->ma_attr.la_ctime;
2187
2188                 la_copy->la_valid = LA_CTIME;
2189                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
2190                                                         handle, 0);
2191         }
2192         mdd_trans_stop(env, mdd, 0, handle);
2193
2194         RETURN(rc);
2195 }
2196
2197 /*
2198  * do NOT or the MAY_*'s, you'll get the weakest
2199  */
2200 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
2201 {
2202         int res = 0;
2203
2204         /* Sadly, NFSD reopens a file repeatedly during operation, so the
2205          * "acc_mode = 0" allowance for newly-created files isn't honoured.
2206          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
2207          * owner can write to a file even if it is marked readonly to hide
2208          * its brokenness. (bug 5781) */
2209         if (flags & MDS_OPEN_OWNEROVERRIDE) {
2210                 struct md_ucred *uc = md_ucred(env);
2211
2212                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
2213                     (la->la_uid == uc->mu_fsuid))
2214                         return 0;
2215         }
2216
2217         if (flags & FMODE_READ)
2218                 res |= MAY_READ;
2219         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2220                 res |= MAY_WRITE;
2221         if (flags & MDS_FMODE_EXEC)
2222                 res = MAY_EXEC;
2223         return res;
2224 }
2225
2226 static int mdd_open_sanity_check(const struct lu_env *env,
2227                                  struct mdd_object *obj, int flag)
2228 {
2229         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2230         int mode, rc;
2231         ENTRY;
2232
2233         /* EEXIST check */
2234         if (mdd_is_dead_obj(obj))
2235                 RETURN(-ENOENT);
2236
2237         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
2238         if (rc)
2239                RETURN(rc);
2240
2241         if (S_ISLNK(tmp_la->la_mode))
2242                 RETURN(-ELOOP);
2243
2244         mode = accmode(env, tmp_la, flag);
2245
2246         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2247                 RETURN(-EISDIR);
2248
2249         if (!(flag & MDS_OPEN_CREATED)) {
2250                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
2251                 if (rc)
2252                         RETURN(rc);
2253         }
2254
2255         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2256             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2257                 flag &= ~MDS_OPEN_TRUNC;
2258
2259         /* For writing append-only file must open it with append mode. */
2260         if (mdd_is_append(obj)) {
2261                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2262                         RETURN(-EPERM);
2263                 if (flag & MDS_OPEN_TRUNC)
2264                         RETURN(-EPERM);
2265         }
2266
2267 #if 0
2268         /*
2269          * Now, flag -- O_NOATIME does not be packed by client.
2270          */
2271         if (flag & O_NOATIME) {
2272                 struct md_ucred *uc = md_ucred(env);
2273
2274                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2275                     (uc->mu_valid == UCRED_NEW)) &&
2276                     (uc->mu_fsuid != tmp_la->la_uid) &&
2277                     !mdd_capable(uc, CFS_CAP_FOWNER))
2278                         RETURN(-EPERM);
2279         }
2280 #endif
2281
2282         RETURN(0);
2283 }
2284
2285 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2286                     int flags)
2287 {
2288         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2289         int rc = 0;
2290
2291         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2292
2293         rc = mdd_open_sanity_check(env, mdd_obj, flags);
2294         if (rc == 0)
2295                 mdd_obj->mod_count++;
2296
2297         mdd_write_unlock(env, mdd_obj);
2298         return rc;
2299 }
2300
2301 int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
2302                             struct md_attr *ma, struct thandle *handle)
2303 {
2304         int rc;
2305
2306         rc = mdd_declare_unlink_log(env, obj, ma, handle);
2307         if (rc)
2308                 return rc;
2309
2310         return mdo_declare_destroy(env, obj, handle);
2311 }
2312
2313 /* return md_attr back,
2314  * if it is last unlink then return lov ea + llog cookie*/
2315 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2316                     struct md_attr *ma, struct thandle *handle)
2317 {
2318         int rc = 0;
2319         ENTRY;
2320
2321         if (S_ISREG(mdd_object_type(obj))) {
2322                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2323                  * Caller must be ready for that. */
2324
2325                 rc = __mdd_lmm_get(env, obj, ma);
2326                 if ((ma->ma_valid & MA_LOV))
2327                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2328                                             obj, ma);
2329         }
2330
2331         if (rc == 0)
2332                 rc = mdo_destroy(env, obj, handle);
2333
2334         RETURN(rc);
2335 }
2336
2337 static int mdd_declare_close(const struct lu_env *env,
2338                              struct mdd_object *obj,
2339                              struct md_attr *ma,
2340                              struct thandle *handle)
2341 {
2342         int rc;
2343
2344         rc = orph_declare_index_delete(env, obj, handle);
2345         if (rc)
2346                 return rc;
2347
2348         return mdd_declare_object_kill(env, obj, ma, handle);
2349 }
2350
2351 /*
2352  * No permission check is needed.
2353  */
2354 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2355                      struct md_attr *ma, int mode)
2356 {
2357         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2358         struct mdd_device *mdd = mdo2mdd(obj);
2359         struct thandle    *handle = NULL;
2360         int rc;
2361         int is_orphan = 0, reset = 1;
2362
2363 #ifdef HAVE_QUOTA_SUPPORT
2364         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2365         struct mds_obd *mds = &obd->u.mds;
2366         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2367         int quota_opc = 0;
2368 #endif
2369         ENTRY;
2370
2371         if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2372                 mdd_obj->mod_count--;
2373
2374                 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2375                         CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2376                                "list\n", PFID(mdd_object_fid(mdd_obj)));
2377                 RETURN(0);
2378         }
2379
2380         /* check without any lock */
2381         if (mdd_obj->mod_count == 1 &&
2382             (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2383  again:
2384                 handle = mdd_trans_create(env, mdo2mdd(obj));
2385                 if (IS_ERR(handle))
2386                         RETURN(PTR_ERR(handle));
2387
2388                 rc = mdd_declare_close(env, mdd_obj, ma, handle);
2389                 if (rc)
2390                         GOTO(stop, rc);
2391
2392                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
2393                 if (rc)
2394                         GOTO(stop, rc);
2395
2396                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2397                 if (rc)
2398                         GOTO(stop, rc);
2399         }
2400
2401         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2402         if (handle == NULL && mdd_obj->mod_count == 1 &&
2403             (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2404                 mdd_write_unlock(env, mdd_obj);
2405                 goto again;
2406         }
2407
2408         /* release open count */
2409         mdd_obj->mod_count --;
2410
2411         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2412                 /* remove link to object from orphan index */
2413                 LASSERT(handle != NULL);
2414                 rc = __mdd_orphan_del(env, mdd_obj, handle);
2415                 if (rc == 0) {
2416                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2417                                "list, OSS objects to be destroyed.\n",
2418                                PFID(mdd_object_fid(mdd_obj)));
2419                         is_orphan = 1;
2420                 } else {
2421                         CERROR("Object "DFID" can not be deleted from orphan "
2422                                 "list, maybe cause OST objects can not be "
2423                                 "destroyed (err: %d).\n",
2424                                 PFID(mdd_object_fid(mdd_obj)), rc);
2425                         /* If object was not deleted from orphan list, do not
2426                          * destroy OSS objects, which will be done when next
2427                          * recovery. */
2428                         GOTO(out, rc);
2429                 }
2430         }
2431
2432         rc = mdd_iattr_get(env, mdd_obj, ma);
2433         /* Object maybe not in orphan list originally, it is rare case for
2434          * mdd_finish_unlink() failure. */
2435         if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_orphan)) {
2436 #ifdef HAVE_QUOTA_SUPPORT
2437                 if (mds->mds_quota) {
2438                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2439                         mdd_quota_wrapper(&ma->ma_attr, qids);
2440                 }
2441 #endif
2442                 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2443                 if (ma->ma_valid & MA_FLAGS &&
2444                     ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2445                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2446                 } else {
2447                         if (handle == NULL) {
2448                                 handle = mdd_trans_create(env, mdo2mdd(obj));
2449                                 if (IS_ERR(handle))
2450                                         GOTO(out, rc = PTR_ERR(handle));
2451
2452                                 rc = mdd_declare_object_kill(env, mdd_obj, ma,
2453                                                              handle);
2454                                 if (rc)
2455                                         GOTO(out, rc);
2456
2457                                 rc = mdd_declare_changelog_store(env, mdd,
2458                                                                  NULL, handle);
2459                                 if (rc)
2460                                         GOTO(stop, rc);
2461
2462                                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2463                                 if (rc)
2464                                         GOTO(out, rc);
2465                         }
2466
2467                         rc = mdd_object_kill(env, mdd_obj, ma, handle);
2468                         if (rc == 0)
2469                                 reset = 0;
2470                 }
2471
2472                 if (rc != 0)
2473                         CERROR("Error when prepare to delete Object "DFID" , "
2474                                "which will cause OST objects can not be "
2475                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
2476         }
2477         EXIT;
2478
2479 out:
2480         if (reset)
2481                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2482
2483         mdd_write_unlock(env, mdd_obj);
2484
2485         if (rc == 0 &&
2486             (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
2487             !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
2488                 if (handle == NULL) {
2489                         handle = mdd_trans_create(env, mdo2mdd(obj));
2490                         if (IS_ERR(handle))
2491                                 GOTO(stop, rc = IS_ERR(handle));
2492
2493                         rc = mdd_declare_changelog_store(env, mdd, NULL,
2494                                                          handle);
2495                         if (rc)
2496                                 GOTO(stop, rc);
2497
2498                         rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2499                         if (rc)
2500                                 GOTO(stop, rc);
2501                 }
2502
2503                 mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
2504                                          mdd_obj, handle);
2505         }
2506
2507 stop:
2508         if (handle != NULL)
2509                 mdd_trans_stop(env, mdd, rc, handle);
2510 #ifdef HAVE_QUOTA_SUPPORT
2511         if (quota_opc)
2512                 /* Trigger dqrel on the owner of child. If failed,
2513                  * the next call for lquota_chkquota will process it */
2514                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2515                               quota_opc);
2516 #endif
2517         return rc;
2518 }
2519
2520 /*
2521  * Permission check is done when open,
2522  * no need check again.
2523  */
2524 static int mdd_readpage_sanity_check(const struct lu_env *env,
2525                                      struct mdd_object *obj)
2526 {
2527         struct dt_object *next = mdd_object_child(obj);
2528         int rc;
2529         ENTRY;
2530
2531         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2532                 rc = 0;
2533         else
2534                 rc = -ENOTDIR;
2535
2536         RETURN(rc);
2537 }
2538
2539 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2540                               struct lu_dirpage *dp, int nob,
2541                               const struct dt_it_ops *iops, struct dt_it *it,
2542                               __u32 attr)
2543 {
2544         void                   *area = dp;
2545         int                     result;
2546         __u64                   hash = 0;
2547         struct lu_dirent       *ent;
2548         struct lu_dirent       *last = NULL;
2549         int                     first = 1;
2550
2551         memset(area, 0, sizeof (*dp));
2552         area += sizeof (*dp);
2553         nob  -= sizeof (*dp);
2554
2555         ent  = area;
2556         do {
2557                 int    len;
2558                 int    recsize;
2559
2560                 len = iops->key_size(env, it);
2561
2562                 /* IAM iterator can return record with zero len. */
2563                 if (len == 0)
2564                         goto next;
2565
2566                 hash = iops->store(env, it);
2567                 if (unlikely(first)) {
2568                         first = 0;
2569                         dp->ldp_hash_start = cpu_to_le64(hash);
2570                 }
2571
2572                 /* calculate max space required for lu_dirent */
2573                 recsize = lu_dirent_calc_size(len, attr);
2574
2575                 if (nob >= recsize) {
2576                         result = iops->rec(env, it, (struct dt_rec *)ent, attr);
2577                         if (result == -ESTALE)
2578                                 goto next;
2579                         if (result != 0)
2580                                 goto out;
2581
2582                         /* osd might not able to pack all attributes,
2583                          * so recheck rec length */
2584                         recsize = le16_to_cpu(ent->lde_reclen);
2585                 } else {
2586                         result = (last != NULL) ? 0 :-EINVAL;
2587                         goto out;
2588                 }
2589                 last = ent;
2590                 ent = (void *)ent + recsize;
2591                 nob -= recsize;
2592
2593 next:
2594                 result = iops->next(env, it);
2595                 if (result == -ESTALE)
2596                         goto next;
2597         } while (result == 0);
2598
2599 out:
2600         dp->ldp_hash_end = cpu_to_le64(hash);
2601         if (last != NULL) {
2602                 if (last->lde_hash == dp->ldp_hash_end)
2603                         dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2604                 last->lde_reclen = 0; /* end mark */
2605         }
2606         return result;
2607 }
2608
2609 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2610                           const struct lu_rdpg *rdpg)
2611 {
2612         struct dt_it      *it;
2613         struct dt_object  *next = mdd_object_child(obj);
2614         const struct dt_it_ops  *iops;
2615         struct page       *pg;
2616         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2617         int i;
2618         int nlupgs = 0;
2619         int rc;
2620         int nob;
2621
2622         LASSERT(rdpg->rp_pages != NULL);
2623         LASSERT(next->do_index_ops != NULL);
2624
2625         if (rdpg->rp_count <= 0)
2626                 return -EFAULT;
2627
2628         /*
2629          * iterate through directory and fill pages from @rdpg
2630          */
2631         iops = &next->do_index_ops->dio_it;
2632         it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
2633         if (IS_ERR(it))
2634                 return PTR_ERR(it);
2635
2636         rc = iops->load(env, it, rdpg->rp_hash);
2637
2638         if (rc == 0) {
2639                 /*
2640                  * Iterator didn't find record with exactly the key requested.
2641                  *
2642                  * It is currently either
2643                  *
2644                  *     - positioned above record with key less than
2645                  *     requested---skip it.
2646                  *
2647                  *     - or not positioned at all (is in IAM_IT_SKEWED
2648                  *     state)---position it on the next item.
2649                  */
2650                 rc = iops->next(env, it);
2651         } else if (rc > 0)
2652                 rc = 0;
2653
2654         /*
2655          * At this point and across for-loop:
2656          *
2657          *  rc == 0 -> ok, proceed.
2658          *  rc >  0 -> end of directory.
2659          *  rc <  0 -> error.
2660          */
2661         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2662              i++, nob -= CFS_PAGE_SIZE) {
2663                 struct lu_dirpage *dp;
2664
2665                 LASSERT(i < rdpg->rp_npages);
2666                 pg = rdpg->rp_pages[i];
2667                 dp = cfs_kmap(pg);
2668 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2669 repeat:
2670 #endif
2671                 rc = mdd_dir_page_build(env, mdd, dp,
2672                                         min_t(int, nob, LU_PAGE_SIZE),
2673                                         iops, it, rdpg->rp_attrs);
2674                 if (rc > 0) {
2675                         /*
2676                          * end of directory.
2677                          */
2678                         dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2679                         nlupgs++;
2680                 } else if (rc < 0) {
2681                         CWARN("build page failed: %d!\n", rc);
2682                 } else {
2683                         nlupgs++;
2684 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2685                         dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2686                         if ((unsigned long)dp & ~CFS_PAGE_MASK)
2687                                 goto repeat;
2688 #endif
2689                 }
2690                 cfs_kunmap(pg);
2691         }
2692         if (rc >= 0) {
2693                 struct lu_dirpage *dp;
2694
2695                 dp = cfs_kmap(rdpg->rp_pages[0]);
2696                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2697                 if (nlupgs == 0) {
2698                         /*
2699                          * No pages were processed, mark this for first page
2700                          * and send back.
2701                          */
2702                         dp->ldp_flags  = cpu_to_le32(LDF_EMPTY);
2703                         nlupgs = 1;
2704                 }
2705                 cfs_kunmap(rdpg->rp_pages[0]);
2706
2707                 rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
2708         }
2709         iops->put(env, it);
2710         iops->fini(env, it);
2711
2712         return rc;
2713 }
2714
2715 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2716                  const struct lu_rdpg *rdpg)
2717 {
2718         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2719         int rc;
2720         ENTRY;
2721
2722         LASSERT(mdd_object_exists(mdd_obj));
2723
2724         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2725         rc = mdd_readpage_sanity_check(env, mdd_obj);
2726         if (rc)
2727                 GOTO(out_unlock, rc);
2728
2729         if (mdd_is_dead_obj(mdd_obj)) {
2730                 struct page *pg;
2731                 struct lu_dirpage *dp;
2732
2733                 /*
2734                  * According to POSIX, please do not return any entry to client:
2735                  * even dot and dotdot should not be returned.
2736                  */
2737                 CWARN("readdir from dead object: "DFID"\n",
2738                         PFID(mdd_object_fid(mdd_obj)));
2739
2740                 if (rdpg->rp_count <= 0)
2741                         GOTO(out_unlock, rc = -EFAULT);
2742                 LASSERT(rdpg->rp_pages != NULL);
2743
2744                 pg = rdpg->rp_pages[0];
2745                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2746                 memset(dp, 0 , sizeof(struct lu_dirpage));
2747                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2748                 dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
2749                 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2750                 cfs_kunmap(pg);
2751                 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2752         }
2753
2754         rc = __mdd_readpage(env, mdd_obj, rdpg);
2755
2756         EXIT;
2757 out_unlock:
2758         mdd_read_unlock(env, mdd_obj);
2759         return rc;
2760 }
2761
2762 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2763 {
2764         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2765         struct dt_object *next;
2766
2767         LASSERT(mdd_object_exists(mdd_obj));
2768         next = mdd_object_child(mdd_obj);
2769         return next->do_ops->do_object_sync(env, next);
2770 }
2771
2772 const struct md_object_operations mdd_obj_ops = {
2773         .moo_permission    = mdd_permission,
2774         .moo_attr_get      = mdd_attr_get,
2775         .moo_attr_set      = mdd_attr_set,
2776         .moo_xattr_get     = mdd_xattr_get,
2777         .moo_xattr_set     = mdd_xattr_set,
2778         .moo_xattr_list    = mdd_xattr_list,
2779         .moo_xattr_del     = mdd_xattr_del,
2780         .moo_object_create = mdd_object_create,
2781         .moo_ref_add       = mdd_ref_add,
2782         .moo_ref_del       = mdd_ref_del,
2783         .moo_open          = mdd_open,
2784         .moo_close         = mdd_close,
2785         .moo_readpage      = mdd_readpage,
2786         .moo_readlink      = mdd_readlink,
2787         .moo_changelog     = mdd_changelog,
2788         .moo_capa_get      = mdd_capa_get,
2789         .moo_object_sync   = mdd_object_sync,
2790         .moo_path          = mdd_path,
2791         .moo_file_lock     = mdd_file_lock,
2792         .moo_file_unlock   = mdd_file_unlock,
2793 };