Whamcloud - gitweb
b=17670
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_object.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49 #include <linux/jbd.h>
50 #include <obd.h>
51 #include <obd_class.h>
52 #include <obd_support.h>
53 #include <lprocfs_status.h>
54 /* fid_be_cpu(), fid_cpu_to_be(). */
55 #include <lustre_fid.h>
56
57 #include <lustre_param.h>
58 #include <linux/ldiskfs_fs.h>
59 #include <lustre_mds.h>
60 #include <lustre/lustre_idl.h>
61
62 #include "mdd_internal.h"
63
64 static const struct lu_object_operations mdd_lu_obj_ops;
65
66 static int mdd_xattr_get(const struct lu_env *env,
67                          struct md_object *obj, struct lu_buf *buf,
68                          const char *name);
69
70 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
71                  void **data)
72 {
73         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
74                  PFID(mdd_object_fid(obj)));
75         mdo_data_get(env, obj, data);
76         return 0;
77 }
78
79 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
80                struct lu_attr *la, struct lustre_capa *capa)
81 {
82         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
83                  PFID(mdd_object_fid(obj)));
84         return mdo_attr_get(env, obj, la, capa);
85 }
86
87 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
88 {
89         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
90
91         if (flags & LUSTRE_APPEND_FL)
92                 obj->mod_flags |= APPEND_OBJ;
93
94         if (flags & LUSTRE_IMMUTABLE_FL)
95                 obj->mod_flags |= IMMUTE_OBJ;
96 }
97
98 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
99 {
100         struct mdd_thread_info *info;
101
102         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
103         LASSERT(info != NULL);
104         return info;
105 }
106
107 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
108 {
109         struct lu_buf *buf;
110
111         buf = &mdd_env_info(env)->mti_buf;
112         buf->lb_buf = area;
113         buf->lb_len = len;
114         return buf;
115 }
116
117 void mdd_buf_put(struct lu_buf *buf)
118 {
119         if (buf == NULL || buf->lb_buf == NULL)
120                 return;
121         if (buf->lb_vmalloc)
122                 OBD_VFREE(buf->lb_buf, buf->lb_len);
123         else
124                 OBD_FREE(buf->lb_buf, buf->lb_len);
125         buf->lb_buf = NULL;
126 }
127
128 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
129                                        const void *area, ssize_t len)
130 {
131         struct lu_buf *buf;
132
133         buf = &mdd_env_info(env)->mti_buf;
134         buf->lb_buf = (void *)area;
135         buf->lb_len = len;
136         return buf;
137 }
138
139 #define BUF_VMALLOC_SIZE (CFS_PAGE_SIZE<<2) /* 16k */
140 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
141 {
142         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
143
144         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
145                 if (buf->lb_vmalloc)
146                         OBD_VFREE(buf->lb_buf, buf->lb_len);
147                 else
148                         OBD_FREE(buf->lb_buf, buf->lb_len);
149                 buf->lb_buf = NULL;
150         }
151         if (buf->lb_buf == NULL) {
152                 buf->lb_len = len;
153                 if (buf->lb_len <= BUF_VMALLOC_SIZE) {
154                         OBD_ALLOC(buf->lb_buf, buf->lb_len);
155                         buf->lb_vmalloc = 0;
156                 }
157                 if (buf->lb_buf == NULL) {
158                         OBD_VMALLOC(buf->lb_buf, buf->lb_len);
159                         buf->lb_vmalloc = 1;
160                 }
161                 if (buf->lb_buf == NULL)
162                         buf->lb_len = 0;
163         }
164         return buf;
165 }
166
167 /** Increase the size of the \a mti_big_buf.
168  * preserves old data in buffer
169  * old buffer remains unchanged on error
170  * \retval 0 or -ENOMEM
171  */
172 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
173 {
174         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
175         struct lu_buf buf;
176
177         LASSERT(len >= oldbuf->lb_len);
178         if (len > BUF_VMALLOC_SIZE) {
179                 OBD_VMALLOC(buf.lb_buf, len);
180                 buf.lb_vmalloc = 1;
181         } else {
182                 OBD_ALLOC(buf.lb_buf, len);
183                 buf.lb_vmalloc = 0;
184         }
185         if (buf.lb_buf == NULL)
186                 return -ENOMEM;
187
188         buf.lb_len = len;
189         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
190
191         if (oldbuf->lb_vmalloc)
192                 OBD_VFREE(oldbuf->lb_buf, oldbuf->lb_len);
193         else
194                 OBD_FREE(oldbuf->lb_buf, oldbuf->lb_len);
195
196         memcpy(oldbuf, &buf, sizeof(buf));
197
198         return 0;
199 }
200
201 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
202                                        struct mdd_device *mdd)
203 {
204         struct mdd_thread_info *mti = mdd_env_info(env);
205         int                     max_cookie_size;
206
207         max_cookie_size = mdd_lov_cookiesize(env, mdd);
208         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
209                 if (mti->mti_max_cookie)
210                         OBD_FREE(mti->mti_max_cookie, mti->mti_max_cookie_size);
211                 mti->mti_max_cookie = NULL;
212                 mti->mti_max_cookie_size = 0;
213         }
214         if (unlikely(mti->mti_max_cookie == NULL)) {
215                 OBD_ALLOC(mti->mti_max_cookie, max_cookie_size);
216                 if (likely(mti->mti_max_cookie != NULL))
217                         mti->mti_max_cookie_size = max_cookie_size;
218         }
219         if (likely(mti->mti_max_cookie != NULL))
220                 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
221         return mti->mti_max_cookie;
222 }
223
224 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
225                                    struct mdd_device *mdd)
226 {
227         struct mdd_thread_info *mti = mdd_env_info(env);
228         int                     max_lmm_size;
229
230         max_lmm_size = mdd_lov_mdsize(env, mdd);
231         if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
232                 if (mti->mti_max_lmm)
233                         OBD_FREE(mti->mti_max_lmm, mti->mti_max_lmm_size);
234                 mti->mti_max_lmm = NULL;
235                 mti->mti_max_lmm_size = 0;
236         }
237         if (unlikely(mti->mti_max_lmm == NULL)) {
238                 OBD_ALLOC(mti->mti_max_lmm, max_lmm_size);
239                 if (unlikely(mti->mti_max_lmm != NULL))
240                         mti->mti_max_lmm_size = max_lmm_size;
241         }
242         return mti->mti_max_lmm;
243 }
244
245 struct lu_object *mdd_object_alloc(const struct lu_env *env,
246                                    const struct lu_object_header *hdr,
247                                    struct lu_device *d)
248 {
249         struct mdd_object *mdd_obj;
250
251         OBD_ALLOC_PTR(mdd_obj);
252         if (mdd_obj != NULL) {
253                 struct lu_object *o;
254
255                 o = mdd2lu_obj(mdd_obj);
256                 lu_object_init(o, NULL, d);
257                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
258                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
259                 mdd_obj->mod_count = 0;
260                 o->lo_ops = &mdd_lu_obj_ops;
261                 return o;
262         } else {
263                 return NULL;
264         }
265 }
266
267 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
268                            const struct lu_object_conf *unused)
269 {
270         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
271         struct mdd_object *mdd_obj = lu2mdd_obj(o);
272         struct lu_object  *below;
273         struct lu_device  *under;
274         ENTRY;
275
276         mdd_obj->mod_cltime = 0;
277         under = &d->mdd_child->dd_lu_dev;
278         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
279         mdd_pdlock_init(mdd_obj);
280         if (below == NULL)
281                 RETURN(-ENOMEM);
282
283         lu_object_add(o, below);
284
285         RETURN(0);
286 }
287
288 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
289 {
290         if (lu_object_exists(o))
291                 return mdd_get_flags(env, lu2mdd_obj(o));
292         else
293                 return 0;
294 }
295
296 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
297 {
298         struct mdd_object *mdd = lu2mdd_obj(o);
299
300         lu_object_fini(o);
301         OBD_FREE_PTR(mdd);
302 }
303
304 static int mdd_object_print(const struct lu_env *env, void *cookie,
305                             lu_printer_t p, const struct lu_object *o)
306 {
307         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
308         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
309                     "valid=%x, cltime=%llu, flags=%lx)",
310                     mdd, mdd->mod_count, mdd->mod_valid,
311                     mdd->mod_cltime, mdd->mod_flags);
312 }
313
314 static const struct lu_object_operations mdd_lu_obj_ops = {
315         .loo_object_init    = mdd_object_init,
316         .loo_object_start   = mdd_object_start,
317         .loo_object_free    = mdd_object_free,
318         .loo_object_print   = mdd_object_print,
319 };
320
321 struct mdd_object *mdd_object_find(const struct lu_env *env,
322                                    struct mdd_device *d,
323                                    const struct lu_fid *f)
324 {
325         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
326 }
327
328 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
329                         const char *path, struct lu_fid *fid)
330 {
331         struct lu_buf *buf;
332         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
333         struct mdd_object *obj;
334         struct lu_name *lname = &mdd_env_info(env)->mti_name;
335         char *name;
336         int rc = 0;
337         ENTRY;
338
339         /* temp buffer for path element */
340         buf = mdd_buf_alloc(env, PATH_MAX);
341         if (buf->lb_buf == NULL)
342                 RETURN(-ENOMEM);
343
344         lname->ln_name = name = buf->lb_buf;
345         lname->ln_namelen = 0;
346         *f = mdd->mdd_root_fid;
347
348         while(1) {
349                 while (*path == '/')
350                         path++;
351                 if (*path == '\0')
352                         break;
353                 while (*path != '/' && *path != '\0') {
354                         *name = *path;
355                         path++;
356                         name++;
357                         lname->ln_namelen++;
358                 }
359
360                 *name = '\0';
361                 /* find obj corresponding to fid */
362                 obj = mdd_object_find(env, mdd, f);
363                 if (obj == NULL)
364                         GOTO(out, rc = -EREMOTE);
365                 if (IS_ERR(obj))
366                         GOTO(out, rc = -PTR_ERR(obj));
367                 /* get child fid from parent and name */
368                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
369                 mdd_object_put(env, obj);
370                 if (rc)
371                         break;
372
373                 name = buf->lb_buf;
374                 lname->ln_namelen = 0;
375         }
376
377         if (!rc)
378                 *fid = *f;
379 out:
380         RETURN(rc);
381 }
382
383 /** The maximum depth that fid2path() will search.
384  * This is limited only because we want to store the fids for
385  * historical path lookup purposes.
386  */
387 #define MAX_PATH_DEPTH 100
388
389 /** mdd_path() lookup structure. */
390 struct path_lookup_info {
391         __u64                pli_recno;        /**< history point */
392         __u64                pli_currec;       /**< current record */
393         struct lu_fid        pli_fid;
394         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
395         struct mdd_object   *pli_mdd_obj;
396         char                *pli_path;         /**< full path */
397         int                  pli_pathlen;
398         int                  pli_linkno;       /**< which hardlink to follow */
399         int                  pli_fidcount;     /**< number of \a pli_fids */
400 };
401
402 static int mdd_path_current(const struct lu_env *env,
403                             struct path_lookup_info *pli)
404 {
405         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
406         struct mdd_object *mdd_obj;
407         struct lu_buf     *buf = NULL;
408         struct link_ea_header *leh;
409         struct link_ea_entry  *lee;
410         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
411         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
412         char *ptr;
413         int reclen;
414         int rc;
415         ENTRY;
416
417         ptr = pli->pli_path + pli->pli_pathlen - 1;
418         *ptr = 0;
419         --ptr;
420         pli->pli_fidcount = 0;
421         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
422
423         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
424                 mdd_obj = mdd_object_find(env, mdd,
425                                           &pli->pli_fids[pli->pli_fidcount]);
426                 if (mdd_obj == NULL)
427                         GOTO(out, rc = -EREMOTE);
428                 if (IS_ERR(mdd_obj))
429                         GOTO(out, rc = -PTR_ERR(mdd_obj));
430                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
431                 if (rc <= 0) {
432                         mdd_object_put(env, mdd_obj);
433                         if (rc == -1)
434                                 rc = -EREMOTE;
435                         else if (rc == 0)
436                                 /* Do I need to error out here? */
437                                 rc = -ENOENT;
438                         GOTO(out, rc);
439                 }
440
441                 /* Get parent fid and object name */
442                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
443                 buf = mdd_links_get(env, mdd_obj);
444                 mdd_read_unlock(env, mdd_obj);
445                 mdd_object_put(env, mdd_obj);
446                 if (IS_ERR(buf))
447                         GOTO(out, rc = PTR_ERR(buf));
448
449                 leh = buf->lb_buf;
450                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
451                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
452
453                 /* If set, use link #linkno for path lookup, otherwise use
454                    link #0.  Only do this for the final path element. */
455                 if ((pli->pli_fidcount == 0) &&
456                     (pli->pli_linkno < leh->leh_reccount)) {
457                         int count;
458                         for (count = 0; count < pli->pli_linkno; count++) {
459                                 lee = (struct link_ea_entry *)
460                                      ((char *)lee + reclen);
461                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
462                         }
463                         if (pli->pli_linkno < leh->leh_reccount - 1)
464                                 /* indicate to user there are more links */
465                                 pli->pli_linkno++;
466                 }
467
468                 /* Pack the name in the end of the buffer */
469                 ptr -= tmpname->ln_namelen;
470                 if (ptr - 1 <= pli->pli_path)
471                         GOTO(out, rc = -EOVERFLOW);
472                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
473                 *(--ptr) = '/';
474
475                 /* Store the parent fid for historic lookup */
476                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
477                         GOTO(out, rc = -EOVERFLOW);
478                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
479         }
480
481         /* Verify that our path hasn't changed since we started the lookup.
482            Record the current index, and verify the path resolves to the
483            same fid. If it does, then the path is correct as of this index. */
484         spin_lock(&mdd->mdd_cl.mc_lock);
485         pli->pli_currec = mdd->mdd_cl.mc_index;
486         spin_unlock(&mdd->mdd_cl.mc_lock);
487         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
488         if (rc) {
489                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
490                 GOTO (out, rc = -EAGAIN);
491         }
492         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
493                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
494                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
495                        PFID(&pli->pli_fid));
496                 GOTO(out, rc = -EAGAIN);
497         }
498
499         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
500
501         EXIT;
502 out:
503         if (buf && !IS_ERR(buf) && buf->lb_vmalloc)
504                 /* if we vmalloced a large buffer drop it */
505                 mdd_buf_put(buf);
506
507         return rc;
508 }
509
510 static int mdd_path_historic(const struct lu_env *env,
511                              struct path_lookup_info *pli)
512 {
513         return 0;
514 }
515
516 /* Returns the full path to this fid, as of changelog record recno. */
517 static int mdd_path(const struct lu_env *env, struct md_object *obj,
518                     char *path, int pathlen, __u64 *recno, int *linkno)
519 {
520         struct path_lookup_info *pli;
521         int tries = 3;
522         int rc = -EAGAIN;
523         ENTRY;
524
525         if (pathlen < 3)
526                 RETURN(-EOVERFLOW);
527
528         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
529                 path[0] = '/';
530                 path[1] = '\0';
531                 RETURN(0);
532         }
533
534         OBD_ALLOC_PTR(pli);
535         if (pli == NULL)
536                 RETURN(-ENOMEM);
537
538         pli->pli_mdd_obj = md2mdd_obj(obj);
539         pli->pli_recno = *recno;
540         pli->pli_path = path;
541         pli->pli_pathlen = pathlen;
542         pli->pli_linkno = *linkno;
543
544         /* Retry multiple times in case file is being moved */
545         while (tries-- && rc == -EAGAIN)
546                 rc = mdd_path_current(env, pli);
547
548         /* For historical path lookup, the current links may not have existed
549          * at "recno" time.  We must switch over to earlier links/parents
550          * by using the changelog records.  If the earlier parent doesn't
551          * exist, we must search back through the changelog to reconstruct
552          * its parents, then check if it exists, etc.
553          * We may ignore this problem for the initial implementation and
554          * state that an "original" hardlink must still exist for us to find
555          * historic path name. */
556         if (pli->pli_recno != -1) {
557                 rc = mdd_path_historic(env, pli);
558         } else {
559                 *recno = pli->pli_currec;
560                 /* Return next link index to caller */
561                 *linkno = pli->pli_linkno;
562         }
563
564         OBD_FREE_PTR(pli);
565
566         RETURN (rc);
567 }
568
569 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
570 {
571         struct lu_attr *la = &mdd_env_info(env)->mti_la;
572         int rc;
573
574         ENTRY;
575         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
576         if (rc == 0) {
577                 mdd_flags_xlate(obj, la->la_flags);
578                 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
579                         obj->mod_flags |= MNLINK_OBJ;
580         }
581         RETURN(rc);
582 }
583
584 /* get only inode attributes */
585 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
586                   struct md_attr *ma)
587 {
588         int rc = 0;
589         ENTRY;
590
591         if (ma->ma_valid & MA_INODE)
592                 RETURN(0);
593
594         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
595                           mdd_object_capa(env, mdd_obj));
596         if (rc == 0)
597                 ma->ma_valid |= MA_INODE;
598         RETURN(rc);
599 }
600
601 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm,
602                        int *size)
603 {
604         struct lov_desc *ldesc;
605         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
606         ENTRY;
607
608         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
609         LASSERT(ldesc != NULL);
610
611         if (!lmm)
612                 RETURN(0);
613
614         lmm->lmm_magic = LOV_MAGIC_V1;
615         lmm->lmm_object_gr = LOV_OBJECT_GROUP_DEFAULT;
616         lmm->lmm_pattern = ldesc->ld_pattern;
617         lmm->lmm_stripe_size = ldesc->ld_default_stripe_size;
618         lmm->lmm_stripe_count = ldesc->ld_default_stripe_count;
619         *size = sizeof(struct lov_mds_md);
620
621         RETURN(sizeof(struct lov_mds_md));
622 }
623
624 /* get lov EA only */
625 static int __mdd_lmm_get(const struct lu_env *env,
626                          struct mdd_object *mdd_obj, struct md_attr *ma)
627 {
628         int rc;
629         ENTRY;
630
631         if (ma->ma_valid & MA_LOV)
632                 RETURN(0);
633
634         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
635                         XATTR_NAME_LOV);
636
637         if (rc == 0 && (ma->ma_need & MA_LOV_DEF)) {
638                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm,
639                                 &ma->ma_lmm_size);
640         }
641
642         if (rc > 0) {
643                 ma->ma_valid |= MA_LOV;
644                 rc = 0;
645         }
646         RETURN(rc);
647 }
648
649 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
650                        struct md_attr *ma)
651 {
652         int rc;
653         ENTRY;
654
655         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
656         rc = __mdd_lmm_get(env, mdd_obj, ma);
657         mdd_read_unlock(env, mdd_obj);
658         RETURN(rc);
659 }
660
661 /* get lmv EA only*/
662 static int __mdd_lmv_get(const struct lu_env *env,
663                          struct mdd_object *mdd_obj, struct md_attr *ma)
664 {
665         int rc;
666         ENTRY;
667
668         if (ma->ma_valid & MA_LMV)
669                 RETURN(0);
670
671         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
672                         XATTR_NAME_LMV);
673         if (rc > 0) {
674                 ma->ma_valid |= MA_LMV;
675                 rc = 0;
676         }
677         RETURN(rc);
678 }
679
680 static int mdd_attr_get_internal(const struct lu_env *env,
681                                  struct mdd_object *mdd_obj,
682                                  struct md_attr *ma)
683 {
684         int rc = 0;
685         ENTRY;
686
687         if (ma->ma_need & MA_INODE)
688                 rc = mdd_iattr_get(env, mdd_obj, ma);
689
690         if (rc == 0 && ma->ma_need & MA_LOV) {
691                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
692                     S_ISDIR(mdd_object_type(mdd_obj)))
693                         rc = __mdd_lmm_get(env, mdd_obj, ma);
694         }
695         if (rc == 0 && ma->ma_need & MA_LMV) {
696                 if (S_ISDIR(mdd_object_type(mdd_obj)))
697                         rc = __mdd_lmv_get(env, mdd_obj, ma);
698         }
699 #ifdef CONFIG_FS_POSIX_ACL
700         if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
701                 if (S_ISDIR(mdd_object_type(mdd_obj)))
702                         rc = mdd_def_acl_get(env, mdd_obj, ma);
703         }
704 #endif
705         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
706                rc, ma->ma_valid);
707         RETURN(rc);
708 }
709
710 int mdd_attr_get_internal_locked(const struct lu_env *env,
711                                  struct mdd_object *mdd_obj, struct md_attr *ma)
712 {
713         int rc;
714         int needlock = ma->ma_need & (MA_LOV | MA_LMV | MA_ACL_DEF);
715
716         if (needlock)
717                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
718         rc = mdd_attr_get_internal(env, mdd_obj, ma);
719         if (needlock)
720                 mdd_read_unlock(env, mdd_obj);
721         return rc;
722 }
723
724 /*
725  * No permission check is needed.
726  */
727 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
728                         struct md_attr *ma)
729 {
730         struct mdd_object *mdd_obj = md2mdd_obj(obj);
731         int                rc;
732
733         ENTRY;
734         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
735         RETURN(rc);
736 }
737
738 /*
739  * No permission check is needed.
740  */
741 static int mdd_xattr_get(const struct lu_env *env,
742                          struct md_object *obj, struct lu_buf *buf,
743                          const char *name)
744 {
745         struct mdd_object *mdd_obj = md2mdd_obj(obj);
746         int rc;
747
748         ENTRY;
749
750         LASSERT(mdd_object_exists(mdd_obj));
751
752         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
753         rc = mdo_xattr_get(env, mdd_obj, buf, name,
754                            mdd_object_capa(env, mdd_obj));
755         mdd_read_unlock(env, mdd_obj);
756
757         RETURN(rc);
758 }
759
760 /*
761  * Permission check is done when open,
762  * no need check again.
763  */
764 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
765                         struct lu_buf *buf)
766 {
767         struct mdd_object *mdd_obj = md2mdd_obj(obj);
768         struct dt_object  *next;
769         loff_t             pos = 0;
770         int                rc;
771         ENTRY;
772
773         LASSERT(mdd_object_exists(mdd_obj));
774
775         next = mdd_object_child(mdd_obj);
776         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
777         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
778                                          mdd_object_capa(env, mdd_obj));
779         mdd_read_unlock(env, mdd_obj);
780         RETURN(rc);
781 }
782
783 /*
784  * No permission check is needed.
785  */
786 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
787                           struct lu_buf *buf)
788 {
789         struct mdd_object *mdd_obj = md2mdd_obj(obj);
790         int rc;
791
792         ENTRY;
793
794         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
795         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
796         mdd_read_unlock(env, mdd_obj);
797
798         RETURN(rc);
799 }
800
801 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
802                                struct mdd_object *c, struct md_attr *ma,
803                                struct thandle *handle,
804                                const struct md_op_spec *spec)
805 {
806         struct lu_attr *attr = &ma->ma_attr;
807         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
808         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
809         const struct dt_index_features *feat = spec->sp_feat;
810         int rc;
811         ENTRY;
812
813         if (!mdd_object_exists(c)) {
814                 struct dt_object *next = mdd_object_child(c);
815                 LASSERT(next);
816
817                 if (feat != &dt_directory_features && feat != NULL)
818                         dof->dof_type = DFT_INDEX;
819                 else
820                         dof->dof_type = dt_mode_to_dft(attr->la_mode);
821
822                 dof->u.dof_idx.di_feat = feat;
823
824                 /* @hint will be initialized by underlying device. */
825                 next->do_ops->do_ah_init(env, hint,
826                                          p ? mdd_object_child(p) : NULL,
827                                          attr->la_mode & S_IFMT);
828
829                 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
830                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
831         } else
832                 rc = -EEXIST;
833
834         RETURN(rc);
835 }
836
837 /**
838  * Make sure the ctime is increased only.
839  */
840 static inline int mdd_attr_check(const struct lu_env *env,
841                                  struct mdd_object *obj,
842                                  struct lu_attr *attr)
843 {
844         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
845         int rc;
846         ENTRY;
847
848         if (attr->la_valid & LA_CTIME) {
849                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
850                 if (rc)
851                         RETURN(rc);
852
853                 if (attr->la_ctime < tmp_la->la_ctime)
854                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
855                 else if (attr->la_valid == LA_CTIME &&
856                          attr->la_ctime == tmp_la->la_ctime)
857                         attr->la_valid &= ~LA_CTIME;
858         }
859         RETURN(0);
860 }
861
862 int mdd_attr_set_internal(const struct lu_env *env,
863                           struct mdd_object *obj,
864                           struct lu_attr *attr,
865                           struct thandle *handle,
866                           int needacl)
867 {
868         int rc;
869         ENTRY;
870
871         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
872 #ifdef CONFIG_FS_POSIX_ACL
873         if (!rc && (attr->la_valid & LA_MODE) && needacl)
874                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
875 #endif
876         RETURN(rc);
877 }
878
879 int mdd_attr_check_set_internal(const struct lu_env *env,
880                                 struct mdd_object *obj,
881                                 struct lu_attr *attr,
882                                 struct thandle *handle,
883                                 int needacl)
884 {
885         int rc;
886         ENTRY;
887
888         rc = mdd_attr_check(env, obj, attr);
889         if (rc)
890                 RETURN(rc);
891
892         if (attr->la_valid)
893                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
894         RETURN(rc);
895 }
896
897 static int mdd_attr_set_internal_locked(const struct lu_env *env,
898                                         struct mdd_object *obj,
899                                         struct lu_attr *attr,
900                                         struct thandle *handle,
901                                         int needacl)
902 {
903         int rc;
904         ENTRY;
905
906         needacl = needacl && (attr->la_valid & LA_MODE);
907         if (needacl)
908                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
909         rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
910         if (needacl)
911                 mdd_write_unlock(env, obj);
912         RETURN(rc);
913 }
914
915 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
916                                        struct mdd_object *obj,
917                                        struct lu_attr *attr,
918                                        struct thandle *handle,
919                                        int needacl)
920 {
921         int rc;
922         ENTRY;
923
924         needacl = needacl && (attr->la_valid & LA_MODE);
925         if (needacl)
926                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
927         rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
928         if (needacl)
929                 mdd_write_unlock(env, obj);
930         RETURN(rc);
931 }
932
933 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
934                     const struct lu_buf *buf, const char *name,
935                     int fl, struct thandle *handle)
936 {
937         struct lustre_capa *capa = mdd_object_capa(env, obj);
938         int rc = -EINVAL;
939         ENTRY;
940
941         if (buf->lb_buf && buf->lb_len > 0)
942                 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
943         else if (buf->lb_buf == NULL && buf->lb_len == 0)
944                 rc = mdo_xattr_del(env, obj, name, handle, capa);
945
946         RETURN(rc);
947 }
948
949 /*
950  * This gives the same functionality as the code between
951  * sys_chmod and inode_setattr
952  * chown_common and inode_setattr
953  * utimes and inode_setattr
954  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
955  */
956 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
957                         struct lu_attr *la, const struct md_attr *ma)
958 {
959         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
960         struct md_ucred  *uc         = md_ucred(env);
961         int               rc;
962         ENTRY;
963
964         if (!la->la_valid)
965                 RETURN(0);
966
967         /* Do not permit change file type */
968         if (la->la_valid & LA_TYPE)
969                 RETURN(-EPERM);
970
971         /* They should not be processed by setattr */
972         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
973                 RETURN(-EPERM);
974
975         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
976         if (rc)
977                 RETURN(rc);
978
979         if (la->la_valid == LA_CTIME) {
980                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
981                         /* This is only for set ctime when rename's source is
982                          * on remote MDS. */
983                         rc = mdd_may_delete(env, NULL, obj,
984                                             (struct md_attr *)ma, 1, 0);
985                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
986                         la->la_valid &= ~LA_CTIME;
987                 RETURN(rc);
988         }
989
990         if (la->la_valid == LA_ATIME) {
991                 /* This is atime only set for read atime update on close. */
992                 if (la->la_atime <= tmp_la->la_atime +
993                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff)
994                         la->la_valid &= ~LA_ATIME;
995                 RETURN(0);
996         }
997
998         /* Check if flags change. */
999         if (la->la_valid & LA_FLAGS) {
1000                 unsigned int oldflags = 0;
1001                 unsigned int newflags = la->la_flags &
1002                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1003
1004                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1005                     !mdd_capable(uc, CFS_CAP_FOWNER))
1006                         RETURN(-EPERM);
1007
1008                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1009                  * only be changed by the relevant capability. */
1010                 if (mdd_is_immutable(obj))
1011                         oldflags |= LUSTRE_IMMUTABLE_FL;
1012                 if (mdd_is_append(obj))
1013                         oldflags |= LUSTRE_APPEND_FL;
1014                 if ((oldflags ^ newflags) &&
1015                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1016                         RETURN(-EPERM);
1017
1018                 if (!S_ISDIR(tmp_la->la_mode))
1019                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1020         }
1021
1022         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1023             (la->la_valid & ~LA_FLAGS) &&
1024             !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1025                 RETURN(-EPERM);
1026
1027         /* Check for setting the obj time. */
1028         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1029             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1030                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1031                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
1032                         rc = mdd_permission_internal_locked(env, obj, tmp_la,
1033                                                             MAY_WRITE,
1034                                                             MOR_TGT_CHILD);
1035                         if (rc)
1036                                 RETURN(rc);
1037                 }
1038         }
1039
1040         /* Make sure a caller can chmod. */
1041         if (la->la_valid & LA_MODE) {
1042                 /* Bypass la_vaild == LA_MODE,
1043                  * this is for changing file with SUID or SGID. */
1044                 if ((la->la_valid & ~LA_MODE) &&
1045                     !(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1046                     (uc->mu_fsuid != tmp_la->la_uid) &&
1047                     !mdd_capable(uc, CFS_CAP_FOWNER))
1048                         RETURN(-EPERM);
1049
1050                 if (la->la_mode == (umode_t) -1)
1051                         la->la_mode = tmp_la->la_mode;
1052                 else
1053                         la->la_mode = (la->la_mode & S_IALLUGO) |
1054                                       (tmp_la->la_mode & ~S_IALLUGO);
1055
1056                 /* Also check the setgid bit! */
1057                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1058                                        la->la_gid : tmp_la->la_gid) &&
1059                     !mdd_capable(uc, CFS_CAP_FSETID))
1060                         la->la_mode &= ~S_ISGID;
1061         } else {
1062                la->la_mode = tmp_la->la_mode;
1063         }
1064
1065         /* Make sure a caller can chown. */
1066         if (la->la_valid & LA_UID) {
1067                 if (la->la_uid == (uid_t) -1)
1068                         la->la_uid = tmp_la->la_uid;
1069                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1070                     (la->la_uid != tmp_la->la_uid)) &&
1071                     !mdd_capable(uc, CFS_CAP_CHOWN))
1072                         RETURN(-EPERM);
1073
1074                 /* If the user or group of a non-directory has been
1075                  * changed by a non-root user, remove the setuid bit.
1076                  * 19981026 David C Niemi <niemi@tux.org>
1077                  *
1078                  * Changed this to apply to all users, including root,
1079                  * to avoid some races. This is the behavior we had in
1080                  * 2.0. The check for non-root was definitely wrong
1081                  * for 2.2 anyway, as it should have been using
1082                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
1083                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1084                     !S_ISDIR(tmp_la->la_mode)) {
1085                         la->la_mode &= ~S_ISUID;
1086                         la->la_valid |= LA_MODE;
1087                 }
1088         }
1089
1090         /* Make sure caller can chgrp. */
1091         if (la->la_valid & LA_GID) {
1092                 if (la->la_gid == (gid_t) -1)
1093                         la->la_gid = tmp_la->la_gid;
1094                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1095                     ((la->la_gid != tmp_la->la_gid) &&
1096                     !lustre_in_group_p(uc, la->la_gid))) &&
1097                     !mdd_capable(uc, CFS_CAP_CHOWN))
1098                         RETURN(-EPERM);
1099
1100                 /* Likewise, if the user or group of a non-directory
1101                  * has been changed by a non-root user, remove the
1102                  * setgid bit UNLESS there is no group execute bit
1103                  * (this would be a file marked for mandatory
1104                  * locking).  19981026 David C Niemi <niemi@tux.org>
1105                  *
1106                  * Removed the fsuid check (see the comment above) --
1107                  * 19990830 SD. */
1108                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1109                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1110                         la->la_mode &= ~S_ISGID;
1111                         la->la_valid |= LA_MODE;
1112                 }
1113         }
1114
1115         /* For both Size-on-MDS case and truncate case,
1116          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1117          * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1118          * For SOM case, it is true, the MAY_WRITE perm has been checked
1119          * when open, no need check again. For truncate case, it is false,
1120          * the MAY_WRITE perm should be checked here. */
1121         if (ma->ma_attr_flags & MDS_SOM) {
1122                 /* For the "Size-on-MDS" setattr update, merge coming
1123                  * attributes with the set in the inode. BUG 10641 */
1124                 if ((la->la_valid & LA_ATIME) &&
1125                     (la->la_atime <= tmp_la->la_atime))
1126                         la->la_valid &= ~LA_ATIME;
1127
1128                 /* OST attributes do not have a priority over MDS attributes,
1129                  * so drop times if ctime is equal. */
1130                 if ((la->la_valid & LA_CTIME) &&
1131                     (la->la_ctime <= tmp_la->la_ctime))
1132                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1133         } else {
1134                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1135                         if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1136                               (uc->mu_fsuid == tmp_la->la_uid)) &&
1137                             !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1138                                 rc = mdd_permission_internal_locked(env, obj,
1139                                                             tmp_la, MAY_WRITE,
1140                                                             MOR_TGT_CHILD);
1141                                 if (rc)
1142                                         RETURN(rc);
1143                         }
1144                 }
1145                 if (la->la_valid & LA_CTIME) {
1146                         /* The pure setattr, it has the priority over what is
1147                          * already set, do not drop it if ctime is equal. */
1148                         if (la->la_ctime < tmp_la->la_ctime)
1149                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1150                                                   LA_CTIME);
1151                 }
1152         }
1153
1154         RETURN(0);
1155 }
1156
1157 /** Store a data change changelog record
1158  * If this fails, we must fail the whole transaction; we don't
1159  * want the change to commit without the log entry.
1160  * \param mdd_obj - mdd_object of change
1161  * \param handle - transacion handle
1162  */
1163 static int mdd_changelog_data_store(const struct lu_env     *env,
1164                                     struct mdd_device       *mdd,
1165                                     enum changelog_rec_type type,
1166                                     struct mdd_object       *mdd_obj,
1167                                     struct thandle          *handle)
1168 {
1169         const struct lu_fid *tfid = mdo2fid(mdd_obj);
1170         struct llog_changelog_rec *rec;
1171         struct lu_buf *buf;
1172         int reclen;
1173         int rc;
1174
1175         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1176                 RETURN(0);
1177
1178         LASSERT(handle != NULL);
1179         LASSERT(mdd_obj != NULL);
1180
1181         if ((type == CL_SETATTR) &&
1182             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1183                 /* Don't need multiple updates in this log */
1184                 /* Don't check under lock - no big deal if we get an extra
1185                    entry */
1186                 RETURN(0);
1187         }
1188
1189         reclen = llog_data_len(sizeof(*rec));
1190         buf = mdd_buf_alloc(env, reclen);
1191         if (buf->lb_buf == NULL)
1192                 RETURN(-ENOMEM);
1193         rec = (struct llog_changelog_rec *)buf->lb_buf;
1194
1195         rec->cr_flags = CLF_VERSION;
1196         rec->cr_type = (__u32)type;
1197         rec->cr_tfid = *tfid;
1198         rec->cr_namelen = 0;
1199         mdd_obj->mod_cltime = cfs_time_current_64();
1200
1201         rc = mdd_changelog_llog_write(mdd, rec, handle);
1202         if (rc < 0) {
1203                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1204                        rc, type, PFID(tfid));
1205                 return -EFAULT;
1206         }
1207
1208         return 0;
1209 }
1210
1211 /* set attr and LOV EA at once, return updated attr */
1212 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1213                         const struct md_attr *ma)
1214 {
1215         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1216         struct mdd_device *mdd = mdo2mdd(obj);
1217         struct thandle *handle;
1218         struct lov_mds_md *lmm = NULL;
1219         struct llog_cookie *logcookies = NULL;
1220         int  rc, lmm_size = 0, cookie_size = 0;
1221         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1222 #ifdef HAVE_QUOTA_SUPPORT
1223         struct obd_device *obd = mdd->mdd_obd_dev;
1224         struct mds_obd *mds = &obd->u.mds;
1225         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1226         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1227         int quota_opc = 0, block_count = 0;
1228         int inode_pending[MAXQUOTAS] = { 0, 0 };
1229         int block_pending[MAXQUOTAS] = { 0, 0 };
1230 #endif
1231         ENTRY;
1232
1233         mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1234                                     MDD_TXN_ATTR_SET_OP);
1235         handle = mdd_trans_start(env, mdd);
1236         if (IS_ERR(handle))
1237                 RETURN(PTR_ERR(handle));
1238         /*TODO: add lock here*/
1239         /* start a log jounal handle if needed */
1240         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1241             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1242                 lmm_size = mdd_lov_mdsize(env, mdd);
1243                 lmm = mdd_max_lmm_get(env, mdd);
1244                 if (lmm == NULL)
1245                         GOTO(cleanup, rc = -ENOMEM);
1246
1247                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1248                                 XATTR_NAME_LOV);
1249
1250                 if (rc < 0)
1251                         GOTO(cleanup, rc);
1252         }
1253
1254         if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1255                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1256                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1257
1258         *la_copy = ma->ma_attr;
1259         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1260         if (rc)
1261                 GOTO(cleanup, rc);
1262
1263 #ifdef HAVE_QUOTA_SUPPORT
1264         if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1265                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1266
1267                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1268                 if (!rc) {
1269                         quota_opc = FSFILT_OP_SETATTR;
1270                         mdd_quota_wrapper(la_copy, qnids);
1271                         mdd_quota_wrapper(la_tmp, qoids);
1272                         /* get file quota for new owner */
1273                         lquota_chkquota(mds_quota_interface_ref, obd, qnids,
1274                                         inode_pending, 1, NULL, 0, NULL, 0);
1275                         block_count = (la_tmp->la_blocks + 7) >> 3;
1276                         if (block_count) {
1277                                 void *data = NULL;
1278                                 mdd_data_get(env, mdd_obj, &data);
1279                                 /* get block quota for new owner */
1280                                 lquota_chkquota(mds_quota_interface_ref, obd,
1281                                                 qnids, block_pending,
1282                                                 block_count, NULL,
1283                                                 LQUOTA_FLAGS_BLK, data, 1);
1284                         }
1285                 }
1286         }
1287 #endif
1288
1289         if (la_copy->la_valid & LA_FLAGS) {
1290                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1291                                                   handle, 1);
1292                 if (rc == 0)
1293                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1294         } else if (la_copy->la_valid) {            /* setattr */
1295                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1296                                                   handle, 1);
1297                 /* journal chown/chgrp in llog, just like unlink */
1298                 if (rc == 0 && lmm_size){
1299                         cookie_size = mdd_lov_cookiesize(env, mdd);
1300                         logcookies = mdd_max_cookie_get(env, mdd);
1301                         if (logcookies == NULL)
1302                                 GOTO(cleanup, rc = -ENOMEM);
1303
1304                         if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1305                                             logcookies, cookie_size) <= 0)
1306                                 logcookies = NULL;
1307                 }
1308         }
1309
1310         if (rc == 0 && ma->ma_valid & MA_LOV) {
1311                 umode_t mode;
1312
1313                 mode = mdd_object_type(mdd_obj);
1314                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1315                         rc = mdd_lsm_sanity_check(env, mdd_obj);
1316                         if (rc)
1317                                 GOTO(cleanup, rc);
1318
1319                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1320                                             ma->ma_lmm_size, handle, 1);
1321                 }
1322
1323         }
1324 cleanup:
1325         if ((rc == 0) && (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME)))
1326                 rc = mdd_changelog_data_store(env, mdd, CL_SETATTR, mdd_obj,
1327                                               handle);
1328         mdd_trans_stop(env, mdd, rc, handle);
1329         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1330                 /*set obd attr, if needed*/
1331                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1332                                            logcookies);
1333         }
1334 #ifdef HAVE_QUOTA_SUPPORT
1335         if (quota_opc) {
1336                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1337                                       inode_pending, 0);
1338                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1339                                       block_pending, 1);
1340                 /* Trigger dqrel/dqacq for original owner and new owner.
1341                  * If failed, the next call for lquota_chkquota will
1342                  * process it. */
1343                 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1344                               quota_opc);
1345         }
1346 #endif
1347         RETURN(rc);
1348 }
1349
1350 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1351                       const struct lu_buf *buf, const char *name, int fl,
1352                       struct thandle *handle)
1353 {
1354         int  rc;
1355         ENTRY;
1356
1357         mdd_write_lock(env, obj, MOR_TGT_CHILD);
1358         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1359         mdd_write_unlock(env, obj);
1360
1361         RETURN(rc);
1362 }
1363
1364 static int mdd_xattr_sanity_check(const struct lu_env *env,
1365                                   struct mdd_object *obj)
1366 {
1367         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1368         struct md_ucred *uc     = md_ucred(env);
1369         int rc;
1370         ENTRY;
1371
1372         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1373                 RETURN(-EPERM);
1374
1375         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1376         if (rc)
1377                 RETURN(rc);
1378
1379         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1380             !mdd_capable(uc, CFS_CAP_FOWNER))
1381                 RETURN(-EPERM);
1382
1383         RETURN(rc);
1384 }
1385
1386 /**
1387  * The caller should guarantee to update the object ctime
1388  * after xattr_set if needed.
1389  */
1390 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1391                          const struct lu_buf *buf, const char *name,
1392                          int fl)
1393 {
1394         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1395         struct mdd_device *mdd = mdo2mdd(obj);
1396         struct thandle *handle;
1397         int  rc;
1398         ENTRY;
1399
1400         rc = mdd_xattr_sanity_check(env, mdd_obj);
1401         if (rc)
1402                 RETURN(rc);
1403
1404         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1405         handle = mdd_trans_start(env, mdd);
1406         if (IS_ERR(handle))
1407                 RETURN(PTR_ERR(handle));
1408
1409         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1410
1411         /* Only record user xattr changes */
1412         if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1413             (strncmp("user.", name, 5) == 0))
1414                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1415                                               handle);
1416         mdd_trans_stop(env, mdd, rc, handle);
1417
1418         RETURN(rc);
1419 }
1420
1421 /**
1422  * The caller should guarantee to update the object ctime
1423  * after xattr_set if needed.
1424  */
1425 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1426                   const char *name)
1427 {
1428         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1429         struct mdd_device *mdd = mdo2mdd(obj);
1430         struct thandle *handle;
1431         int  rc;
1432         ENTRY;
1433
1434         rc = mdd_xattr_sanity_check(env, mdd_obj);
1435         if (rc)
1436                 RETURN(rc);
1437
1438         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1439         handle = mdd_trans_start(env, mdd);
1440         if (IS_ERR(handle))
1441                 RETURN(PTR_ERR(handle));
1442
1443         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1444         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1445                            mdd_object_capa(env, mdd_obj));
1446         mdd_write_unlock(env, mdd_obj);
1447
1448         /* Only record user xattr changes */
1449         if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1450             (strncmp("user.", name, 5) != 0))
1451                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1452                                               handle);
1453
1454         mdd_trans_stop(env, mdd, rc, handle);
1455
1456         RETURN(rc);
1457 }
1458
1459 /* partial unlink */
1460 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1461                        struct md_attr *ma)
1462 {
1463         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1464         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1465         struct mdd_device *mdd = mdo2mdd(obj);
1466         struct thandle *handle;
1467 #ifdef HAVE_QUOTA_SUPPORT
1468         struct obd_device *obd = mdd->mdd_obd_dev;
1469         struct mds_obd *mds = &obd->u.mds;
1470         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1471         int quota_opc = 0;
1472 #endif
1473         int rc;
1474         ENTRY;
1475
1476         /*
1477          * Check -ENOENT early here because we need to get object type
1478          * to calculate credits before transaction start
1479          */
1480         if (!mdd_object_exists(mdd_obj))
1481                 RETURN(-ENOENT);
1482
1483         LASSERT(mdd_object_exists(mdd_obj) > 0);
1484
1485         rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1486         if (rc)
1487                 RETURN(rc);
1488
1489         handle = mdd_trans_start(env, mdd);
1490         if (IS_ERR(handle))
1491                 RETURN(-ENOMEM);
1492
1493         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1494
1495         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1496         if (rc)
1497                 GOTO(cleanup, rc);
1498
1499         __mdd_ref_del(env, mdd_obj, handle, 0);
1500
1501         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1502                 /* unlink dot */
1503                 __mdd_ref_del(env, mdd_obj, handle, 1);
1504         }
1505
1506         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1507         la_copy->la_ctime = ma->ma_attr.la_ctime;
1508
1509         la_copy->la_valid = LA_CTIME;
1510         rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1511         if (rc)
1512                 GOTO(cleanup, rc);
1513
1514         rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1515 #ifdef HAVE_QUOTA_SUPPORT
1516         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1517             ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1518                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1519                 mdd_quota_wrapper(&ma->ma_attr, qids);
1520         }
1521 #endif
1522
1523
1524         EXIT;
1525 cleanup:
1526         mdd_write_unlock(env, mdd_obj);
1527         mdd_trans_stop(env, mdd, rc, handle);
1528 #ifdef HAVE_QUOTA_SUPPORT
1529         if (quota_opc)
1530                 /* Trigger dqrel on the owner of child. If failed,
1531                  * the next call for lquota_chkquota will process it */
1532                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1533                               quota_opc);
1534 #endif
1535         return rc;
1536 }
1537
1538 /* partial operation */
1539 static int mdd_oc_sanity_check(const struct lu_env *env,
1540                                struct mdd_object *obj,
1541                                struct md_attr *ma)
1542 {
1543         int rc;
1544         ENTRY;
1545
1546         switch (ma->ma_attr.la_mode & S_IFMT) {
1547         case S_IFREG:
1548         case S_IFDIR:
1549         case S_IFLNK:
1550         case S_IFCHR:
1551         case S_IFBLK:
1552         case S_IFIFO:
1553         case S_IFSOCK:
1554                 rc = 0;
1555                 break;
1556         default:
1557                 rc = -EINVAL;
1558                 break;
1559         }
1560         RETURN(rc);
1561 }
1562
1563 static int mdd_object_create(const struct lu_env *env,
1564                              struct md_object *obj,
1565                              const struct md_op_spec *spec,
1566                              struct md_attr *ma)
1567 {
1568
1569         struct mdd_device *mdd = mdo2mdd(obj);
1570         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1571         const struct lu_fid *pfid = spec->u.sp_pfid;
1572         struct thandle *handle;
1573 #ifdef HAVE_QUOTA_SUPPORT
1574         struct obd_device *obd = mdd->mdd_obd_dev;
1575         struct mds_obd *mds = &obd->u.mds;
1576         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1577         int quota_opc = 0, block_count = 0;
1578         int inode_pending[MAXQUOTAS] = { 0, 0 };
1579         int block_pending[MAXQUOTAS] = { 0, 0 };
1580 #endif
1581         int rc = 0;
1582         ENTRY;
1583
1584 #ifdef HAVE_QUOTA_SUPPORT
1585         if (mds->mds_quota) {
1586                 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1587                 mdd_quota_wrapper(&ma->ma_attr, qids);
1588                 /* get file quota for child */
1589                 lquota_chkquota(mds_quota_interface_ref, obd, qids,
1590                                 inode_pending, 1, NULL, 0, NULL, 0);
1591                 switch (ma->ma_attr.la_mode & S_IFMT) {
1592                 case S_IFLNK:
1593                 case S_IFDIR:
1594                         block_count = 2;
1595                         break;
1596                 case S_IFREG:
1597                         block_count = 1;
1598                         break;
1599                 }
1600                 /* get block quota for child */
1601                 if (block_count)
1602                         lquota_chkquota(mds_quota_interface_ref, obd, qids,
1603                                         block_pending, block_count, NULL,
1604                                         LQUOTA_FLAGS_BLK, NULL, 0);
1605         }
1606 #endif
1607
1608         mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
1609         handle = mdd_trans_start(env, mdd);
1610         if (IS_ERR(handle))
1611                 GOTO(out_pending, rc = PTR_ERR(handle));
1612
1613         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1614         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1615         if (rc)
1616                 GOTO(unlock, rc);
1617
1618         rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1619         if (rc)
1620                 GOTO(unlock, rc);
1621
1622         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1623                 /* If creating the slave object, set slave EA here. */
1624                 int lmv_size = spec->u.sp_ea.eadatalen;
1625                 struct lmv_stripe_md *lmv;
1626
1627                 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1628                 LASSERT(lmv != NULL && lmv_size > 0);
1629
1630                 rc = __mdd_xattr_set(env, mdd_obj,
1631                                      mdd_buf_get_const(env, lmv, lmv_size),
1632                                      XATTR_NAME_LMV, 0, handle);
1633                 if (rc)
1634                         GOTO(unlock, rc);
1635
1636                 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1637                                            handle, 0);
1638         } else {
1639 #ifdef CONFIG_FS_POSIX_ACL
1640                 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1641                         struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1642
1643                         buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1644                         buf->lb_len = spec->u.sp_ea.eadatalen;
1645                         if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1646                                 rc = __mdd_acl_init(env, mdd_obj, buf,
1647                                                     &ma->ma_attr.la_mode,
1648                                                     handle);
1649                                 if (rc)
1650                                         GOTO(unlock, rc);
1651                                 else
1652                                         ma->ma_attr.la_valid |= LA_MODE;
1653                         }
1654
1655                         pfid = spec->u.sp_ea.fid;
1656                 }
1657 #endif
1658                 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1659                                            spec);
1660         }
1661         EXIT;
1662 unlock:
1663         if (rc == 0)
1664                 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1665         mdd_write_unlock(env, mdd_obj);
1666
1667         mdd_trans_stop(env, mdd, rc, handle);
1668 out_pending:
1669 #ifdef HAVE_QUOTA_SUPPORT
1670         if (quota_opc) {
1671                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1672                                       inode_pending, 0);
1673                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1674                                       block_pending, 1);
1675                 /* Trigger dqacq on the owner of child. If failed,
1676                  * the next call for lquota_chkquota will process it. */
1677                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1678                               quota_opc);
1679         }
1680 #endif
1681         return rc;
1682 }
1683
1684 /* partial link */
1685 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1686                        const struct md_attr *ma)
1687 {
1688         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1689         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1690         struct mdd_device *mdd = mdo2mdd(obj);
1691         struct thandle *handle;
1692         int rc;
1693         ENTRY;
1694
1695         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1696         handle = mdd_trans_start(env, mdd);
1697         if (IS_ERR(handle))
1698                 RETURN(-ENOMEM);
1699
1700         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1701         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1702         if (rc == 0)
1703                 __mdd_ref_add(env, mdd_obj, handle);
1704         mdd_write_unlock(env, mdd_obj);
1705         if (rc == 0) {
1706                 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1707                 la_copy->la_ctime = ma->ma_attr.la_ctime;
1708
1709                 la_copy->la_valid = LA_CTIME;
1710                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1711                                                         handle, 0);
1712         }
1713         mdd_trans_stop(env, mdd, 0, handle);
1714
1715         RETURN(rc);
1716 }
1717
1718 /*
1719  * do NOT or the MAY_*'s, you'll get the weakest
1720  */
1721 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1722 {
1723         int res = 0;
1724
1725         /* Sadly, NFSD reopens a file repeatedly during operation, so the
1726          * "acc_mode = 0" allowance for newly-created files isn't honoured.
1727          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1728          * owner can write to a file even if it is marked readonly to hide
1729          * its brokenness. (bug 5781) */
1730         if (flags & MDS_OPEN_OWNEROVERRIDE) {
1731                 struct md_ucred *uc = md_ucred(env);
1732
1733                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1734                     (la->la_uid == uc->mu_fsuid))
1735                         return 0;
1736         }
1737
1738         if (flags & FMODE_READ)
1739                 res |= MAY_READ;
1740         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1741                 res |= MAY_WRITE;
1742         if (flags & MDS_FMODE_EXEC)
1743                 res |= MAY_EXEC;
1744         return res;
1745 }
1746
1747 static int mdd_open_sanity_check(const struct lu_env *env,
1748                                  struct mdd_object *obj, int flag)
1749 {
1750         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1751         int mode, rc;
1752         ENTRY;
1753
1754         /* EEXIST check */
1755         if (mdd_is_dead_obj(obj))
1756                 RETURN(-ENOENT);
1757
1758         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1759         if (rc)
1760                RETURN(rc);
1761
1762         if (S_ISLNK(tmp_la->la_mode))
1763                 RETURN(-ELOOP);
1764
1765         mode = accmode(env, tmp_la, flag);
1766
1767         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1768                 RETURN(-EISDIR);
1769
1770         if (!(flag & MDS_OPEN_CREATED)) {
1771                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1772                 if (rc)
1773                         RETURN(rc);
1774         }
1775
1776         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1777             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1778                 flag &= ~MDS_OPEN_TRUNC;
1779
1780         /* For writing append-only file must open it with append mode. */
1781         if (mdd_is_append(obj)) {
1782                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
1783                         RETURN(-EPERM);
1784                 if (flag & MDS_OPEN_TRUNC)
1785                         RETURN(-EPERM);
1786         }
1787
1788 #if 0
1789         /*
1790          * Now, flag -- O_NOATIME does not be packed by client.
1791          */
1792         if (flag & O_NOATIME) {
1793                 struct md_ucred *uc = md_ucred(env);
1794
1795                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
1796                     (uc->mu_valid == UCRED_NEW)) &&
1797                     (uc->mu_fsuid != tmp_la->la_uid) &&
1798                     !mdd_capable(uc, CFS_CAP_FOWNER))
1799                         RETURN(-EPERM);
1800         }
1801 #endif
1802
1803         RETURN(0);
1804 }
1805
1806 static int mdd_open(const struct lu_env *env, struct md_object *obj,
1807                     int flags)
1808 {
1809         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1810         int rc = 0;
1811
1812         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1813
1814         rc = mdd_open_sanity_check(env, mdd_obj, flags);
1815         if (rc == 0)
1816                 mdd_obj->mod_count++;
1817
1818         mdd_write_unlock(env, mdd_obj);
1819         return rc;
1820 }
1821
1822 /* return md_attr back,
1823  * if it is last unlink then return lov ea + llog cookie*/
1824 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
1825                     struct md_attr *ma)
1826 {
1827         int rc = 0;
1828         ENTRY;
1829
1830         if (S_ISREG(mdd_object_type(obj))) {
1831                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
1832                  * Caller must be ready for that. */
1833
1834                 rc = __mdd_lmm_get(env, obj, ma);
1835                 if ((ma->ma_valid & MA_LOV))
1836                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
1837                                             obj, ma);
1838         }
1839         RETURN(rc);
1840 }
1841
1842 /*
1843  * No permission check is needed.
1844  */
1845 static int mdd_close(const struct lu_env *env, struct md_object *obj,
1846                      struct md_attr *ma)
1847 {
1848         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1849         struct mdd_device *mdd = mdo2mdd(obj);
1850         struct thandle    *handle;
1851         int rc;
1852         int reset = 1;
1853
1854 #ifdef HAVE_QUOTA_SUPPORT
1855         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
1856         struct mds_obd *mds = &obd->u.mds;
1857         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1858         int quota_opc = 0;
1859 #endif
1860         ENTRY;
1861
1862         rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1863         if (rc)
1864                 RETURN(rc);
1865         handle = mdd_trans_start(env, mdo2mdd(obj));
1866         if (IS_ERR(handle))
1867                 RETURN(PTR_ERR(handle));
1868
1869         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1870         /* release open count */
1871         mdd_obj->mod_count --;
1872
1873         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
1874                 /* remove link to object from orphan index */
1875                 rc = __mdd_orphan_del(env, mdd_obj, handle);
1876                 if (rc == 0) {
1877                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
1878                                "list, OSS objects to be destroyed.\n",
1879                                PFID(mdd_object_fid(mdd_obj)));
1880                 } else {
1881                         CERROR("Object "DFID" can not be deleted from orphan "
1882                                 "list, maybe cause OST objects can not be "
1883                                 "destroyed (err: %d).\n",
1884                                 PFID(mdd_object_fid(mdd_obj)), rc);
1885                         /* If object was not deleted from orphan list, do not
1886                          * destroy OSS objects, which will be done when next
1887                          * recovery. */
1888                         GOTO(out, rc);
1889                 }
1890         }
1891
1892         rc = mdd_iattr_get(env, mdd_obj, ma);
1893         /* Object maybe not in orphan list originally, it is rare case for
1894          * mdd_finish_unlink() failure. */
1895         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
1896 #ifdef HAVE_QUOTA_SUPPORT
1897                 if (mds->mds_quota) {
1898                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1899                         mdd_quota_wrapper(&ma->ma_attr, qids);
1900                 }
1901 #endif
1902                 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
1903                 if (ma->ma_valid & MA_FLAGS &&
1904                     ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
1905                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
1906                 } else {
1907                         rc = mdd_object_kill(env, mdd_obj, ma);
1908                                 if (rc == 0)
1909                                         reset = 0;
1910                 }
1911
1912                 if (rc != 0)
1913                         CERROR("Error when prepare to delete Object "DFID" , "
1914                                "which will cause OST objects can not be "
1915                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
1916         }
1917         EXIT;
1918
1919 out:
1920         if (reset)
1921                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
1922
1923         mdd_write_unlock(env, mdd_obj);
1924         mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
1925 #ifdef HAVE_QUOTA_SUPPORT
1926         if (quota_opc)
1927                 /* Trigger dqrel on the owner of child. If failed,
1928                  * the next call for lquota_chkquota will process it */
1929                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1930                               quota_opc);
1931 #endif
1932         return rc;
1933 }
1934
1935 /*
1936  * Permission check is done when open,
1937  * no need check again.
1938  */
1939 static int mdd_readpage_sanity_check(const struct lu_env *env,
1940                                      struct mdd_object *obj)
1941 {
1942         struct dt_object *next = mdd_object_child(obj);
1943         int rc;
1944         ENTRY;
1945
1946         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
1947                 rc = 0;
1948         else
1949                 rc = -ENOTDIR;
1950
1951         RETURN(rc);
1952 }
1953
1954 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
1955                               int first, void *area, int nob,
1956                               const struct dt_it_ops *iops, struct dt_it *it,
1957                               __u64 *start, __u64 *end,
1958                               struct lu_dirent **last, __u32 attr)
1959 {
1960         int                     result;
1961         __u64                   hash = 0;
1962         struct lu_dirent       *ent;
1963
1964         if (first) {
1965                 memset(area, 0, sizeof (struct lu_dirpage));
1966                 area += sizeof (struct lu_dirpage);
1967                 nob  -= sizeof (struct lu_dirpage);
1968         }
1969
1970         ent  = area;
1971         do {
1972                 int    len;
1973                 int    recsize;
1974
1975                 len  = iops->key_size(env, it);
1976
1977                 /* IAM iterator can return record with zero len. */
1978                 if (len == 0)
1979                         goto next;
1980
1981                 hash = iops->store(env, it);
1982                 if (unlikely(first)) {
1983                         first = 0;
1984                         *start = hash;
1985                 }
1986
1987                 /* calculate max space required for lu_dirent */
1988                 recsize = lu_dirent_calc_size(len, attr);
1989
1990                 if (nob >= recsize) {
1991                         result = iops->rec(env, it, ent, attr);
1992                         if (result == -ESTALE)
1993                                 goto next;
1994                         if (result != 0)
1995                                 goto out;
1996
1997                         /* osd might not able to pack all attributes,
1998                          * so recheck rec length */
1999                         recsize = le16_to_cpu(ent->lde_reclen);
2000                 } else {
2001                         /*
2002                          * record doesn't fit into page, enlarge previous one.
2003                          */
2004                         if (*last) {
2005                                 (*last)->lde_reclen =
2006                                         cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
2007                                                         nob);
2008                                 result = 0;
2009                         } else
2010                                 result = -EINVAL;
2011
2012                         goto out;
2013                 }
2014                 *last = ent;
2015                 ent = (void *)ent + recsize;
2016                 nob -= recsize;
2017
2018 next:
2019                 result = iops->next(env, it);
2020                 if (result == -ESTALE)
2021                         goto next;
2022         } while (result == 0);
2023
2024 out:
2025         *end = hash;
2026         return result;
2027 }
2028
2029 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2030                           const struct lu_rdpg *rdpg)
2031 {
2032         struct dt_it      *it;
2033         struct dt_object  *next = mdd_object_child(obj);
2034         const struct dt_it_ops  *iops;
2035         struct page       *pg;
2036         struct lu_dirent  *last = NULL;
2037         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2038         int i;
2039         int rc;
2040         int nob;
2041         __u64 hash_start;
2042         __u64 hash_end = 0;
2043
2044         LASSERT(rdpg->rp_pages != NULL);
2045         LASSERT(next->do_index_ops != NULL);
2046
2047         if (rdpg->rp_count <= 0)
2048                 return -EFAULT;
2049
2050         /*
2051          * iterate through directory and fill pages from @rdpg
2052          */
2053         iops = &next->do_index_ops->dio_it;
2054         it = iops->init(env, next, mdd_object_capa(env, obj));
2055         if (IS_ERR(it))
2056                 return PTR_ERR(it);
2057
2058         rc = iops->load(env, it, rdpg->rp_hash);
2059
2060         if (rc == 0){
2061                 /*
2062                  * Iterator didn't find record with exactly the key requested.
2063                  *
2064                  * It is currently either
2065                  *
2066                  *     - positioned above record with key less than
2067                  *     requested---skip it.
2068                  *
2069                  *     - or not positioned at all (is in IAM_IT_SKEWED
2070                  *     state)---position it on the next item.
2071                  */
2072                 rc = iops->next(env, it);
2073         } else if (rc > 0)
2074                 rc = 0;
2075
2076         /*
2077          * At this point and across for-loop:
2078          *
2079          *  rc == 0 -> ok, proceed.
2080          *  rc >  0 -> end of directory.
2081          *  rc <  0 -> error.
2082          */
2083         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2084              i++, nob -= CFS_PAGE_SIZE) {
2085                 LASSERT(i < rdpg->rp_npages);
2086                 pg = rdpg->rp_pages[i];
2087                 rc = mdd_dir_page_build(env, mdd, !i, cfs_kmap(pg),
2088                                         min_t(int, nob, CFS_PAGE_SIZE), iops,
2089                                         it, &hash_start, &hash_end, &last,
2090                                         rdpg->rp_attrs);
2091                 if (rc != 0 || i == rdpg->rp_npages - 1) {
2092                         if (last)
2093                                 last->lde_reclen = 0;
2094                 }
2095                 cfs_kunmap(pg);
2096         }
2097         if (rc > 0) {
2098                 /*
2099                  * end of directory.
2100                  */
2101                 hash_end = DIR_END_OFF;
2102                 rc = 0;
2103         }
2104         if (rc == 0) {
2105                 struct lu_dirpage *dp;
2106
2107                 dp = cfs_kmap(rdpg->rp_pages[0]);
2108                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2109                 dp->ldp_hash_end   = cpu_to_le64(hash_end);
2110                 if (i == 0)
2111                         /*
2112                          * No pages were processed, mark this.
2113                          */
2114                         dp->ldp_flags |= LDF_EMPTY;
2115
2116                 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2117                 cfs_kunmap(rdpg->rp_pages[0]);
2118         }
2119         iops->put(env, it);
2120         iops->fini(env, it);
2121
2122         return rc;
2123 }
2124
2125 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2126                  const struct lu_rdpg *rdpg)
2127 {
2128         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2129         int rc;
2130         ENTRY;
2131
2132         LASSERT(mdd_object_exists(mdd_obj));
2133
2134         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2135         rc = mdd_readpage_sanity_check(env, mdd_obj);
2136         if (rc)
2137                 GOTO(out_unlock, rc);
2138
2139         if (mdd_is_dead_obj(mdd_obj)) {
2140                 struct page *pg;
2141                 struct lu_dirpage *dp;
2142
2143                 /*
2144                  * According to POSIX, please do not return any entry to client:
2145                  * even dot and dotdot should not be returned.
2146                  */
2147                 CWARN("readdir from dead object: "DFID"\n",
2148                         PFID(mdd_object_fid(mdd_obj)));
2149
2150                 if (rdpg->rp_count <= 0)
2151                         GOTO(out_unlock, rc = -EFAULT);
2152                 LASSERT(rdpg->rp_pages != NULL);
2153
2154                 pg = rdpg->rp_pages[0];
2155                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2156                 memset(dp, 0 , sizeof(struct lu_dirpage));
2157                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2158                 dp->ldp_hash_end   = cpu_to_le64(DIR_END_OFF);
2159                 dp->ldp_flags |= LDF_EMPTY;
2160                 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2161                 cfs_kunmap(pg);
2162                 GOTO(out_unlock, rc = 0);
2163         }
2164
2165         rc = __mdd_readpage(env, mdd_obj, rdpg);
2166
2167         EXIT;
2168 out_unlock:
2169         mdd_read_unlock(env, mdd_obj);
2170         return rc;
2171 }
2172
2173 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2174 {
2175         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2176         struct dt_object *next;
2177
2178         LASSERT(mdd_object_exists(mdd_obj));
2179         next = mdd_object_child(mdd_obj);
2180         return next->do_ops->do_object_sync(env, next);
2181 }
2182
2183 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2184                                         struct md_object *obj)
2185 {
2186         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2187
2188         LASSERT(mdd_object_exists(mdd_obj));
2189         return do_version_get(env, mdd_object_child(mdd_obj));
2190 }
2191
2192 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2193                             dt_obj_version_t version)
2194 {
2195         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2196
2197         LASSERT(mdd_object_exists(mdd_obj));
2198         return do_version_set(env, mdd_object_child(mdd_obj), version);
2199 }
2200
2201 const struct md_object_operations mdd_obj_ops = {
2202         .moo_permission    = mdd_permission,
2203         .moo_attr_get      = mdd_attr_get,
2204         .moo_attr_set      = mdd_attr_set,
2205         .moo_xattr_get     = mdd_xattr_get,
2206         .moo_xattr_set     = mdd_xattr_set,
2207         .moo_xattr_list    = mdd_xattr_list,
2208         .moo_xattr_del     = mdd_xattr_del,
2209         .moo_object_create = mdd_object_create,
2210         .moo_ref_add       = mdd_ref_add,
2211         .moo_ref_del       = mdd_ref_del,
2212         .moo_open          = mdd_open,
2213         .moo_close         = mdd_close,
2214         .moo_readpage      = mdd_readpage,
2215         .moo_readlink      = mdd_readlink,
2216         .moo_capa_get      = mdd_capa_get,
2217         .moo_object_sync   = mdd_object_sync,
2218         .moo_version_get   = mdd_version_get,
2219         .moo_version_set   = mdd_version_set,
2220         .moo_path          = mdd_path,
2221 };