Whamcloud - gitweb
27c68749011b86b8279269fe419b8bc03c67a591
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_object.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <linux/module.h>
46 #include <obd.h>
47 #include <obd_class.h>
48 #include <obd_support.h>
49 #include <lprocfs_status.h>
50 /* fid_be_cpu(), fid_cpu_to_be(). */
51 #include <lustre_fid.h>
52 #include <obd_lov.h>
53
54 #include <lustre_param.h>
55 #include <lustre_mds.h>
56 #include <lustre/lustre_idl.h>
57
58 #include "mdd_internal.h"
59
60 static const struct lu_object_operations mdd_lu_obj_ops;
61
62 static int mdd_xattr_get(const struct lu_env *env,
63                          struct md_object *obj, struct lu_buf *buf,
64                          const char *name);
65
66 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
67                  void **data)
68 {
69         if (mdd_object_exists(obj) == 0) {
70                 CERROR("%s: object "DFID" not found: rc = -2\n",
71                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
72                 return -ENOENT;
73         }
74         mdo_data_get(env, obj, data);
75         return 0;
76 }
77
78 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
79                struct lu_attr *la, struct lustre_capa *capa)
80 {
81         if (mdd_object_exists(obj) == 0) {
82                 CERROR("%s: object "DFID" not found: rc = -2\n",
83                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
84                 return -ENOENT;
85         }
86         return mdo_attr_get(env, obj, la, capa);
87 }
88
89 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
90 {
91         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
92
93         if (flags & LUSTRE_APPEND_FL)
94                 obj->mod_flags |= APPEND_OBJ;
95
96         if (flags & LUSTRE_IMMUTABLE_FL)
97                 obj->mod_flags |= IMMUTE_OBJ;
98 }
99
100 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
101 {
102         struct mdd_thread_info *info;
103
104         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
105         LASSERT(info != NULL);
106         return info;
107 }
108
109 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
110 {
111         struct lu_buf *buf;
112
113         buf = &mdd_env_info(env)->mti_buf;
114         buf->lb_buf = area;
115         buf->lb_len = len;
116         return buf;
117 }
118
119 void mdd_buf_put(struct lu_buf *buf)
120 {
121         if (buf == NULL || buf->lb_buf == NULL)
122                 return;
123         OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
124         buf->lb_buf = NULL;
125         buf->lb_len = 0;
126 }
127
128 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
129                                        const void *area, ssize_t len)
130 {
131         struct lu_buf *buf;
132
133         buf = &mdd_env_info(env)->mti_buf;
134         buf->lb_buf = (void *)area;
135         buf->lb_len = len;
136         return buf;
137 }
138
139 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
140 {
141         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
142
143         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
144                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
145                 buf->lb_buf = NULL;
146         }
147         if (buf->lb_buf == NULL) {
148                 buf->lb_len = len;
149                 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
150                 if (buf->lb_buf == NULL)
151                         buf->lb_len = 0;
152         }
153         return buf;
154 }
155
156 /** Increase the size of the \a mti_big_buf.
157  * preserves old data in buffer
158  * old buffer remains unchanged on error
159  * \retval 0 or -ENOMEM
160  */
161 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
162 {
163         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
164         struct lu_buf buf;
165
166         LASSERT(len >= oldbuf->lb_len);
167         OBD_ALLOC_LARGE(buf.lb_buf, len);
168
169         if (buf.lb_buf == NULL)
170                 return -ENOMEM;
171
172         buf.lb_len = len;
173         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
174
175         OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
176
177         memcpy(oldbuf, &buf, sizeof(buf));
178
179         return 0;
180 }
181
182 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
183                                        struct mdd_device *mdd)
184 {
185         struct mdd_thread_info *mti = mdd_env_info(env);
186         int                     max_cookie_size;
187
188         max_cookie_size = mdd_lov_cookiesize(env, mdd);
189         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
190                 if (mti->mti_max_cookie)
191                         OBD_FREE_LARGE(mti->mti_max_cookie,
192                                        mti->mti_max_cookie_size);
193                 mti->mti_max_cookie = NULL;
194                 mti->mti_max_cookie_size = 0;
195         }
196         if (unlikely(mti->mti_max_cookie == NULL)) {
197                 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
198                 if (likely(mti->mti_max_cookie != NULL))
199                         mti->mti_max_cookie_size = max_cookie_size;
200         }
201         if (likely(mti->mti_max_cookie != NULL))
202                 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
203         return mti->mti_max_cookie;
204 }
205
206 struct lov_mds_md *mdd_max_lmm_buffer(const struct lu_env *env, int size)
207 {
208         struct mdd_thread_info *mti = mdd_env_info(env);
209
210         if (unlikely(mti->mti_max_lmm_size < size)) {
211                 int rsize = size_roundup_power2(size);
212
213                 if (mti->mti_max_lmm_size > 0) {
214                         LASSERT(mti->mti_max_lmm);
215                         OBD_FREE_LARGE(mti->mti_max_lmm,
216                                        mti->mti_max_lmm_size);
217                         mti->mti_max_lmm = NULL;
218                         mti->mti_max_lmm_size = 0;
219                 }
220
221                 OBD_ALLOC_LARGE(mti->mti_max_lmm, rsize);
222                 if (likely(mti->mti_max_lmm != NULL))
223                         mti->mti_max_lmm_size = rsize;
224         }
225         return mti->mti_max_lmm;
226 }
227
228 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
229                                    struct mdd_device *mdd)
230 {
231         int max_lmm_size;
232
233         max_lmm_size = mdd_lov_mdsize(env, mdd);
234         return mdd_max_lmm_buffer(env, max_lmm_size);
235 }
236
237 struct lu_object *mdd_object_alloc(const struct lu_env *env,
238                                    const struct lu_object_header *hdr,
239                                    struct lu_device *d)
240 {
241         struct mdd_object *mdd_obj;
242
243         OBD_ALLOC_PTR(mdd_obj);
244         if (mdd_obj != NULL) {
245                 struct lu_object *o;
246
247                 o = mdd2lu_obj(mdd_obj);
248                 lu_object_init(o, NULL, d);
249                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
250                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
251                 mdd_obj->mod_count = 0;
252                 o->lo_ops = &mdd_lu_obj_ops;
253                 return o;
254         } else {
255                 return NULL;
256         }
257 }
258
259 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
260                            const struct lu_object_conf *unused)
261 {
262         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
263         struct mdd_object *mdd_obj = lu2mdd_obj(o);
264         struct lu_object  *below;
265         struct lu_device  *under;
266         ENTRY;
267
268         mdd_obj->mod_cltime = 0;
269         under = &d->mdd_child->dd_lu_dev;
270         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
271         mdd_pdlock_init(mdd_obj);
272         if (below == NULL)
273                 RETURN(-ENOMEM);
274
275         lu_object_add(o, below);
276
277         RETURN(0);
278 }
279
280 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
281 {
282         if (lu_object_exists(o))
283                 return mdd_get_flags(env, lu2mdd_obj(o));
284         else
285                 return 0;
286 }
287
288 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
289 {
290         struct mdd_object *mdd = lu2mdd_obj(o);
291
292         lu_object_fini(o);
293         OBD_FREE_PTR(mdd);
294 }
295
296 static int mdd_object_print(const struct lu_env *env, void *cookie,
297                             lu_printer_t p, const struct lu_object *o)
298 {
299         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
300         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
301                     "valid=%x, cltime="LPU64", flags=%lx)",
302                     mdd, mdd->mod_count, mdd->mod_valid,
303                     mdd->mod_cltime, mdd->mod_flags);
304 }
305
306 static const struct lu_object_operations mdd_lu_obj_ops = {
307         .loo_object_init    = mdd_object_init,
308         .loo_object_start   = mdd_object_start,
309         .loo_object_free    = mdd_object_free,
310         .loo_object_print   = mdd_object_print,
311 };
312
313 struct mdd_object *mdd_object_find(const struct lu_env *env,
314                                    struct mdd_device *d,
315                                    const struct lu_fid *f)
316 {
317         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
318 }
319
320 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
321                         const char *path, struct lu_fid *fid)
322 {
323         struct lu_buf *buf;
324         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
325         struct mdd_object *obj;
326         struct lu_name *lname = &mdd_env_info(env)->mti_name;
327         char *name;
328         int rc = 0;
329         ENTRY;
330
331         /* temp buffer for path element */
332         buf = mdd_buf_alloc(env, PATH_MAX);
333         if (buf->lb_buf == NULL)
334                 RETURN(-ENOMEM);
335
336         lname->ln_name = name = buf->lb_buf;
337         lname->ln_namelen = 0;
338         *f = mdd->mdd_root_fid;
339
340         while(1) {
341                 while (*path == '/')
342                         path++;
343                 if (*path == '\0')
344                         break;
345                 while (*path != '/' && *path != '\0') {
346                         *name = *path;
347                         path++;
348                         name++;
349                         lname->ln_namelen++;
350                 }
351
352                 *name = '\0';
353                 /* find obj corresponding to fid */
354                 obj = mdd_object_find(env, mdd, f);
355                 if (obj == NULL)
356                         GOTO(out, rc = -EREMOTE);
357                 if (IS_ERR(obj))
358                         GOTO(out, rc = PTR_ERR(obj));
359                 /* get child fid from parent and name */
360                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
361                 mdd_object_put(env, obj);
362                 if (rc)
363                         break;
364
365                 name = buf->lb_buf;
366                 lname->ln_namelen = 0;
367         }
368
369         if (!rc)
370                 *fid = *f;
371 out:
372         RETURN(rc);
373 }
374
375 /** The maximum depth that fid2path() will search.
376  * This is limited only because we want to store the fids for
377  * historical path lookup purposes.
378  */
379 #define MAX_PATH_DEPTH 100
380
381 /** mdd_path() lookup structure. */
382 struct path_lookup_info {
383         __u64                pli_recno;        /**< history point */
384         __u64                pli_currec;       /**< current record */
385         struct lu_fid        pli_fid;
386         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
387         struct mdd_object   *pli_mdd_obj;
388         char                *pli_path;         /**< full path */
389         int                  pli_pathlen;
390         int                  pli_linkno;       /**< which hardlink to follow */
391         int                  pli_fidcount;     /**< number of \a pli_fids */
392 };
393
394 static int mdd_path_current(const struct lu_env *env,
395                             struct path_lookup_info *pli)
396 {
397         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
398         struct mdd_object *mdd_obj;
399         struct lu_buf     *buf = NULL;
400         struct link_ea_header *leh;
401         struct link_ea_entry  *lee;
402         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
403         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
404         char *ptr;
405         int reclen;
406         int rc;
407         ENTRY;
408
409         ptr = pli->pli_path + pli->pli_pathlen - 1;
410         *ptr = 0;
411         --ptr;
412         pli->pli_fidcount = 0;
413         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
414
415         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
416                 mdd_obj = mdd_object_find(env, mdd,
417                                           &pli->pli_fids[pli->pli_fidcount]);
418                 if (mdd_obj == NULL)
419                         GOTO(out, rc = -EREMOTE);
420                 if (IS_ERR(mdd_obj))
421                         GOTO(out, rc = PTR_ERR(mdd_obj));
422                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
423                 if (rc <= 0) {
424                         mdd_object_put(env, mdd_obj);
425                         if (rc == -1)
426                                 rc = -EREMOTE;
427                         else if (rc == 0)
428                                 /* Do I need to error out here? */
429                                 rc = -ENOENT;
430                         GOTO(out, rc);
431                 }
432
433                 /* Get parent fid and object name */
434                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
435                 buf = mdd_links_get(env, mdd_obj);
436                 mdd_read_unlock(env, mdd_obj);
437                 mdd_object_put(env, mdd_obj);
438                 if (IS_ERR(buf))
439                         GOTO(out, rc = PTR_ERR(buf));
440
441                 leh = buf->lb_buf;
442                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
443                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
444
445                 /* If set, use link #linkno for path lookup, otherwise use
446                    link #0.  Only do this for the final path element. */
447                 if ((pli->pli_fidcount == 0) &&
448                     (pli->pli_linkno < leh->leh_reccount)) {
449                         int count;
450                         for (count = 0; count < pli->pli_linkno; count++) {
451                                 lee = (struct link_ea_entry *)
452                                      ((char *)lee + reclen);
453                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
454                         }
455                         if (pli->pli_linkno < leh->leh_reccount - 1)
456                                 /* indicate to user there are more links */
457                                 pli->pli_linkno++;
458                 }
459
460                 /* Pack the name in the end of the buffer */
461                 ptr -= tmpname->ln_namelen;
462                 if (ptr - 1 <= pli->pli_path)
463                         GOTO(out, rc = -EOVERFLOW);
464                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
465                 *(--ptr) = '/';
466
467                 /* Store the parent fid for historic lookup */
468                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
469                         GOTO(out, rc = -EOVERFLOW);
470                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
471         }
472
473         /* Verify that our path hasn't changed since we started the lookup.
474            Record the current index, and verify the path resolves to the
475            same fid. If it does, then the path is correct as of this index. */
476         cfs_spin_lock(&mdd->mdd_cl.mc_lock);
477         pli->pli_currec = mdd->mdd_cl.mc_index;
478         cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
479         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
480         if (rc) {
481                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
482                 GOTO (out, rc = -EAGAIN);
483         }
484         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
485                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
486                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
487                        PFID(&pli->pli_fid));
488                 GOTO(out, rc = -EAGAIN);
489         }
490         ptr++; /* skip leading / */
491         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
492
493         EXIT;
494 out:
495         if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
496                 /* if we vmalloced a large buffer drop it */
497                 mdd_buf_put(buf);
498
499         return rc;
500 }
501
502 static int mdd_path_historic(const struct lu_env *env,
503                              struct path_lookup_info *pli)
504 {
505         return 0;
506 }
507
508 /* Returns the full path to this fid, as of changelog record recno. */
509 static int mdd_path(const struct lu_env *env, struct md_object *obj,
510                     char *path, int pathlen, __u64 *recno, int *linkno)
511 {
512         struct path_lookup_info *pli;
513         int tries = 3;
514         int rc = -EAGAIN;
515         ENTRY;
516
517         if (pathlen < 3)
518                 RETURN(-EOVERFLOW);
519
520         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
521                 path[0] = '\0';
522                 RETURN(0);
523         }
524
525         OBD_ALLOC_PTR(pli);
526         if (pli == NULL)
527                 RETURN(-ENOMEM);
528
529         pli->pli_mdd_obj = md2mdd_obj(obj);
530         pli->pli_recno = *recno;
531         pli->pli_path = path;
532         pli->pli_pathlen = pathlen;
533         pli->pli_linkno = *linkno;
534
535         /* Retry multiple times in case file is being moved */
536         while (tries-- && rc == -EAGAIN)
537                 rc = mdd_path_current(env, pli);
538
539         /* For historical path lookup, the current links may not have existed
540          * at "recno" time.  We must switch over to earlier links/parents
541          * by using the changelog records.  If the earlier parent doesn't
542          * exist, we must search back through the changelog to reconstruct
543          * its parents, then check if it exists, etc.
544          * We may ignore this problem for the initial implementation and
545          * state that an "original" hardlink must still exist for us to find
546          * historic path name. */
547         if (pli->pli_recno != -1) {
548                 rc = mdd_path_historic(env, pli);
549         } else {
550                 *recno = pli->pli_currec;
551                 /* Return next link index to caller */
552                 *linkno = pli->pli_linkno;
553         }
554
555         OBD_FREE_PTR(pli);
556
557         RETURN (rc);
558 }
559
560 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
561 {
562         struct lu_attr *la = &mdd_env_info(env)->mti_la;
563         int rc;
564
565         ENTRY;
566         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
567         if (rc == 0) {
568                 mdd_flags_xlate(obj, la->la_flags);
569         }
570         RETURN(rc);
571 }
572
573 /* get only inode attributes */
574 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
575                   struct md_attr *ma)
576 {
577         int rc = 0;
578         ENTRY;
579
580         if (ma->ma_valid & MA_INODE)
581                 RETURN(0);
582
583         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
584                           mdd_object_capa(env, mdd_obj));
585         if (rc == 0)
586                 ma->ma_valid |= MA_INODE;
587         RETURN(rc);
588 }
589
590 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
591 {
592         struct lov_desc *ldesc;
593         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
594         struct lov_user_md *lum = (struct lov_user_md*)lmm;
595         ENTRY;
596
597         if (!lum)
598                 RETURN(0);
599
600         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
601         LASSERT(ldesc != NULL);
602
603         lum->lmm_magic = LOV_MAGIC_V1;
604         lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
605         lum->lmm_pattern = ldesc->ld_pattern;
606         lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
607         lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
608         lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
609
610         RETURN(sizeof(*lum));
611 }
612
613 static int is_rootdir(struct mdd_object *mdd_obj)
614 {
615         const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
616         const struct lu_fid *fid = mdo2fid(mdd_obj);
617
618         return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
619 }
620
621 int mdd_big_lmm_get(const struct lu_env *env, struct mdd_object *obj,
622                     struct md_attr *ma)
623 {
624         struct mdd_thread_info *info = mdd_env_info(env);
625         int size;
626         int rc;
627         ENTRY;
628
629         LASSERT(info != NULL);
630         LASSERT(ma->ma_lmm_size > 0);
631         LASSERT(ma->ma_big_lmm_used == 0);
632
633         rc = mdo_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
634                            mdd_object_capa(env, obj));
635         if (rc < 0)
636                 RETURN(rc);
637
638         /* big_lmm may need to grow */
639         size = rc;
640         mdd_max_lmm_buffer(env, size);
641         if (info->mti_max_lmm == NULL)
642                 RETURN(-ENOMEM);
643
644         LASSERT(info->mti_max_lmm_size >= size);
645         rc = mdd_get_md(env, obj, info->mti_max_lmm, &size,
646                         XATTR_NAME_LOV);
647         if (rc < 0)
648                 RETURN(rc);
649
650         ma->ma_big_lmm_used = 1;
651         ma->ma_valid |= MA_LOV;
652         ma->ma_lmm = info->mti_max_lmm;
653         ma->ma_lmm_size = size;
654         LASSERT(size == rc);
655         RETURN(rc);
656 }
657
658 /* get lov EA only */
659 static int __mdd_lmm_get(const struct lu_env *env,
660                          struct mdd_object *mdd_obj, struct md_attr *ma)
661 {
662         int rc;
663         ENTRY;
664
665         if (ma->ma_valid & MA_LOV)
666                 RETURN(0);
667
668         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
669                         XATTR_NAME_LOV);
670         if (rc == -ERANGE)
671                 rc = mdd_big_lmm_get(env, mdd_obj, ma);
672         else if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
673                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
674
675         if (rc > 0) {
676                 ma->ma_lmm_size = rc;
677                 ma->ma_layout_gen = ma->ma_lmm->lmm_layout_gen;
678                 ma->ma_valid |= MA_LOV | MA_LAY_GEN;
679                 rc = 0;
680         }
681         RETURN(rc);
682 }
683
684 /* get the first parent fid from link EA */
685 static int mdd_pfid_get(const struct lu_env *env,
686                         struct mdd_object *mdd_obj, struct md_attr *ma)
687 {
688         struct lu_buf *buf;
689         struct link_ea_header *leh;
690         struct link_ea_entry *lee;
691         struct lu_fid *pfid = &ma->ma_pfid;
692         ENTRY;
693
694         if (ma->ma_valid & MA_PFID)
695                 RETURN(0);
696
697         buf = mdd_links_get(env, mdd_obj);
698         if (IS_ERR(buf))
699                 RETURN(PTR_ERR(buf));
700
701         leh = buf->lb_buf;
702         lee = (struct link_ea_entry *)(leh + 1);
703         memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
704         fid_be_to_cpu(pfid, pfid);
705         ma->ma_valid |= MA_PFID;
706         if (buf->lb_len > OBD_ALLOC_BIG)
707                 /* if we vmalloced a large buffer drop it */
708                 mdd_buf_put(buf);
709         RETURN(0);
710 }
711
712 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
713                        struct md_attr *ma)
714 {
715         int rc;
716         ENTRY;
717
718         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
719         rc = __mdd_lmm_get(env, mdd_obj, ma);
720         mdd_read_unlock(env, mdd_obj);
721         RETURN(rc);
722 }
723
724 /* get lmv EA only*/
725 static int __mdd_lmv_get(const struct lu_env *env,
726                          struct mdd_object *mdd_obj, struct md_attr *ma)
727 {
728         int rc;
729         ENTRY;
730
731         if (ma->ma_valid & MA_LMV)
732                 RETURN(0);
733
734         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
735                         XATTR_NAME_LMV);
736         if (rc > 0) {
737                 ma->ma_valid |= MA_LMV;
738                 rc = 0;
739         }
740         RETURN(rc);
741 }
742
743 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
744                          struct md_attr *ma)
745 {
746         struct mdd_thread_info *info = mdd_env_info(env);
747         struct lustre_mdt_attrs *lma =
748                                  (struct lustre_mdt_attrs *)info->mti_xattr_buf;
749         int lma_size;
750         int rc;
751         ENTRY;
752
753         /* If all needed data are already valid, nothing to do */
754         if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
755             (ma->ma_need & (MA_HSM | MA_SOM)))
756                 RETURN(0);
757
758         /* Read LMA from disk EA */
759         lma_size = sizeof(info->mti_xattr_buf);
760         rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
761         if (rc <= 0)
762                 RETURN(rc);
763
764         /* Useless to check LMA incompatibility because this is already done in
765          * osd_ea_fid_get(), and this will fail long before this code is
766          * called.
767          * So, if we are here, LMA is compatible.
768          */
769
770         lustre_lma_swab(lma);
771
772         /* Swab and copy LMA */
773         if (ma->ma_need & MA_HSM) {
774                 if (lma->lma_compat & LMAC_HSM)
775                         ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
776                 else
777                         ma->ma_hsm.mh_flags = 0;
778                 ma->ma_valid |= MA_HSM;
779         }
780
781         /* Copy SOM */
782         if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
783                 LASSERT(ma->ma_som != NULL);
784                 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
785                 ma->ma_som->msd_size    = lma->lma_som_size;
786                 ma->ma_som->msd_blocks  = lma->lma_som_blocks;
787                 ma->ma_som->msd_mountid = lma->lma_som_mountid;
788                 ma->ma_valid |= MA_SOM;
789         }
790
791         RETURN(0);
792 }
793
794 int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
795                                  struct md_attr *ma)
796 {
797         int rc = 0;
798         ENTRY;
799
800         if (ma->ma_need & MA_INODE)
801                 rc = mdd_iattr_get(env, mdd_obj, ma);
802
803         if (rc == 0 && ma->ma_need & MA_LOV) {
804                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
805                     S_ISDIR(mdd_object_type(mdd_obj)))
806                         rc = __mdd_lmm_get(env, mdd_obj, ma);
807         }
808         if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
809                 if (S_ISREG(mdd_object_type(mdd_obj)))
810                         rc = mdd_pfid_get(env, mdd_obj, ma);
811         }
812         if (rc == 0 && ma->ma_need & MA_LMV) {
813                 if (S_ISDIR(mdd_object_type(mdd_obj)))
814                         rc = __mdd_lmv_get(env, mdd_obj, ma);
815         }
816         if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
817                 if (S_ISREG(mdd_object_type(mdd_obj)))
818                         rc = __mdd_lma_get(env, mdd_obj, ma);
819         }
820 #ifdef CONFIG_FS_POSIX_ACL
821         if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
822                 if (S_ISDIR(mdd_object_type(mdd_obj)))
823                         rc = mdd_def_acl_get(env, mdd_obj, ma);
824         }
825 #endif
826         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
827                rc, ma->ma_valid, ma->ma_lmm);
828         RETURN(rc);
829 }
830
831 int mdd_attr_get_internal_locked(const struct lu_env *env,
832                                  struct mdd_object *mdd_obj, struct md_attr *ma)
833 {
834         int rc;
835         int needlock = ma->ma_need &
836                        (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
837
838         if (needlock)
839                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
840         rc = mdd_attr_get_internal(env, mdd_obj, ma);
841         if (needlock)
842                 mdd_read_unlock(env, mdd_obj);
843         return rc;
844 }
845
846 /*
847  * No permission check is needed.
848  */
849 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
850                         struct md_attr *ma)
851 {
852         struct mdd_object *mdd_obj = md2mdd_obj(obj);
853         int                rc;
854
855         ENTRY;
856         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
857         RETURN(rc);
858 }
859
860 /*
861  * No permission check is needed.
862  */
863 static int mdd_xattr_get(const struct lu_env *env,
864                          struct md_object *obj, struct lu_buf *buf,
865                          const char *name)
866 {
867         struct mdd_object *mdd_obj = md2mdd_obj(obj);
868         int rc;
869
870         ENTRY;
871
872         if (mdd_object_exists(mdd_obj) == 0) {
873                 CERROR("%s: object "DFID" not found: rc = -2\n",
874                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
875                 return -ENOENT;
876         }
877
878         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
879         rc = mdo_xattr_get(env, mdd_obj, buf, name,
880                            mdd_object_capa(env, mdd_obj));
881         mdd_read_unlock(env, mdd_obj);
882
883         RETURN(rc);
884 }
885
886 /*
887  * Permission check is done when open,
888  * no need check again.
889  */
890 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
891                         struct lu_buf *buf)
892 {
893         struct mdd_object *mdd_obj = md2mdd_obj(obj);
894         struct dt_object  *next;
895         loff_t             pos = 0;
896         int                rc;
897         ENTRY;
898
899         if (mdd_object_exists(mdd_obj) == 0) {
900                 CERROR("%s: object "DFID" not found: rc = -2\n",
901                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
902                 return -ENOENT;
903         }
904
905         next = mdd_object_child(mdd_obj);
906         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
907         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
908                                          mdd_object_capa(env, mdd_obj));
909         mdd_read_unlock(env, mdd_obj);
910         RETURN(rc);
911 }
912
913 /*
914  * No permission check is needed.
915  */
916 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
917                           struct lu_buf *buf)
918 {
919         struct mdd_object *mdd_obj = md2mdd_obj(obj);
920         int rc;
921
922         ENTRY;
923
924         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
925         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
926         mdd_read_unlock(env, mdd_obj);
927
928         RETURN(rc);
929 }
930
931 int mdd_declare_object_create_internal(const struct lu_env *env,
932                                        struct mdd_object *p,
933                                        struct mdd_object *c,
934                                        struct md_attr *ma,
935                                        struct thandle *handle,
936                                        const struct md_op_spec *spec)
937 {
938         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
939         const struct dt_index_features *feat = spec->sp_feat;
940         int rc;
941         ENTRY;
942
943         if (feat != &dt_directory_features && feat != NULL)
944                 dof->dof_type = DFT_INDEX;
945         else
946                 dof->dof_type = dt_mode_to_dft(ma->ma_attr.la_mode);
947
948         dof->u.dof_idx.di_feat = feat;
949
950         rc = mdo_declare_create_obj(env, c, &ma->ma_attr, NULL, dof, handle);
951
952         RETURN(rc);
953 }
954
955 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
956                                struct mdd_object *c, struct md_attr *ma,
957                                struct thandle *handle,
958                                const struct md_op_spec *spec)
959 {
960         struct lu_attr *attr = &ma->ma_attr;
961         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
962         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
963         const struct dt_index_features *feat = spec->sp_feat;
964         int rc;
965         ENTRY;
966
967         if (!mdd_object_exists(c)) {
968                 struct dt_object *next = mdd_object_child(c);
969                 LASSERT(next);
970
971                 if (feat != &dt_directory_features && feat != NULL)
972                         dof->dof_type = DFT_INDEX;
973                 else
974                         dof->dof_type = dt_mode_to_dft(attr->la_mode);
975
976                 dof->u.dof_idx.di_feat = feat;
977
978                 /* @hint will be initialized by underlying device. */
979                 next->do_ops->do_ah_init(env, hint,
980                                          p ? mdd_object_child(p) : NULL,
981                                          attr->la_mode & S_IFMT);
982
983                 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
984                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
985         } else
986                 rc = -EEXIST;
987
988         RETURN(rc);
989 }
990
991 /**
992  * Make sure the ctime is increased only.
993  */
994 static inline int mdd_attr_check(const struct lu_env *env,
995                                  struct mdd_object *obj,
996                                  struct lu_attr *attr)
997 {
998         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
999         int rc;
1000         ENTRY;
1001
1002         if (attr->la_valid & LA_CTIME) {
1003                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1004                 if (rc)
1005                         RETURN(rc);
1006
1007                 if (attr->la_ctime < tmp_la->la_ctime)
1008                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
1009                 else if (attr->la_valid == LA_CTIME &&
1010                          attr->la_ctime == tmp_la->la_ctime)
1011                         attr->la_valid &= ~LA_CTIME;
1012         }
1013         RETURN(0);
1014 }
1015
1016 int mdd_attr_set_internal(const struct lu_env *env,
1017                           struct mdd_object *obj,
1018                           struct lu_attr *attr,
1019                           struct thandle *handle,
1020                           int needacl)
1021 {
1022         int rc;
1023         ENTRY;
1024
1025         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
1026 #ifdef CONFIG_FS_POSIX_ACL
1027         if (!rc && (attr->la_valid & LA_MODE) && needacl)
1028                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
1029 #endif
1030         RETURN(rc);
1031 }
1032
1033 int mdd_attr_check_set_internal(const struct lu_env *env,
1034                                 struct mdd_object *obj,
1035                                 struct lu_attr *attr,
1036                                 struct thandle *handle,
1037                                 int needacl)
1038 {
1039         int rc;
1040         ENTRY;
1041
1042         rc = mdd_attr_check(env, obj, attr);
1043         if (rc)
1044                 RETURN(rc);
1045
1046         if (attr->la_valid)
1047                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1048         RETURN(rc);
1049 }
1050
1051 static int mdd_attr_set_internal_locked(const struct lu_env *env,
1052                                         struct mdd_object *obj,
1053                                         struct lu_attr *attr,
1054                                         struct thandle *handle,
1055                                         int needacl)
1056 {
1057         int rc;
1058         ENTRY;
1059
1060         needacl = needacl && (attr->la_valid & LA_MODE);
1061         if (needacl)
1062                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1063         rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
1064         if (needacl)
1065                 mdd_write_unlock(env, obj);
1066         RETURN(rc);
1067 }
1068
1069 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
1070                                        struct mdd_object *obj,
1071                                        struct lu_attr *attr,
1072                                        struct thandle *handle,
1073                                        int needacl)
1074 {
1075         int rc;
1076         ENTRY;
1077
1078         needacl = needacl && (attr->la_valid & LA_MODE);
1079         if (needacl)
1080                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
1081         rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
1082         if (needacl)
1083                 mdd_write_unlock(env, obj);
1084         RETURN(rc);
1085 }
1086
1087 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
1088                     const struct lu_buf *buf, const char *name,
1089                     int fl, struct thandle *handle)
1090 {
1091         struct lustre_capa *capa = mdd_object_capa(env, obj);
1092         int rc = -EINVAL;
1093         ENTRY;
1094
1095         if (buf->lb_buf && buf->lb_len > 0)
1096                 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1097         else if (buf->lb_buf == NULL && buf->lb_len == 0)
1098                 rc = mdo_xattr_del(env, obj, name, handle, capa);
1099
1100         RETURN(rc);
1101 }
1102
1103 /*
1104  * This gives the same functionality as the code between
1105  * sys_chmod and inode_setattr
1106  * chown_common and inode_setattr
1107  * utimes and inode_setattr
1108  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1109  */
1110 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1111                         struct lu_attr *la, const struct md_attr *ma)
1112 {
1113         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
1114         struct md_ucred  *uc;
1115         int               rc;
1116         ENTRY;
1117
1118         if (!la->la_valid)
1119                 RETURN(0);
1120
1121         /* Do not permit change file type */
1122         if (la->la_valid & LA_TYPE)
1123                 RETURN(-EPERM);
1124
1125         /* They should not be processed by setattr */
1126         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1127                 RETURN(-EPERM);
1128
1129         /* export destroy does not have ->le_ses, but we may want
1130          * to drop LUSTRE_SOM_FL. */
1131         if (!env->le_ses)
1132                 RETURN(0);
1133
1134         uc = md_ucred(env);
1135
1136         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1137         if (rc)
1138                 RETURN(rc);
1139
1140         if (la->la_valid == LA_CTIME) {
1141                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1142                         /* This is only for set ctime when rename's source is
1143                          * on remote MDS. */
1144                         rc = mdd_may_delete(env, NULL, obj,
1145                                             (struct md_attr *)ma, 1, 0);
1146                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1147                         la->la_valid &= ~LA_CTIME;
1148                 RETURN(rc);
1149         }
1150
1151         if (la->la_valid == LA_ATIME) {
1152                 /* This is atime only set for read atime update on close. */
1153                 if (la->la_atime >= tmp_la->la_atime &&
1154                     la->la_atime < (tmp_la->la_atime +
1155                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1156                         la->la_valid &= ~LA_ATIME;
1157                 RETURN(0);
1158         }
1159
1160         /* Check if flags change. */
1161         if (la->la_valid & LA_FLAGS) {
1162                 unsigned int oldflags = 0;
1163                 unsigned int newflags = la->la_flags &
1164                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1165
1166                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1167                     !mdd_capable(uc, CFS_CAP_FOWNER))
1168                         RETURN(-EPERM);
1169
1170                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1171                  * only be changed by the relevant capability. */
1172                 if (mdd_is_immutable(obj))
1173                         oldflags |= LUSTRE_IMMUTABLE_FL;
1174                 if (mdd_is_append(obj))
1175                         oldflags |= LUSTRE_APPEND_FL;
1176                 if ((oldflags ^ newflags) &&
1177                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1178                         RETURN(-EPERM);
1179
1180                 if (!S_ISDIR(tmp_la->la_mode))
1181                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1182         }
1183
1184         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1185             (la->la_valid & ~LA_FLAGS) &&
1186             !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1187                 RETURN(-EPERM);
1188
1189         /* Check for setting the obj time. */
1190         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1191             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1192                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1193                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
1194                         rc = mdd_permission_internal_locked(env, obj, tmp_la,
1195                                                             MAY_WRITE,
1196                                                             MOR_TGT_CHILD);
1197                         if (rc)
1198                                 RETURN(rc);
1199                 }
1200         }
1201
1202         if (la->la_valid & LA_KILL_SUID) {
1203                 la->la_valid &= ~LA_KILL_SUID;
1204                 if ((tmp_la->la_mode & S_ISUID) &&
1205                     !(la->la_valid & LA_MODE)) {
1206                         la->la_mode = tmp_la->la_mode;
1207                         la->la_valid |= LA_MODE;
1208                 }
1209                 la->la_mode &= ~S_ISUID;
1210         }
1211
1212         if (la->la_valid & LA_KILL_SGID) {
1213                 la->la_valid &= ~LA_KILL_SGID;
1214                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1215                                         (S_ISGID | S_IXGRP)) &&
1216                     !(la->la_valid & LA_MODE)) {
1217                         la->la_mode = tmp_la->la_mode;
1218                         la->la_valid |= LA_MODE;
1219                 }
1220                 la->la_mode &= ~S_ISGID;
1221         }
1222
1223         /* Make sure a caller can chmod. */
1224         if (la->la_valid & LA_MODE) {
1225                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1226                     (uc->mu_fsuid != tmp_la->la_uid) &&
1227                     !mdd_capable(uc, CFS_CAP_FOWNER))
1228                         RETURN(-EPERM);
1229
1230                 if (la->la_mode == (cfs_umode_t) -1)
1231                         la->la_mode = tmp_la->la_mode;
1232                 else
1233                         la->la_mode = (la->la_mode & S_IALLUGO) |
1234                                       (tmp_la->la_mode & ~S_IALLUGO);
1235
1236                 /* Also check the setgid bit! */
1237                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1238                                        la->la_gid : tmp_la->la_gid) &&
1239                     !mdd_capable(uc, CFS_CAP_FSETID))
1240                         la->la_mode &= ~S_ISGID;
1241         } else {
1242                la->la_mode = tmp_la->la_mode;
1243         }
1244
1245         /* Make sure a caller can chown. */
1246         if (la->la_valid & LA_UID) {
1247                 if (la->la_uid == (uid_t) -1)
1248                         la->la_uid = tmp_la->la_uid;
1249                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1250                     (la->la_uid != tmp_la->la_uid)) &&
1251                     !mdd_capable(uc, CFS_CAP_CHOWN))
1252                         RETURN(-EPERM);
1253
1254                 /* If the user or group of a non-directory has been
1255                  * changed by a non-root user, remove the setuid bit.
1256                  * 19981026 David C Niemi <niemi@tux.org>
1257                  *
1258                  * Changed this to apply to all users, including root,
1259                  * to avoid some races. This is the behavior we had in
1260                  * 2.0. The check for non-root was definitely wrong
1261                  * for 2.2 anyway, as it should have been using
1262                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
1263                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1264                     !S_ISDIR(tmp_la->la_mode)) {
1265                         la->la_mode &= ~S_ISUID;
1266                         la->la_valid |= LA_MODE;
1267                 }
1268         }
1269
1270         /* Make sure caller can chgrp. */
1271         if (la->la_valid & LA_GID) {
1272                 if (la->la_gid == (gid_t) -1)
1273                         la->la_gid = tmp_la->la_gid;
1274                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1275                     ((la->la_gid != tmp_la->la_gid) &&
1276                     !lustre_in_group_p(uc, la->la_gid))) &&
1277                     !mdd_capable(uc, CFS_CAP_CHOWN))
1278                         RETURN(-EPERM);
1279
1280                 /* Likewise, if the user or group of a non-directory
1281                  * has been changed by a non-root user, remove the
1282                  * setgid bit UNLESS there is no group execute bit
1283                  * (this would be a file marked for mandatory
1284                  * locking).  19981026 David C Niemi <niemi@tux.org>
1285                  *
1286                  * Removed the fsuid check (see the comment above) --
1287                  * 19990830 SD. */
1288                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1289                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1290                         la->la_mode &= ~S_ISGID;
1291                         la->la_valid |= LA_MODE;
1292                 }
1293         }
1294
1295         /* For both Size-on-MDS case and truncate case,
1296          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1297          * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1298          * For SOM case, it is true, the MAY_WRITE perm has been checked
1299          * when open, no need check again. For truncate case, it is false,
1300          * the MAY_WRITE perm should be checked here. */
1301         if (ma->ma_attr_flags & MDS_SOM) {
1302                 /* For the "Size-on-MDS" setattr update, merge coming
1303                  * attributes with the set in the inode. BUG 10641 */
1304                 if ((la->la_valid & LA_ATIME) &&
1305                     (la->la_atime <= tmp_la->la_atime))
1306                         la->la_valid &= ~LA_ATIME;
1307
1308                 /* OST attributes do not have a priority over MDS attributes,
1309                  * so drop times if ctime is equal. */
1310                 if ((la->la_valid & LA_CTIME) &&
1311                     (la->la_ctime <= tmp_la->la_ctime))
1312                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1313         } else {
1314                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1315                         if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1316                               (uc->mu_fsuid == tmp_la->la_uid)) &&
1317                             !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1318                                 rc = mdd_permission_internal_locked(env, obj,
1319                                                             tmp_la, MAY_WRITE,
1320                                                             MOR_TGT_CHILD);
1321                                 if (rc)
1322                                         RETURN(rc);
1323                         }
1324                 }
1325                 if (la->la_valid & LA_CTIME) {
1326                         /* The pure setattr, it has the priority over what is
1327                          * already set, do not drop it if ctime is equal. */
1328                         if (la->la_ctime < tmp_la->la_ctime)
1329                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1330                                                   LA_CTIME);
1331                 }
1332         }
1333
1334         RETURN(0);
1335 }
1336
1337 /** Store a data change changelog record
1338  * If this fails, we must fail the whole transaction; we don't
1339  * want the change to commit without the log entry.
1340  * \param mdd_obj - mdd_object of change
1341  * \param handle - transacion handle
1342  */
1343 static int mdd_changelog_data_store(const struct lu_env     *env,
1344                                     struct mdd_device       *mdd,
1345                                     enum changelog_rec_type type,
1346                                     int                     flags,
1347                                     struct mdd_object       *mdd_obj,
1348                                     struct thandle          *handle)
1349 {
1350         const struct lu_fid *tfid = mdo2fid(mdd_obj);
1351         struct llog_changelog_rec *rec;
1352         struct thandle *th = NULL;
1353         struct lu_buf *buf;
1354         int reclen;
1355         int rc;
1356
1357         /* Not recording */
1358         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1359                 RETURN(0);
1360         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1361                 RETURN(0);
1362
1363         LASSERT(mdd_obj != NULL);
1364         LASSERT(handle != NULL);
1365
1366         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1367             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1368                 /* Don't need multiple updates in this log */
1369                 /* Don't check under lock - no big deal if we get an extra
1370                    entry */
1371                 RETURN(0);
1372         }
1373
1374         reclen = llog_data_len(sizeof(*rec));
1375         buf = mdd_buf_alloc(env, reclen);
1376         if (buf->lb_buf == NULL)
1377                 RETURN(-ENOMEM);
1378         rec = (struct llog_changelog_rec *)buf->lb_buf;
1379
1380         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1381         rec->cr.cr_type = (__u32)type;
1382         rec->cr.cr_tfid = *tfid;
1383         rec->cr.cr_namelen = 0;
1384         mdd_obj->mod_cltime = cfs_time_current_64();
1385
1386         rc = mdd_changelog_llog_write(mdd, rec, handle ? : th);
1387
1388         if (th)
1389                 mdd_trans_stop(env, mdd, rc, th);
1390
1391         if (rc < 0) {
1392                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1393                        rc, type, PFID(tfid));
1394                 return -EFAULT;
1395         }
1396
1397         return 0;
1398 }
1399
1400 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1401                   int flags, struct md_object *obj)
1402 {
1403         struct thandle *handle;
1404         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1405         struct mdd_device *mdd = mdo2mdd(obj);
1406         int rc;
1407         ENTRY;
1408
1409         handle = mdd_trans_create(env, mdd);
1410         if (IS_ERR(handle))
1411                 return(PTR_ERR(handle));
1412
1413         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1414         if (rc)
1415                 GOTO(stop, rc);
1416
1417         rc = mdd_trans_start(env, mdd, handle);
1418         if (rc)
1419                 GOTO(stop, rc);
1420
1421         rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1422                                       handle);
1423
1424 stop:
1425         mdd_trans_stop(env, mdd, rc, handle);
1426
1427         RETURN(rc);
1428 }
1429
1430 /**
1431  * Should be called with write lock held.
1432  *
1433  * \see mdd_lma_set_locked().
1434  */
1435 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1436                        const struct md_attr *ma, struct thandle *handle)
1437 {
1438         struct mdd_thread_info *info = mdd_env_info(env);
1439         struct lu_buf *buf;
1440         struct lustre_mdt_attrs *lma =
1441                                 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1442         int lmasize = sizeof(struct lustre_mdt_attrs);
1443         int rc = 0;
1444
1445         ENTRY;
1446
1447         /* Either HSM or SOM part is not valid, we need to read it before */
1448         if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1449                 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1450                 if (rc <= 0)
1451                         RETURN(rc);
1452
1453                 lustre_lma_swab(lma);
1454         } else {
1455                 memset(lma, 0, lmasize);
1456         }
1457
1458         /* Copy HSM data */
1459         if (ma->ma_valid & MA_HSM) {
1460                 lma->lma_flags  |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1461                 lma->lma_compat |= LMAC_HSM;
1462         }
1463
1464         /* Copy SOM data */
1465         if (ma->ma_valid & MA_SOM) {
1466                 LASSERT(ma->ma_som != NULL);
1467                 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1468                         lma->lma_compat     &= ~LMAC_SOM;
1469                 } else {
1470                         lma->lma_compat     |= LMAC_SOM;
1471                         lma->lma_ioepoch     = ma->ma_som->msd_ioepoch;
1472                         lma->lma_som_size    = ma->ma_som->msd_size;
1473                         lma->lma_som_blocks  = ma->ma_som->msd_blocks;
1474                         lma->lma_som_mountid = ma->ma_som->msd_mountid;
1475                 }
1476         }
1477
1478         /* Copy FID */
1479         memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1480
1481         lustre_lma_swab(lma);
1482         buf = mdd_buf_get(env, lma, lmasize);
1483         rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1484
1485         RETURN(rc);
1486 }
1487
1488 /**
1489  * Save LMA extended attributes with data from \a ma.
1490  *
1491  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1492  * not, LMA EA will be first read from disk, modified and write back.
1493  *
1494  */
1495 static int mdd_lma_set_locked(const struct lu_env *env,
1496                               struct mdd_object *mdd_obj,
1497                               const struct md_attr *ma, struct thandle *handle)
1498 {
1499         int rc;
1500
1501         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1502         rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1503         mdd_write_unlock(env, mdd_obj);
1504         return rc;
1505 }
1506
1507 /* Precedence for choosing record type when multiple
1508  * attributes change: setattr > mtime > ctime > atime
1509  * (ctime changes when mtime does, plus chmod/chown.
1510  * atime and ctime are independent.) */
1511 static int mdd_attr_set_changelog(const struct lu_env *env,
1512                                   struct md_object *obj, struct thandle *handle,
1513                                   __u64 valid)
1514 {
1515         struct mdd_device *mdd = mdo2mdd(obj);
1516         int bits, type = 0;
1517
1518         bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1519         bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1520         bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1521         bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1522         bits = bits & mdd->mdd_cl.mc_mask;
1523         if (bits == 0)
1524                 return 0;
1525
1526         /* The record type is the lowest non-masked set bit */
1527         while (bits && ((bits & 1) == 0)) {
1528                 bits = bits >> 1;
1529                 type++;
1530         }
1531
1532         /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1533         return mdd_changelog_data_store(env, mdd, type, (int)valid,
1534                                         md2mdd_obj(obj), handle);
1535 }
1536
1537 static int mdd_declare_attr_set(const struct lu_env *env,
1538                                 struct mdd_device *mdd,
1539                                 struct mdd_object *obj,
1540                                 const struct md_attr *ma,
1541                                 struct lov_mds_md *lmm,
1542                                 struct thandle *handle)
1543 {
1544         struct lu_buf  *buf = &mdd_env_info(env)->mti_buf;
1545         int             rc, i;
1546
1547         rc = mdo_declare_attr_set(env, obj, &ma->ma_attr, handle);
1548         if (rc)
1549                 return rc;
1550
1551         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1552         if (rc)
1553                 return rc;
1554
1555         if (ma->ma_valid & MA_LOV) {
1556                 buf->lb_buf = NULL;
1557                 buf->lb_len = ma->ma_lmm_size;
1558                 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
1559                                            0, handle);
1560                 if (rc)
1561                         return rc;
1562         }
1563
1564         if (ma->ma_valid & (MA_HSM | MA_SOM)) {
1565                 buf->lb_buf = NULL;
1566                 buf->lb_len = sizeof(struct lustre_mdt_attrs);
1567                 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA,
1568                                            0, handle);
1569                 if (rc)
1570                         return rc;
1571         }
1572
1573 #ifdef CONFIG_FS_POSIX_ACL
1574         if (ma->ma_attr.la_valid & LA_MODE) {
1575                 mdd_read_lock(env, obj, MOR_TGT_CHILD);
1576                 rc = mdo_xattr_get(env, obj, buf, XATTR_NAME_ACL_ACCESS,
1577                                    BYPASS_CAPA);
1578                 mdd_read_unlock(env, obj);
1579                 if (rc == -EOPNOTSUPP || rc == -ENODATA)
1580                         rc = 0;
1581                 else if (rc < 0)
1582                         return rc;
1583
1584                 if (rc != 0) {
1585                         buf->lb_buf = NULL;
1586                         buf->lb_len = rc;
1587                         rc = mdo_declare_xattr_set(env, obj, buf,
1588                                                    XATTR_NAME_ACL_ACCESS, 0,
1589                                                    handle);
1590                         if (rc)
1591                                 return rc;
1592                 }
1593         }
1594 #endif
1595
1596         /* basically the log is the same as in unlink case */
1597         if (lmm) {
1598                 __u16 stripe;
1599
1600                 if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V1 &&
1601                                 le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V3) {
1602                         CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n",
1603                                mdd->mdd_obd_dev->obd_name,
1604                                le32_to_cpu(lmm->lmm_magic),
1605                                PFID(lu_object_fid(&obj->mod_obj.mo_lu)));
1606                         return -EINVAL;
1607                 }
1608
1609                 stripe = le16_to_cpu(lmm->lmm_stripe_count);
1610                 if (stripe == LOV_ALL_STRIPES) {
1611                         struct lov_desc *ldesc;
1612
1613                         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
1614                         LASSERT(ldesc != NULL);
1615                         stripe = ldesc->ld_tgt_count;
1616                 }
1617
1618                 for (i = 0; i < stripe; i++) {
1619                         rc = mdd_declare_llog_record(env, mdd,
1620                                         sizeof(struct llog_unlink_rec),
1621                                         handle);
1622                         if (rc)
1623                                 return rc;
1624                 }
1625         }
1626
1627         return rc;
1628 }
1629
1630 /* set attr and LOV EA at once, return updated attr */
1631 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1632                         const struct md_attr *ma)
1633 {
1634         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1635         struct mdd_device *mdd = mdo2mdd(obj);
1636         struct thandle *handle;
1637         struct lov_mds_md *lmm = NULL;
1638         struct llog_cookie *logcookies = NULL;
1639         int  rc, lmm_size = 0, cookie_size = 0;
1640         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1641 #ifdef HAVE_QUOTA_SUPPORT
1642         struct obd_device *obd = mdd->mdd_obd_dev;
1643         struct mds_obd *mds = &obd->u.mds;
1644         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1645         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1646         int quota_opc = 0, block_count = 0;
1647         int inode_pending[MAXQUOTAS] = { 0, 0 };
1648         int block_pending[MAXQUOTAS] = { 0, 0 };
1649 #endif
1650         ENTRY;
1651
1652         *la_copy = ma->ma_attr;
1653         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1654         if (rc != 0)
1655                 RETURN(rc);
1656
1657         /* setattr on "close" only change atime, or do nothing */
1658         if (ma->ma_valid == MA_INODE &&
1659             ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
1660                 RETURN(0);
1661
1662         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1663             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1664                 lmm_size = mdd_lov_mdsize(env, mdd);
1665                 lmm = mdd_max_lmm_get(env, mdd);
1666                 if (lmm == NULL)
1667                         RETURN(-ENOMEM);
1668
1669                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1670                                 XATTR_NAME_LOV);
1671
1672                 if (rc < 0)
1673                         RETURN(rc);
1674         }
1675
1676         handle = mdd_trans_create(env, mdd);
1677         if (IS_ERR(handle))
1678                 RETURN(PTR_ERR(handle));
1679
1680         rc = mdd_declare_attr_set(env, mdd, mdd_obj, ma,
1681                                   lmm_size > 0 ? lmm : NULL, handle);
1682         if (rc)
1683                 GOTO(stop, rc);
1684
1685         rc = mdd_trans_start(env, mdd, handle);
1686         if (rc)
1687                 GOTO(stop, rc);
1688
1689         /* permission changes may require sync operation */
1690         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1691                 handle->th_sync |= !!mdd->mdd_sync_permission;
1692
1693         if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1694                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1695                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1696
1697 #ifdef HAVE_QUOTA_SUPPORT
1698         if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1699                 struct obd_export *exp = md_quota(env)->mq_exp;
1700                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1701
1702                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1703                 if (!rc) {
1704                         quota_opc = FSFILT_OP_SETATTR;
1705                         mdd_quota_wrapper(la_copy, qnids);
1706                         mdd_quota_wrapper(la_tmp, qoids);
1707                         /* get file quota for new owner */
1708                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1709                                         qnids, inode_pending, 1, NULL, 0,
1710                                         NULL, 0);
1711                         block_count = (la_tmp->la_blocks + 7) >> 3;
1712                         if (block_count) {
1713                                 void *data = NULL;
1714                                 mdd_data_get(env, mdd_obj, &data);
1715                                 /* get block quota for new owner */
1716                                 lquota_chkquota(mds_quota_interface_ref, obd,
1717                                                 exp, qnids, block_pending,
1718                                                 block_count, NULL,
1719                                                 LQUOTA_FLAGS_BLK, data, 1);
1720                         }
1721                 }
1722         }
1723 #endif
1724
1725         if (la_copy->la_valid & LA_FLAGS) {
1726                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1727                                                   handle, 1);
1728                 if (rc == 0)
1729                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1730         } else if (la_copy->la_valid) {            /* setattr */
1731                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1732                                                   handle, 1);
1733                 /* journal chown/chgrp in llog, just like unlink */
1734                 if (rc == 0 && lmm_size){
1735                         cookie_size = mdd_lov_cookiesize(env, mdd);
1736                         logcookies = mdd_max_cookie_get(env, mdd);
1737                         if (logcookies == NULL)
1738                                 GOTO(cleanup, rc = -ENOMEM);
1739
1740                         if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1741                                             logcookies, cookie_size) <= 0)
1742                                 logcookies = NULL;
1743                 }
1744         }
1745
1746         if (rc == 0 && ma->ma_valid & MA_LOV) {
1747                 cfs_umode_t mode;
1748
1749                 mode = mdd_object_type(mdd_obj);
1750                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1751                         rc = mdd_lsm_sanity_check(env, mdd_obj);
1752                         if (rc)
1753                                 GOTO(cleanup, rc);
1754
1755                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1756                                             ma->ma_lmm_size, handle, 1);
1757                 }
1758
1759         }
1760         if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1761                 cfs_umode_t mode;
1762
1763                 mode = mdd_object_type(mdd_obj);
1764                 if (S_ISREG(mode))
1765                         rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1766
1767         }
1768 cleanup:
1769         if (rc == 0)
1770                 rc = mdd_attr_set_changelog(env, obj, handle,
1771                                             ma->ma_attr.la_valid);
1772 stop:
1773         mdd_trans_stop(env, mdd, rc, handle);
1774         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1775                 /*set obd attr, if needed*/
1776                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1777                                            logcookies);
1778         }
1779 #ifdef HAVE_QUOTA_SUPPORT
1780         if (quota_opc) {
1781                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1782                                       inode_pending, 0);
1783                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1784                                       block_pending, 1);
1785                 /* Trigger dqrel/dqacq for original owner and new owner.
1786                  * If failed, the next call for lquota_chkquota will
1787                  * process it. */
1788                 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1789                               quota_opc);
1790         }
1791 #endif
1792         RETURN(rc);
1793 }
1794
1795 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1796                       const struct lu_buf *buf, const char *name, int fl,
1797                       struct thandle *handle)
1798 {
1799         int  rc;
1800         ENTRY;
1801
1802         mdd_write_lock(env, obj, MOR_TGT_CHILD);
1803         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1804         mdd_write_unlock(env, obj);
1805
1806         RETURN(rc);
1807 }
1808
1809 static int mdd_xattr_sanity_check(const struct lu_env *env,
1810                                   struct mdd_object *obj)
1811 {
1812         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1813         struct md_ucred *uc     = md_ucred(env);
1814         int rc;
1815         ENTRY;
1816
1817         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1818                 RETURN(-EPERM);
1819
1820         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1821         if (rc)
1822                 RETURN(rc);
1823
1824         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1825             !mdd_capable(uc, CFS_CAP_FOWNER))
1826                 RETURN(-EPERM);
1827
1828         RETURN(rc);
1829 }
1830
1831 static int mdd_declare_xattr_set(const struct lu_env *env,
1832                                  struct mdd_device *mdd,
1833                                  struct mdd_object *obj,
1834                                  const struct lu_buf *buf,
1835                                  const char *name,
1836                                  struct thandle *handle)
1837
1838 {
1839         int rc;
1840
1841         rc = mdo_declare_xattr_set(env, obj, buf, name, 0, handle);
1842         if (rc)
1843                 return rc;
1844
1845         /* Only record user xattr changes */
1846         if ((strncmp("user.", name, 5) == 0))
1847                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1848
1849         return rc;
1850 }
1851
1852 /**
1853  * The caller should guarantee to update the object ctime
1854  * after xattr_set if needed.
1855  */
1856 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1857                          const struct lu_buf *buf, const char *name,
1858                          int fl)
1859 {
1860         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1861         struct mdd_device *mdd = mdo2mdd(obj);
1862         struct thandle *handle;
1863         int  rc;
1864         ENTRY;
1865
1866         rc = mdd_xattr_sanity_check(env, mdd_obj);
1867         if (rc)
1868                 RETURN(rc);
1869
1870         handle = mdd_trans_create(env, mdd);
1871         if (IS_ERR(handle))
1872                 RETURN(PTR_ERR(handle));
1873
1874         rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, handle);
1875         if (rc)
1876                 GOTO(stop, rc);
1877
1878         rc = mdd_trans_start(env, mdd, handle);
1879         if (rc)
1880                 GOTO(stop, rc);
1881
1882         /* security-replated changes may require sync */
1883         if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1884                 handle->th_sync |= !!mdd->mdd_sync_permission;
1885
1886         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1887
1888         /* Only record system & user xattr changes */
1889         if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1890                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1891                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1892                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1893                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1894                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1895                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1896                                               handle);
1897
1898 stop:
1899         mdd_trans_stop(env, mdd, rc, handle);
1900
1901         RETURN(rc);
1902 }
1903
1904 static int mdd_declare_xattr_del(const struct lu_env *env,
1905                                  struct mdd_device *mdd,
1906                                  struct mdd_object *obj,
1907                                  const char *name,
1908                                  struct thandle *handle)
1909 {
1910         int rc;
1911
1912         rc = mdo_declare_xattr_del(env, obj, name, handle);
1913         if (rc)
1914                 return rc;
1915
1916         /* Only record user xattr changes */
1917         if ((strncmp("user.", name, 5) == 0))
1918                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1919
1920         return rc;
1921 }
1922
1923 /**
1924  * The caller should guarantee to update the object ctime
1925  * after xattr_set if needed.
1926  */
1927 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1928                   const char *name)
1929 {
1930         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1931         struct mdd_device *mdd = mdo2mdd(obj);
1932         struct thandle *handle;
1933         int  rc;
1934         ENTRY;
1935
1936         rc = mdd_xattr_sanity_check(env, mdd_obj);
1937         if (rc)
1938                 RETURN(rc);
1939
1940         handle = mdd_trans_create(env, mdd);
1941         if (IS_ERR(handle))
1942                 RETURN(PTR_ERR(handle));
1943
1944         rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
1945         if (rc)
1946                 GOTO(stop, rc);
1947
1948         rc = mdd_trans_start(env, mdd, handle);
1949         if (rc)
1950                 GOTO(stop, rc);
1951
1952         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1953         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1954                            mdd_object_capa(env, mdd_obj));
1955         mdd_write_unlock(env, mdd_obj);
1956
1957         /* Only record system & user xattr changes */
1958         if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
1959                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1960                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1961                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1962                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1963                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
1964                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1965                                               handle);
1966
1967 stop:
1968         mdd_trans_stop(env, mdd, rc, handle);
1969
1970         RETURN(rc);
1971 }
1972
1973 /* partial unlink */
1974 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1975                        struct md_attr *ma)
1976 {
1977         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1978         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1979         struct mdd_device *mdd = mdo2mdd(obj);
1980         struct thandle *handle;
1981 #ifdef HAVE_QUOTA_SUPPORT
1982         struct obd_device *obd = mdd->mdd_obd_dev;
1983         struct mds_obd *mds = &obd->u.mds;
1984         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1985         int quota_opc = 0;
1986 #endif
1987         int rc;
1988         ENTRY;
1989
1990         /* XXX: this code won't be used ever:
1991          * DNE uses slightly different approach */
1992         LBUG();
1993
1994         /*
1995          * Check -ENOENT early here because we need to get object type
1996          * to calculate credits before transaction start
1997          */
1998         if (mdd_object_exists(mdd_obj) == 0) {
1999                 CERROR("%s: object "DFID" not found: rc = -2\n",
2000                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2001                 RETURN(-ENOENT);
2002         }
2003
2004         LASSERT(mdd_object_exists(mdd_obj) > 0);
2005
2006         handle = mdd_trans_create(env, mdd);
2007         if (IS_ERR(handle))
2008                 RETURN(-ENOMEM);
2009
2010         rc = mdd_trans_start(env, mdd, handle);
2011
2012         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2013
2014         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
2015         if (rc)
2016                 GOTO(cleanup, rc);
2017
2018         mdo_ref_del(env, mdd_obj, handle);
2019
2020         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
2021                 /* unlink dot */
2022                 mdo_ref_del(env, mdd_obj, handle);
2023         }
2024
2025         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2026         la_copy->la_ctime = ma->ma_attr.la_ctime;
2027
2028         la_copy->la_valid = LA_CTIME;
2029         rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
2030         if (rc)
2031                 GOTO(cleanup, rc);
2032
2033         rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
2034 #ifdef HAVE_QUOTA_SUPPORT
2035         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
2036             ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
2037                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2038                 mdd_quota_wrapper(&ma->ma_attr, qids);
2039         }
2040 #endif
2041
2042
2043         EXIT;
2044 cleanup:
2045         mdd_write_unlock(env, mdd_obj);
2046         mdd_trans_stop(env, mdd, rc, handle);
2047 #ifdef HAVE_QUOTA_SUPPORT
2048         if (quota_opc)
2049                 /* Trigger dqrel on the owner of child. If failed,
2050                  * the next call for lquota_chkquota will process it */
2051                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2052                               quota_opc);
2053 #endif
2054         return rc;
2055 }
2056
2057 /* partial operation */
2058 static int mdd_oc_sanity_check(const struct lu_env *env,
2059                                struct mdd_object *obj,
2060                                struct md_attr *ma)
2061 {
2062         int rc;
2063         ENTRY;
2064
2065         switch (ma->ma_attr.la_mode & S_IFMT) {
2066         case S_IFREG:
2067         case S_IFDIR:
2068         case S_IFLNK:
2069         case S_IFCHR:
2070         case S_IFBLK:
2071         case S_IFIFO:
2072         case S_IFSOCK:
2073                 rc = 0;
2074                 break;
2075         default:
2076                 rc = -EINVAL;
2077                 break;
2078         }
2079         RETURN(rc);
2080 }
2081
2082 static int mdd_object_create(const struct lu_env *env,
2083                              struct md_object *obj,
2084                              const struct md_op_spec *spec,
2085                              struct md_attr *ma)
2086 {
2087
2088         struct mdd_device *mdd = mdo2mdd(obj);
2089         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2090         const struct lu_fid *pfid = spec->u.sp_pfid;
2091         struct thandle *handle;
2092 #ifdef HAVE_QUOTA_SUPPORT
2093         struct obd_device *obd = mdd->mdd_obd_dev;
2094         struct obd_export *exp = md_quota(env)->mq_exp;
2095         struct mds_obd *mds = &obd->u.mds;
2096         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2097         int quota_opc = 0, block_count = 0;
2098         int inode_pending[MAXQUOTAS] = { 0, 0 };
2099         int block_pending[MAXQUOTAS] = { 0, 0 };
2100 #endif
2101         int rc = 0;
2102         ENTRY;
2103
2104         /* XXX: this code won't be used ever:
2105          * DNE uses slightly different approach */
2106         LBUG();
2107
2108 #ifdef HAVE_QUOTA_SUPPORT
2109         if (mds->mds_quota) {
2110                 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
2111                 mdd_quota_wrapper(&ma->ma_attr, qids);
2112                 /* get file quota for child */
2113                 lquota_chkquota(mds_quota_interface_ref, obd, exp,
2114                                 qids, inode_pending, 1, NULL, 0,
2115                                 NULL, 0);
2116                 switch (ma->ma_attr.la_mode & S_IFMT) {
2117                 case S_IFLNK:
2118                 case S_IFDIR:
2119                         block_count = 2;
2120                         break;
2121                 case S_IFREG:
2122                         block_count = 1;
2123                         break;
2124                 }
2125                 /* get block quota for child */
2126                 if (block_count)
2127                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
2128                                         qids, block_pending, block_count,
2129                                         NULL, LQUOTA_FLAGS_BLK, NULL, 0);
2130         }
2131 #endif
2132
2133         handle = mdd_trans_create(env, mdd);
2134         if (IS_ERR(handle))
2135                 GOTO(out_pending, rc = PTR_ERR(handle));
2136
2137         rc = mdd_trans_start(env, mdd, handle);
2138
2139         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2140         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
2141         if (rc)
2142                 GOTO(unlock, rc);
2143
2144         rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
2145         if (rc)
2146                 GOTO(unlock, rc);
2147
2148         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
2149                 /* If creating the slave object, set slave EA here. */
2150                 int lmv_size = spec->u.sp_ea.eadatalen;
2151                 struct lmv_stripe_md *lmv;
2152
2153                 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
2154                 LASSERT(lmv != NULL && lmv_size > 0);
2155
2156                 rc = __mdd_xattr_set(env, mdd_obj,
2157                                      mdd_buf_get_const(env, lmv, lmv_size),
2158                                      XATTR_NAME_LMV, 0, handle);
2159                 if (rc)
2160                         GOTO(unlock, rc);
2161
2162                 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
2163                                            handle, 0);
2164         } else {
2165 #ifdef CONFIG_FS_POSIX_ACL
2166                 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
2167                         struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
2168
2169                         buf->lb_buf = (void *)spec->u.sp_ea.eadata;
2170                         buf->lb_len = spec->u.sp_ea.eadatalen;
2171                         if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
2172                                 rc = __mdd_acl_init(env, mdd_obj, buf,
2173                                                     &ma->ma_attr.la_mode,
2174                                                     handle);
2175                                 if (rc)
2176                                         GOTO(unlock, rc);
2177                                 else
2178                                         ma->ma_attr.la_valid |= LA_MODE;
2179                         }
2180
2181                         pfid = spec->u.sp_ea.fid;
2182                 }
2183 #endif
2184                 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
2185                                            spec);
2186         }
2187         EXIT;
2188 unlock:
2189         if (rc == 0)
2190                 rc = mdd_attr_get_internal(env, mdd_obj, ma);
2191         mdd_write_unlock(env, mdd_obj);
2192
2193         mdd_trans_stop(env, mdd, rc, handle);
2194 out_pending:
2195 #ifdef HAVE_QUOTA_SUPPORT
2196         if (quota_opc) {
2197                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2198                                       inode_pending, 0);
2199                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
2200                                       block_pending, 1);
2201                 /* Trigger dqacq on the owner of child. If failed,
2202                  * the next call for lquota_chkquota will process it. */
2203                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2204                               quota_opc);
2205         }
2206 #endif
2207         return rc;
2208 }
2209
2210 /* partial link */
2211 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
2212                        const struct md_attr *ma)
2213 {
2214         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
2215         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2216         struct mdd_device *mdd = mdo2mdd(obj);
2217         struct thandle *handle;
2218         int rc;
2219         ENTRY;
2220
2221         /* XXX: this code won't be used ever:
2222          * DNE uses slightly different approach */
2223         LBUG();
2224
2225         handle = mdd_trans_create(env, mdd);
2226         if (IS_ERR(handle))
2227                 RETURN(-ENOMEM);
2228
2229         rc = mdd_trans_start(env, mdd, handle);
2230
2231         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2232         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
2233         if (rc == 0)
2234                 mdo_ref_add(env, mdd_obj, handle);
2235         mdd_write_unlock(env, mdd_obj);
2236         if (rc == 0) {
2237                 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
2238                 la_copy->la_ctime = ma->ma_attr.la_ctime;
2239
2240                 la_copy->la_valid = LA_CTIME;
2241                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
2242                                                         handle, 0);
2243         }
2244         mdd_trans_stop(env, mdd, 0, handle);
2245
2246         RETURN(rc);
2247 }
2248
2249 /*
2250  * do NOT or the MAY_*'s, you'll get the weakest
2251  */
2252 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
2253 {
2254         int res = 0;
2255
2256         /* Sadly, NFSD reopens a file repeatedly during operation, so the
2257          * "acc_mode = 0" allowance for newly-created files isn't honoured.
2258          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
2259          * owner can write to a file even if it is marked readonly to hide
2260          * its brokenness. (bug 5781) */
2261         if (flags & MDS_OPEN_OWNEROVERRIDE) {
2262                 struct md_ucred *uc = md_ucred(env);
2263
2264                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
2265                     (la->la_uid == uc->mu_fsuid))
2266                         return 0;
2267         }
2268
2269         if (flags & FMODE_READ)
2270                 res |= MAY_READ;
2271         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
2272                 res |= MAY_WRITE;
2273         if (flags & MDS_FMODE_EXEC)
2274                 res = MAY_EXEC;
2275         return res;
2276 }
2277
2278 static int mdd_open_sanity_check(const struct lu_env *env,
2279                                  struct mdd_object *obj, int flag)
2280 {
2281         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
2282         int mode, rc;
2283         ENTRY;
2284
2285         /* EEXIST check */
2286         if (mdd_is_dead_obj(obj))
2287                 RETURN(-ENOENT);
2288
2289         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
2290         if (rc)
2291                RETURN(rc);
2292
2293         if (S_ISLNK(tmp_la->la_mode))
2294                 RETURN(-ELOOP);
2295
2296         mode = accmode(env, tmp_la, flag);
2297
2298         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
2299                 RETURN(-EISDIR);
2300
2301         if (!(flag & MDS_OPEN_CREATED)) {
2302                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
2303                 if (rc)
2304                         RETURN(rc);
2305         }
2306
2307         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
2308             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
2309                 flag &= ~MDS_OPEN_TRUNC;
2310
2311         /* For writing append-only file must open it with append mode. */
2312         if (mdd_is_append(obj)) {
2313                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
2314                         RETURN(-EPERM);
2315                 if (flag & MDS_OPEN_TRUNC)
2316                         RETURN(-EPERM);
2317         }
2318
2319 #if 0
2320         /*
2321          * Now, flag -- O_NOATIME does not be packed by client.
2322          */
2323         if (flag & O_NOATIME) {
2324                 struct md_ucred *uc = md_ucred(env);
2325
2326                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
2327                     (uc->mu_valid == UCRED_NEW)) &&
2328                     (uc->mu_fsuid != tmp_la->la_uid) &&
2329                     !mdd_capable(uc, CFS_CAP_FOWNER))
2330                         RETURN(-EPERM);
2331         }
2332 #endif
2333
2334         RETURN(0);
2335 }
2336
2337 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2338                     int flags)
2339 {
2340         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2341         int rc = 0;
2342
2343         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2344
2345         rc = mdd_open_sanity_check(env, mdd_obj, flags);
2346         if (rc == 0)
2347                 mdd_obj->mod_count++;
2348
2349         mdd_write_unlock(env, mdd_obj);
2350         return rc;
2351 }
2352
2353 int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
2354                             struct md_attr *ma, struct thandle *handle)
2355 {
2356         int rc;
2357
2358         rc = mdd_declare_unlink_log(env, obj, ma, handle);
2359         if (rc)
2360                 return rc;
2361
2362         return mdo_declare_destroy(env, obj, handle);
2363 }
2364
2365 /* return md_attr back,
2366  * if it is last unlink then return lov ea + llog cookie*/
2367 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2368                     struct md_attr *ma, struct thandle *handle)
2369 {
2370         int rc = 0;
2371         ENTRY;
2372
2373         if (S_ISREG(mdd_object_type(obj))) {
2374                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2375                  * Caller must be ready for that. */
2376                 rc = __mdd_lmm_get(env, obj, ma);
2377                 if ((ma->ma_valid & MA_LOV))
2378                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2379                                             obj, ma);
2380         }
2381
2382         if (rc == 0)
2383                 rc = mdo_destroy(env, obj, handle);
2384
2385         RETURN(rc);
2386 }
2387
2388 static int mdd_declare_close(const struct lu_env *env,
2389                              struct mdd_object *obj,
2390                              struct md_attr *ma,
2391                              struct thandle *handle)
2392 {
2393         int rc;
2394
2395         rc = orph_declare_index_delete(env, obj, handle);
2396         if (rc)
2397                 return rc;
2398
2399         return mdd_declare_object_kill(env, obj, ma, handle);
2400 }
2401
2402 /*
2403  * No permission check is needed.
2404  */
2405 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2406                      struct md_attr *ma, int mode)
2407 {
2408         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2409         struct mdd_device *mdd = mdo2mdd(obj);
2410         struct thandle    *handle = NULL;
2411         int rc;
2412         int is_orphan = 0, reset = 1;
2413
2414 #ifdef HAVE_QUOTA_SUPPORT
2415         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2416         struct mds_obd *mds = &obd->u.mds;
2417         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2418         int quota_opc = 0;
2419 #endif
2420         ENTRY;
2421
2422         if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
2423                 mdd_obj->mod_count--;
2424
2425                 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
2426                         CDEBUG(D_HA, "Object "DFID" is retained in orphan "
2427                                "list\n", PFID(mdd_object_fid(mdd_obj)));
2428                 RETURN(0);
2429         }
2430
2431         /* check without any lock */
2432         if (mdd_obj->mod_count == 1 &&
2433             (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
2434  again:
2435                 handle = mdd_trans_create(env, mdo2mdd(obj));
2436                 if (IS_ERR(handle))
2437                         RETURN(PTR_ERR(handle));
2438
2439                 rc = mdd_declare_close(env, mdd_obj, ma, handle);
2440                 if (rc)
2441                         GOTO(stop, rc);
2442
2443                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
2444                 if (rc)
2445                         GOTO(stop, rc);
2446
2447                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2448                 if (rc)
2449                         GOTO(stop, rc);
2450         }
2451
2452         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2453         if (handle == NULL && mdd_obj->mod_count == 1 &&
2454             (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
2455                 mdd_write_unlock(env, mdd_obj);
2456                 goto again;
2457         }
2458
2459         /* release open count */
2460         mdd_obj->mod_count --;
2461
2462         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2463                 /* remove link to object from orphan index */
2464                 LASSERT(handle != NULL);
2465                 rc = __mdd_orphan_del(env, mdd_obj, handle);
2466                 if (rc == 0) {
2467                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2468                                "list, OSS objects to be destroyed.\n",
2469                                PFID(mdd_object_fid(mdd_obj)));
2470                         is_orphan = 1;
2471                 } else {
2472                         CERROR("Object "DFID" can not be deleted from orphan "
2473                                 "list, maybe cause OST objects can not be "
2474                                 "destroyed (err: %d).\n",
2475                                 PFID(mdd_object_fid(mdd_obj)), rc);
2476                         /* If object was not deleted from orphan list, do not
2477                          * destroy OSS objects, which will be done when next
2478                          * recovery. */
2479                         GOTO(out, rc);
2480                 }
2481         }
2482
2483         rc = mdd_iattr_get(env, mdd_obj, ma);
2484         /* Object maybe not in orphan list originally, it is rare case for
2485          * mdd_finish_unlink() failure. */
2486         if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_orphan)) {
2487 #ifdef HAVE_QUOTA_SUPPORT
2488                 if (mds->mds_quota) {
2489                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2490                         mdd_quota_wrapper(&ma->ma_attr, qids);
2491                 }
2492 #endif
2493                 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2494                 if (ma->ma_valid & MA_FLAGS &&
2495                     ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2496                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2497                 } else {
2498                         if (handle == NULL) {
2499                                 handle = mdd_trans_create(env, mdo2mdd(obj));
2500                                 if (IS_ERR(handle))
2501                                         GOTO(out, rc = PTR_ERR(handle));
2502
2503                                 rc = mdd_declare_object_kill(env, mdd_obj, ma,
2504                                                              handle);
2505                                 if (rc)
2506                                         GOTO(out, rc);
2507
2508                                 rc = mdd_declare_changelog_store(env, mdd,
2509                                                                  NULL, handle);
2510                                 if (rc)
2511                                         GOTO(stop, rc);
2512
2513                                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2514                                 if (rc)
2515                                         GOTO(out, rc);
2516                         }
2517
2518                         rc = mdd_object_kill(env, mdd_obj, ma, handle);
2519                         if (rc == 0)
2520                                 reset = 0;
2521                 }
2522
2523                 if (rc != 0)
2524                         CERROR("Error when prepare to delete Object "DFID" , "
2525                                "which will cause OST objects can not be "
2526                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
2527         }
2528         EXIT;
2529
2530 out:
2531         if (reset)
2532                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2533
2534         mdd_write_unlock(env, mdd_obj);
2535
2536         if (rc == 0 &&
2537             (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
2538             !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
2539                 if (handle == NULL) {
2540                         handle = mdd_trans_create(env, mdo2mdd(obj));
2541                         if (IS_ERR(handle))
2542                                 GOTO(stop, rc = IS_ERR(handle));
2543
2544                         rc = mdd_declare_changelog_store(env, mdd, NULL,
2545                                                          handle);
2546                         if (rc)
2547                                 GOTO(stop, rc);
2548
2549                         rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2550                         if (rc)
2551                                 GOTO(stop, rc);
2552                 }
2553
2554                 mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
2555                                          mdd_obj, handle);
2556         }
2557
2558 stop:
2559         if (handle != NULL)
2560                 mdd_trans_stop(env, mdd, rc, handle);
2561 #ifdef HAVE_QUOTA_SUPPORT
2562         if (quota_opc)
2563                 /* Trigger dqrel on the owner of child. If failed,
2564                  * the next call for lquota_chkquota will process it */
2565                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2566                               quota_opc);
2567 #endif
2568         return rc;
2569 }
2570
2571 /*
2572  * Permission check is done when open,
2573  * no need check again.
2574  */
2575 static int mdd_readpage_sanity_check(const struct lu_env *env,
2576                                      struct mdd_object *obj)
2577 {
2578         struct dt_object *next = mdd_object_child(obj);
2579         int rc;
2580         ENTRY;
2581
2582         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2583                 rc = 0;
2584         else
2585                 rc = -ENOTDIR;
2586
2587         RETURN(rc);
2588 }
2589
2590 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2591                               struct lu_dirpage *dp, int nob,
2592                               const struct dt_it_ops *iops, struct dt_it *it,
2593                               __u32 attr)
2594 {
2595         void                   *area = dp;
2596         int                     result;
2597         __u64                   hash = 0;
2598         struct lu_dirent       *ent;
2599         struct lu_dirent       *last = NULL;
2600         int                     first = 1;
2601
2602         memset(area, 0, sizeof (*dp));
2603         area += sizeof (*dp);
2604         nob  -= sizeof (*dp);
2605
2606         ent  = area;
2607         do {
2608                 int    len;
2609                 int    recsize;
2610
2611                 len = iops->key_size(env, it);
2612
2613                 /* IAM iterator can return record with zero len. */
2614                 if (len == 0)
2615                         goto next;
2616
2617                 hash = iops->store(env, it);
2618                 if (unlikely(first)) {
2619                         first = 0;
2620                         dp->ldp_hash_start = cpu_to_le64(hash);
2621                 }
2622
2623                 /* calculate max space required for lu_dirent */
2624                 recsize = lu_dirent_calc_size(len, attr);
2625
2626                 if (nob >= recsize) {
2627                         result = iops->rec(env, it, (struct dt_rec *)ent, attr);
2628                         if (result == -ESTALE)
2629                                 goto next;
2630                         if (result != 0)
2631                                 goto out;
2632
2633                         /* osd might not able to pack all attributes,
2634                          * so recheck rec length */
2635                         recsize = le16_to_cpu(ent->lde_reclen);
2636                 } else {
2637                         result = (last != NULL) ? 0 :-EINVAL;
2638                         goto out;
2639                 }
2640                 last = ent;
2641                 ent = (void *)ent + recsize;
2642                 nob -= recsize;
2643
2644 next:
2645                 result = iops->next(env, it);
2646                 if (result == -ESTALE)
2647                         goto next;
2648         } while (result == 0);
2649
2650 out:
2651         dp->ldp_hash_end = cpu_to_le64(hash);
2652         if (last != NULL) {
2653                 if (last->lde_hash == dp->ldp_hash_end)
2654                         dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2655                 last->lde_reclen = 0; /* end mark */
2656         }
2657         return result;
2658 }
2659
2660 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2661                           const struct lu_rdpg *rdpg)
2662 {
2663         struct dt_it      *it;
2664         struct dt_object  *next = mdd_object_child(obj);
2665         const struct dt_it_ops  *iops;
2666         struct page       *pg;
2667         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2668         int i;
2669         int nlupgs = 0;
2670         int rc;
2671         int nob;
2672
2673         LASSERT(rdpg->rp_pages != NULL);
2674         LASSERT(next->do_index_ops != NULL);
2675
2676         if (rdpg->rp_count <= 0)
2677                 return -EFAULT;
2678
2679         /*
2680          * iterate through directory and fill pages from @rdpg
2681          */
2682         iops = &next->do_index_ops->dio_it;
2683         it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
2684         if (IS_ERR(it))
2685                 return PTR_ERR(it);
2686
2687         rc = iops->load(env, it, rdpg->rp_hash);
2688
2689         if (rc == 0) {
2690                 /*
2691                  * Iterator didn't find record with exactly the key requested.
2692                  *
2693                  * It is currently either
2694                  *
2695                  *     - positioned above record with key less than
2696                  *     requested---skip it.
2697                  *
2698                  *     - or not positioned at all (is in IAM_IT_SKEWED
2699                  *     state)---position it on the next item.
2700                  */
2701                 rc = iops->next(env, it);
2702         } else if (rc > 0)
2703                 rc = 0;
2704
2705         /*
2706          * At this point and across for-loop:
2707          *
2708          *  rc == 0 -> ok, proceed.
2709          *  rc >  0 -> end of directory.
2710          *  rc <  0 -> error.
2711          */
2712         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2713              i++, nob -= CFS_PAGE_SIZE) {
2714                 struct lu_dirpage *dp;
2715
2716                 LASSERT(i < rdpg->rp_npages);
2717                 pg = rdpg->rp_pages[i];
2718                 dp = cfs_kmap(pg);
2719 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2720 repeat:
2721 #endif
2722                 rc = mdd_dir_page_build(env, mdd, dp,
2723                                         min_t(int, nob, LU_PAGE_SIZE),
2724                                         iops, it, rdpg->rp_attrs);
2725                 if (rc > 0) {
2726                         /*
2727                          * end of directory.
2728                          */
2729                         dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2730                         nlupgs++;
2731                 } else if (rc < 0) {
2732                         CWARN("build page failed: %d!\n", rc);
2733                 } else {
2734                         nlupgs++;
2735 #if CFS_PAGE_SIZE > LU_PAGE_SIZE
2736                         dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2737                         if ((unsigned long)dp & ~CFS_PAGE_MASK)
2738                                 goto repeat;
2739 #endif
2740                 }
2741                 cfs_kunmap(pg);
2742         }
2743         if (rc >= 0) {
2744                 struct lu_dirpage *dp;
2745
2746                 dp = cfs_kmap(rdpg->rp_pages[0]);
2747                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2748                 if (nlupgs == 0) {
2749                         /*
2750                          * No pages were processed, mark this for first page
2751                          * and send back.
2752                          */
2753                         dp->ldp_flags  = cpu_to_le32(LDF_EMPTY);
2754                         nlupgs = 1;
2755                 }
2756                 cfs_kunmap(rdpg->rp_pages[0]);
2757
2758                 rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
2759         }
2760         iops->put(env, it);
2761         iops->fini(env, it);
2762
2763         return rc;
2764 }
2765
2766 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2767                  const struct lu_rdpg *rdpg)
2768 {
2769         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2770         int rc;
2771         ENTRY;
2772
2773         if (mdd_object_exists(mdd_obj) == 0) {
2774                 CERROR("%s: object "DFID" not found: rc = -2\n",
2775                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2776                 return -ENOENT;
2777         }
2778
2779         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2780         rc = mdd_readpage_sanity_check(env, mdd_obj);
2781         if (rc)
2782                 GOTO(out_unlock, rc);
2783
2784         if (mdd_is_dead_obj(mdd_obj)) {
2785                 struct page *pg;
2786                 struct lu_dirpage *dp;
2787
2788                 /*
2789                  * According to POSIX, please do not return any entry to client:
2790                  * even dot and dotdot should not be returned.
2791                  */
2792                 CWARN("readdir from dead object: "DFID"\n",
2793                         PFID(mdd_object_fid(mdd_obj)));
2794
2795                 if (rdpg->rp_count <= 0)
2796                         GOTO(out_unlock, rc = -EFAULT);
2797                 LASSERT(rdpg->rp_pages != NULL);
2798
2799                 pg = rdpg->rp_pages[0];
2800                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2801                 memset(dp, 0 , sizeof(struct lu_dirpage));
2802                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2803                 dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
2804                 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2805                 cfs_kunmap(pg);
2806                 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2807         }
2808
2809         rc = __mdd_readpage(env, mdd_obj, rdpg);
2810
2811         EXIT;
2812 out_unlock:
2813         mdd_read_unlock(env, mdd_obj);
2814         return rc;
2815 }
2816
2817 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2818 {
2819         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2820
2821         if (mdd_object_exists(mdd_obj) == 0) {
2822                 CERROR("%s: object "DFID" not found: rc = -2\n",
2823                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2824                 return -ENOENT;
2825         }
2826         return dt_object_sync(env, mdd_object_child(mdd_obj));
2827 }
2828
2829 const struct md_object_operations mdd_obj_ops = {
2830         .moo_permission    = mdd_permission,
2831         .moo_attr_get      = mdd_attr_get,
2832         .moo_attr_set      = mdd_attr_set,
2833         .moo_xattr_get     = mdd_xattr_get,
2834         .moo_xattr_set     = mdd_xattr_set,
2835         .moo_xattr_list    = mdd_xattr_list,
2836         .moo_xattr_del     = mdd_xattr_del,
2837         .moo_object_create = mdd_object_create,
2838         .moo_ref_add       = mdd_ref_add,
2839         .moo_ref_del       = mdd_ref_del,
2840         .moo_open          = mdd_open,
2841         .moo_close         = mdd_close,
2842         .moo_readpage      = mdd_readpage,
2843         .moo_readlink      = mdd_readlink,
2844         .moo_changelog     = mdd_changelog,
2845         .moo_capa_get      = mdd_capa_get,
2846         .moo_object_sync   = mdd_object_sync,
2847         .moo_path          = mdd_path,
2848         .moo_file_lock     = mdd_file_lock,
2849         .moo_file_unlock   = mdd_file_unlock,
2850 };