Whamcloud - gitweb
b=20530
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_object.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49 #ifdef HAVE_EXT4_LDISKFS
50 #include <ldiskfs/ldiskfs_jbd2.h>
51 #else
52 #include <linux/jbd.h>
53 #endif
54 #include <obd.h>
55 #include <obd_class.h>
56 #include <obd_support.h>
57 #include <lprocfs_status.h>
58 /* fid_be_cpu(), fid_cpu_to_be(). */
59 #include <lustre_fid.h>
60
61 #include <lustre_param.h>
62 #ifdef HAVE_EXT4_LDISKFS
63 #include <ldiskfs/ldiskfs.h>
64 #else
65 #include <linux/ldiskfs_fs.h>
66 #endif
67 #include <lustre_mds.h>
68 #include <lustre/lustre_idl.h>
69
70 #include "mdd_internal.h"
71
72 static const struct lu_object_operations mdd_lu_obj_ops;
73
74 static int mdd_xattr_get(const struct lu_env *env,
75                          struct md_object *obj, struct lu_buf *buf,
76                          const char *name);
77
78 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
79                  void **data)
80 {
81         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
82                  PFID(mdd_object_fid(obj)));
83         mdo_data_get(env, obj, data);
84         return 0;
85 }
86
87 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
88                struct lu_attr *la, struct lustre_capa *capa)
89 {
90         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
91                  PFID(mdd_object_fid(obj)));
92         return mdo_attr_get(env, obj, la, capa);
93 }
94
95 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
96 {
97         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
98
99         if (flags & LUSTRE_APPEND_FL)
100                 obj->mod_flags |= APPEND_OBJ;
101
102         if (flags & LUSTRE_IMMUTABLE_FL)
103                 obj->mod_flags |= IMMUTE_OBJ;
104 }
105
106 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
107 {
108         struct mdd_thread_info *info;
109
110         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
111         LASSERT(info != NULL);
112         return info;
113 }
114
115 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
116 {
117         struct lu_buf *buf;
118
119         buf = &mdd_env_info(env)->mti_buf;
120         buf->lb_buf = area;
121         buf->lb_len = len;
122         return buf;
123 }
124
125 void mdd_buf_put(struct lu_buf *buf)
126 {
127         if (buf == NULL || buf->lb_buf == NULL)
128                 return;
129         if (buf->lb_vmalloc)
130                 OBD_VFREE(buf->lb_buf, buf->lb_len);
131         else
132                 OBD_FREE(buf->lb_buf, buf->lb_len);
133         buf->lb_buf = NULL;
134 }
135
136 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
137                                        const void *area, ssize_t len)
138 {
139         struct lu_buf *buf;
140
141         buf = &mdd_env_info(env)->mti_buf;
142         buf->lb_buf = (void *)area;
143         buf->lb_len = len;
144         return buf;
145 }
146
147 #define BUF_VMALLOC_SIZE (CFS_PAGE_SIZE<<2) /* 16k */
148 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
149 {
150         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
151
152         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
153                 if (buf->lb_vmalloc)
154                         OBD_VFREE(buf->lb_buf, buf->lb_len);
155                 else
156                         OBD_FREE(buf->lb_buf, buf->lb_len);
157                 buf->lb_buf = NULL;
158         }
159         if (buf->lb_buf == NULL) {
160                 buf->lb_len = len;
161                 if (buf->lb_len <= BUF_VMALLOC_SIZE) {
162                         OBD_ALLOC(buf->lb_buf, buf->lb_len);
163                         buf->lb_vmalloc = 0;
164                 }
165                 if (buf->lb_buf == NULL) {
166                         OBD_VMALLOC(buf->lb_buf, buf->lb_len);
167                         buf->lb_vmalloc = 1;
168                 }
169                 if (buf->lb_buf == NULL)
170                         buf->lb_len = 0;
171         }
172         return buf;
173 }
174
175 /** Increase the size of the \a mti_big_buf.
176  * preserves old data in buffer
177  * old buffer remains unchanged on error
178  * \retval 0 or -ENOMEM
179  */
180 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
181 {
182         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
183         struct lu_buf buf;
184
185         LASSERT(len >= oldbuf->lb_len);
186         if (len > BUF_VMALLOC_SIZE) {
187                 OBD_VMALLOC(buf.lb_buf, len);
188                 buf.lb_vmalloc = 1;
189         } else {
190                 OBD_ALLOC(buf.lb_buf, len);
191                 buf.lb_vmalloc = 0;
192         }
193         if (buf.lb_buf == NULL)
194                 return -ENOMEM;
195
196         buf.lb_len = len;
197         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
198
199         if (oldbuf->lb_vmalloc)
200                 OBD_VFREE(oldbuf->lb_buf, oldbuf->lb_len);
201         else
202                 OBD_FREE(oldbuf->lb_buf, oldbuf->lb_len);
203
204         memcpy(oldbuf, &buf, sizeof(buf));
205
206         return 0;
207 }
208
209 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
210                                        struct mdd_device *mdd)
211 {
212         struct mdd_thread_info *mti = mdd_env_info(env);
213         int                     max_cookie_size;
214
215         max_cookie_size = mdd_lov_cookiesize(env, mdd);
216         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
217                 if (mti->mti_max_cookie)
218                         OBD_FREE(mti->mti_max_cookie, mti->mti_max_cookie_size);
219                 mti->mti_max_cookie = NULL;
220                 mti->mti_max_cookie_size = 0;
221         }
222         if (unlikely(mti->mti_max_cookie == NULL)) {
223                 OBD_ALLOC(mti->mti_max_cookie, max_cookie_size);
224                 if (likely(mti->mti_max_cookie != NULL))
225                         mti->mti_max_cookie_size = max_cookie_size;
226         }
227         if (likely(mti->mti_max_cookie != NULL))
228                 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
229         return mti->mti_max_cookie;
230 }
231
232 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
233                                    struct mdd_device *mdd)
234 {
235         struct mdd_thread_info *mti = mdd_env_info(env);
236         int                     max_lmm_size;
237
238         max_lmm_size = mdd_lov_mdsize(env, mdd);
239         if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
240                 if (mti->mti_max_lmm)
241                         OBD_FREE(mti->mti_max_lmm, mti->mti_max_lmm_size);
242                 mti->mti_max_lmm = NULL;
243                 mti->mti_max_lmm_size = 0;
244         }
245         if (unlikely(mti->mti_max_lmm == NULL)) {
246                 OBD_ALLOC(mti->mti_max_lmm, max_lmm_size);
247                 if (unlikely(mti->mti_max_lmm != NULL))
248                         mti->mti_max_lmm_size = max_lmm_size;
249         }
250         return mti->mti_max_lmm;
251 }
252
253 struct lu_object *mdd_object_alloc(const struct lu_env *env,
254                                    const struct lu_object_header *hdr,
255                                    struct lu_device *d)
256 {
257         struct mdd_object *mdd_obj;
258
259         OBD_ALLOC_PTR(mdd_obj);
260         if (mdd_obj != NULL) {
261                 struct lu_object *o;
262
263                 o = mdd2lu_obj(mdd_obj);
264                 lu_object_init(o, NULL, d);
265                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
266                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
267                 mdd_obj->mod_count = 0;
268                 o->lo_ops = &mdd_lu_obj_ops;
269                 return o;
270         } else {
271                 return NULL;
272         }
273 }
274
275 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
276                            const struct lu_object_conf *unused)
277 {
278         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
279         struct mdd_object *mdd_obj = lu2mdd_obj(o);
280         struct lu_object  *below;
281         struct lu_device  *under;
282         ENTRY;
283
284         mdd_obj->mod_cltime = 0;
285         under = &d->mdd_child->dd_lu_dev;
286         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
287         mdd_pdlock_init(mdd_obj);
288         if (below == NULL)
289                 RETURN(-ENOMEM);
290
291         lu_object_add(o, below);
292
293         RETURN(0);
294 }
295
296 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
297 {
298         if (lu_object_exists(o))
299                 return mdd_get_flags(env, lu2mdd_obj(o));
300         else
301                 return 0;
302 }
303
304 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
305 {
306         struct mdd_object *mdd = lu2mdd_obj(o);
307
308         lu_object_fini(o);
309         OBD_FREE_PTR(mdd);
310 }
311
312 static int mdd_object_print(const struct lu_env *env, void *cookie,
313                             lu_printer_t p, const struct lu_object *o)
314 {
315         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
316         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
317                     "valid=%x, cltime=%llu, flags=%lx)",
318                     mdd, mdd->mod_count, mdd->mod_valid,
319                     mdd->mod_cltime, mdd->mod_flags);
320 }
321
322 static const struct lu_object_operations mdd_lu_obj_ops = {
323         .loo_object_init    = mdd_object_init,
324         .loo_object_start   = mdd_object_start,
325         .loo_object_free    = mdd_object_free,
326         .loo_object_print   = mdd_object_print,
327 };
328
329 struct mdd_object *mdd_object_find(const struct lu_env *env,
330                                    struct mdd_device *d,
331                                    const struct lu_fid *f)
332 {
333         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
334 }
335
336 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
337                         const char *path, struct lu_fid *fid)
338 {
339         struct lu_buf *buf;
340         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
341         struct mdd_object *obj;
342         struct lu_name *lname = &mdd_env_info(env)->mti_name;
343         char *name;
344         int rc = 0;
345         ENTRY;
346
347         /* temp buffer for path element */
348         buf = mdd_buf_alloc(env, PATH_MAX);
349         if (buf->lb_buf == NULL)
350                 RETURN(-ENOMEM);
351
352         lname->ln_name = name = buf->lb_buf;
353         lname->ln_namelen = 0;
354         *f = mdd->mdd_root_fid;
355
356         while(1) {
357                 while (*path == '/')
358                         path++;
359                 if (*path == '\0')
360                         break;
361                 while (*path != '/' && *path != '\0') {
362                         *name = *path;
363                         path++;
364                         name++;
365                         lname->ln_namelen++;
366                 }
367
368                 *name = '\0';
369                 /* find obj corresponding to fid */
370                 obj = mdd_object_find(env, mdd, f);
371                 if (obj == NULL)
372                         GOTO(out, rc = -EREMOTE);
373                 if (IS_ERR(obj))
374                         GOTO(out, rc = -PTR_ERR(obj));
375                 /* get child fid from parent and name */
376                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
377                 mdd_object_put(env, obj);
378                 if (rc)
379                         break;
380
381                 name = buf->lb_buf;
382                 lname->ln_namelen = 0;
383         }
384
385         if (!rc)
386                 *fid = *f;
387 out:
388         RETURN(rc);
389 }
390
391 /** The maximum depth that fid2path() will search.
392  * This is limited only because we want to store the fids for
393  * historical path lookup purposes.
394  */
395 #define MAX_PATH_DEPTH 100
396
397 /** mdd_path() lookup structure. */
398 struct path_lookup_info {
399         __u64                pli_recno;        /**< history point */
400         __u64                pli_currec;       /**< current record */
401         struct lu_fid        pli_fid;
402         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
403         struct mdd_object   *pli_mdd_obj;
404         char                *pli_path;         /**< full path */
405         int                  pli_pathlen;
406         int                  pli_linkno;       /**< which hardlink to follow */
407         int                  pli_fidcount;     /**< number of \a pli_fids */
408 };
409
410 static int mdd_path_current(const struct lu_env *env,
411                             struct path_lookup_info *pli)
412 {
413         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
414         struct mdd_object *mdd_obj;
415         struct lu_buf     *buf = NULL;
416         struct link_ea_header *leh;
417         struct link_ea_entry  *lee;
418         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
419         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
420         char *ptr;
421         int reclen;
422         int rc;
423         ENTRY;
424
425         ptr = pli->pli_path + pli->pli_pathlen - 1;
426         *ptr = 0;
427         --ptr;
428         pli->pli_fidcount = 0;
429         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
430
431         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
432                 mdd_obj = mdd_object_find(env, mdd,
433                                           &pli->pli_fids[pli->pli_fidcount]);
434                 if (mdd_obj == NULL)
435                         GOTO(out, rc = -EREMOTE);
436                 if (IS_ERR(mdd_obj))
437                         GOTO(out, rc = -PTR_ERR(mdd_obj));
438                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
439                 if (rc <= 0) {
440                         mdd_object_put(env, mdd_obj);
441                         if (rc == -1)
442                                 rc = -EREMOTE;
443                         else if (rc == 0)
444                                 /* Do I need to error out here? */
445                                 rc = -ENOENT;
446                         GOTO(out, rc);
447                 }
448
449                 /* Get parent fid and object name */
450                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
451                 buf = mdd_links_get(env, mdd_obj);
452                 mdd_read_unlock(env, mdd_obj);
453                 mdd_object_put(env, mdd_obj);
454                 if (IS_ERR(buf))
455                         GOTO(out, rc = PTR_ERR(buf));
456
457                 leh = buf->lb_buf;
458                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
459                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
460
461                 /* If set, use link #linkno for path lookup, otherwise use
462                    link #0.  Only do this for the final path element. */
463                 if ((pli->pli_fidcount == 0) &&
464                     (pli->pli_linkno < leh->leh_reccount)) {
465                         int count;
466                         for (count = 0; count < pli->pli_linkno; count++) {
467                                 lee = (struct link_ea_entry *)
468                                      ((char *)lee + reclen);
469                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
470                         }
471                         if (pli->pli_linkno < leh->leh_reccount - 1)
472                                 /* indicate to user there are more links */
473                                 pli->pli_linkno++;
474                 }
475
476                 /* Pack the name in the end of the buffer */
477                 ptr -= tmpname->ln_namelen;
478                 if (ptr - 1 <= pli->pli_path)
479                         GOTO(out, rc = -EOVERFLOW);
480                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
481                 *(--ptr) = '/';
482
483                 /* Store the parent fid for historic lookup */
484                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
485                         GOTO(out, rc = -EOVERFLOW);
486                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
487         }
488
489         /* Verify that our path hasn't changed since we started the lookup.
490            Record the current index, and verify the path resolves to the
491            same fid. If it does, then the path is correct as of this index. */
492         spin_lock(&mdd->mdd_cl.mc_lock);
493         pli->pli_currec = mdd->mdd_cl.mc_index;
494         spin_unlock(&mdd->mdd_cl.mc_lock);
495         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
496         if (rc) {
497                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
498                 GOTO (out, rc = -EAGAIN);
499         }
500         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
501                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
502                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
503                        PFID(&pli->pli_fid));
504                 GOTO(out, rc = -EAGAIN);
505         }
506
507         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
508
509         EXIT;
510 out:
511         if (buf && !IS_ERR(buf) && buf->lb_vmalloc)
512                 /* if we vmalloced a large buffer drop it */
513                 mdd_buf_put(buf);
514
515         return rc;
516 }
517
518 static int mdd_path_historic(const struct lu_env *env,
519                              struct path_lookup_info *pli)
520 {
521         return 0;
522 }
523
524 /* Returns the full path to this fid, as of changelog record recno. */
525 static int mdd_path(const struct lu_env *env, struct md_object *obj,
526                     char *path, int pathlen, __u64 *recno, int *linkno)
527 {
528         struct path_lookup_info *pli;
529         int tries = 3;
530         int rc = -EAGAIN;
531         ENTRY;
532
533         if (pathlen < 3)
534                 RETURN(-EOVERFLOW);
535
536         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
537                 path[0] = '/';
538                 path[1] = '\0';
539                 RETURN(0);
540         }
541
542         OBD_ALLOC_PTR(pli);
543         if (pli == NULL)
544                 RETURN(-ENOMEM);
545
546         pli->pli_mdd_obj = md2mdd_obj(obj);
547         pli->pli_recno = *recno;
548         pli->pli_path = path;
549         pli->pli_pathlen = pathlen;
550         pli->pli_linkno = *linkno;
551
552         /* Retry multiple times in case file is being moved */
553         while (tries-- && rc == -EAGAIN)
554                 rc = mdd_path_current(env, pli);
555
556         /* For historical path lookup, the current links may not have existed
557          * at "recno" time.  We must switch over to earlier links/parents
558          * by using the changelog records.  If the earlier parent doesn't
559          * exist, we must search back through the changelog to reconstruct
560          * its parents, then check if it exists, etc.
561          * We may ignore this problem for the initial implementation and
562          * state that an "original" hardlink must still exist for us to find
563          * historic path name. */
564         if (pli->pli_recno != -1) {
565                 rc = mdd_path_historic(env, pli);
566         } else {
567                 *recno = pli->pli_currec;
568                 /* Return next link index to caller */
569                 *linkno = pli->pli_linkno;
570         }
571
572         OBD_FREE_PTR(pli);
573
574         RETURN (rc);
575 }
576
577 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
578 {
579         struct lu_attr *la = &mdd_env_info(env)->mti_la;
580         int rc;
581
582         ENTRY;
583         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
584         if (rc == 0) {
585                 mdd_flags_xlate(obj, la->la_flags);
586                 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
587                         obj->mod_flags |= MNLINK_OBJ;
588         }
589         RETURN(rc);
590 }
591
592 /* get only inode attributes */
593 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
594                   struct md_attr *ma)
595 {
596         int rc = 0;
597         ENTRY;
598
599         if (ma->ma_valid & MA_INODE)
600                 RETURN(0);
601
602         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
603                           mdd_object_capa(env, mdd_obj));
604         if (rc == 0)
605                 ma->ma_valid |= MA_INODE;
606         RETURN(rc);
607 }
608
609 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm,
610                        int *size)
611 {
612         struct lov_desc *ldesc;
613         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
614         ENTRY;
615
616         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
617         LASSERT(ldesc != NULL);
618
619         if (!lmm)
620                 RETURN(0);
621
622         lmm->lmm_magic = LOV_MAGIC_V1;
623         lmm->lmm_object_gr = LOV_OBJECT_GROUP_DEFAULT;
624         lmm->lmm_pattern = ldesc->ld_pattern;
625         lmm->lmm_stripe_size = ldesc->ld_default_stripe_size;
626         lmm->lmm_stripe_count = ldesc->ld_default_stripe_count;
627         *size = sizeof(struct lov_mds_md);
628
629         RETURN(sizeof(struct lov_mds_md));
630 }
631
632 /* get lov EA only */
633 static int __mdd_lmm_get(const struct lu_env *env,
634                          struct mdd_object *mdd_obj, struct md_attr *ma)
635 {
636         int rc;
637         ENTRY;
638
639         if (ma->ma_valid & MA_LOV)
640                 RETURN(0);
641
642         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
643                         XATTR_NAME_LOV);
644
645         if (rc == 0 && (ma->ma_need & MA_LOV_DEF)) {
646                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm,
647                                 &ma->ma_lmm_size);
648         }
649
650         if (rc > 0) {
651                 ma->ma_valid |= MA_LOV;
652                 rc = 0;
653         }
654         RETURN(rc);
655 }
656
657 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
658                        struct md_attr *ma)
659 {
660         int rc;
661         ENTRY;
662
663         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
664         rc = __mdd_lmm_get(env, mdd_obj, ma);
665         mdd_read_unlock(env, mdd_obj);
666         RETURN(rc);
667 }
668
669 /* get lmv EA only*/
670 static int __mdd_lmv_get(const struct lu_env *env,
671                          struct mdd_object *mdd_obj, struct md_attr *ma)
672 {
673         int rc;
674         ENTRY;
675
676         if (ma->ma_valid & MA_LMV)
677                 RETURN(0);
678
679         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
680                         XATTR_NAME_LMV);
681         if (rc > 0) {
682                 ma->ma_valid |= MA_LMV;
683                 rc = 0;
684         }
685         RETURN(rc);
686 }
687
688 static int mdd_attr_get_internal(const struct lu_env *env,
689                                  struct mdd_object *mdd_obj,
690                                  struct md_attr *ma)
691 {
692         int rc = 0;
693         ENTRY;
694
695         if (ma->ma_need & MA_INODE)
696                 rc = mdd_iattr_get(env, mdd_obj, ma);
697
698         if (rc == 0 && ma->ma_need & MA_LOV) {
699                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
700                     S_ISDIR(mdd_object_type(mdd_obj)))
701                         rc = __mdd_lmm_get(env, mdd_obj, ma);
702         }
703         if (rc == 0 && ma->ma_need & MA_LMV) {
704                 if (S_ISDIR(mdd_object_type(mdd_obj)))
705                         rc = __mdd_lmv_get(env, mdd_obj, ma);
706         }
707 #ifdef CONFIG_FS_POSIX_ACL
708         if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
709                 if (S_ISDIR(mdd_object_type(mdd_obj)))
710                         rc = mdd_def_acl_get(env, mdd_obj, ma);
711         }
712 #endif
713         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
714                rc, ma->ma_valid);
715         RETURN(rc);
716 }
717
718 int mdd_attr_get_internal_locked(const struct lu_env *env,
719                                  struct mdd_object *mdd_obj, struct md_attr *ma)
720 {
721         int rc;
722         int needlock = ma->ma_need & (MA_LOV | MA_LMV | MA_ACL_DEF);
723
724         if (needlock)
725                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
726         rc = mdd_attr_get_internal(env, mdd_obj, ma);
727         if (needlock)
728                 mdd_read_unlock(env, mdd_obj);
729         return rc;
730 }
731
732 /*
733  * No permission check is needed.
734  */
735 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
736                         struct md_attr *ma)
737 {
738         struct mdd_object *mdd_obj = md2mdd_obj(obj);
739         int                rc;
740
741         ENTRY;
742         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
743         RETURN(rc);
744 }
745
746 /*
747  * No permission check is needed.
748  */
749 static int mdd_xattr_get(const struct lu_env *env,
750                          struct md_object *obj, struct lu_buf *buf,
751                          const char *name)
752 {
753         struct mdd_object *mdd_obj = md2mdd_obj(obj);
754         int rc;
755
756         ENTRY;
757
758         LASSERT(mdd_object_exists(mdd_obj));
759
760         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
761         rc = mdo_xattr_get(env, mdd_obj, buf, name,
762                            mdd_object_capa(env, mdd_obj));
763         mdd_read_unlock(env, mdd_obj);
764
765         RETURN(rc);
766 }
767
768 /*
769  * Permission check is done when open,
770  * no need check again.
771  */
772 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
773                         struct lu_buf *buf)
774 {
775         struct mdd_object *mdd_obj = md2mdd_obj(obj);
776         struct dt_object  *next;
777         loff_t             pos = 0;
778         int                rc;
779         ENTRY;
780
781         LASSERT(mdd_object_exists(mdd_obj));
782
783         next = mdd_object_child(mdd_obj);
784         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
785         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
786                                          mdd_object_capa(env, mdd_obj));
787         mdd_read_unlock(env, mdd_obj);
788         RETURN(rc);
789 }
790
791 /*
792  * No permission check is needed.
793  */
794 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
795                           struct lu_buf *buf)
796 {
797         struct mdd_object *mdd_obj = md2mdd_obj(obj);
798         int rc;
799
800         ENTRY;
801
802         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
803         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
804         mdd_read_unlock(env, mdd_obj);
805
806         RETURN(rc);
807 }
808
809 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
810                                struct mdd_object *c, struct md_attr *ma,
811                                struct thandle *handle,
812                                const struct md_op_spec *spec)
813 {
814         struct lu_attr *attr = &ma->ma_attr;
815         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
816         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
817         const struct dt_index_features *feat = spec->sp_feat;
818         int rc;
819         ENTRY;
820
821         if (!mdd_object_exists(c)) {
822                 struct dt_object *next = mdd_object_child(c);
823                 LASSERT(next);
824
825                 if (feat != &dt_directory_features && feat != NULL)
826                         dof->dof_type = DFT_INDEX;
827                 else
828                         dof->dof_type = dt_mode_to_dft(attr->la_mode);
829
830                 dof->u.dof_idx.di_feat = feat;
831
832                 /* @hint will be initialized by underlying device. */
833                 next->do_ops->do_ah_init(env, hint,
834                                          p ? mdd_object_child(p) : NULL,
835                                          attr->la_mode & S_IFMT);
836
837                 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
838                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
839         } else
840                 rc = -EEXIST;
841
842         RETURN(rc);
843 }
844
845 /**
846  * Make sure the ctime is increased only.
847  */
848 static inline int mdd_attr_check(const struct lu_env *env,
849                                  struct mdd_object *obj,
850                                  struct lu_attr *attr)
851 {
852         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
853         int rc;
854         ENTRY;
855
856         if (attr->la_valid & LA_CTIME) {
857                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
858                 if (rc)
859                         RETURN(rc);
860
861                 if (attr->la_ctime < tmp_la->la_ctime)
862                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
863                 else if (attr->la_valid == LA_CTIME &&
864                          attr->la_ctime == tmp_la->la_ctime)
865                         attr->la_valid &= ~LA_CTIME;
866         }
867         RETURN(0);
868 }
869
870 int mdd_attr_set_internal(const struct lu_env *env,
871                           struct mdd_object *obj,
872                           struct lu_attr *attr,
873                           struct thandle *handle,
874                           int needacl)
875 {
876         int rc;
877         ENTRY;
878
879         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
880 #ifdef CONFIG_FS_POSIX_ACL
881         if (!rc && (attr->la_valid & LA_MODE) && needacl)
882                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
883 #endif
884         RETURN(rc);
885 }
886
887 int mdd_attr_check_set_internal(const struct lu_env *env,
888                                 struct mdd_object *obj,
889                                 struct lu_attr *attr,
890                                 struct thandle *handle,
891                                 int needacl)
892 {
893         int rc;
894         ENTRY;
895
896         rc = mdd_attr_check(env, obj, attr);
897         if (rc)
898                 RETURN(rc);
899
900         if (attr->la_valid)
901                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
902         RETURN(rc);
903 }
904
905 static int mdd_attr_set_internal_locked(const struct lu_env *env,
906                                         struct mdd_object *obj,
907                                         struct lu_attr *attr,
908                                         struct thandle *handle,
909                                         int needacl)
910 {
911         int rc;
912         ENTRY;
913
914         needacl = needacl && (attr->la_valid & LA_MODE);
915         if (needacl)
916                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
917         rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
918         if (needacl)
919                 mdd_write_unlock(env, obj);
920         RETURN(rc);
921 }
922
923 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
924                                        struct mdd_object *obj,
925                                        struct lu_attr *attr,
926                                        struct thandle *handle,
927                                        int needacl)
928 {
929         int rc;
930         ENTRY;
931
932         needacl = needacl && (attr->la_valid & LA_MODE);
933         if (needacl)
934                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
935         rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
936         if (needacl)
937                 mdd_write_unlock(env, obj);
938         RETURN(rc);
939 }
940
941 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
942                     const struct lu_buf *buf, const char *name,
943                     int fl, struct thandle *handle)
944 {
945         struct lustre_capa *capa = mdd_object_capa(env, obj);
946         int rc = -EINVAL;
947         ENTRY;
948
949         if (buf->lb_buf && buf->lb_len > 0)
950                 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
951         else if (buf->lb_buf == NULL && buf->lb_len == 0)
952                 rc = mdo_xattr_del(env, obj, name, handle, capa);
953
954         RETURN(rc);
955 }
956
957 /*
958  * This gives the same functionality as the code between
959  * sys_chmod and inode_setattr
960  * chown_common and inode_setattr
961  * utimes and inode_setattr
962  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
963  */
964 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
965                         struct lu_attr *la, const struct md_attr *ma)
966 {
967         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
968         struct md_ucred  *uc         = md_ucred(env);
969         int               rc;
970         ENTRY;
971
972         if (!la->la_valid)
973                 RETURN(0);
974
975         /* Do not permit change file type */
976         if (la->la_valid & LA_TYPE)
977                 RETURN(-EPERM);
978
979         /* They should not be processed by setattr */
980         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
981                 RETURN(-EPERM);
982
983         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
984         if (rc)
985                 RETURN(rc);
986
987         if (la->la_valid == LA_CTIME) {
988                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
989                         /* This is only for set ctime when rename's source is
990                          * on remote MDS. */
991                         rc = mdd_may_delete(env, NULL, obj,
992                                             (struct md_attr *)ma, 1, 0);
993                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
994                         la->la_valid &= ~LA_CTIME;
995                 RETURN(rc);
996         }
997
998         if (la->la_valid == LA_ATIME) {
999                 /* This is atime only set for read atime update on close. */
1000                 if (la->la_atime <= tmp_la->la_atime +
1001                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff)
1002                         la->la_valid &= ~LA_ATIME;
1003                 RETURN(0);
1004         }
1005
1006         /* Check if flags change. */
1007         if (la->la_valid & LA_FLAGS) {
1008                 unsigned int oldflags = 0;
1009                 unsigned int newflags = la->la_flags &
1010                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1011
1012                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1013                     !mdd_capable(uc, CFS_CAP_FOWNER))
1014                         RETURN(-EPERM);
1015
1016                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1017                  * only be changed by the relevant capability. */
1018                 if (mdd_is_immutable(obj))
1019                         oldflags |= LUSTRE_IMMUTABLE_FL;
1020                 if (mdd_is_append(obj))
1021                         oldflags |= LUSTRE_APPEND_FL;
1022                 if ((oldflags ^ newflags) &&
1023                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1024                         RETURN(-EPERM);
1025
1026                 if (!S_ISDIR(tmp_la->la_mode))
1027                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1028         }
1029
1030         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1031             (la->la_valid & ~LA_FLAGS) &&
1032             !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1033                 RETURN(-EPERM);
1034
1035         /* Check for setting the obj time. */
1036         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1037             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1038                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1039                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
1040                         rc = mdd_permission_internal_locked(env, obj, tmp_la,
1041                                                             MAY_WRITE,
1042                                                             MOR_TGT_CHILD);
1043                         if (rc)
1044                                 RETURN(rc);
1045                 }
1046         }
1047
1048         /* Make sure a caller can chmod. */
1049         if (la->la_valid & LA_MODE) {
1050                 /* Bypass la_vaild == LA_MODE,
1051                  * this is for changing file with SUID or SGID. */
1052                 if ((la->la_valid & ~LA_MODE) &&
1053                     !(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1054                     (uc->mu_fsuid != tmp_la->la_uid) &&
1055                     !mdd_capable(uc, CFS_CAP_FOWNER))
1056                         RETURN(-EPERM);
1057
1058                 if (la->la_mode == (umode_t) -1)
1059                         la->la_mode = tmp_la->la_mode;
1060                 else
1061                         la->la_mode = (la->la_mode & S_IALLUGO) |
1062                                       (tmp_la->la_mode & ~S_IALLUGO);
1063
1064                 /* Also check the setgid bit! */
1065                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1066                                        la->la_gid : tmp_la->la_gid) &&
1067                     !mdd_capable(uc, CFS_CAP_FSETID))
1068                         la->la_mode &= ~S_ISGID;
1069         } else {
1070                la->la_mode = tmp_la->la_mode;
1071         }
1072
1073         /* Make sure a caller can chown. */
1074         if (la->la_valid & LA_UID) {
1075                 if (la->la_uid == (uid_t) -1)
1076                         la->la_uid = tmp_la->la_uid;
1077                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1078                     (la->la_uid != tmp_la->la_uid)) &&
1079                     !mdd_capable(uc, CFS_CAP_CHOWN))
1080                         RETURN(-EPERM);
1081
1082                 /* If the user or group of a non-directory has been
1083                  * changed by a non-root user, remove the setuid bit.
1084                  * 19981026 David C Niemi <niemi@tux.org>
1085                  *
1086                  * Changed this to apply to all users, including root,
1087                  * to avoid some races. This is the behavior we had in
1088                  * 2.0. The check for non-root was definitely wrong
1089                  * for 2.2 anyway, as it should have been using
1090                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
1091                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1092                     !S_ISDIR(tmp_la->la_mode)) {
1093                         la->la_mode &= ~S_ISUID;
1094                         la->la_valid |= LA_MODE;
1095                 }
1096         }
1097
1098         /* Make sure caller can chgrp. */
1099         if (la->la_valid & LA_GID) {
1100                 if (la->la_gid == (gid_t) -1)
1101                         la->la_gid = tmp_la->la_gid;
1102                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1103                     ((la->la_gid != tmp_la->la_gid) &&
1104                     !lustre_in_group_p(uc, la->la_gid))) &&
1105                     !mdd_capable(uc, CFS_CAP_CHOWN))
1106                         RETURN(-EPERM);
1107
1108                 /* Likewise, if the user or group of a non-directory
1109                  * has been changed by a non-root user, remove the
1110                  * setgid bit UNLESS there is no group execute bit
1111                  * (this would be a file marked for mandatory
1112                  * locking).  19981026 David C Niemi <niemi@tux.org>
1113                  *
1114                  * Removed the fsuid check (see the comment above) --
1115                  * 19990830 SD. */
1116                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1117                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1118                         la->la_mode &= ~S_ISGID;
1119                         la->la_valid |= LA_MODE;
1120                 }
1121         }
1122
1123         /* For both Size-on-MDS case and truncate case,
1124          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1125          * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1126          * For SOM case, it is true, the MAY_WRITE perm has been checked
1127          * when open, no need check again. For truncate case, it is false,
1128          * the MAY_WRITE perm should be checked here. */
1129         if (ma->ma_attr_flags & MDS_SOM) {
1130                 /* For the "Size-on-MDS" setattr update, merge coming
1131                  * attributes with the set in the inode. BUG 10641 */
1132                 if ((la->la_valid & LA_ATIME) &&
1133                     (la->la_atime <= tmp_la->la_atime))
1134                         la->la_valid &= ~LA_ATIME;
1135
1136                 /* OST attributes do not have a priority over MDS attributes,
1137                  * so drop times if ctime is equal. */
1138                 if ((la->la_valid & LA_CTIME) &&
1139                     (la->la_ctime <= tmp_la->la_ctime))
1140                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1141         } else {
1142                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1143                         if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1144                               (uc->mu_fsuid == tmp_la->la_uid)) &&
1145                             !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1146                                 rc = mdd_permission_internal_locked(env, obj,
1147                                                             tmp_la, MAY_WRITE,
1148                                                             MOR_TGT_CHILD);
1149                                 if (rc)
1150                                         RETURN(rc);
1151                         }
1152                 }
1153                 if (la->la_valid & LA_CTIME) {
1154                         /* The pure setattr, it has the priority over what is
1155                          * already set, do not drop it if ctime is equal. */
1156                         if (la->la_ctime < tmp_la->la_ctime)
1157                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1158                                                   LA_CTIME);
1159                 }
1160         }
1161
1162         RETURN(0);
1163 }
1164
1165 /** Store a data change changelog record
1166  * If this fails, we must fail the whole transaction; we don't
1167  * want the change to commit without the log entry.
1168  * \param mdd_obj - mdd_object of change
1169  * \param handle - transacion handle
1170  */
1171 static int mdd_changelog_data_store(const struct lu_env     *env,
1172                                     struct mdd_device       *mdd,
1173                                     enum changelog_rec_type type,
1174                                     struct mdd_object       *mdd_obj,
1175                                     struct thandle          *handle)
1176 {
1177         const struct lu_fid *tfid = mdo2fid(mdd_obj);
1178         struct llog_changelog_rec *rec;
1179         struct lu_buf *buf;
1180         int reclen;
1181         int rc;
1182
1183         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1184                 RETURN(0);
1185
1186         LASSERT(handle != NULL);
1187         LASSERT(mdd_obj != NULL);
1188
1189         if ((type == CL_SETATTR) &&
1190             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1191                 /* Don't need multiple updates in this log */
1192                 /* Don't check under lock - no big deal if we get an extra
1193                    entry */
1194                 RETURN(0);
1195         }
1196
1197         reclen = llog_data_len(sizeof(*rec));
1198         buf = mdd_buf_alloc(env, reclen);
1199         if (buf->lb_buf == NULL)
1200                 RETURN(-ENOMEM);
1201         rec = (struct llog_changelog_rec *)buf->lb_buf;
1202
1203         rec->cr.cr_flags = CLF_VERSION;
1204         rec->cr.cr_type = (__u32)type;
1205         rec->cr.cr_tfid = *tfid;
1206         rec->cr.cr_namelen = 0;
1207         mdd_obj->mod_cltime = cfs_time_current_64();
1208
1209         rc = mdd_changelog_llog_write(mdd, rec, handle);
1210         if (rc < 0) {
1211                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1212                        rc, type, PFID(tfid));
1213                 return -EFAULT;
1214         }
1215
1216         return 0;
1217 }
1218
1219 /* set attr and LOV EA at once, return updated attr */
1220 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1221                         const struct md_attr *ma)
1222 {
1223         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1224         struct mdd_device *mdd = mdo2mdd(obj);
1225         struct thandle *handle;
1226         struct lov_mds_md *lmm = NULL;
1227         struct llog_cookie *logcookies = NULL;
1228         int  rc, lmm_size = 0, cookie_size = 0;
1229         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1230 #ifdef HAVE_QUOTA_SUPPORT
1231         struct obd_device *obd = mdd->mdd_obd_dev;
1232         struct obd_export *exp = md_quota(env)->mq_exp;
1233         struct mds_obd *mds = &obd->u.mds;
1234         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1235         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1236         int quota_opc = 0, block_count = 0;
1237         int inode_pending[MAXQUOTAS] = { 0, 0 };
1238         int block_pending[MAXQUOTAS] = { 0, 0 };
1239 #endif
1240         ENTRY;
1241
1242         mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1243                                     MDD_TXN_ATTR_SET_OP);
1244         handle = mdd_trans_start(env, mdd);
1245         if (IS_ERR(handle))
1246                 RETURN(PTR_ERR(handle));
1247         /*TODO: add lock here*/
1248         /* start a log jounal handle if needed */
1249         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1250             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1251                 lmm_size = mdd_lov_mdsize(env, mdd);
1252                 lmm = mdd_max_lmm_get(env, mdd);
1253                 if (lmm == NULL)
1254                         GOTO(cleanup, rc = -ENOMEM);
1255
1256                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1257                                 XATTR_NAME_LOV);
1258
1259                 if (rc < 0)
1260                         GOTO(cleanup, rc);
1261         }
1262
1263         if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1264                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1265                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1266
1267         *la_copy = ma->ma_attr;
1268         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1269         if (rc)
1270                 GOTO(cleanup, rc);
1271
1272 #ifdef HAVE_QUOTA_SUPPORT
1273         if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1274                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1275
1276                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1277                 if (!rc) {
1278                         quota_opc = FSFILT_OP_SETATTR;
1279                         mdd_quota_wrapper(la_copy, qnids);
1280                         mdd_quota_wrapper(la_tmp, qoids);
1281                         /* get file quota for new owner */
1282                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1283                                         qnids, inode_pending, 1, NULL, 0,
1284                                         NULL, 0);
1285                         block_count = (la_tmp->la_blocks + 7) >> 3;
1286                         if (block_count) {
1287                                 void *data = NULL;
1288                                 mdd_data_get(env, mdd_obj, &data);
1289                                 /* get block quota for new owner */
1290                                 lquota_chkquota(mds_quota_interface_ref, obd,
1291                                                 exp, qnids, block_pending,
1292                                                 block_count, NULL,
1293                                                 LQUOTA_FLAGS_BLK, data, 1);
1294                         }
1295                 }
1296         }
1297 #endif
1298
1299         if (la_copy->la_valid & LA_FLAGS) {
1300                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1301                                                   handle, 1);
1302                 if (rc == 0)
1303                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1304         } else if (la_copy->la_valid) {            /* setattr */
1305                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1306                                                   handle, 1);
1307                 /* journal chown/chgrp in llog, just like unlink */
1308                 if (rc == 0 && lmm_size){
1309                         cookie_size = mdd_lov_cookiesize(env, mdd);
1310                         logcookies = mdd_max_cookie_get(env, mdd);
1311                         if (logcookies == NULL)
1312                                 GOTO(cleanup, rc = -ENOMEM);
1313
1314                         if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1315                                             logcookies, cookie_size) <= 0)
1316                                 logcookies = NULL;
1317                 }
1318         }
1319
1320         if (rc == 0 && ma->ma_valid & MA_LOV) {
1321                 umode_t mode;
1322
1323                 mode = mdd_object_type(mdd_obj);
1324                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1325                         rc = mdd_lsm_sanity_check(env, mdd_obj);
1326                         if (rc)
1327                                 GOTO(cleanup, rc);
1328
1329                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1330                                             ma->ma_lmm_size, handle, 1);
1331                 }
1332
1333         }
1334 cleanup:
1335         if ((rc == 0) && (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME)))
1336                 rc = mdd_changelog_data_store(env, mdd, CL_SETATTR, mdd_obj,
1337                                               handle);
1338         mdd_trans_stop(env, mdd, rc, handle);
1339         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1340                 /*set obd attr, if needed*/
1341                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1342                                            logcookies);
1343         }
1344 #ifdef HAVE_QUOTA_SUPPORT
1345         if (quota_opc) {
1346                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1347                                       inode_pending, 0);
1348                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1349                                       block_pending, 1);
1350                 /* Trigger dqrel/dqacq for original owner and new owner.
1351                  * If failed, the next call for lquota_chkquota will
1352                  * process it. */
1353                 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1354                               quota_opc);
1355         }
1356 #endif
1357         RETURN(rc);
1358 }
1359
1360 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1361                       const struct lu_buf *buf, const char *name, int fl,
1362                       struct thandle *handle)
1363 {
1364         int  rc;
1365         ENTRY;
1366
1367         mdd_write_lock(env, obj, MOR_TGT_CHILD);
1368         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1369         mdd_write_unlock(env, obj);
1370
1371         RETURN(rc);
1372 }
1373
1374 static int mdd_xattr_sanity_check(const struct lu_env *env,
1375                                   struct mdd_object *obj)
1376 {
1377         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1378         struct md_ucred *uc     = md_ucred(env);
1379         int rc;
1380         ENTRY;
1381
1382         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1383                 RETURN(-EPERM);
1384
1385         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1386         if (rc)
1387                 RETURN(rc);
1388
1389         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1390             !mdd_capable(uc, CFS_CAP_FOWNER))
1391                 RETURN(-EPERM);
1392
1393         RETURN(rc);
1394 }
1395
1396 /**
1397  * The caller should guarantee to update the object ctime
1398  * after xattr_set if needed.
1399  */
1400 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1401                          const struct lu_buf *buf, const char *name,
1402                          int fl)
1403 {
1404         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1405         struct mdd_device *mdd = mdo2mdd(obj);
1406         struct thandle *handle;
1407         int  rc;
1408         ENTRY;
1409
1410         rc = mdd_xattr_sanity_check(env, mdd_obj);
1411         if (rc)
1412                 RETURN(rc);
1413
1414         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1415         /* security-replated changes may require sync */
1416         if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1417             mdd->mdd_sync_permission == 1)
1418                 txn_param_sync(&mdd_env_info(env)->mti_param);
1419
1420         handle = mdd_trans_start(env, mdd);
1421         if (IS_ERR(handle))
1422                 RETURN(PTR_ERR(handle));
1423
1424         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1425
1426         /* Only record user xattr changes */
1427         if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1428             (strncmp("user.", name, 5) == 0))
1429                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1430                                               handle);
1431         mdd_trans_stop(env, mdd, rc, handle);
1432
1433         RETURN(rc);
1434 }
1435
1436 /**
1437  * The caller should guarantee to update the object ctime
1438  * after xattr_set if needed.
1439  */
1440 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1441                   const char *name)
1442 {
1443         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1444         struct mdd_device *mdd = mdo2mdd(obj);
1445         struct thandle *handle;
1446         int  rc;
1447         ENTRY;
1448
1449         rc = mdd_xattr_sanity_check(env, mdd_obj);
1450         if (rc)
1451                 RETURN(rc);
1452
1453         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1454         handle = mdd_trans_start(env, mdd);
1455         if (IS_ERR(handle))
1456                 RETURN(PTR_ERR(handle));
1457
1458         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1459         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1460                            mdd_object_capa(env, mdd_obj));
1461         mdd_write_unlock(env, mdd_obj);
1462
1463         /* Only record user xattr changes */
1464         if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1465             (strncmp("user.", name, 5) != 0))
1466                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1467                                               handle);
1468
1469         mdd_trans_stop(env, mdd, rc, handle);
1470
1471         RETURN(rc);
1472 }
1473
1474 /* partial unlink */
1475 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1476                        struct md_attr *ma)
1477 {
1478         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1479         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1480         struct mdd_device *mdd = mdo2mdd(obj);
1481         struct thandle *handle;
1482 #ifdef HAVE_QUOTA_SUPPORT
1483         struct obd_device *obd = mdd->mdd_obd_dev;
1484         struct mds_obd *mds = &obd->u.mds;
1485         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1486         int quota_opc = 0;
1487 #endif
1488         int rc;
1489         ENTRY;
1490
1491         /*
1492          * Check -ENOENT early here because we need to get object type
1493          * to calculate credits before transaction start
1494          */
1495         if (!mdd_object_exists(mdd_obj))
1496                 RETURN(-ENOENT);
1497
1498         LASSERT(mdd_object_exists(mdd_obj) > 0);
1499
1500         rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1501         if (rc)
1502                 RETURN(rc);
1503
1504         handle = mdd_trans_start(env, mdd);
1505         if (IS_ERR(handle))
1506                 RETURN(-ENOMEM);
1507
1508         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1509
1510         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1511         if (rc)
1512                 GOTO(cleanup, rc);
1513
1514         __mdd_ref_del(env, mdd_obj, handle, 0);
1515
1516         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1517                 /* unlink dot */
1518                 __mdd_ref_del(env, mdd_obj, handle, 1);
1519         }
1520
1521         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1522         la_copy->la_ctime = ma->ma_attr.la_ctime;
1523
1524         la_copy->la_valid = LA_CTIME;
1525         rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1526         if (rc)
1527                 GOTO(cleanup, rc);
1528
1529         rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1530 #ifdef HAVE_QUOTA_SUPPORT
1531         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1532             ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1533                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1534                 mdd_quota_wrapper(&ma->ma_attr, qids);
1535         }
1536 #endif
1537
1538
1539         EXIT;
1540 cleanup:
1541         mdd_write_unlock(env, mdd_obj);
1542         mdd_trans_stop(env, mdd, rc, handle);
1543 #ifdef HAVE_QUOTA_SUPPORT
1544         if (quota_opc)
1545                 /* Trigger dqrel on the owner of child. If failed,
1546                  * the next call for lquota_chkquota will process it */
1547                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1548                               quota_opc);
1549 #endif
1550         return rc;
1551 }
1552
1553 /* partial operation */
1554 static int mdd_oc_sanity_check(const struct lu_env *env,
1555                                struct mdd_object *obj,
1556                                struct md_attr *ma)
1557 {
1558         int rc;
1559         ENTRY;
1560
1561         switch (ma->ma_attr.la_mode & S_IFMT) {
1562         case S_IFREG:
1563         case S_IFDIR:
1564         case S_IFLNK:
1565         case S_IFCHR:
1566         case S_IFBLK:
1567         case S_IFIFO:
1568         case S_IFSOCK:
1569                 rc = 0;
1570                 break;
1571         default:
1572                 rc = -EINVAL;
1573                 break;
1574         }
1575         RETURN(rc);
1576 }
1577
1578 static int mdd_object_create(const struct lu_env *env,
1579                              struct md_object *obj,
1580                              const struct md_op_spec *spec,
1581                              struct md_attr *ma)
1582 {
1583
1584         struct mdd_device *mdd = mdo2mdd(obj);
1585         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1586         const struct lu_fid *pfid = spec->u.sp_pfid;
1587         struct thandle *handle;
1588 #ifdef HAVE_QUOTA_SUPPORT
1589         struct obd_device *obd = mdd->mdd_obd_dev;
1590         struct obd_export *exp = md_quota(env)->mq_exp;
1591         struct mds_obd *mds = &obd->u.mds;
1592         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1593         int quota_opc = 0, block_count = 0;
1594         int inode_pending[MAXQUOTAS] = { 0, 0 };
1595         int block_pending[MAXQUOTAS] = { 0, 0 };
1596 #endif
1597         int rc = 0;
1598         ENTRY;
1599
1600 #ifdef HAVE_QUOTA_SUPPORT
1601         if (mds->mds_quota) {
1602                 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1603                 mdd_quota_wrapper(&ma->ma_attr, qids);
1604                 /* get file quota for child */
1605                 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1606                                 qids, inode_pending, 1, NULL, 0,
1607                                 NULL, 0);
1608                 switch (ma->ma_attr.la_mode & S_IFMT) {
1609                 case S_IFLNK:
1610                 case S_IFDIR:
1611                         block_count = 2;
1612                         break;
1613                 case S_IFREG:
1614                         block_count = 1;
1615                         break;
1616                 }
1617                 /* get block quota for child */
1618                 if (block_count)
1619                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1620                                         qids, block_pending, block_count,
1621                                         NULL, LQUOTA_FLAGS_BLK, NULL, 0);
1622         }
1623 #endif
1624
1625         mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
1626         handle = mdd_trans_start(env, mdd);
1627         if (IS_ERR(handle))
1628                 GOTO(out_pending, rc = PTR_ERR(handle));
1629
1630         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1631         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1632         if (rc)
1633                 GOTO(unlock, rc);
1634
1635         rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1636         if (rc)
1637                 GOTO(unlock, rc);
1638
1639         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1640                 /* If creating the slave object, set slave EA here. */
1641                 int lmv_size = spec->u.sp_ea.eadatalen;
1642                 struct lmv_stripe_md *lmv;
1643
1644                 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1645                 LASSERT(lmv != NULL && lmv_size > 0);
1646
1647                 rc = __mdd_xattr_set(env, mdd_obj,
1648                                      mdd_buf_get_const(env, lmv, lmv_size),
1649                                      XATTR_NAME_LMV, 0, handle);
1650                 if (rc)
1651                         GOTO(unlock, rc);
1652
1653                 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1654                                            handle, 0);
1655         } else {
1656 #ifdef CONFIG_FS_POSIX_ACL
1657                 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1658                         struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1659
1660                         buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1661                         buf->lb_len = spec->u.sp_ea.eadatalen;
1662                         if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1663                                 rc = __mdd_acl_init(env, mdd_obj, buf,
1664                                                     &ma->ma_attr.la_mode,
1665                                                     handle);
1666                                 if (rc)
1667                                         GOTO(unlock, rc);
1668                                 else
1669                                         ma->ma_attr.la_valid |= LA_MODE;
1670                         }
1671
1672                         pfid = spec->u.sp_ea.fid;
1673                 }
1674 #endif
1675                 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1676                                            spec);
1677         }
1678         EXIT;
1679 unlock:
1680         if (rc == 0)
1681                 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1682         mdd_write_unlock(env, mdd_obj);
1683
1684         mdd_trans_stop(env, mdd, rc, handle);
1685 out_pending:
1686 #ifdef HAVE_QUOTA_SUPPORT
1687         if (quota_opc) {
1688                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1689                                       inode_pending, 0);
1690                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1691                                       block_pending, 1);
1692                 /* Trigger dqacq on the owner of child. If failed,
1693                  * the next call for lquota_chkquota will process it. */
1694                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1695                               quota_opc);
1696         }
1697 #endif
1698         return rc;
1699 }
1700
1701 /* partial link */
1702 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1703                        const struct md_attr *ma)
1704 {
1705         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1706         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1707         struct mdd_device *mdd = mdo2mdd(obj);
1708         struct thandle *handle;
1709         int rc;
1710         ENTRY;
1711
1712         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1713         handle = mdd_trans_start(env, mdd);
1714         if (IS_ERR(handle))
1715                 RETURN(-ENOMEM);
1716
1717         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1718         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1719         if (rc == 0)
1720                 __mdd_ref_add(env, mdd_obj, handle);
1721         mdd_write_unlock(env, mdd_obj);
1722         if (rc == 0) {
1723                 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1724                 la_copy->la_ctime = ma->ma_attr.la_ctime;
1725
1726                 la_copy->la_valid = LA_CTIME;
1727                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1728                                                         handle, 0);
1729         }
1730         mdd_trans_stop(env, mdd, 0, handle);
1731
1732         RETURN(rc);
1733 }
1734
1735 /*
1736  * do NOT or the MAY_*'s, you'll get the weakest
1737  */
1738 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1739 {
1740         int res = 0;
1741
1742         /* Sadly, NFSD reopens a file repeatedly during operation, so the
1743          * "acc_mode = 0" allowance for newly-created files isn't honoured.
1744          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1745          * owner can write to a file even if it is marked readonly to hide
1746          * its brokenness. (bug 5781) */
1747         if (flags & MDS_OPEN_OWNEROVERRIDE) {
1748                 struct md_ucred *uc = md_ucred(env);
1749
1750                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1751                     (la->la_uid == uc->mu_fsuid))
1752                         return 0;
1753         }
1754
1755         if (flags & FMODE_READ)
1756                 res |= MAY_READ;
1757         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1758                 res |= MAY_WRITE;
1759         if (flags & MDS_FMODE_EXEC)
1760                 res |= MAY_EXEC;
1761         return res;
1762 }
1763
1764 static int mdd_open_sanity_check(const struct lu_env *env,
1765                                  struct mdd_object *obj, int flag)
1766 {
1767         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1768         int mode, rc;
1769         ENTRY;
1770
1771         /* EEXIST check */
1772         if (mdd_is_dead_obj(obj))
1773                 RETURN(-ENOENT);
1774
1775         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1776         if (rc)
1777                RETURN(rc);
1778
1779         if (S_ISLNK(tmp_la->la_mode))
1780                 RETURN(-ELOOP);
1781
1782         mode = accmode(env, tmp_la, flag);
1783
1784         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1785                 RETURN(-EISDIR);
1786
1787         if (!(flag & MDS_OPEN_CREATED)) {
1788                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1789                 if (rc)
1790                         RETURN(rc);
1791         }
1792
1793         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1794             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1795                 flag &= ~MDS_OPEN_TRUNC;
1796
1797         /* For writing append-only file must open it with append mode. */
1798         if (mdd_is_append(obj)) {
1799                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
1800                         RETURN(-EPERM);
1801                 if (flag & MDS_OPEN_TRUNC)
1802                         RETURN(-EPERM);
1803         }
1804
1805 #if 0
1806         /*
1807          * Now, flag -- O_NOATIME does not be packed by client.
1808          */
1809         if (flag & O_NOATIME) {
1810                 struct md_ucred *uc = md_ucred(env);
1811
1812                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
1813                     (uc->mu_valid == UCRED_NEW)) &&
1814                     (uc->mu_fsuid != tmp_la->la_uid) &&
1815                     !mdd_capable(uc, CFS_CAP_FOWNER))
1816                         RETURN(-EPERM);
1817         }
1818 #endif
1819
1820         RETURN(0);
1821 }
1822
1823 static int mdd_open(const struct lu_env *env, struct md_object *obj,
1824                     int flags)
1825 {
1826         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1827         int rc = 0;
1828
1829         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1830
1831         rc = mdd_open_sanity_check(env, mdd_obj, flags);
1832         if (rc == 0)
1833                 mdd_obj->mod_count++;
1834
1835         mdd_write_unlock(env, mdd_obj);
1836         return rc;
1837 }
1838
1839 /* return md_attr back,
1840  * if it is last unlink then return lov ea + llog cookie*/
1841 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
1842                     struct md_attr *ma)
1843 {
1844         int rc = 0;
1845         ENTRY;
1846
1847         if (S_ISREG(mdd_object_type(obj))) {
1848                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
1849                  * Caller must be ready for that. */
1850
1851                 rc = __mdd_lmm_get(env, obj, ma);
1852                 if ((ma->ma_valid & MA_LOV))
1853                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
1854                                             obj, ma);
1855         }
1856         RETURN(rc);
1857 }
1858
1859 /*
1860  * No permission check is needed.
1861  */
1862 static int mdd_close(const struct lu_env *env, struct md_object *obj,
1863                      struct md_attr *ma)
1864 {
1865         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1866         struct mdd_device *mdd = mdo2mdd(obj);
1867         struct thandle    *handle;
1868         int rc;
1869         int reset = 1;
1870
1871 #ifdef HAVE_QUOTA_SUPPORT
1872         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
1873         struct mds_obd *mds = &obd->u.mds;
1874         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1875         int quota_opc = 0;
1876 #endif
1877         ENTRY;
1878
1879         rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1880         if (rc)
1881                 RETURN(rc);
1882         handle = mdd_trans_start(env, mdo2mdd(obj));
1883         if (IS_ERR(handle))
1884                 RETURN(PTR_ERR(handle));
1885
1886         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1887         /* release open count */
1888         mdd_obj->mod_count --;
1889
1890         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
1891                 /* remove link to object from orphan index */
1892                 rc = __mdd_orphan_del(env, mdd_obj, handle);
1893                 if (rc == 0) {
1894                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
1895                                "list, OSS objects to be destroyed.\n",
1896                                PFID(mdd_object_fid(mdd_obj)));
1897                 } else {
1898                         CERROR("Object "DFID" can not be deleted from orphan "
1899                                 "list, maybe cause OST objects can not be "
1900                                 "destroyed (err: %d).\n",
1901                                 PFID(mdd_object_fid(mdd_obj)), rc);
1902                         /* If object was not deleted from orphan list, do not
1903                          * destroy OSS objects, which will be done when next
1904                          * recovery. */
1905                         GOTO(out, rc);
1906                 }
1907         }
1908
1909         rc = mdd_iattr_get(env, mdd_obj, ma);
1910         /* Object maybe not in orphan list originally, it is rare case for
1911          * mdd_finish_unlink() failure. */
1912         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
1913 #ifdef HAVE_QUOTA_SUPPORT
1914                 if (mds->mds_quota) {
1915                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1916                         mdd_quota_wrapper(&ma->ma_attr, qids);
1917                 }
1918 #endif
1919                 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
1920                 if (ma->ma_valid & MA_FLAGS &&
1921                     ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
1922                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
1923                 } else {
1924                         rc = mdd_object_kill(env, mdd_obj, ma);
1925                                 if (rc == 0)
1926                                         reset = 0;
1927                 }
1928
1929                 if (rc != 0)
1930                         CERROR("Error when prepare to delete Object "DFID" , "
1931                                "which will cause OST objects can not be "
1932                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
1933         }
1934         EXIT;
1935
1936 out:
1937         if (reset)
1938                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
1939
1940         mdd_write_unlock(env, mdd_obj);
1941         mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
1942 #ifdef HAVE_QUOTA_SUPPORT
1943         if (quota_opc)
1944                 /* Trigger dqrel on the owner of child. If failed,
1945                  * the next call for lquota_chkquota will process it */
1946                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1947                               quota_opc);
1948 #endif
1949         return rc;
1950 }
1951
1952 /*
1953  * Permission check is done when open,
1954  * no need check again.
1955  */
1956 static int mdd_readpage_sanity_check(const struct lu_env *env,
1957                                      struct mdd_object *obj)
1958 {
1959         struct dt_object *next = mdd_object_child(obj);
1960         int rc;
1961         ENTRY;
1962
1963         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
1964                 rc = 0;
1965         else
1966                 rc = -ENOTDIR;
1967
1968         RETURN(rc);
1969 }
1970
1971 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
1972                               int first, void *area, int nob,
1973                               const struct dt_it_ops *iops, struct dt_it *it,
1974                               __u64 *start, __u64 *end,
1975                               struct lu_dirent **last, __u32 attr)
1976 {
1977         int                     result;
1978         __u64                   hash = 0;
1979         struct lu_dirent       *ent;
1980
1981         if (first) {
1982                 memset(area, 0, sizeof (struct lu_dirpage));
1983                 area += sizeof (struct lu_dirpage);
1984                 nob  -= sizeof (struct lu_dirpage);
1985         }
1986
1987         ent  = area;
1988         do {
1989                 int    len;
1990                 int    recsize;
1991
1992                 len  = iops->key_size(env, it);
1993
1994                 /* IAM iterator can return record with zero len. */
1995                 if (len == 0)
1996                         goto next;
1997
1998                 hash = iops->store(env, it);
1999                 if (unlikely(first)) {
2000                         first = 0;
2001                         *start = hash;
2002                 }
2003
2004                 /* calculate max space required for lu_dirent */
2005                 recsize = lu_dirent_calc_size(len, attr);
2006
2007                 if (nob >= recsize) {
2008                         result = iops->rec(env, it, ent, attr);
2009                         if (result == -ESTALE)
2010                                 goto next;
2011                         if (result != 0)
2012                                 goto out;
2013
2014                         /* osd might not able to pack all attributes,
2015                          * so recheck rec length */
2016                         recsize = le16_to_cpu(ent->lde_reclen);
2017                 } else {
2018                         /*
2019                          * record doesn't fit into page, enlarge previous one.
2020                          */
2021                         if (*last) {
2022                                 (*last)->lde_reclen =
2023                                         cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
2024                                                         nob);
2025                                 result = 0;
2026                         } else
2027                                 result = -EINVAL;
2028
2029                         goto out;
2030                 }
2031                 *last = ent;
2032                 ent = (void *)ent + recsize;
2033                 nob -= recsize;
2034
2035 next:
2036                 result = iops->next(env, it);
2037                 if (result == -ESTALE)
2038                         goto next;
2039         } while (result == 0);
2040
2041 out:
2042         *end = hash;
2043         return result;
2044 }
2045
2046 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2047                           const struct lu_rdpg *rdpg)
2048 {
2049         struct dt_it      *it;
2050         struct dt_object  *next = mdd_object_child(obj);
2051         const struct dt_it_ops  *iops;
2052         struct page       *pg;
2053         struct lu_dirent  *last = NULL;
2054         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2055         int i;
2056         int rc;
2057         int nob;
2058         __u64 hash_start;
2059         __u64 hash_end = 0;
2060
2061         LASSERT(rdpg->rp_pages != NULL);
2062         LASSERT(next->do_index_ops != NULL);
2063
2064         if (rdpg->rp_count <= 0)
2065                 return -EFAULT;
2066
2067         /*
2068          * iterate through directory and fill pages from @rdpg
2069          */
2070         iops = &next->do_index_ops->dio_it;
2071         it = iops->init(env, next, mdd_object_capa(env, obj));
2072         if (IS_ERR(it))
2073                 return PTR_ERR(it);
2074
2075         rc = iops->load(env, it, rdpg->rp_hash);
2076
2077         if (rc == 0){
2078                 /*
2079                  * Iterator didn't find record with exactly the key requested.
2080                  *
2081                  * It is currently either
2082                  *
2083                  *     - positioned above record with key less than
2084                  *     requested---skip it.
2085                  *
2086                  *     - or not positioned at all (is in IAM_IT_SKEWED
2087                  *     state)---position it on the next item.
2088                  */
2089                 rc = iops->next(env, it);
2090         } else if (rc > 0)
2091                 rc = 0;
2092
2093         /*
2094          * At this point and across for-loop:
2095          *
2096          *  rc == 0 -> ok, proceed.
2097          *  rc >  0 -> end of directory.
2098          *  rc <  0 -> error.
2099          */
2100         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2101              i++, nob -= CFS_PAGE_SIZE) {
2102                 LASSERT(i < rdpg->rp_npages);
2103                 pg = rdpg->rp_pages[i];
2104                 rc = mdd_dir_page_build(env, mdd, !i, cfs_kmap(pg),
2105                                         min_t(int, nob, CFS_PAGE_SIZE), iops,
2106                                         it, &hash_start, &hash_end, &last,
2107                                         rdpg->rp_attrs);
2108                 if (rc != 0 || i == rdpg->rp_npages - 1) {
2109                         if (last)
2110                                 last->lde_reclen = 0;
2111                 }
2112                 cfs_kunmap(pg);
2113         }
2114         if (rc > 0) {
2115                 /*
2116                  * end of directory.
2117                  */
2118                 hash_end = DIR_END_OFF;
2119                 rc = 0;
2120         }
2121         if (rc == 0) {
2122                 struct lu_dirpage *dp;
2123
2124                 dp = cfs_kmap(rdpg->rp_pages[0]);
2125                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2126                 dp->ldp_hash_end   = cpu_to_le64(hash_end);
2127                 if (i == 0)
2128                         /*
2129                          * No pages were processed, mark this.
2130                          */
2131                         dp->ldp_flags |= LDF_EMPTY;
2132
2133                 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2134                 cfs_kunmap(rdpg->rp_pages[0]);
2135         }
2136         iops->put(env, it);
2137         iops->fini(env, it);
2138
2139         return rc;
2140 }
2141
2142 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2143                  const struct lu_rdpg *rdpg)
2144 {
2145         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2146         int rc;
2147         ENTRY;
2148
2149         LASSERT(mdd_object_exists(mdd_obj));
2150
2151         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2152         rc = mdd_readpage_sanity_check(env, mdd_obj);
2153         if (rc)
2154                 GOTO(out_unlock, rc);
2155
2156         if (mdd_is_dead_obj(mdd_obj)) {
2157                 struct page *pg;
2158                 struct lu_dirpage *dp;
2159
2160                 /*
2161                  * According to POSIX, please do not return any entry to client:
2162                  * even dot and dotdot should not be returned.
2163                  */
2164                 CWARN("readdir from dead object: "DFID"\n",
2165                         PFID(mdd_object_fid(mdd_obj)));
2166
2167                 if (rdpg->rp_count <= 0)
2168                         GOTO(out_unlock, rc = -EFAULT);
2169                 LASSERT(rdpg->rp_pages != NULL);
2170
2171                 pg = rdpg->rp_pages[0];
2172                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2173                 memset(dp, 0 , sizeof(struct lu_dirpage));
2174                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2175                 dp->ldp_hash_end   = cpu_to_le64(DIR_END_OFF);
2176                 dp->ldp_flags |= LDF_EMPTY;
2177                 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2178                 cfs_kunmap(pg);
2179                 GOTO(out_unlock, rc = 0);
2180         }
2181
2182         rc = __mdd_readpage(env, mdd_obj, rdpg);
2183
2184         EXIT;
2185 out_unlock:
2186         mdd_read_unlock(env, mdd_obj);
2187         return rc;
2188 }
2189
2190 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2191 {
2192         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2193         struct dt_object *next;
2194
2195         LASSERT(mdd_object_exists(mdd_obj));
2196         next = mdd_object_child(mdd_obj);
2197         return next->do_ops->do_object_sync(env, next);
2198 }
2199
2200 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2201                                         struct md_object *obj)
2202 {
2203         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2204
2205         LASSERT(mdd_object_exists(mdd_obj));
2206         return do_version_get(env, mdd_object_child(mdd_obj));
2207 }
2208
2209 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2210                             dt_obj_version_t version)
2211 {
2212         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2213
2214         LASSERT(mdd_object_exists(mdd_obj));
2215         return do_version_set(env, mdd_object_child(mdd_obj), version);
2216 }
2217
2218 const struct md_object_operations mdd_obj_ops = {
2219         .moo_permission    = mdd_permission,
2220         .moo_attr_get      = mdd_attr_get,
2221         .moo_attr_set      = mdd_attr_set,
2222         .moo_xattr_get     = mdd_xattr_get,
2223         .moo_xattr_set     = mdd_xattr_set,
2224         .moo_xattr_list    = mdd_xattr_list,
2225         .moo_xattr_del     = mdd_xattr_del,
2226         .moo_object_create = mdd_object_create,
2227         .moo_ref_add       = mdd_ref_add,
2228         .moo_ref_del       = mdd_ref_del,
2229         .moo_open          = mdd_open,
2230         .moo_close         = mdd_close,
2231         .moo_readpage      = mdd_readpage,
2232         .moo_readlink      = mdd_readlink,
2233         .moo_capa_get      = mdd_capa_get,
2234         .moo_object_sync   = mdd_object_sync,
2235         .moo_version_get   = mdd_version_get,
2236         .moo_version_set   = mdd_version_set,
2237         .moo_path          = mdd_path,
2238 };