Whamcloud - gitweb
- land b_hd_ver_recov
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_object.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49 #include <linux/jbd.h>
50 #include <obd.h>
51 #include <obd_class.h>
52 #include <obd_support.h>
53 #include <lprocfs_status.h>
54 /* fid_be_cpu(), fid_cpu_to_be(). */
55 #include <lustre_fid.h>
56
57 #include <lustre_param.h>
58 #include <linux/ldiskfs_fs.h>
59 #include <lustre_mds.h>
60 #include <lustre/lustre_idl.h>
61
62 #include "mdd_internal.h"
63
64 static const struct lu_object_operations mdd_lu_obj_ops;
65
66 static int mdd_xattr_get(const struct lu_env *env,
67                          struct md_object *obj, struct lu_buf *buf,
68                          const char *name);
69
70 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
71                  void **data)
72 {
73         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
74                  PFID(mdd_object_fid(obj)));
75         mdo_data_get(env, obj, data);
76         return 0;
77 }
78
79 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
80                struct lu_attr *la, struct lustre_capa *capa)
81 {
82         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
83                  PFID(mdd_object_fid(obj)));
84         return mdo_attr_get(env, obj, la, capa);
85 }
86
87 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
88 {
89         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
90
91         if (flags & LUSTRE_APPEND_FL)
92                 obj->mod_flags |= APPEND_OBJ;
93
94         if (flags & LUSTRE_IMMUTABLE_FL)
95                 obj->mod_flags |= IMMUTE_OBJ;
96 }
97
98 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
99 {
100         struct mdd_thread_info *info;
101
102         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
103         LASSERT(info != NULL);
104         return info;
105 }
106
107 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
108 {
109         struct lu_buf *buf;
110
111         buf = &mdd_env_info(env)->mti_buf;
112         buf->lb_buf = area;
113         buf->lb_len = len;
114         return buf;
115 }
116
117 void mdd_buf_put(struct lu_buf *buf)
118 {
119         if (buf == NULL || buf->lb_buf == NULL)
120                 return;
121         if (buf->lb_vmalloc)
122                 OBD_VFREE(buf->lb_buf, buf->lb_len);
123         else
124                 OBD_FREE(buf->lb_buf, buf->lb_len);
125         buf->lb_buf = NULL;
126 }
127
128 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
129                                        const void *area, ssize_t len)
130 {
131         struct lu_buf *buf;
132
133         buf = &mdd_env_info(env)->mti_buf;
134         buf->lb_buf = (void *)area;
135         buf->lb_len = len;
136         return buf;
137 }
138
139 #define BUF_VMALLOC_SIZE (CFS_PAGE_SIZE<<2) /* 16k */
140 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
141 {
142         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
143
144         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
145                 if (buf->lb_vmalloc)
146                         OBD_VFREE(buf->lb_buf, buf->lb_len);
147                 else
148                         OBD_FREE(buf->lb_buf, buf->lb_len);
149                 buf->lb_buf = NULL;
150         }
151         if (buf->lb_buf == NULL) {
152                 buf->lb_len = len;
153                 if (buf->lb_len <= BUF_VMALLOC_SIZE) {
154                         OBD_ALLOC(buf->lb_buf, buf->lb_len);
155                         buf->lb_vmalloc = 0;
156                 }
157                 if (buf->lb_buf == NULL) {
158                         OBD_VMALLOC(buf->lb_buf, buf->lb_len);
159                         buf->lb_vmalloc = 1;
160                 }
161                 if (buf->lb_buf == NULL)
162                         buf->lb_len = 0;
163         }
164         return buf;
165 }
166
167 /** Increase the size of the \a mti_big_buf.
168  * preserves old data in buffer
169  * old buffer remains unchanged on error
170  * \retval 0 or -ENOMEM
171  */
172 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
173 {
174         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
175         struct lu_buf buf;
176
177         LASSERT(len >= oldbuf->lb_len);
178         if (len > BUF_VMALLOC_SIZE) {
179                 OBD_VMALLOC(buf.lb_buf, len);
180                 buf.lb_vmalloc = 1;
181         } else {
182                 OBD_ALLOC(buf.lb_buf, len);
183                 buf.lb_vmalloc = 0;
184         }
185         if (buf.lb_buf == NULL)
186                 return -ENOMEM;
187
188         buf.lb_len = len;
189         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
190
191         if (oldbuf->lb_vmalloc)
192                 OBD_VFREE(oldbuf->lb_buf, oldbuf->lb_len);
193         else
194                 OBD_FREE(oldbuf->lb_buf, oldbuf->lb_len);
195
196         memcpy(oldbuf, &buf, sizeof(buf));
197
198         return 0;
199 }
200
201 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
202                                        struct mdd_device *mdd)
203 {
204         struct mdd_thread_info *mti = mdd_env_info(env);
205         int                     max_cookie_size;
206
207         max_cookie_size = mdd_lov_cookiesize(env, mdd);
208         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
209                 if (mti->mti_max_cookie)
210                         OBD_FREE(mti->mti_max_cookie, mti->mti_max_cookie_size);
211                 mti->mti_max_cookie = NULL;
212                 mti->mti_max_cookie_size = 0;
213         }
214         if (unlikely(mti->mti_max_cookie == NULL)) {
215                 OBD_ALLOC(mti->mti_max_cookie, max_cookie_size);
216                 if (likely(mti->mti_max_cookie != NULL))
217                         mti->mti_max_cookie_size = max_cookie_size;
218         }
219         if (likely(mti->mti_max_cookie != NULL))
220                 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
221         return mti->mti_max_cookie;
222 }
223
224 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
225                                    struct mdd_device *mdd)
226 {
227         struct mdd_thread_info *mti = mdd_env_info(env);
228         int                     max_lmm_size;
229
230         max_lmm_size = mdd_lov_mdsize(env, mdd);
231         if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
232                 if (mti->mti_max_lmm)
233                         OBD_FREE(mti->mti_max_lmm, mti->mti_max_lmm_size);
234                 mti->mti_max_lmm = NULL;
235                 mti->mti_max_lmm_size = 0;
236         }
237         if (unlikely(mti->mti_max_lmm == NULL)) {
238                 OBD_ALLOC(mti->mti_max_lmm, max_lmm_size);
239                 if (unlikely(mti->mti_max_lmm != NULL))
240                         mti->mti_max_lmm_size = max_lmm_size;
241         }
242         return mti->mti_max_lmm;
243 }
244
245 struct lu_object *mdd_object_alloc(const struct lu_env *env,
246                                    const struct lu_object_header *hdr,
247                                    struct lu_device *d)
248 {
249         struct mdd_object *mdd_obj;
250
251         OBD_ALLOC_PTR(mdd_obj);
252         if (mdd_obj != NULL) {
253                 struct lu_object *o;
254
255                 o = mdd2lu_obj(mdd_obj);
256                 lu_object_init(o, NULL, d);
257                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
258                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
259                 mdd_obj->mod_count = 0;
260                 o->lo_ops = &mdd_lu_obj_ops;
261                 return o;
262         } else {
263                 return NULL;
264         }
265 }
266
267 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
268                            const struct lu_object_conf *_)
269 {
270         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
271         struct mdd_object *mdd_obj = lu2mdd_obj(o);
272         struct lu_object  *below;
273         struct lu_device  *under;
274         ENTRY;
275
276         mdd_obj->mod_cltime = 0;
277         under = &d->mdd_child->dd_lu_dev;
278         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
279         mdd_pdlock_init(mdd_obj);
280         if (below == NULL)
281                 RETURN(-ENOMEM);
282
283         lu_object_add(o, below);
284
285         RETURN(0);
286 }
287
288 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
289 {
290         if (lu_object_exists(o))
291                 return mdd_get_flags(env, lu2mdd_obj(o));
292         else
293                 return 0;
294 }
295
296 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
297 {
298         struct mdd_object *mdd = lu2mdd_obj(o);
299
300         lu_object_fini(o);
301         OBD_FREE_PTR(mdd);
302 }
303
304 static int mdd_object_print(const struct lu_env *env, void *cookie,
305                             lu_printer_t p, const struct lu_object *o)
306 {
307         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
308         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
309                     "valid=%x, cltime=%llu, flags=%lx)",
310                     mdd, mdd->mod_count, mdd->mod_valid,
311                     mdd->mod_cltime, mdd->mod_flags);
312 }
313
314 static const struct lu_object_operations mdd_lu_obj_ops = {
315         .loo_object_init    = mdd_object_init,
316         .loo_object_start   = mdd_object_start,
317         .loo_object_free    = mdd_object_free,
318         .loo_object_print   = mdd_object_print,
319 };
320
321 struct mdd_object *mdd_object_find(const struct lu_env *env,
322                                    struct mdd_device *d,
323                                    const struct lu_fid *f)
324 {
325         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
326 }
327
328 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
329                         const char *path, struct lu_fid *fid)
330 {
331         struct lu_buf *buf;
332         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
333         struct mdd_object *obj;
334         struct lu_name *lname = &mdd_env_info(env)->mti_name;
335         char *name;
336         int rc = 0;
337         ENTRY;
338
339         /* temp buffer for path element */
340         buf = mdd_buf_alloc(env, PATH_MAX);
341         if (buf->lb_buf == NULL)
342                 RETURN(-ENOMEM);
343
344         lname->ln_name = name = buf->lb_buf;
345         lname->ln_namelen = 0;
346         *f = mdd->mdd_root_fid;
347
348         while(1) {
349                 while (*path == '/')
350                         path++;
351                 if (*path == '\0')
352                         break;
353                 while (*path != '/' && *path != '\0') {
354                         *name = *path;
355                         path++;
356                         name++;
357                         lname->ln_namelen++;
358                 }
359
360                 *name = '\0';
361                 /* find obj corresponding to fid */
362                 obj = mdd_object_find(env, mdd, f);
363                 if (obj == NULL)
364                         GOTO(out, rc = -EREMOTE);
365                 if (IS_ERR(obj))
366                         GOTO(out, rc = -PTR_ERR(obj));
367                 /* get child fid from parent and name */
368                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
369                 mdd_object_put(env, obj);
370                 if (rc)
371                         break;
372
373                 name = buf->lb_buf;
374                 lname->ln_namelen = 0;
375         }
376
377         if (!rc)
378                 *fid = *f;
379 out:
380         RETURN(rc);
381 }
382
383 /** The maximum depth that fid2path() will search.
384  * This is limited only because we want to store the fids for
385  * historical path lookup purposes.
386  */
387 #define MAX_PATH_DEPTH 100
388
389 /** mdd_path() lookup structure. */
390 struct path_lookup_info {
391         __u64                pli_recno;        /**< history point */
392         __u64                pli_currec;       /**< current record */
393         struct lu_fid        pli_fid;
394         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
395         struct mdd_object   *pli_mdd_obj;
396         char                *pli_path;         /**< full path */
397         int                  pli_pathlen;
398         int                  pli_linkno;       /**< which hardlink to follow */
399         int                  pli_fidcount;     /**< number of \a pli_fids */
400 };
401
402 static int mdd_path_current(const struct lu_env *env,
403                             struct path_lookup_info *pli)
404 {
405         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
406         struct mdd_object *mdd_obj;
407         struct lu_buf     *buf = NULL;
408         struct link_ea_header *leh;
409         struct link_ea_entry  *lee;
410         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
411         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
412         char *ptr;
413         int reclen;
414         int rc;
415         ENTRY;
416
417         ptr = pli->pli_path + pli->pli_pathlen - 1;
418         *ptr = 0;
419         --ptr;
420         pli->pli_fidcount = 0;
421         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
422
423         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
424                 mdd_obj = mdd_object_find(env, mdd,
425                                           &pli->pli_fids[pli->pli_fidcount]);
426                 if (mdd_obj == NULL)
427                         GOTO(out, rc = -EREMOTE);
428                 if (IS_ERR(mdd_obj))
429                         GOTO(out, rc = -PTR_ERR(mdd_obj));
430                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
431                 if (rc <= 0) {
432                         mdd_object_put(env, mdd_obj);
433                         if (rc == -1)
434                                 rc = -EREMOTE;
435                         else if (rc == 0)
436                                 /* Do I need to error out here? */
437                                 rc = -ENOENT;
438                         GOTO(out, rc);
439                 }
440
441                 /* Get parent fid and object name */
442                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
443                 buf = mdd_links_get(env, mdd_obj);
444                 mdd_read_unlock(env, mdd_obj);
445                 mdd_object_put(env, mdd_obj);
446                 if (IS_ERR(buf))
447                         GOTO(out, rc = PTR_ERR(buf));
448
449                 leh = buf->lb_buf;
450                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
451                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
452
453                 /* If set, use link #linkno for path lookup, otherwise use
454                    link #0.  Only do this for the final path element. */
455                 if ((pli->pli_fidcount == 0) &&
456                     (pli->pli_linkno < leh->leh_reccount)) {
457                         int count;
458                         for (count = 0; count < pli->pli_linkno; count++) {
459                                 lee = (struct link_ea_entry *)
460                                      ((char *)lee + reclen);
461                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
462                         }
463                         if (pli->pli_linkno < leh->leh_reccount - 1)
464                                 /* indicate to user there are more links */
465                                 pli->pli_linkno++;
466                 }
467
468                 /* Pack the name in the end of the buffer */
469                 ptr -= tmpname->ln_namelen;
470                 if (ptr - 1 <= pli->pli_path)
471                         GOTO(out, rc = -EOVERFLOW);
472                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
473                 *(--ptr) = '/';
474
475                 /* Store the parent fid for historic lookup */
476                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
477                         GOTO(out, rc = -EOVERFLOW);
478                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
479         }
480
481         /* Verify that our path hasn't changed since we started the lookup.
482            Record the current index, and verify the path resolves to the
483            same fid. If it does, then the path is correct as of this index. */
484         spin_lock(&mdd->mdd_cl.mc_lock);
485         pli->pli_currec = mdd->mdd_cl.mc_index;
486         spin_unlock(&mdd->mdd_cl.mc_lock);
487         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
488         if (rc) {
489                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
490                 GOTO (out, rc = -EAGAIN);
491         }
492         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
493                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
494                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
495                        PFID(&pli->pli_fid));
496                 GOTO(out, rc = -EAGAIN);
497         }
498
499         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
500
501         EXIT;
502 out:
503         if (buf && !IS_ERR(buf) && buf->lb_vmalloc)
504                 /* if we vmalloced a large buffer drop it */
505                 mdd_buf_put(buf);
506
507         return rc;
508 }
509
510 static int mdd_path_historic(const struct lu_env *env,
511                              struct path_lookup_info *pli)
512 {
513         return 0;
514 }
515
516 /* Returns the full path to this fid, as of changelog record recno. */
517 static int mdd_path(const struct lu_env *env, struct md_object *obj,
518                     char *path, int pathlen, __u64 *recno, int *linkno)
519 {
520         struct path_lookup_info *pli;
521         int tries = 3;
522         int rc = -EAGAIN;
523         ENTRY;
524
525         if (pathlen < 3)
526                 RETURN(-EOVERFLOW);
527
528         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
529                 path[0] = '/';
530                 path[1] = '\0';
531                 RETURN(0);
532         }
533
534         OBD_ALLOC_PTR(pli);
535         if (pli == NULL)
536                 RETURN(-ENOMEM);
537
538         pli->pli_mdd_obj = md2mdd_obj(obj);
539         pli->pli_recno = *recno;
540         pli->pli_path = path;
541         pli->pli_pathlen = pathlen;
542         pli->pli_linkno = *linkno;
543
544         /* Retry multiple times in case file is being moved */
545         while (tries-- && rc == -EAGAIN)
546                 rc = mdd_path_current(env, pli);
547
548         /* For historical path lookup, the current links may not have existed
549          * at "recno" time.  We must switch over to earlier links/parents
550          * by using the changelog records.  If the earlier parent doesn't
551          * exist, we must search back through the changelog to reconstruct
552          * its parents, then check if it exists, etc.
553          * We may ignore this problem for the initial implementation and
554          * state that an "original" hardlink must still exist for us to find
555          * historic path name. */
556         if (pli->pli_recno != -1) {
557                 rc = mdd_path_historic(env, pli);
558         } else {
559                 *recno = pli->pli_currec;
560                 /* Return next link index to caller */
561                 *linkno = pli->pli_linkno;
562         }
563
564         OBD_FREE_PTR(pli);
565
566         RETURN (rc);
567 }
568
569 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
570 {
571         struct lu_attr *la = &mdd_env_info(env)->mti_la;
572         int rc;
573
574         ENTRY;
575         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
576         if (rc == 0) {
577                 mdd_flags_xlate(obj, la->la_flags);
578                 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
579                         obj->mod_flags |= MNLINK_OBJ;
580         }
581         RETURN(rc);
582 }
583
584 /* get only inode attributes */
585 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
586                   struct md_attr *ma)
587 {
588         int rc = 0;
589         ENTRY;
590
591         if (ma->ma_valid & MA_INODE)
592                 RETURN(0);
593
594         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
595                           mdd_object_capa(env, mdd_obj));
596         if (rc == 0)
597                 ma->ma_valid |= MA_INODE;
598         RETURN(rc);
599 }
600
601 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm,
602                        int *size)
603 {
604         struct lov_desc *ldesc;
605         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
606         ENTRY;
607
608         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
609         LASSERT(ldesc != NULL);
610
611         if (!lmm)
612                 RETURN(0);
613
614         lmm->lmm_magic = LOV_MAGIC_V1;
615         lmm->lmm_object_gr = LOV_OBJECT_GROUP_DEFAULT;
616         lmm->lmm_pattern = ldesc->ld_pattern;
617         lmm->lmm_stripe_size = ldesc->ld_default_stripe_size;
618         lmm->lmm_stripe_count = ldesc->ld_default_stripe_count;
619         *size = sizeof(struct lov_mds_md);
620
621         RETURN(sizeof(struct lov_mds_md));
622 }
623
624 /* get lov EA only */
625 static int __mdd_lmm_get(const struct lu_env *env,
626                          struct mdd_object *mdd_obj, struct md_attr *ma)
627 {
628         int rc;
629         ENTRY;
630
631         if (ma->ma_valid & MA_LOV)
632                 RETURN(0);
633
634         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
635                         XATTR_NAME_LOV);
636
637         if (rc == 0 && (ma->ma_need & MA_LOV_DEF)) {
638                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm,
639                                 &ma->ma_lmm_size);
640         }
641
642         if (rc > 0) {
643                 ma->ma_valid |= MA_LOV;
644                 rc = 0;
645         }
646         RETURN(rc);
647 }
648
649 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
650                        struct md_attr *ma)
651 {
652         int rc;
653         ENTRY;
654
655         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
656         rc = __mdd_lmm_get(env, mdd_obj, ma);
657         mdd_read_unlock(env, mdd_obj);
658         RETURN(rc);
659 }
660
661 /* get lmv EA only*/
662 static int __mdd_lmv_get(const struct lu_env *env,
663                          struct mdd_object *mdd_obj, struct md_attr *ma)
664 {
665         int rc;
666         ENTRY;
667
668         if (ma->ma_valid & MA_LMV)
669                 RETURN(0);
670
671         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
672                         XATTR_NAME_LMV);
673         if (rc > 0) {
674                 ma->ma_valid |= MA_LMV;
675                 rc = 0;
676         }
677         RETURN(rc);
678 }
679
680 static int mdd_attr_get_internal(const struct lu_env *env,
681                                  struct mdd_object *mdd_obj,
682                                  struct md_attr *ma)
683 {
684         int rc = 0;
685         ENTRY;
686
687         if (ma->ma_need & MA_INODE)
688                 rc = mdd_iattr_get(env, mdd_obj, ma);
689
690         if (rc == 0 && ma->ma_need & MA_LOV) {
691                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
692                     S_ISDIR(mdd_object_type(mdd_obj)))
693                         rc = __mdd_lmm_get(env, mdd_obj, ma);
694         }
695         if (rc == 0 && ma->ma_need & MA_LMV) {
696                 if (S_ISDIR(mdd_object_type(mdd_obj)))
697                         rc = __mdd_lmv_get(env, mdd_obj, ma);
698         }
699 #ifdef CONFIG_FS_POSIX_ACL
700         if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
701                 if (S_ISDIR(mdd_object_type(mdd_obj)))
702                         rc = mdd_def_acl_get(env, mdd_obj, ma);
703         }
704 #endif
705         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
706                rc, ma->ma_valid);
707         RETURN(rc);
708 }
709
710 int mdd_attr_get_internal_locked(const struct lu_env *env,
711                                  struct mdd_object *mdd_obj, struct md_attr *ma)
712 {
713         int rc;
714         int needlock = ma->ma_need & (MA_LOV | MA_LMV | MA_ACL_DEF);
715
716         if (needlock)
717                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
718         rc = mdd_attr_get_internal(env, mdd_obj, ma);
719         if (needlock)
720                 mdd_read_unlock(env, mdd_obj);
721         return rc;
722 }
723
724 /*
725  * No permission check is needed.
726  */
727 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
728                         struct md_attr *ma)
729 {
730         struct mdd_object *mdd_obj = md2mdd_obj(obj);
731         int                rc;
732
733         ENTRY;
734         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
735         RETURN(rc);
736 }
737
738 /*
739  * No permission check is needed.
740  */
741 static int mdd_xattr_get(const struct lu_env *env,
742                          struct md_object *obj, struct lu_buf *buf,
743                          const char *name)
744 {
745         struct mdd_object *mdd_obj = md2mdd_obj(obj);
746         int rc;
747
748         ENTRY;
749
750         LASSERT(mdd_object_exists(mdd_obj));
751
752         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
753         rc = mdo_xattr_get(env, mdd_obj, buf, name,
754                            mdd_object_capa(env, mdd_obj));
755         mdd_read_unlock(env, mdd_obj);
756
757         RETURN(rc);
758 }
759
760 /*
761  * Permission check is done when open,
762  * no need check again.
763  */
764 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
765                         struct lu_buf *buf)
766 {
767         struct mdd_object *mdd_obj = md2mdd_obj(obj);
768         struct dt_object  *next;
769         loff_t             pos = 0;
770         int                rc;
771         ENTRY;
772
773         LASSERT(mdd_object_exists(mdd_obj));
774
775         next = mdd_object_child(mdd_obj);
776         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
777         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
778                                          mdd_object_capa(env, mdd_obj));
779         mdd_read_unlock(env, mdd_obj);
780         RETURN(rc);
781 }
782
783 /*
784  * No permission check is needed.
785  */
786 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
787                           struct lu_buf *buf)
788 {
789         struct mdd_object *mdd_obj = md2mdd_obj(obj);
790         int rc;
791
792         ENTRY;
793
794         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
795         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
796         mdd_read_unlock(env, mdd_obj);
797
798         RETURN(rc);
799 }
800
801 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
802                                struct mdd_object *c, struct md_attr *ma,
803                                struct thandle *handle,
804                                const struct md_op_spec *spec)
805 {
806         struct lu_attr *attr = &ma->ma_attr;
807         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
808         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
809         const struct dt_index_features *feat = spec->sp_feat;
810         int rc;
811         ENTRY;
812
813         if (!mdd_object_exists(c)) {
814                 struct dt_object *next = mdd_object_child(c);
815                 LASSERT(next);
816
817                 if (feat != &dt_directory_features && feat != NULL)
818                         dof->dof_type = DFT_INDEX;
819                 else
820                         dof->dof_type = dt_mode_to_dft(attr->la_mode);
821
822                 dof->u.dof_idx.di_feat = feat;
823
824                 /* @hint will be initialized by underlying device. */
825                 next->do_ops->do_ah_init(env, hint,
826                                          p ? mdd_object_child(p) : NULL,
827                                          attr->la_mode & S_IFMT);
828
829                 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
830                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
831         } else
832                 rc = -EEXIST;
833
834         RETURN(rc);
835 }
836
837 /**
838  * Make sure the ctime is increased only.
839  */
840 static inline int mdd_attr_check(const struct lu_env *env,
841                                  struct mdd_object *obj,
842                                  struct lu_attr *attr)
843 {
844         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
845         int rc;
846         ENTRY;
847
848         if (attr->la_valid & LA_CTIME) {
849                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
850                 if (rc)
851                         RETURN(rc);
852
853                 if (attr->la_ctime < tmp_la->la_ctime)
854                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
855                 else if (attr->la_valid == LA_CTIME &&
856                          attr->la_ctime == tmp_la->la_ctime)
857                         attr->la_valid &= ~LA_CTIME;
858         }
859         RETURN(0);
860 }
861
862 int mdd_attr_set_internal(const struct lu_env *env,
863                           struct mdd_object *obj,
864                           struct lu_attr *attr,
865                           struct thandle *handle,
866                           int needacl)
867 {
868         int rc;
869         ENTRY;
870
871         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
872 #ifdef CONFIG_FS_POSIX_ACL
873         if (!rc && (attr->la_valid & LA_MODE) && needacl)
874                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
875 #endif
876         RETURN(rc);
877 }
878
879 int mdd_attr_check_set_internal(const struct lu_env *env,
880                                 struct mdd_object *obj,
881                                 struct lu_attr *attr,
882                                 struct thandle *handle,
883                                 int needacl)
884 {
885         int rc;
886         ENTRY;
887
888         rc = mdd_attr_check(env, obj, attr);
889         if (rc)
890                 RETURN(rc);
891
892         if (attr->la_valid)
893                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
894         RETURN(rc);
895 }
896
897 static int mdd_attr_set_internal_locked(const struct lu_env *env,
898                                         struct mdd_object *obj,
899                                         struct lu_attr *attr,
900                                         struct thandle *handle,
901                                         int needacl)
902 {
903         int rc;
904         ENTRY;
905
906         needacl = needacl && (attr->la_valid & LA_MODE);
907         if (needacl)
908                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
909         rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
910         if (needacl)
911                 mdd_write_unlock(env, obj);
912         RETURN(rc);
913 }
914
915 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
916                                        struct mdd_object *obj,
917                                        struct lu_attr *attr,
918                                        struct thandle *handle,
919                                        int needacl)
920 {
921         int rc;
922         ENTRY;
923
924         needacl = needacl && (attr->la_valid & LA_MODE);
925         if (needacl)
926                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
927         rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
928         if (needacl)
929                 mdd_write_unlock(env, obj);
930         RETURN(rc);
931 }
932
933 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
934                     const struct lu_buf *buf, const char *name,
935                     int fl, struct thandle *handle)
936 {
937         struct lustre_capa *capa = mdd_object_capa(env, obj);
938         int rc = -EINVAL;
939         ENTRY;
940
941         if (buf->lb_buf && buf->lb_len > 0)
942                 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
943         else if (buf->lb_buf == NULL && buf->lb_len == 0)
944                 rc = mdo_xattr_del(env, obj, name, handle, capa);
945
946         RETURN(rc);
947 }
948
949 /*
950  * This gives the same functionality as the code between
951  * sys_chmod and inode_setattr
952  * chown_common and inode_setattr
953  * utimes and inode_setattr
954  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
955  */
956 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
957                         struct lu_attr *la, const struct md_attr *ma)
958 {
959         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
960         struct md_ucred  *uc         = md_ucred(env);
961         int               rc;
962         ENTRY;
963
964         if (!la->la_valid)
965                 RETURN(0);
966
967         /* Do not permit change file type */
968         if (la->la_valid & LA_TYPE)
969                 RETURN(-EPERM);
970
971         /* They should not be processed by setattr */
972         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
973                 RETURN(-EPERM);
974
975         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
976         if (rc)
977                 RETURN(rc);
978
979         if (la->la_valid == LA_CTIME) {
980                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
981                         /* This is only for set ctime when rename's source is
982                          * on remote MDS. */
983                         rc = mdd_may_delete(env, NULL, obj,
984                                             (struct md_attr *)ma, 1, 0);
985                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
986                         la->la_valid &= ~LA_CTIME;
987                 RETURN(rc);
988         }
989
990         if (la->la_valid == LA_ATIME) {
991                 /* This is atime only set for read atime update on close. */
992                 if (la->la_atime <= tmp_la->la_atime +
993                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff)
994                         la->la_valid &= ~LA_ATIME;
995                 RETURN(0);
996         }
997
998         /* Check if flags change. */
999         if (la->la_valid & LA_FLAGS) {
1000                 unsigned int oldflags = 0;
1001                 unsigned int newflags = la->la_flags &
1002                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1003
1004                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1005                     !mdd_capable(uc, CFS_CAP_FOWNER))
1006                         RETURN(-EPERM);
1007
1008                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1009                  * only be changed by the relevant capability. */
1010                 if (mdd_is_immutable(obj))
1011                         oldflags |= LUSTRE_IMMUTABLE_FL;
1012                 if (mdd_is_append(obj))
1013                         oldflags |= LUSTRE_APPEND_FL;
1014                 if ((oldflags ^ newflags) &&
1015                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1016                         RETURN(-EPERM);
1017
1018                 if (!S_ISDIR(tmp_la->la_mode))
1019                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1020         }
1021
1022         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1023             (la->la_valid & ~LA_FLAGS) &&
1024             !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1025                 RETURN(-EPERM);
1026
1027         /* Check for setting the obj time. */
1028         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1029             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1030                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1031                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
1032                         rc = mdd_permission_internal_locked(env, obj, tmp_la,
1033                                                             MAY_WRITE,
1034                                                             MOR_TGT_CHILD);
1035                         if (rc)
1036                                 RETURN(rc);
1037                 }
1038         }
1039
1040         /* Make sure a caller can chmod. */
1041         if (la->la_valid & LA_MODE) {
1042                 /* Bypass la_vaild == LA_MODE,
1043                  * this is for changing file with SUID or SGID. */
1044                 if ((la->la_valid & ~LA_MODE) &&
1045                     !(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1046                     (uc->mu_fsuid != tmp_la->la_uid) &&
1047                     !mdd_capable(uc, CFS_CAP_FOWNER))
1048                         RETURN(-EPERM);
1049
1050                 if (la->la_mode == (umode_t) -1)
1051                         la->la_mode = tmp_la->la_mode;
1052                 else
1053                         la->la_mode = (la->la_mode & S_IALLUGO) |
1054                                       (tmp_la->la_mode & ~S_IALLUGO);
1055
1056                 /* Also check the setgid bit! */
1057                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1058                                        la->la_gid : tmp_la->la_gid) &&
1059                     !mdd_capable(uc, CFS_CAP_FSETID))
1060                         la->la_mode &= ~S_ISGID;
1061         } else {
1062                la->la_mode = tmp_la->la_mode;
1063         }
1064
1065         /* Make sure a caller can chown. */
1066         if (la->la_valid & LA_UID) {
1067                 if (la->la_uid == (uid_t) -1)
1068                         la->la_uid = tmp_la->la_uid;
1069                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1070                     (la->la_uid != tmp_la->la_uid)) &&
1071                     !mdd_capable(uc, CFS_CAP_CHOWN))
1072                         RETURN(-EPERM);
1073
1074                 /* If the user or group of a non-directory has been
1075                  * changed by a non-root user, remove the setuid bit.
1076                  * 19981026 David C Niemi <niemi@tux.org>
1077                  *
1078                  * Changed this to apply to all users, including root,
1079                  * to avoid some races. This is the behavior we had in
1080                  * 2.0. The check for non-root was definitely wrong
1081                  * for 2.2 anyway, as it should have been using
1082                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
1083                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1084                     !S_ISDIR(tmp_la->la_mode)) {
1085                         la->la_mode &= ~S_ISUID;
1086                         la->la_valid |= LA_MODE;
1087                 }
1088         }
1089
1090         /* Make sure caller can chgrp. */
1091         if (la->la_valid & LA_GID) {
1092                 if (la->la_gid == (gid_t) -1)
1093                         la->la_gid = tmp_la->la_gid;
1094                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1095                     ((la->la_gid != tmp_la->la_gid) &&
1096                     !lustre_in_group_p(uc, la->la_gid))) &&
1097                     !mdd_capable(uc, CFS_CAP_CHOWN))
1098                         RETURN(-EPERM);
1099
1100                 /* Likewise, if the user or group of a non-directory
1101                  * has been changed by a non-root user, remove the
1102                  * setgid bit UNLESS there is no group execute bit
1103                  * (this would be a file marked for mandatory
1104                  * locking).  19981026 David C Niemi <niemi@tux.org>
1105                  *
1106                  * Removed the fsuid check (see the comment above) --
1107                  * 19990830 SD. */
1108                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1109                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1110                         la->la_mode &= ~S_ISGID;
1111                         la->la_valid |= LA_MODE;
1112                 }
1113         }
1114
1115         /* For both Size-on-MDS case and truncate case,
1116          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1117          * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1118          * For SOM case, it is true, the MAY_WRITE perm has been checked
1119          * when open, no need check again. For truncate case, it is false,
1120          * the MAY_WRITE perm should be checked here. */
1121         if (ma->ma_attr_flags & MDS_SOM) {
1122                 /* For the "Size-on-MDS" setattr update, merge coming
1123                  * attributes with the set in the inode. BUG 10641 */
1124                 if ((la->la_valid & LA_ATIME) &&
1125                     (la->la_atime <= tmp_la->la_atime))
1126                         la->la_valid &= ~LA_ATIME;
1127
1128                 /* OST attributes do not have a priority over MDS attributes,
1129                  * so drop times if ctime is equal. */
1130                 if ((la->la_valid & LA_CTIME) &&
1131                     (la->la_ctime <= tmp_la->la_ctime))
1132                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1133         } else {
1134                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1135                         if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1136                               (uc->mu_fsuid == tmp_la->la_uid)) &&
1137                             !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1138                                 rc = mdd_permission_internal_locked(env, obj,
1139                                                             tmp_la, MAY_WRITE,
1140                                                             MOR_TGT_CHILD);
1141                                 if (rc)
1142                                         RETURN(rc);
1143                         }
1144                 }
1145                 if (la->la_valid & LA_CTIME) {
1146                         /* The pure setattr, it has the priority over what is
1147                          * already set, do not drop it if ctime is equal. */
1148                         if (la->la_ctime < tmp_la->la_ctime)
1149                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1150                                                   LA_CTIME);
1151                 }
1152         }
1153
1154         RETURN(0);
1155 }
1156
1157 /** Store a data change changelog record
1158  * If this fails, we must fail the whole transaction; we don't
1159  * want the change to commit without the log entry.
1160  * \param mdd_obj - mdd_object of change
1161  * \param handle - transacion handle
1162  */
1163 static int mdd_changelog_data_store(const struct lu_env     *env,
1164                                     struct mdd_device       *mdd,
1165                                     enum changelog_rec_type type,
1166                                     struct mdd_object       *mdd_obj,
1167                                     struct thandle          *handle)
1168 {
1169         const struct lu_fid *tfid = mdo2fid(mdd_obj);
1170         struct llog_changelog_rec *rec;
1171         struct lu_buf *buf;
1172         int reclen;
1173         int rc;
1174
1175         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1176                 RETURN(0);
1177
1178         LASSERT(handle != NULL);
1179         LASSERT(mdd_obj != NULL);
1180
1181         if ((type == CL_SETATTR) &&
1182             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1183                 /* Don't need multiple updates in this log */
1184                 /* Don't check under lock - no big deal if we get an extra
1185                    entry */
1186                 RETURN(0);
1187         }
1188
1189         reclen = llog_data_len(sizeof(*rec));
1190         buf = mdd_buf_alloc(env, reclen);
1191         if (buf->lb_buf == NULL)
1192                 RETURN(-ENOMEM);
1193         rec = (struct llog_changelog_rec *)buf->lb_buf;
1194
1195         rec->cr_flags = CLF_VERSION;
1196         rec->cr_type = (__u32)type;
1197         rec->cr_tfid = *tfid;
1198         rec->cr_namelen = 0;
1199         mdd_obj->mod_cltime = cfs_time_current_64();
1200
1201         rc = mdd_changelog_llog_write(mdd, rec, handle);
1202         if (rc < 0) {
1203                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1204                        rc, type, PFID(tfid));
1205                 return -EFAULT;
1206         }
1207
1208         return 0;
1209 }
1210
1211 /* set attr and LOV EA at once, return updated attr */
1212 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1213                         const struct md_attr *ma)
1214 {
1215         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1216         struct mdd_device *mdd = mdo2mdd(obj);
1217         struct thandle *handle;
1218         struct lov_mds_md *lmm = NULL;
1219         struct llog_cookie *logcookies = NULL;
1220         int  rc, lmm_size = 0, cookie_size = 0;
1221         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1222 #ifdef HAVE_QUOTA_SUPPORT
1223         struct obd_device *obd = mdd->mdd_obd_dev;
1224         struct mds_obd *mds = &obd->u.mds;
1225         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1226         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1227         int quota_opc = 0, block_count = 0;
1228         int inode_pending = 0, block_pending = 0;
1229 #endif
1230         ENTRY;
1231
1232         mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1233                                     MDD_TXN_ATTR_SET_OP);
1234         handle = mdd_trans_start(env, mdd);
1235         if (IS_ERR(handle))
1236                 RETURN(PTR_ERR(handle));
1237         /*TODO: add lock here*/
1238         /* start a log jounal handle if needed */
1239         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1240             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1241                 lmm_size = mdd_lov_mdsize(env, mdd);
1242                 lmm = mdd_max_lmm_get(env, mdd);
1243                 if (lmm == NULL)
1244                         GOTO(cleanup, rc = -ENOMEM);
1245
1246                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1247                                 XATTR_NAME_LOV);
1248
1249                 if (rc < 0)
1250                         GOTO(cleanup, rc);
1251         }
1252
1253         if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1254                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1255                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1256
1257         *la_copy = ma->ma_attr;
1258         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1259         if (rc)
1260                 GOTO(cleanup, rc);
1261
1262 #ifdef HAVE_QUOTA_SUPPORT
1263         if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1264                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1265
1266                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1267                 if (!rc) {
1268                         quota_opc = FSFILT_OP_SETATTR;
1269                         mdd_quota_wrapper(la_copy, qnids);
1270                         mdd_quota_wrapper(la_tmp, qoids);
1271                         /* get file quota for new owner */
1272                         lquota_chkquota(mds_quota_interface_ref, obd,
1273                                         qnids[USRQUOTA], qnids[GRPQUOTA], 1,
1274                                         &inode_pending, NULL, 0, NULL, 0);
1275                         block_count = (la_tmp->la_blocks + 7) >> 3;
1276                         if (block_count) {
1277                                 void *data = NULL;
1278                                 mdd_data_get(env, mdd_obj, &data);
1279                                 /* get block quota for new owner */
1280                                 lquota_chkquota(mds_quota_interface_ref, obd,
1281                                                 qnids[USRQUOTA],
1282                                                 qnids[GRPQUOTA],
1283                                                 block_count, &block_pending,
1284                                                 NULL, LQUOTA_FLAGS_BLK,
1285                                                 data, 1);
1286                         }
1287                 }
1288         }
1289 #endif
1290
1291         if (la_copy->la_valid & LA_FLAGS) {
1292                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1293                                                   handle, 1);
1294                 if (rc == 0)
1295                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1296         } else if (la_copy->la_valid) {            /* setattr */
1297                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1298                                                   handle, 1);
1299                 /* journal chown/chgrp in llog, just like unlink */
1300                 if (rc == 0 && lmm_size){
1301                         cookie_size = mdd_lov_cookiesize(env, mdd);
1302                         logcookies = mdd_max_cookie_get(env, mdd);
1303                         if (logcookies == NULL)
1304                                 GOTO(cleanup, rc = -ENOMEM);
1305
1306                         if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1307                                             logcookies, cookie_size) <= 0)
1308                                 logcookies = NULL;
1309                 }
1310         }
1311
1312         if (rc == 0 && ma->ma_valid & MA_LOV) {
1313                 umode_t mode;
1314
1315                 mode = mdd_object_type(mdd_obj);
1316                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1317                         rc = mdd_lsm_sanity_check(env, mdd_obj);
1318                         if (rc)
1319                                 GOTO(cleanup, rc);
1320
1321                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1322                                             ma->ma_lmm_size, handle, 1);
1323                 }
1324
1325         }
1326 cleanup:
1327         if ((rc == 0) && (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME)))
1328                 rc = mdd_changelog_data_store(env, mdd, CL_SETATTR, mdd_obj,
1329                                               handle);
1330         mdd_trans_stop(env, mdd, rc, handle);
1331         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1332                 /*set obd attr, if needed*/
1333                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1334                                            logcookies);
1335         }
1336 #ifdef HAVE_QUOTA_SUPPORT
1337         if (quota_opc) {
1338                 if (inode_pending)
1339                         lquota_pending_commit(mds_quota_interface_ref, obd,
1340                                               qnids[USRQUOTA], qnids[GRPQUOTA],
1341                                               inode_pending, 0);
1342                 if (block_pending)
1343                         lquota_pending_commit(mds_quota_interface_ref, obd,
1344                                               qnids[USRQUOTA], qnids[GRPQUOTA],
1345                                               block_pending, 1);
1346                 /* Trigger dqrel/dqacq for original owner and new owner.
1347                  * If failed, the next call for lquota_chkquota will
1348                  * process it. */
1349                 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1350                               quota_opc);
1351         }
1352 #endif
1353         RETURN(rc);
1354 }
1355
1356 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1357                       const struct lu_buf *buf, const char *name, int fl,
1358                       struct thandle *handle)
1359 {
1360         int  rc;
1361         ENTRY;
1362
1363         mdd_write_lock(env, obj, MOR_TGT_CHILD);
1364         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1365         mdd_write_unlock(env, obj);
1366
1367         RETURN(rc);
1368 }
1369
1370 static int mdd_xattr_sanity_check(const struct lu_env *env,
1371                                   struct mdd_object *obj)
1372 {
1373         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1374         struct md_ucred *uc     = md_ucred(env);
1375         int rc;
1376         ENTRY;
1377
1378         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1379                 RETURN(-EPERM);
1380
1381         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1382         if (rc)
1383                 RETURN(rc);
1384
1385         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1386             !mdd_capable(uc, CFS_CAP_FOWNER))
1387                 RETURN(-EPERM);
1388
1389         RETURN(rc);
1390 }
1391
1392 /**
1393  * The caller should guarantee to update the object ctime
1394  * after xattr_set if needed.
1395  */
1396 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1397                          const struct lu_buf *buf, const char *name,
1398                          int fl)
1399 {
1400         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1401         struct mdd_device *mdd = mdo2mdd(obj);
1402         struct thandle *handle;
1403         int  rc;
1404         ENTRY;
1405
1406         rc = mdd_xattr_sanity_check(env, mdd_obj);
1407         if (rc)
1408                 RETURN(rc);
1409
1410         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1411         handle = mdd_trans_start(env, mdd);
1412         if (IS_ERR(handle))
1413                 RETURN(PTR_ERR(handle));
1414
1415         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1416
1417         /* Only record user xattr changes */
1418         if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1419             (strncmp("user.", name, 5) == 0))
1420                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1421                                               handle);
1422         mdd_trans_stop(env, mdd, rc, handle);
1423
1424         RETURN(rc);
1425 }
1426
1427 /**
1428  * The caller should guarantee to update the object ctime
1429  * after xattr_set if needed.
1430  */
1431 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1432                   const char *name)
1433 {
1434         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1435         struct mdd_device *mdd = mdo2mdd(obj);
1436         struct thandle *handle;
1437         int  rc;
1438         ENTRY;
1439
1440         rc = mdd_xattr_sanity_check(env, mdd_obj);
1441         if (rc)
1442                 RETURN(rc);
1443
1444         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1445         handle = mdd_trans_start(env, mdd);
1446         if (IS_ERR(handle))
1447                 RETURN(PTR_ERR(handle));
1448
1449         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1450         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1451                            mdd_object_capa(env, mdd_obj));
1452         mdd_write_unlock(env, mdd_obj);
1453
1454         /* Only record user xattr changes */
1455         if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1456             (strncmp("user.", name, 5) != 0))
1457                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1458                                               handle);
1459
1460         mdd_trans_stop(env, mdd, rc, handle);
1461
1462         RETURN(rc);
1463 }
1464
1465 /* partial unlink */
1466 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1467                        struct md_attr *ma)
1468 {
1469         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1470         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1471         struct mdd_device *mdd = mdo2mdd(obj);
1472         struct thandle *handle;
1473 #ifdef HAVE_QUOTA_SUPPORT
1474         struct obd_device *obd = mdd->mdd_obd_dev;
1475         struct mds_obd *mds = &obd->u.mds;
1476         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1477         int quota_opc = 0;
1478 #endif
1479         int rc;
1480         ENTRY;
1481
1482         /*
1483          * Check -ENOENT early here because we need to get object type
1484          * to calculate credits before transaction start
1485          */
1486         if (!mdd_object_exists(mdd_obj))
1487                 RETURN(-ENOENT);
1488
1489         LASSERT(mdd_object_exists(mdd_obj) > 0);
1490
1491         rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1492         if (rc)
1493                 RETURN(rc);
1494
1495         handle = mdd_trans_start(env, mdd);
1496         if (IS_ERR(handle))
1497                 RETURN(-ENOMEM);
1498
1499         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1500
1501         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1502         if (rc)
1503                 GOTO(cleanup, rc);
1504
1505         __mdd_ref_del(env, mdd_obj, handle, 0);
1506
1507         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1508                 /* unlink dot */
1509                 __mdd_ref_del(env, mdd_obj, handle, 1);
1510         }
1511
1512         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1513         la_copy->la_ctime = ma->ma_attr.la_ctime;
1514
1515         la_copy->la_valid = LA_CTIME;
1516         rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1517         if (rc)
1518                 GOTO(cleanup, rc);
1519
1520         rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1521 #ifdef HAVE_QUOTA_SUPPORT
1522         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1523             ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1524                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1525                 mdd_quota_wrapper(&ma->ma_attr, qids);
1526         }
1527 #endif
1528
1529
1530         EXIT;
1531 cleanup:
1532         mdd_write_unlock(env, mdd_obj);
1533         mdd_trans_stop(env, mdd, rc, handle);
1534 #ifdef HAVE_QUOTA_SUPPORT
1535         if (quota_opc)
1536                 /* Trigger dqrel on the owner of child. If failed,
1537                  * the next call for lquota_chkquota will process it */
1538                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1539                               quota_opc);
1540 #endif
1541         return rc;
1542 }
1543
1544 /* partial operation */
1545 static int mdd_oc_sanity_check(const struct lu_env *env,
1546                                struct mdd_object *obj,
1547                                struct md_attr *ma)
1548 {
1549         int rc;
1550         ENTRY;
1551
1552         switch (ma->ma_attr.la_mode & S_IFMT) {
1553         case S_IFREG:
1554         case S_IFDIR:
1555         case S_IFLNK:
1556         case S_IFCHR:
1557         case S_IFBLK:
1558         case S_IFIFO:
1559         case S_IFSOCK:
1560                 rc = 0;
1561                 break;
1562         default:
1563                 rc = -EINVAL;
1564                 break;
1565         }
1566         RETURN(rc);
1567 }
1568
1569 static int mdd_object_create(const struct lu_env *env,
1570                              struct md_object *obj,
1571                              const struct md_op_spec *spec,
1572                              struct md_attr *ma)
1573 {
1574
1575         struct mdd_device *mdd = mdo2mdd(obj);
1576         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1577         const struct lu_fid *pfid = spec->u.sp_pfid;
1578         struct thandle *handle;
1579 #ifdef HAVE_QUOTA_SUPPORT
1580         struct obd_device *obd = mdd->mdd_obd_dev;
1581         struct mds_obd *mds = &obd->u.mds;
1582         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1583         int quota_opc = 0, block_count = 0;
1584         int inode_pending = 0, block_pending = 0;
1585 #endif
1586         int rc = 0;
1587         ENTRY;
1588
1589 #ifdef HAVE_QUOTA_SUPPORT
1590         if (mds->mds_quota) {
1591                 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1592                 mdd_quota_wrapper(&ma->ma_attr, qids);
1593                 /* get file quota for child */
1594                 lquota_chkquota(mds_quota_interface_ref, obd, qids[USRQUOTA],
1595                                 qids[GRPQUOTA], 1, &inode_pending, NULL, 0,
1596                                 NULL, 0);
1597                 switch (ma->ma_attr.la_mode & S_IFMT) {
1598                 case S_IFLNK:
1599                 case S_IFDIR:
1600                         block_count = 2;
1601                         break;
1602                 case S_IFREG:
1603                         block_count = 1;
1604                         break;
1605                 }
1606                 /* get block quota for child */
1607                 if (block_count)
1608                         lquota_chkquota(mds_quota_interface_ref, obd,
1609                                         qids[USRQUOTA], qids[GRPQUOTA],
1610                                         block_count, &block_pending, NULL,
1611                                         LQUOTA_FLAGS_BLK, NULL, 0);
1612         }
1613 #endif
1614
1615         mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
1616         handle = mdd_trans_start(env, mdd);
1617         if (IS_ERR(handle))
1618                 GOTO(out_pending, rc = PTR_ERR(handle));
1619
1620         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1621         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1622         if (rc)
1623                 GOTO(unlock, rc);
1624
1625         rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1626         if (rc)
1627                 GOTO(unlock, rc);
1628
1629         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1630                 /* If creating the slave object, set slave EA here. */
1631                 int lmv_size = spec->u.sp_ea.eadatalen;
1632                 struct lmv_stripe_md *lmv;
1633
1634                 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1635                 LASSERT(lmv != NULL && lmv_size > 0);
1636
1637                 rc = __mdd_xattr_set(env, mdd_obj,
1638                                      mdd_buf_get_const(env, lmv, lmv_size),
1639                                      XATTR_NAME_LMV, 0, handle);
1640                 if (rc)
1641                         GOTO(unlock, rc);
1642
1643                 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1644                                            handle, 0);
1645         } else {
1646 #ifdef CONFIG_FS_POSIX_ACL
1647                 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1648                         struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1649
1650                         buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1651                         buf->lb_len = spec->u.sp_ea.eadatalen;
1652                         if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1653                                 rc = __mdd_acl_init(env, mdd_obj, buf,
1654                                                     &ma->ma_attr.la_mode,
1655                                                     handle);
1656                                 if (rc)
1657                                         GOTO(unlock, rc);
1658                                 else
1659                                         ma->ma_attr.la_valid |= LA_MODE;
1660                         }
1661
1662                         pfid = spec->u.sp_ea.fid;
1663                 }
1664 #endif
1665                 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1666                                            spec);
1667         }
1668         EXIT;
1669 unlock:
1670         if (rc == 0)
1671                 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1672         mdd_write_unlock(env, mdd_obj);
1673
1674         mdd_trans_stop(env, mdd, rc, handle);
1675 out_pending:
1676 #ifdef HAVE_QUOTA_SUPPORT
1677         if (quota_opc) {
1678                 if (inode_pending)
1679                         lquota_pending_commit(mds_quota_interface_ref, obd,
1680                                               qids[USRQUOTA], qids[GRPQUOTA],
1681                                               inode_pending, 0);
1682                 if (block_pending)
1683                         lquota_pending_commit(mds_quota_interface_ref, obd,
1684                                               qids[USRQUOTA], qids[GRPQUOTA],
1685                                               block_pending, 1);
1686                 /* Trigger dqacq on the owner of child. If failed,
1687                  * the next call for lquota_chkquota will process it. */
1688                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1689                               FSFILT_OP_CREATE_PARTIAL_CHILD);
1690         }
1691 #endif
1692         return rc;
1693 }
1694
1695 /* partial link */
1696 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1697                        const struct md_attr *ma)
1698 {
1699         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1700         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1701         struct mdd_device *mdd = mdo2mdd(obj);
1702         struct thandle *handle;
1703         int rc;
1704         ENTRY;
1705
1706         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1707         handle = mdd_trans_start(env, mdd);
1708         if (IS_ERR(handle))
1709                 RETURN(-ENOMEM);
1710
1711         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1712         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1713         if (rc == 0)
1714                 __mdd_ref_add(env, mdd_obj, handle);
1715         mdd_write_unlock(env, mdd_obj);
1716         if (rc == 0) {
1717                 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1718                 la_copy->la_ctime = ma->ma_attr.la_ctime;
1719
1720                 la_copy->la_valid = LA_CTIME;
1721                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1722                                                         handle, 0);
1723         }
1724         mdd_trans_stop(env, mdd, 0, handle);
1725
1726         RETURN(rc);
1727 }
1728
1729 /*
1730  * do NOT or the MAY_*'s, you'll get the weakest
1731  */
1732 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1733 {
1734         int res = 0;
1735
1736         /* Sadly, NFSD reopens a file repeatedly during operation, so the
1737          * "acc_mode = 0" allowance for newly-created files isn't honoured.
1738          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1739          * owner can write to a file even if it is marked readonly to hide
1740          * its brokenness. (bug 5781) */
1741         if (flags & MDS_OPEN_OWNEROVERRIDE) {
1742                 struct md_ucred *uc = md_ucred(env);
1743
1744                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1745                     (la->la_uid == uc->mu_fsuid))
1746                         return 0;
1747         }
1748
1749         if (flags & FMODE_READ)
1750                 res |= MAY_READ;
1751         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1752                 res |= MAY_WRITE;
1753         if (flags & MDS_FMODE_EXEC)
1754                 res |= MAY_EXEC;
1755         return res;
1756 }
1757
1758 static int mdd_open_sanity_check(const struct lu_env *env,
1759                                  struct mdd_object *obj, int flag)
1760 {
1761         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1762         int mode, rc;
1763         ENTRY;
1764
1765         /* EEXIST check */
1766         if (mdd_is_dead_obj(obj))
1767                 RETURN(-ENOENT);
1768
1769         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1770         if (rc)
1771                RETURN(rc);
1772
1773         if (S_ISLNK(tmp_la->la_mode))
1774                 RETURN(-ELOOP);
1775
1776         mode = accmode(env, tmp_la, flag);
1777
1778         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1779                 RETURN(-EISDIR);
1780
1781         if (!(flag & MDS_OPEN_CREATED)) {
1782                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1783                 if (rc)
1784                         RETURN(rc);
1785         }
1786
1787         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1788             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1789                 flag &= ~MDS_OPEN_TRUNC;
1790
1791         /* For writing append-only file must open it with append mode. */
1792         if (mdd_is_append(obj)) {
1793                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
1794                         RETURN(-EPERM);
1795                 if (flag & MDS_OPEN_TRUNC)
1796                         RETURN(-EPERM);
1797         }
1798
1799 #if 0
1800         /*
1801          * Now, flag -- O_NOATIME does not be packed by client.
1802          */
1803         if (flag & O_NOATIME) {
1804                 struct md_ucred *uc = md_ucred(env);
1805
1806                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
1807                     (uc->mu_valid == UCRED_NEW)) &&
1808                     (uc->mu_fsuid != tmp_la->la_uid) &&
1809                     !mdd_capable(uc, CFS_CAP_FOWNER))
1810                         RETURN(-EPERM);
1811         }
1812 #endif
1813
1814         RETURN(0);
1815 }
1816
1817 static int mdd_open(const struct lu_env *env, struct md_object *obj,
1818                     int flags)
1819 {
1820         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1821         int rc = 0;
1822
1823         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1824
1825         rc = mdd_open_sanity_check(env, mdd_obj, flags);
1826         if (rc == 0)
1827                 mdd_obj->mod_count++;
1828
1829         mdd_write_unlock(env, mdd_obj);
1830         return rc;
1831 }
1832
1833 /* return md_attr back,
1834  * if it is last unlink then return lov ea + llog cookie*/
1835 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
1836                     struct md_attr *ma)
1837 {
1838         int rc = 0;
1839         ENTRY;
1840
1841         if (S_ISREG(mdd_object_type(obj))) {
1842                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
1843                  * Caller must be ready for that. */
1844
1845                 rc = __mdd_lmm_get(env, obj, ma);
1846                 if ((ma->ma_valid & MA_LOV))
1847                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
1848                                             obj, ma);
1849         }
1850         RETURN(rc);
1851 }
1852
1853 /*
1854  * No permission check is needed.
1855  */
1856 static int mdd_close(const struct lu_env *env, struct md_object *obj,
1857                      struct md_attr *ma)
1858 {
1859         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1860         struct thandle    *handle;
1861         int rc;
1862         int reset = 1;
1863
1864 #ifdef HAVE_QUOTA_SUPPORT
1865         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
1866         struct mds_obd *mds = &obd->u.mds;
1867         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1868         int quota_opc = 0;
1869 #endif
1870         ENTRY;
1871
1872         rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1873         if (rc)
1874                 RETURN(rc);
1875         handle = mdd_trans_start(env, mdo2mdd(obj));
1876         if (IS_ERR(handle))
1877                 RETURN(PTR_ERR(handle));
1878
1879         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1880         /* release open count */
1881         mdd_obj->mod_count --;
1882
1883         if (mdd_obj->mod_count == 0) {
1884                 /* remove link to object from orphan index */
1885                 if (mdd_obj->mod_flags & ORPHAN_OBJ)
1886                         __mdd_orphan_del(env, mdd_obj, handle);
1887         }
1888
1889         rc = mdd_iattr_get(env, mdd_obj, ma);
1890         if (rc == 0) {
1891                 if (mdd_obj->mod_count == 0 && ma->ma_attr.la_nlink == 0) {
1892                         rc = mdd_object_kill(env, mdd_obj, ma);
1893 #ifdef HAVE_QUOTA_SUPPORT
1894                         if (mds->mds_quota) {
1895                                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1896                                 mdd_quota_wrapper(&ma->ma_attr, qids);
1897                         }
1898 #endif
1899                         if (rc == 0)
1900                                 reset = 0;
1901                 }
1902         }
1903
1904         if (reset)
1905                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
1906
1907         mdd_write_unlock(env, mdd_obj);
1908         mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
1909 #ifdef HAVE_QUOTA_SUPPORT
1910         if (quota_opc)
1911                 /* Trigger dqrel on the owner of child. If failed,
1912                  * the next call for lquota_chkquota will process it */
1913                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1914                               quota_opc);
1915 #endif
1916         RETURN(rc);
1917 }
1918
1919 /*
1920  * Permission check is done when open,
1921  * no need check again.
1922  */
1923 static int mdd_readpage_sanity_check(const struct lu_env *env,
1924                                      struct mdd_object *obj)
1925 {
1926         struct dt_object *next = mdd_object_child(obj);
1927         int rc;
1928         ENTRY;
1929
1930         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
1931                 rc = 0;
1932         else
1933                 rc = -ENOTDIR;
1934
1935         RETURN(rc);
1936 }
1937
1938 static int mdd_append_attrs(const struct lu_env *env,
1939                              struct mdd_device *mdd,
1940                              __u32 attr,
1941                              const struct dt_it_ops *iops,
1942                              struct dt_it *it,
1943                              struct lu_dirent*ent)
1944 {
1945         struct mdd_thread_info  *info = mdd_env_info(env);
1946         struct lu_fid           *fid  = &info->mti_fid2;
1947         int                      len = cpu_to_le16(ent->lde_namelen);
1948         const unsigned           align = sizeof(struct luda_type) - 1;
1949         struct lu_fid_pack      *pack;
1950         struct mdd_object       *obj;
1951         struct luda_type        *lt;
1952         int rc = 0;
1953
1954         if (attr & LUDA_FID) {
1955                 pack = (struct lu_fid_pack *)iops->rec(env, it);
1956                 if (IS_ERR(pack)) {
1957                         rc = PTR_ERR(pack);
1958                         ent->lde_attrs = 0;
1959                         goto out;
1960                 }
1961                 rc = fid_unpack(pack, fid);
1962                 if (rc != 0) {
1963                         ent->lde_attrs = 0;
1964                         goto out;
1965                 }
1966
1967                 fid_cpu_to_le(&ent->lde_fid, fid);
1968                 ent->lde_attrs = LUDA_FID;
1969         }
1970
1971         /* check if file type is required */
1972         if (attr & LUDA_TYPE) {
1973                 if (!(attr & LUDA_FID)) {
1974                         CERROR("wrong attr : [%x]\n",attr);
1975                         rc = -EINVAL;
1976                         goto out;
1977                 }
1978
1979                 obj = mdd_object_find(env, mdd, fid);
1980                 if (obj == NULL) /* remote object */
1981                         goto out;
1982
1983                 if (IS_ERR(obj)) {
1984                         rc = PTR_ERR(obj);
1985                         goto out;
1986                 }
1987
1988                 if (mdd_object_exists(obj) == +1) {
1989                         len = (len + align) & ~align;
1990
1991                         lt = (void *) ent->lde_name + len;
1992                         lt->lt_type = cpu_to_le16(mdd_object_type(obj));
1993
1994                         ent->lde_attrs |= LUDA_TYPE;
1995                 }
1996                 mdd_object_put(env, obj);
1997         }
1998 out:
1999         ent->lde_attrs = cpu_to_le32(ent->lde_attrs);
2000         return rc;
2001 }
2002
2003 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2004                               int first, void *area, int nob,
2005                               const struct dt_it_ops *iops, struct dt_it *it,
2006                               __u64 *start, __u64 *end,
2007                               struct lu_dirent **last, __u32 attr)
2008 {
2009         int                     result;
2010         struct lu_dirent       *ent;
2011         __u64  hash = 0;
2012
2013         if (first) {
2014                 memset(area, 0, sizeof (struct lu_dirpage));
2015                 area += sizeof (struct lu_dirpage);
2016                 nob  -= sizeof (struct lu_dirpage);
2017         }
2018
2019         ent  = area;
2020         do {
2021                 char  *name;
2022                 int    len;
2023                 int    recsize;
2024
2025                 len  = iops->key_size(env, it);
2026
2027                 /* IAM iterator can return record with zero len. */
2028                 if (len == 0)
2029                         goto next;
2030
2031                 name = (char *)iops->key(env, it);
2032                 hash = iops->store(env, it);
2033
2034                 if (unlikely(first)) {
2035                         first = 0;
2036                         *start = hash;
2037                 }
2038
2039                 recsize = lu_dirent_calc_size(len, attr);
2040
2041                 CDEBUG(D_INFO, "%p %p %d "LPU64" (%d) \"%*.*s\"\n",
2042                                 name, ent, nob, hash, len, len, len, name);
2043
2044                 if (nob >= recsize) {
2045                         ent->lde_hash    = cpu_to_le64(hash);
2046                         ent->lde_namelen = cpu_to_le16(len);
2047                         ent->lde_reclen  = cpu_to_le16(recsize);
2048                         memcpy(ent->lde_name, name, len);
2049
2050                         result = mdd_append_attrs(env, mdd, attr, iops, it, ent);
2051                         if (result != 0)
2052                                 goto out;
2053                 } else {
2054                         /*
2055                          * record doesn't fit into page, enlarge previous one.
2056                          */
2057                         if (*last) {
2058                                 (*last)->lde_reclen =
2059                                         cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
2060                                                         nob);
2061                                 result = 0;
2062                         } else
2063                                 result = -EINVAL;
2064
2065                         goto out;
2066                 }
2067                 *last = ent;
2068                 ent = (void *)ent + recsize;
2069                 nob -= recsize;
2070
2071 next:
2072                 result = iops->next(env, it);
2073         } while (result == 0);
2074
2075 out:
2076         *end = hash;
2077         return result;
2078 }
2079
2080 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2081                           const struct lu_rdpg *rdpg)
2082 {
2083         struct dt_it      *it;
2084         struct dt_object  *next = mdd_object_child(obj);
2085         const struct dt_it_ops  *iops;
2086         struct page       *pg;
2087         struct lu_dirent  *last = NULL;
2088         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2089         int i;
2090         int rc;
2091         int nob;
2092         __u64 hash_start;
2093         __u64 hash_end = 0;
2094
2095         LASSERT(rdpg->rp_pages != NULL);
2096         LASSERT(next->do_index_ops != NULL);
2097
2098         if (rdpg->rp_count <= 0)
2099                 return -EFAULT;
2100
2101         /*
2102          * iterate through directory and fill pages from @rdpg
2103          */
2104         iops = &next->do_index_ops->dio_it;
2105         it = iops->init(env, next, mdd_object_capa(env, obj));
2106         if (IS_ERR(it))
2107                 return PTR_ERR(it);
2108
2109         rc = iops->load(env, it, rdpg->rp_hash);
2110
2111         if (rc == 0)
2112                 /*
2113                  * Iterator didn't find record with exactly the key requested.
2114                  *
2115                  * It is currently either
2116                  *
2117                  *     - positioned above record with key less than
2118                  *     requested---skip it.
2119                  *
2120                  *     - or not positioned at all (is in IAM_IT_SKEWED
2121                  *     state)---position it on the next item.
2122                  */
2123                 rc = iops->next(env, it);
2124         else if (rc > 0)
2125                 rc = 0;
2126
2127         /*
2128          * At this point and across for-loop:
2129          *
2130          *  rc == 0 -> ok, proceed.
2131          *  rc >  0 -> end of directory.
2132          *  rc <  0 -> error.
2133          */
2134         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2135              i++, nob -= CFS_PAGE_SIZE) {
2136                 LASSERT(i < rdpg->rp_npages);
2137                 pg = rdpg->rp_pages[i];
2138                 rc = mdd_dir_page_build(env, mdd, !i, cfs_kmap(pg),
2139                                         min_t(int, nob, CFS_PAGE_SIZE), iops,
2140                                         it, &hash_start, &hash_end, &last,
2141                                         rdpg->rp_attrs);
2142                 if (rc != 0 || i == rdpg->rp_npages - 1) {
2143                         if (last)
2144                                 last->lde_reclen = 0;
2145                 }
2146                 cfs_kunmap(pg);
2147         }
2148         if (rc > 0) {
2149                 /*
2150                  * end of directory.
2151                  */
2152                 hash_end = DIR_END_OFF;
2153                 rc = 0;
2154         }
2155         if (rc == 0) {
2156                 struct lu_dirpage *dp;
2157
2158                 dp = cfs_kmap(rdpg->rp_pages[0]);
2159                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2160                 dp->ldp_hash_end   = cpu_to_le64(hash_end);
2161                 if (i == 0)
2162                         /*
2163                          * No pages were processed, mark this.
2164                          */
2165                         dp->ldp_flags |= LDF_EMPTY;
2166
2167                 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2168                 cfs_kunmap(rdpg->rp_pages[0]);
2169         }
2170         iops->put(env, it);
2171         iops->fini(env, it);
2172
2173         return rc;
2174 }
2175
2176 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2177                  const struct lu_rdpg *rdpg)
2178 {
2179         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2180         int rc;
2181         ENTRY;
2182
2183         LASSERT(mdd_object_exists(mdd_obj));
2184
2185         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2186         rc = mdd_readpage_sanity_check(env, mdd_obj);
2187         if (rc)
2188                 GOTO(out_unlock, rc);
2189
2190         if (mdd_is_dead_obj(mdd_obj)) {
2191                 struct page *pg;
2192                 struct lu_dirpage *dp;
2193
2194                 /*
2195                  * According to POSIX, please do not return any entry to client:
2196                  * even dot and dotdot should not be returned.
2197                  */
2198                 CWARN("readdir from dead object: "DFID"\n",
2199                         PFID(mdd_object_fid(mdd_obj)));
2200
2201                 if (rdpg->rp_count <= 0)
2202                         GOTO(out_unlock, rc = -EFAULT);
2203                 LASSERT(rdpg->rp_pages != NULL);
2204
2205                 pg = rdpg->rp_pages[0];
2206                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2207                 memset(dp, 0 , sizeof(struct lu_dirpage));
2208                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2209                 dp->ldp_hash_end   = cpu_to_le64(DIR_END_OFF);
2210                 dp->ldp_flags |= LDF_EMPTY;
2211                 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2212                 cfs_kunmap(pg);
2213                 GOTO(out_unlock, rc = 0);
2214         }
2215
2216         rc = __mdd_readpage(env, mdd_obj, rdpg);
2217
2218         EXIT;
2219 out_unlock:
2220         mdd_read_unlock(env, mdd_obj);
2221         return rc;
2222 }
2223
2224 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2225 {
2226         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2227         struct dt_object *next;
2228
2229         LASSERT(mdd_object_exists(mdd_obj));
2230         next = mdd_object_child(mdd_obj);
2231         return next->do_ops->do_object_sync(env, next);
2232 }
2233
2234 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2235                                         struct md_object *obj)
2236 {
2237         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2238
2239         LASSERT(mdd_object_exists(mdd_obj));
2240         return do_version_get(env, mdd_object_child(mdd_obj));
2241 }
2242
2243 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2244                             dt_obj_version_t version)
2245 {
2246         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2247
2248         LASSERT(mdd_object_exists(mdd_obj));
2249         return do_version_set(env, mdd_object_child(mdd_obj), version);
2250 }
2251
2252 const struct md_object_operations mdd_obj_ops = {
2253         .moo_permission    = mdd_permission,
2254         .moo_attr_get      = mdd_attr_get,
2255         .moo_attr_set      = mdd_attr_set,
2256         .moo_xattr_get     = mdd_xattr_get,
2257         .moo_xattr_set     = mdd_xattr_set,
2258         .moo_xattr_list    = mdd_xattr_list,
2259         .moo_xattr_del     = mdd_xattr_del,
2260         .moo_object_create = mdd_object_create,
2261         .moo_ref_add       = mdd_ref_add,
2262         .moo_ref_del       = mdd_ref_del,
2263         .moo_open          = mdd_open,
2264         .moo_close         = mdd_close,
2265         .moo_readpage      = mdd_readpage,
2266         .moo_readlink      = mdd_readlink,
2267         .moo_capa_get      = mdd_capa_get,
2268         .moo_object_sync   = mdd_object_sync,
2269         .moo_version_get   = mdd_version_get,
2270         .moo_version_set   = mdd_version_set,
2271         .moo_path          = mdd_path,
2272 };