Whamcloud - gitweb
e04f1c817bf18cb261274f04f34005ce172ae3cd
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_object.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49 #ifdef HAVE_EXT4_LDISKFS
50 #include <ldiskfs/ldiskfs_jbd2.h>
51 #else
52 #include <linux/jbd.h>
53 #endif
54 #include <obd.h>
55 #include <obd_class.h>
56 #include <obd_support.h>
57 #include <lprocfs_status.h>
58 /* fid_be_cpu(), fid_cpu_to_be(). */
59 #include <lustre_fid.h>
60
61 #include <lustre_param.h>
62 #ifdef HAVE_EXT4_LDISKFS
63 #include <ldiskfs/ldiskfs.h>
64 #else
65 #include <linux/ldiskfs_fs.h>
66 #endif
67 #include <lustre_mds.h>
68 #include <lustre/lustre_idl.h>
69
70 #include "mdd_internal.h"
71
72 static const struct lu_object_operations mdd_lu_obj_ops;
73
74 static int mdd_xattr_get(const struct lu_env *env,
75                          struct md_object *obj, struct lu_buf *buf,
76                          const char *name);
77
78 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
79                  void **data)
80 {
81         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
82                  PFID(mdd_object_fid(obj)));
83         mdo_data_get(env, obj, data);
84         return 0;
85 }
86
87 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
88                struct lu_attr *la, struct lustre_capa *capa)
89 {
90         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
91                  PFID(mdd_object_fid(obj)));
92         return mdo_attr_get(env, obj, la, capa);
93 }
94
95 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
96 {
97         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
98
99         if (flags & LUSTRE_APPEND_FL)
100                 obj->mod_flags |= APPEND_OBJ;
101
102         if (flags & LUSTRE_IMMUTABLE_FL)
103                 obj->mod_flags |= IMMUTE_OBJ;
104 }
105
106 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
107 {
108         struct mdd_thread_info *info;
109
110         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
111         LASSERT(info != NULL);
112         return info;
113 }
114
115 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
116 {
117         struct lu_buf *buf;
118
119         buf = &mdd_env_info(env)->mti_buf;
120         buf->lb_buf = area;
121         buf->lb_len = len;
122         return buf;
123 }
124
125 void mdd_buf_put(struct lu_buf *buf)
126 {
127         if (buf == NULL || buf->lb_buf == NULL)
128                 return;
129         if (buf->lb_vmalloc)
130                 OBD_VFREE(buf->lb_buf, buf->lb_len);
131         else
132                 OBD_FREE(buf->lb_buf, buf->lb_len);
133         buf->lb_buf = NULL;
134 }
135
136 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
137                                        const void *area, ssize_t len)
138 {
139         struct lu_buf *buf;
140
141         buf = &mdd_env_info(env)->mti_buf;
142         buf->lb_buf = (void *)area;
143         buf->lb_len = len;
144         return buf;
145 }
146
147 #define BUF_VMALLOC_SIZE (CFS_PAGE_SIZE<<2) /* 16k */
148 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
149 {
150         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
151
152         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
153                 if (buf->lb_vmalloc)
154                         OBD_VFREE(buf->lb_buf, buf->lb_len);
155                 else
156                         OBD_FREE(buf->lb_buf, buf->lb_len);
157                 buf->lb_buf = NULL;
158         }
159         if (buf->lb_buf == NULL) {
160                 buf->lb_len = len;
161                 if (buf->lb_len <= BUF_VMALLOC_SIZE) {
162                         OBD_ALLOC(buf->lb_buf, buf->lb_len);
163                         buf->lb_vmalloc = 0;
164                 }
165                 if (buf->lb_buf == NULL) {
166                         OBD_VMALLOC(buf->lb_buf, buf->lb_len);
167                         buf->lb_vmalloc = 1;
168                 }
169                 if (buf->lb_buf == NULL)
170                         buf->lb_len = 0;
171         }
172         return buf;
173 }
174
175 /** Increase the size of the \a mti_big_buf.
176  * preserves old data in buffer
177  * old buffer remains unchanged on error
178  * \retval 0 or -ENOMEM
179  */
180 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
181 {
182         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
183         struct lu_buf buf;
184
185         LASSERT(len >= oldbuf->lb_len);
186         if (len > BUF_VMALLOC_SIZE) {
187                 OBD_VMALLOC(buf.lb_buf, len);
188                 buf.lb_vmalloc = 1;
189         } else {
190                 OBD_ALLOC(buf.lb_buf, len);
191                 buf.lb_vmalloc = 0;
192         }
193         if (buf.lb_buf == NULL)
194                 return -ENOMEM;
195
196         buf.lb_len = len;
197         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
198
199         if (oldbuf->lb_vmalloc)
200                 OBD_VFREE(oldbuf->lb_buf, oldbuf->lb_len);
201         else
202                 OBD_FREE(oldbuf->lb_buf, oldbuf->lb_len);
203
204         memcpy(oldbuf, &buf, sizeof(buf));
205
206         return 0;
207 }
208
209 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
210                                        struct mdd_device *mdd)
211 {
212         struct mdd_thread_info *mti = mdd_env_info(env);
213         int                     max_cookie_size;
214
215         max_cookie_size = mdd_lov_cookiesize(env, mdd);
216         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
217                 if (mti->mti_max_cookie)
218                         OBD_FREE(mti->mti_max_cookie, mti->mti_max_cookie_size);
219                 mti->mti_max_cookie = NULL;
220                 mti->mti_max_cookie_size = 0;
221         }
222         if (unlikely(mti->mti_max_cookie == NULL)) {
223                 OBD_ALLOC(mti->mti_max_cookie, max_cookie_size);
224                 if (likely(mti->mti_max_cookie != NULL))
225                         mti->mti_max_cookie_size = max_cookie_size;
226         }
227         if (likely(mti->mti_max_cookie != NULL))
228                 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
229         return mti->mti_max_cookie;
230 }
231
232 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
233                                    struct mdd_device *mdd)
234 {
235         struct mdd_thread_info *mti = mdd_env_info(env);
236         int                     max_lmm_size;
237
238         max_lmm_size = mdd_lov_mdsize(env, mdd);
239         if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
240                 if (mti->mti_max_lmm)
241                         OBD_FREE(mti->mti_max_lmm, mti->mti_max_lmm_size);
242                 mti->mti_max_lmm = NULL;
243                 mti->mti_max_lmm_size = 0;
244         }
245         if (unlikely(mti->mti_max_lmm == NULL)) {
246                 OBD_ALLOC(mti->mti_max_lmm, max_lmm_size);
247                 if (unlikely(mti->mti_max_lmm != NULL))
248                         mti->mti_max_lmm_size = max_lmm_size;
249         }
250         return mti->mti_max_lmm;
251 }
252
253 struct lu_object *mdd_object_alloc(const struct lu_env *env,
254                                    const struct lu_object_header *hdr,
255                                    struct lu_device *d)
256 {
257         struct mdd_object *mdd_obj;
258
259         OBD_ALLOC_PTR(mdd_obj);
260         if (mdd_obj != NULL) {
261                 struct lu_object *o;
262
263                 o = mdd2lu_obj(mdd_obj);
264                 lu_object_init(o, NULL, d);
265                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
266                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
267                 mdd_obj->mod_count = 0;
268                 o->lo_ops = &mdd_lu_obj_ops;
269                 return o;
270         } else {
271                 return NULL;
272         }
273 }
274
275 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
276                            const struct lu_object_conf *unused)
277 {
278         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
279         struct mdd_object *mdd_obj = lu2mdd_obj(o);
280         struct lu_object  *below;
281         struct lu_device  *under;
282         ENTRY;
283
284         mdd_obj->mod_cltime = 0;
285         under = &d->mdd_child->dd_lu_dev;
286         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
287         mdd_pdlock_init(mdd_obj);
288         if (below == NULL)
289                 RETURN(-ENOMEM);
290
291         lu_object_add(o, below);
292
293         RETURN(0);
294 }
295
296 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
297 {
298         if (lu_object_exists(o))
299                 return mdd_get_flags(env, lu2mdd_obj(o));
300         else
301                 return 0;
302 }
303
304 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
305 {
306         struct mdd_object *mdd = lu2mdd_obj(o);
307
308         lu_object_fini(o);
309         OBD_FREE_PTR(mdd);
310 }
311
312 static int mdd_object_print(const struct lu_env *env, void *cookie,
313                             lu_printer_t p, const struct lu_object *o)
314 {
315         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
316         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
317                     "valid=%x, cltime=%llu, flags=%lx)",
318                     mdd, mdd->mod_count, mdd->mod_valid,
319                     mdd->mod_cltime, mdd->mod_flags);
320 }
321
322 static const struct lu_object_operations mdd_lu_obj_ops = {
323         .loo_object_init    = mdd_object_init,
324         .loo_object_start   = mdd_object_start,
325         .loo_object_free    = mdd_object_free,
326         .loo_object_print   = mdd_object_print,
327 };
328
329 struct mdd_object *mdd_object_find(const struct lu_env *env,
330                                    struct mdd_device *d,
331                                    const struct lu_fid *f)
332 {
333         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
334 }
335
336 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
337                         const char *path, struct lu_fid *fid)
338 {
339         struct lu_buf *buf;
340         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
341         struct mdd_object *obj;
342         struct lu_name *lname = &mdd_env_info(env)->mti_name;
343         char *name;
344         int rc = 0;
345         ENTRY;
346
347         /* temp buffer for path element */
348         buf = mdd_buf_alloc(env, PATH_MAX);
349         if (buf->lb_buf == NULL)
350                 RETURN(-ENOMEM);
351
352         lname->ln_name = name = buf->lb_buf;
353         lname->ln_namelen = 0;
354         *f = mdd->mdd_root_fid;
355
356         while(1) {
357                 while (*path == '/')
358                         path++;
359                 if (*path == '\0')
360                         break;
361                 while (*path != '/' && *path != '\0') {
362                         *name = *path;
363                         path++;
364                         name++;
365                         lname->ln_namelen++;
366                 }
367
368                 *name = '\0';
369                 /* find obj corresponding to fid */
370                 obj = mdd_object_find(env, mdd, f);
371                 if (obj == NULL)
372                         GOTO(out, rc = -EREMOTE);
373                 if (IS_ERR(obj))
374                         GOTO(out, rc = -PTR_ERR(obj));
375                 /* get child fid from parent and name */
376                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
377                 mdd_object_put(env, obj);
378                 if (rc)
379                         break;
380
381                 name = buf->lb_buf;
382                 lname->ln_namelen = 0;
383         }
384
385         if (!rc)
386                 *fid = *f;
387 out:
388         RETURN(rc);
389 }
390
391 /** The maximum depth that fid2path() will search.
392  * This is limited only because we want to store the fids for
393  * historical path lookup purposes.
394  */
395 #define MAX_PATH_DEPTH 100
396
397 /** mdd_path() lookup structure. */
398 struct path_lookup_info {
399         __u64                pli_recno;        /**< history point */
400         __u64                pli_currec;       /**< current record */
401         struct lu_fid        pli_fid;
402         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
403         struct mdd_object   *pli_mdd_obj;
404         char                *pli_path;         /**< full path */
405         int                  pli_pathlen;
406         int                  pli_linkno;       /**< which hardlink to follow */
407         int                  pli_fidcount;     /**< number of \a pli_fids */
408 };
409
410 static int mdd_path_current(const struct lu_env *env,
411                             struct path_lookup_info *pli)
412 {
413         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
414         struct mdd_object *mdd_obj;
415         struct lu_buf     *buf = NULL;
416         struct link_ea_header *leh;
417         struct link_ea_entry  *lee;
418         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
419         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
420         char *ptr;
421         int reclen;
422         int rc;
423         ENTRY;
424
425         ptr = pli->pli_path + pli->pli_pathlen - 1;
426         *ptr = 0;
427         --ptr;
428         pli->pli_fidcount = 0;
429         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
430
431         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
432                 mdd_obj = mdd_object_find(env, mdd,
433                                           &pli->pli_fids[pli->pli_fidcount]);
434                 if (mdd_obj == NULL)
435                         GOTO(out, rc = -EREMOTE);
436                 if (IS_ERR(mdd_obj))
437                         GOTO(out, rc = -PTR_ERR(mdd_obj));
438                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
439                 if (rc <= 0) {
440                         mdd_object_put(env, mdd_obj);
441                         if (rc == -1)
442                                 rc = -EREMOTE;
443                         else if (rc == 0)
444                                 /* Do I need to error out here? */
445                                 rc = -ENOENT;
446                         GOTO(out, rc);
447                 }
448
449                 /* Get parent fid and object name */
450                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
451                 buf = mdd_links_get(env, mdd_obj);
452                 mdd_read_unlock(env, mdd_obj);
453                 mdd_object_put(env, mdd_obj);
454                 if (IS_ERR(buf))
455                         GOTO(out, rc = PTR_ERR(buf));
456
457                 leh = buf->lb_buf;
458                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
459                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
460
461                 /* If set, use link #linkno for path lookup, otherwise use
462                    link #0.  Only do this for the final path element. */
463                 if ((pli->pli_fidcount == 0) &&
464                     (pli->pli_linkno < leh->leh_reccount)) {
465                         int count;
466                         for (count = 0; count < pli->pli_linkno; count++) {
467                                 lee = (struct link_ea_entry *)
468                                      ((char *)lee + reclen);
469                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
470                         }
471                         if (pli->pli_linkno < leh->leh_reccount - 1)
472                                 /* indicate to user there are more links */
473                                 pli->pli_linkno++;
474                 }
475
476                 /* Pack the name in the end of the buffer */
477                 ptr -= tmpname->ln_namelen;
478                 if (ptr - 1 <= pli->pli_path)
479                         GOTO(out, rc = -EOVERFLOW);
480                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
481                 *(--ptr) = '/';
482
483                 /* Store the parent fid for historic lookup */
484                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
485                         GOTO(out, rc = -EOVERFLOW);
486                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
487         }
488
489         /* Verify that our path hasn't changed since we started the lookup.
490            Record the current index, and verify the path resolves to the
491            same fid. If it does, then the path is correct as of this index. */
492         spin_lock(&mdd->mdd_cl.mc_lock);
493         pli->pli_currec = mdd->mdd_cl.mc_index;
494         spin_unlock(&mdd->mdd_cl.mc_lock);
495         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
496         if (rc) {
497                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
498                 GOTO (out, rc = -EAGAIN);
499         }
500         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
501                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
502                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
503                        PFID(&pli->pli_fid));
504                 GOTO(out, rc = -EAGAIN);
505         }
506
507         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
508
509         EXIT;
510 out:
511         if (buf && !IS_ERR(buf) && buf->lb_vmalloc)
512                 /* if we vmalloced a large buffer drop it */
513                 mdd_buf_put(buf);
514
515         return rc;
516 }
517
518 static int mdd_path_historic(const struct lu_env *env,
519                              struct path_lookup_info *pli)
520 {
521         return 0;
522 }
523
524 /* Returns the full path to this fid, as of changelog record recno. */
525 static int mdd_path(const struct lu_env *env, struct md_object *obj,
526                     char *path, int pathlen, __u64 *recno, int *linkno)
527 {
528         struct path_lookup_info *pli;
529         int tries = 3;
530         int rc = -EAGAIN;
531         ENTRY;
532
533         if (pathlen < 3)
534                 RETURN(-EOVERFLOW);
535
536         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
537                 path[0] = '/';
538                 path[1] = '\0';
539                 RETURN(0);
540         }
541
542         OBD_ALLOC_PTR(pli);
543         if (pli == NULL)
544                 RETURN(-ENOMEM);
545
546         pli->pli_mdd_obj = md2mdd_obj(obj);
547         pli->pli_recno = *recno;
548         pli->pli_path = path;
549         pli->pli_pathlen = pathlen;
550         pli->pli_linkno = *linkno;
551
552         /* Retry multiple times in case file is being moved */
553         while (tries-- && rc == -EAGAIN)
554                 rc = mdd_path_current(env, pli);
555
556         /* For historical path lookup, the current links may not have existed
557          * at "recno" time.  We must switch over to earlier links/parents
558          * by using the changelog records.  If the earlier parent doesn't
559          * exist, we must search back through the changelog to reconstruct
560          * its parents, then check if it exists, etc.
561          * We may ignore this problem for the initial implementation and
562          * state that an "original" hardlink must still exist for us to find
563          * historic path name. */
564         if (pli->pli_recno != -1) {
565                 rc = mdd_path_historic(env, pli);
566         } else {
567                 *recno = pli->pli_currec;
568                 /* Return next link index to caller */
569                 *linkno = pli->pli_linkno;
570         }
571
572         OBD_FREE_PTR(pli);
573
574         RETURN (rc);
575 }
576
577 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
578 {
579         struct lu_attr *la = &mdd_env_info(env)->mti_la;
580         int rc;
581
582         ENTRY;
583         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
584         if (rc == 0) {
585                 mdd_flags_xlate(obj, la->la_flags);
586                 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
587                         obj->mod_flags |= MNLINK_OBJ;
588         }
589         RETURN(rc);
590 }
591
592 /* get only inode attributes */
593 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
594                   struct md_attr *ma)
595 {
596         int rc = 0;
597         ENTRY;
598
599         if (ma->ma_valid & MA_INODE)
600                 RETURN(0);
601
602         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
603                           mdd_object_capa(env, mdd_obj));
604         if (rc == 0)
605                 ma->ma_valid |= MA_INODE;
606         RETURN(rc);
607 }
608
609 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm,
610                        int *size)
611 {
612         struct lov_desc *ldesc;
613         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
614         ENTRY;
615
616         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
617         LASSERT(ldesc != NULL);
618
619         if (!lmm)
620                 RETURN(0);
621
622         lmm->lmm_magic = LOV_MAGIC_V1;
623         lmm->lmm_object_gr = LOV_OBJECT_GROUP_DEFAULT;
624         lmm->lmm_pattern = ldesc->ld_pattern;
625         lmm->lmm_stripe_size = ldesc->ld_default_stripe_size;
626         lmm->lmm_stripe_count = ldesc->ld_default_stripe_count;
627         *size = sizeof(struct lov_mds_md);
628
629         RETURN(sizeof(struct lov_mds_md));
630 }
631
632 /* get lov EA only */
633 static int __mdd_lmm_get(const struct lu_env *env,
634                          struct mdd_object *mdd_obj, struct md_attr *ma)
635 {
636         int rc;
637         ENTRY;
638
639         if (ma->ma_valid & MA_LOV)
640                 RETURN(0);
641
642         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
643                         XATTR_NAME_LOV);
644
645         if (rc == 0 && (ma->ma_need & MA_LOV_DEF)) {
646                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm,
647                                 &ma->ma_lmm_size);
648         }
649
650         if (rc > 0) {
651                 ma->ma_valid |= MA_LOV;
652                 rc = 0;
653         }
654         RETURN(rc);
655 }
656
657 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
658                        struct md_attr *ma)
659 {
660         int rc;
661         ENTRY;
662
663         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
664         rc = __mdd_lmm_get(env, mdd_obj, ma);
665         mdd_read_unlock(env, mdd_obj);
666         RETURN(rc);
667 }
668
669 /* get lmv EA only*/
670 static int __mdd_lmv_get(const struct lu_env *env,
671                          struct mdd_object *mdd_obj, struct md_attr *ma)
672 {
673         int rc;
674         ENTRY;
675
676         if (ma->ma_valid & MA_LMV)
677                 RETURN(0);
678
679         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
680                         XATTR_NAME_LMV);
681         if (rc > 0) {
682                 ma->ma_valid |= MA_LMV;
683                 rc = 0;
684         }
685         RETURN(rc);
686 }
687
688 static int mdd_attr_get_internal(const struct lu_env *env,
689                                  struct mdd_object *mdd_obj,
690                                  struct md_attr *ma)
691 {
692         int rc = 0;
693         ENTRY;
694
695         if (ma->ma_need & MA_INODE)
696                 rc = mdd_iattr_get(env, mdd_obj, ma);
697
698         if (rc == 0 && ma->ma_need & MA_LOV) {
699                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
700                     S_ISDIR(mdd_object_type(mdd_obj)))
701                         rc = __mdd_lmm_get(env, mdd_obj, ma);
702         }
703         if (rc == 0 && ma->ma_need & MA_LMV) {
704                 if (S_ISDIR(mdd_object_type(mdd_obj)))
705                         rc = __mdd_lmv_get(env, mdd_obj, ma);
706         }
707 #ifdef CONFIG_FS_POSIX_ACL
708         if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
709                 if (S_ISDIR(mdd_object_type(mdd_obj)))
710                         rc = mdd_def_acl_get(env, mdd_obj, ma);
711         }
712 #endif
713         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
714                rc, ma->ma_valid);
715         RETURN(rc);
716 }
717
718 int mdd_attr_get_internal_locked(const struct lu_env *env,
719                                  struct mdd_object *mdd_obj, struct md_attr *ma)
720 {
721         int rc;
722         int needlock = ma->ma_need & (MA_LOV | MA_LMV | MA_ACL_DEF);
723
724         if (needlock)
725                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
726         rc = mdd_attr_get_internal(env, mdd_obj, ma);
727         if (needlock)
728                 mdd_read_unlock(env, mdd_obj);
729         return rc;
730 }
731
732 /*
733  * No permission check is needed.
734  */
735 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
736                         struct md_attr *ma)
737 {
738         struct mdd_object *mdd_obj = md2mdd_obj(obj);
739         int                rc;
740
741         ENTRY;
742         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
743         RETURN(rc);
744 }
745
746 /*
747  * No permission check is needed.
748  */
749 static int mdd_xattr_get(const struct lu_env *env,
750                          struct md_object *obj, struct lu_buf *buf,
751                          const char *name)
752 {
753         struct mdd_object *mdd_obj = md2mdd_obj(obj);
754         int rc;
755
756         ENTRY;
757
758         LASSERT(mdd_object_exists(mdd_obj));
759
760         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
761         rc = mdo_xattr_get(env, mdd_obj, buf, name,
762                            mdd_object_capa(env, mdd_obj));
763         mdd_read_unlock(env, mdd_obj);
764
765         RETURN(rc);
766 }
767
768 /*
769  * Permission check is done when open,
770  * no need check again.
771  */
772 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
773                         struct lu_buf *buf)
774 {
775         struct mdd_object *mdd_obj = md2mdd_obj(obj);
776         struct dt_object  *next;
777         loff_t             pos = 0;
778         int                rc;
779         ENTRY;
780
781         LASSERT(mdd_object_exists(mdd_obj));
782
783         next = mdd_object_child(mdd_obj);
784         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
785         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
786                                          mdd_object_capa(env, mdd_obj));
787         mdd_read_unlock(env, mdd_obj);
788         RETURN(rc);
789 }
790
791 /*
792  * No permission check is needed.
793  */
794 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
795                           struct lu_buf *buf)
796 {
797         struct mdd_object *mdd_obj = md2mdd_obj(obj);
798         int rc;
799
800         ENTRY;
801
802         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
803         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
804         mdd_read_unlock(env, mdd_obj);
805
806         RETURN(rc);
807 }
808
809 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
810                                struct mdd_object *c, struct md_attr *ma,
811                                struct thandle *handle,
812                                const struct md_op_spec *spec)
813 {
814         struct lu_attr *attr = &ma->ma_attr;
815         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
816         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
817         const struct dt_index_features *feat = spec->sp_feat;
818         int rc;
819         ENTRY;
820
821         if (!mdd_object_exists(c)) {
822                 struct dt_object *next = mdd_object_child(c);
823                 LASSERT(next);
824
825                 if (feat != &dt_directory_features && feat != NULL)
826                         dof->dof_type = DFT_INDEX;
827                 else
828                         dof->dof_type = dt_mode_to_dft(attr->la_mode);
829
830                 dof->u.dof_idx.di_feat = feat;
831
832                 /* @hint will be initialized by underlying device. */
833                 next->do_ops->do_ah_init(env, hint,
834                                          p ? mdd_object_child(p) : NULL,
835                                          attr->la_mode & S_IFMT);
836
837                 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
838                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
839         } else
840                 rc = -EEXIST;
841
842         RETURN(rc);
843 }
844
845 /**
846  * Make sure the ctime is increased only.
847  */
848 static inline int mdd_attr_check(const struct lu_env *env,
849                                  struct mdd_object *obj,
850                                  struct lu_attr *attr)
851 {
852         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
853         int rc;
854         ENTRY;
855
856         if (attr->la_valid & LA_CTIME) {
857                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
858                 if (rc)
859                         RETURN(rc);
860
861                 if (attr->la_ctime < tmp_la->la_ctime)
862                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
863                 else if (attr->la_valid == LA_CTIME &&
864                          attr->la_ctime == tmp_la->la_ctime)
865                         attr->la_valid &= ~LA_CTIME;
866         }
867         RETURN(0);
868 }
869
870 int mdd_attr_set_internal(const struct lu_env *env,
871                           struct mdd_object *obj,
872                           struct lu_attr *attr,
873                           struct thandle *handle,
874                           int needacl)
875 {
876         int rc;
877         ENTRY;
878
879         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
880 #ifdef CONFIG_FS_POSIX_ACL
881         if (!rc && (attr->la_valid & LA_MODE) && needacl)
882                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
883 #endif
884         RETURN(rc);
885 }
886
887 int mdd_attr_check_set_internal(const struct lu_env *env,
888                                 struct mdd_object *obj,
889                                 struct lu_attr *attr,
890                                 struct thandle *handle,
891                                 int needacl)
892 {
893         int rc;
894         ENTRY;
895
896         rc = mdd_attr_check(env, obj, attr);
897         if (rc)
898                 RETURN(rc);
899
900         if (attr->la_valid)
901                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
902         RETURN(rc);
903 }
904
905 static int mdd_attr_set_internal_locked(const struct lu_env *env,
906                                         struct mdd_object *obj,
907                                         struct lu_attr *attr,
908                                         struct thandle *handle,
909                                         int needacl)
910 {
911         int rc;
912         ENTRY;
913
914         needacl = needacl && (attr->la_valid & LA_MODE);
915         if (needacl)
916                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
917         rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
918         if (needacl)
919                 mdd_write_unlock(env, obj);
920         RETURN(rc);
921 }
922
923 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
924                                        struct mdd_object *obj,
925                                        struct lu_attr *attr,
926                                        struct thandle *handle,
927                                        int needacl)
928 {
929         int rc;
930         ENTRY;
931
932         needacl = needacl && (attr->la_valid & LA_MODE);
933         if (needacl)
934                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
935         rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
936         if (needacl)
937                 mdd_write_unlock(env, obj);
938         RETURN(rc);
939 }
940
941 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
942                     const struct lu_buf *buf, const char *name,
943                     int fl, struct thandle *handle)
944 {
945         struct lustre_capa *capa = mdd_object_capa(env, obj);
946         int rc = -EINVAL;
947         ENTRY;
948
949         if (buf->lb_buf && buf->lb_len > 0)
950                 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
951         else if (buf->lb_buf == NULL && buf->lb_len == 0)
952                 rc = mdo_xattr_del(env, obj, name, handle, capa);
953
954         RETURN(rc);
955 }
956
957 /*
958  * This gives the same functionality as the code between
959  * sys_chmod and inode_setattr
960  * chown_common and inode_setattr
961  * utimes and inode_setattr
962  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
963  */
964 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
965                         struct lu_attr *la, const struct md_attr *ma)
966 {
967         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
968         struct md_ucred  *uc         = md_ucred(env);
969         int               rc;
970         ENTRY;
971
972         if (!la->la_valid)
973                 RETURN(0);
974
975         /* Do not permit change file type */
976         if (la->la_valid & LA_TYPE)
977                 RETURN(-EPERM);
978
979         /* They should not be processed by setattr */
980         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
981                 RETURN(-EPERM);
982
983         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
984         if (rc)
985                 RETURN(rc);
986
987         if (la->la_valid == LA_CTIME) {
988                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
989                         /* This is only for set ctime when rename's source is
990                          * on remote MDS. */
991                         rc = mdd_may_delete(env, NULL, obj,
992                                             (struct md_attr *)ma, 1, 0);
993                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
994                         la->la_valid &= ~LA_CTIME;
995                 RETURN(rc);
996         }
997
998         if (la->la_valid == LA_ATIME) {
999                 /* This is atime only set for read atime update on close. */
1000                 if (la->la_atime <= tmp_la->la_atime +
1001                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff)
1002                         la->la_valid &= ~LA_ATIME;
1003                 RETURN(0);
1004         }
1005
1006         /* Check if flags change. */
1007         if (la->la_valid & LA_FLAGS) {
1008                 unsigned int oldflags = 0;
1009                 unsigned int newflags = la->la_flags &
1010                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1011
1012                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1013                     !mdd_capable(uc, CFS_CAP_FOWNER))
1014                         RETURN(-EPERM);
1015
1016                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1017                  * only be changed by the relevant capability. */
1018                 if (mdd_is_immutable(obj))
1019                         oldflags |= LUSTRE_IMMUTABLE_FL;
1020                 if (mdd_is_append(obj))
1021                         oldflags |= LUSTRE_APPEND_FL;
1022                 if ((oldflags ^ newflags) &&
1023                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1024                         RETURN(-EPERM);
1025
1026                 if (!S_ISDIR(tmp_la->la_mode))
1027                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1028         }
1029
1030         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1031             (la->la_valid & ~LA_FLAGS) &&
1032             !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1033                 RETURN(-EPERM);
1034
1035         /* Check for setting the obj time. */
1036         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1037             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1038                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1039                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
1040                         rc = mdd_permission_internal_locked(env, obj, tmp_la,
1041                                                             MAY_WRITE,
1042                                                             MOR_TGT_CHILD);
1043                         if (rc)
1044                                 RETURN(rc);
1045                 }
1046         }
1047
1048         /* Make sure a caller can chmod. */
1049         if (la->la_valid & LA_MODE) {
1050                 /* Bypass la_vaild == LA_MODE,
1051                  * this is for changing file with SUID or SGID. */
1052                 if ((la->la_valid & ~LA_MODE) &&
1053                     !(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1054                     (uc->mu_fsuid != tmp_la->la_uid) &&
1055                     !mdd_capable(uc, CFS_CAP_FOWNER))
1056                         RETURN(-EPERM);
1057
1058                 if (la->la_mode == (umode_t) -1)
1059                         la->la_mode = tmp_la->la_mode;
1060                 else
1061                         la->la_mode = (la->la_mode & S_IALLUGO) |
1062                                       (tmp_la->la_mode & ~S_IALLUGO);
1063
1064                 /* Also check the setgid bit! */
1065                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1066                                        la->la_gid : tmp_la->la_gid) &&
1067                     !mdd_capable(uc, CFS_CAP_FSETID))
1068                         la->la_mode &= ~S_ISGID;
1069         } else {
1070                la->la_mode = tmp_la->la_mode;
1071         }
1072
1073         /* Make sure a caller can chown. */
1074         if (la->la_valid & LA_UID) {
1075                 if (la->la_uid == (uid_t) -1)
1076                         la->la_uid = tmp_la->la_uid;
1077                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1078                     (la->la_uid != tmp_la->la_uid)) &&
1079                     !mdd_capable(uc, CFS_CAP_CHOWN))
1080                         RETURN(-EPERM);
1081
1082                 /* If the user or group of a non-directory has been
1083                  * changed by a non-root user, remove the setuid bit.
1084                  * 19981026 David C Niemi <niemi@tux.org>
1085                  *
1086                  * Changed this to apply to all users, including root,
1087                  * to avoid some races. This is the behavior we had in
1088                  * 2.0. The check for non-root was definitely wrong
1089                  * for 2.2 anyway, as it should have been using
1090                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
1091                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1092                     !S_ISDIR(tmp_la->la_mode)) {
1093                         la->la_mode &= ~S_ISUID;
1094                         la->la_valid |= LA_MODE;
1095                 }
1096         }
1097
1098         /* Make sure caller can chgrp. */
1099         if (la->la_valid & LA_GID) {
1100                 if (la->la_gid == (gid_t) -1)
1101                         la->la_gid = tmp_la->la_gid;
1102                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1103                     ((la->la_gid != tmp_la->la_gid) &&
1104                     !lustre_in_group_p(uc, la->la_gid))) &&
1105                     !mdd_capable(uc, CFS_CAP_CHOWN))
1106                         RETURN(-EPERM);
1107
1108                 /* Likewise, if the user or group of a non-directory
1109                  * has been changed by a non-root user, remove the
1110                  * setgid bit UNLESS there is no group execute bit
1111                  * (this would be a file marked for mandatory
1112                  * locking).  19981026 David C Niemi <niemi@tux.org>
1113                  *
1114                  * Removed the fsuid check (see the comment above) --
1115                  * 19990830 SD. */
1116                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1117                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1118                         la->la_mode &= ~S_ISGID;
1119                         la->la_valid |= LA_MODE;
1120                 }
1121         }
1122
1123         /* For both Size-on-MDS case and truncate case,
1124          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1125          * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1126          * For SOM case, it is true, the MAY_WRITE perm has been checked
1127          * when open, no need check again. For truncate case, it is false,
1128          * the MAY_WRITE perm should be checked here. */
1129         if (ma->ma_attr_flags & MDS_SOM) {
1130                 /* For the "Size-on-MDS" setattr update, merge coming
1131                  * attributes with the set in the inode. BUG 10641 */
1132                 if ((la->la_valid & LA_ATIME) &&
1133                     (la->la_atime <= tmp_la->la_atime))
1134                         la->la_valid &= ~LA_ATIME;
1135
1136                 /* OST attributes do not have a priority over MDS attributes,
1137                  * so drop times if ctime is equal. */
1138                 if ((la->la_valid & LA_CTIME) &&
1139                     (la->la_ctime <= tmp_la->la_ctime))
1140                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1141         } else {
1142                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1143                         if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1144                               (uc->mu_fsuid == tmp_la->la_uid)) &&
1145                             !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1146                                 rc = mdd_permission_internal_locked(env, obj,
1147                                                             tmp_la, MAY_WRITE,
1148                                                             MOR_TGT_CHILD);
1149                                 if (rc)
1150                                         RETURN(rc);
1151                         }
1152                 }
1153                 if (la->la_valid & LA_CTIME) {
1154                         /* The pure setattr, it has the priority over what is
1155                          * already set, do not drop it if ctime is equal. */
1156                         if (la->la_ctime < tmp_la->la_ctime)
1157                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1158                                                   LA_CTIME);
1159                 }
1160         }
1161
1162         RETURN(0);
1163 }
1164
1165 /** Store a data change changelog record
1166  * If this fails, we must fail the whole transaction; we don't
1167  * want the change to commit without the log entry.
1168  * \param mdd_obj - mdd_object of change
1169  * \param handle - transacion handle
1170  */
1171 static int mdd_changelog_data_store(const struct lu_env     *env,
1172                                     struct mdd_device       *mdd,
1173                                     enum changelog_rec_type type,
1174                                     struct mdd_object       *mdd_obj,
1175                                     struct thandle          *handle)
1176 {
1177         const struct lu_fid *tfid = mdo2fid(mdd_obj);
1178         struct llog_changelog_rec *rec;
1179         struct lu_buf *buf;
1180         int reclen;
1181         int rc;
1182
1183         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1184                 RETURN(0);
1185
1186         LASSERT(handle != NULL);
1187         LASSERT(mdd_obj != NULL);
1188
1189         if ((type == CL_SETATTR) &&
1190             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1191                 /* Don't need multiple updates in this log */
1192                 /* Don't check under lock - no big deal if we get an extra
1193                    entry */
1194                 RETURN(0);
1195         }
1196
1197         reclen = llog_data_len(sizeof(*rec));
1198         buf = mdd_buf_alloc(env, reclen);
1199         if (buf->lb_buf == NULL)
1200                 RETURN(-ENOMEM);
1201         rec = (struct llog_changelog_rec *)buf->lb_buf;
1202
1203         rec->cr_flags = CLF_VERSION;
1204         rec->cr_type = (__u32)type;
1205         rec->cr_tfid = *tfid;
1206         rec->cr_namelen = 0;
1207         mdd_obj->mod_cltime = cfs_time_current_64();
1208
1209         rc = mdd_changelog_llog_write(mdd, rec, handle);
1210         if (rc < 0) {
1211                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1212                        rc, type, PFID(tfid));
1213                 return -EFAULT;
1214         }
1215
1216         return 0;
1217 }
1218
1219 /* set attr and LOV EA at once, return updated attr */
1220 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1221                         const struct md_attr *ma)
1222 {
1223         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1224         struct mdd_device *mdd = mdo2mdd(obj);
1225         struct thandle *handle;
1226         struct lov_mds_md *lmm = NULL;
1227         struct llog_cookie *logcookies = NULL;
1228         int  rc, lmm_size = 0, cookie_size = 0;
1229         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1230 #ifdef HAVE_QUOTA_SUPPORT
1231         struct obd_device *obd = mdd->mdd_obd_dev;
1232         struct mds_obd *mds = &obd->u.mds;
1233         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1234         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1235         int quota_opc = 0, block_count = 0;
1236         int inode_pending[MAXQUOTAS] = { 0, 0 };
1237         int block_pending[MAXQUOTAS] = { 0, 0 };
1238 #endif
1239         ENTRY;
1240
1241         mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1242                                     MDD_TXN_ATTR_SET_OP);
1243         handle = mdd_trans_start(env, mdd);
1244         if (IS_ERR(handle))
1245                 RETURN(PTR_ERR(handle));
1246         /*TODO: add lock here*/
1247         /* start a log jounal handle if needed */
1248         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1249             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1250                 lmm_size = mdd_lov_mdsize(env, mdd);
1251                 lmm = mdd_max_lmm_get(env, mdd);
1252                 if (lmm == NULL)
1253                         GOTO(cleanup, rc = -ENOMEM);
1254
1255                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1256                                 XATTR_NAME_LOV);
1257
1258                 if (rc < 0)
1259                         GOTO(cleanup, rc);
1260         }
1261
1262         if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1263                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1264                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1265
1266         *la_copy = ma->ma_attr;
1267         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1268         if (rc)
1269                 GOTO(cleanup, rc);
1270
1271 #ifdef HAVE_QUOTA_SUPPORT
1272         if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1273                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1274
1275                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1276                 if (!rc) {
1277                         quota_opc = FSFILT_OP_SETATTR;
1278                         mdd_quota_wrapper(la_copy, qnids);
1279                         mdd_quota_wrapper(la_tmp, qoids);
1280                         /* get file quota for new owner */
1281                         lquota_chkquota(mds_quota_interface_ref, obd, qnids,
1282                                         inode_pending, 1, NULL, 0, NULL, 0);
1283                         block_count = (la_tmp->la_blocks + 7) >> 3;
1284                         if (block_count) {
1285                                 void *data = NULL;
1286                                 mdd_data_get(env, mdd_obj, &data);
1287                                 /* get block quota for new owner */
1288                                 lquota_chkquota(mds_quota_interface_ref, obd,
1289                                                 qnids, block_pending,
1290                                                 block_count, NULL,
1291                                                 LQUOTA_FLAGS_BLK, data, 1);
1292                         }
1293                 }
1294         }
1295 #endif
1296
1297         if (la_copy->la_valid & LA_FLAGS) {
1298                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1299                                                   handle, 1);
1300                 if (rc == 0)
1301                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1302         } else if (la_copy->la_valid) {            /* setattr */
1303                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1304                                                   handle, 1);
1305                 /* journal chown/chgrp in llog, just like unlink */
1306                 if (rc == 0 && lmm_size){
1307                         cookie_size = mdd_lov_cookiesize(env, mdd);
1308                         logcookies = mdd_max_cookie_get(env, mdd);
1309                         if (logcookies == NULL)
1310                                 GOTO(cleanup, rc = -ENOMEM);
1311
1312                         if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1313                                             logcookies, cookie_size) <= 0)
1314                                 logcookies = NULL;
1315                 }
1316         }
1317
1318         if (rc == 0 && ma->ma_valid & MA_LOV) {
1319                 umode_t mode;
1320
1321                 mode = mdd_object_type(mdd_obj);
1322                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1323                         rc = mdd_lsm_sanity_check(env, mdd_obj);
1324                         if (rc)
1325                                 GOTO(cleanup, rc);
1326
1327                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1328                                             ma->ma_lmm_size, handle, 1);
1329                 }
1330
1331         }
1332 cleanup:
1333         if ((rc == 0) && (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME)))
1334                 rc = mdd_changelog_data_store(env, mdd, CL_SETATTR, mdd_obj,
1335                                               handle);
1336         mdd_trans_stop(env, mdd, rc, handle);
1337         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1338                 /*set obd attr, if needed*/
1339                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1340                                            logcookies);
1341         }
1342 #ifdef HAVE_QUOTA_SUPPORT
1343         if (quota_opc) {
1344                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1345                                       inode_pending, 0);
1346                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1347                                       block_pending, 1);
1348                 /* Trigger dqrel/dqacq for original owner and new owner.
1349                  * If failed, the next call for lquota_chkquota will
1350                  * process it. */
1351                 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1352                               quota_opc);
1353         }
1354 #endif
1355         RETURN(rc);
1356 }
1357
1358 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1359                       const struct lu_buf *buf, const char *name, int fl,
1360                       struct thandle *handle)
1361 {
1362         int  rc;
1363         ENTRY;
1364
1365         mdd_write_lock(env, obj, MOR_TGT_CHILD);
1366         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1367         mdd_write_unlock(env, obj);
1368
1369         RETURN(rc);
1370 }
1371
1372 static int mdd_xattr_sanity_check(const struct lu_env *env,
1373                                   struct mdd_object *obj)
1374 {
1375         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1376         struct md_ucred *uc     = md_ucred(env);
1377         int rc;
1378         ENTRY;
1379
1380         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1381                 RETURN(-EPERM);
1382
1383         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1384         if (rc)
1385                 RETURN(rc);
1386
1387         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1388             !mdd_capable(uc, CFS_CAP_FOWNER))
1389                 RETURN(-EPERM);
1390
1391         RETURN(rc);
1392 }
1393
1394 /**
1395  * The caller should guarantee to update the object ctime
1396  * after xattr_set if needed.
1397  */
1398 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1399                          const struct lu_buf *buf, const char *name,
1400                          int fl)
1401 {
1402         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1403         struct mdd_device *mdd = mdo2mdd(obj);
1404         struct thandle *handle;
1405         int  rc;
1406         ENTRY;
1407
1408         rc = mdd_xattr_sanity_check(env, mdd_obj);
1409         if (rc)
1410                 RETURN(rc);
1411
1412         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1413         handle = mdd_trans_start(env, mdd);
1414         if (IS_ERR(handle))
1415                 RETURN(PTR_ERR(handle));
1416
1417         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1418
1419         /* Only record user xattr changes */
1420         if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1421             (strncmp("user.", name, 5) == 0))
1422                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1423                                               handle);
1424         mdd_trans_stop(env, mdd, rc, handle);
1425
1426         RETURN(rc);
1427 }
1428
1429 /**
1430  * The caller should guarantee to update the object ctime
1431  * after xattr_set if needed.
1432  */
1433 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1434                   const char *name)
1435 {
1436         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1437         struct mdd_device *mdd = mdo2mdd(obj);
1438         struct thandle *handle;
1439         int  rc;
1440         ENTRY;
1441
1442         rc = mdd_xattr_sanity_check(env, mdd_obj);
1443         if (rc)
1444                 RETURN(rc);
1445
1446         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1447         handle = mdd_trans_start(env, mdd);
1448         if (IS_ERR(handle))
1449                 RETURN(PTR_ERR(handle));
1450
1451         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1452         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1453                            mdd_object_capa(env, mdd_obj));
1454         mdd_write_unlock(env, mdd_obj);
1455
1456         /* Only record user xattr changes */
1457         if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1458             (strncmp("user.", name, 5) != 0))
1459                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1460                                               handle);
1461
1462         mdd_trans_stop(env, mdd, rc, handle);
1463
1464         RETURN(rc);
1465 }
1466
1467 /* partial unlink */
1468 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1469                        struct md_attr *ma)
1470 {
1471         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1472         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1473         struct mdd_device *mdd = mdo2mdd(obj);
1474         struct thandle *handle;
1475 #ifdef HAVE_QUOTA_SUPPORT
1476         struct obd_device *obd = mdd->mdd_obd_dev;
1477         struct mds_obd *mds = &obd->u.mds;
1478         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1479         int quota_opc = 0;
1480 #endif
1481         int rc;
1482         ENTRY;
1483
1484         /*
1485          * Check -ENOENT early here because we need to get object type
1486          * to calculate credits before transaction start
1487          */
1488         if (!mdd_object_exists(mdd_obj))
1489                 RETURN(-ENOENT);
1490
1491         LASSERT(mdd_object_exists(mdd_obj) > 0);
1492
1493         rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1494         if (rc)
1495                 RETURN(rc);
1496
1497         handle = mdd_trans_start(env, mdd);
1498         if (IS_ERR(handle))
1499                 RETURN(-ENOMEM);
1500
1501         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1502
1503         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1504         if (rc)
1505                 GOTO(cleanup, rc);
1506
1507         __mdd_ref_del(env, mdd_obj, handle, 0);
1508
1509         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1510                 /* unlink dot */
1511                 __mdd_ref_del(env, mdd_obj, handle, 1);
1512         }
1513
1514         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1515         la_copy->la_ctime = ma->ma_attr.la_ctime;
1516
1517         la_copy->la_valid = LA_CTIME;
1518         rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1519         if (rc)
1520                 GOTO(cleanup, rc);
1521
1522         rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1523 #ifdef HAVE_QUOTA_SUPPORT
1524         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1525             ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1526                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1527                 mdd_quota_wrapper(&ma->ma_attr, qids);
1528         }
1529 #endif
1530
1531
1532         EXIT;
1533 cleanup:
1534         mdd_write_unlock(env, mdd_obj);
1535         mdd_trans_stop(env, mdd, rc, handle);
1536 #ifdef HAVE_QUOTA_SUPPORT
1537         if (quota_opc)
1538                 /* Trigger dqrel on the owner of child. If failed,
1539                  * the next call for lquota_chkquota will process it */
1540                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1541                               quota_opc);
1542 #endif
1543         return rc;
1544 }
1545
1546 /* partial operation */
1547 static int mdd_oc_sanity_check(const struct lu_env *env,
1548                                struct mdd_object *obj,
1549                                struct md_attr *ma)
1550 {
1551         int rc;
1552         ENTRY;
1553
1554         switch (ma->ma_attr.la_mode & S_IFMT) {
1555         case S_IFREG:
1556         case S_IFDIR:
1557         case S_IFLNK:
1558         case S_IFCHR:
1559         case S_IFBLK:
1560         case S_IFIFO:
1561         case S_IFSOCK:
1562                 rc = 0;
1563                 break;
1564         default:
1565                 rc = -EINVAL;
1566                 break;
1567         }
1568         RETURN(rc);
1569 }
1570
1571 static int mdd_object_create(const struct lu_env *env,
1572                              struct md_object *obj,
1573                              const struct md_op_spec *spec,
1574                              struct md_attr *ma)
1575 {
1576
1577         struct mdd_device *mdd = mdo2mdd(obj);
1578         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1579         const struct lu_fid *pfid = spec->u.sp_pfid;
1580         struct thandle *handle;
1581 #ifdef HAVE_QUOTA_SUPPORT
1582         struct obd_device *obd = mdd->mdd_obd_dev;
1583         struct mds_obd *mds = &obd->u.mds;
1584         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1585         int quota_opc = 0, block_count = 0;
1586         int inode_pending[MAXQUOTAS] = { 0, 0 };
1587         int block_pending[MAXQUOTAS] = { 0, 0 };
1588 #endif
1589         int rc = 0;
1590         ENTRY;
1591
1592 #ifdef HAVE_QUOTA_SUPPORT
1593         if (mds->mds_quota) {
1594                 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1595                 mdd_quota_wrapper(&ma->ma_attr, qids);
1596                 /* get file quota for child */
1597                 lquota_chkquota(mds_quota_interface_ref, obd, qids,
1598                                 inode_pending, 1, NULL, 0, NULL, 0);
1599                 switch (ma->ma_attr.la_mode & S_IFMT) {
1600                 case S_IFLNK:
1601                 case S_IFDIR:
1602                         block_count = 2;
1603                         break;
1604                 case S_IFREG:
1605                         block_count = 1;
1606                         break;
1607                 }
1608                 /* get block quota for child */
1609                 if (block_count)
1610                         lquota_chkquota(mds_quota_interface_ref, obd, qids,
1611                                         block_pending, block_count, NULL,
1612                                         LQUOTA_FLAGS_BLK, NULL, 0);
1613         }
1614 #endif
1615
1616         mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
1617         handle = mdd_trans_start(env, mdd);
1618         if (IS_ERR(handle))
1619                 GOTO(out_pending, rc = PTR_ERR(handle));
1620
1621         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1622         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1623         if (rc)
1624                 GOTO(unlock, rc);
1625
1626         rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1627         if (rc)
1628                 GOTO(unlock, rc);
1629
1630         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1631                 /* If creating the slave object, set slave EA here. */
1632                 int lmv_size = spec->u.sp_ea.eadatalen;
1633                 struct lmv_stripe_md *lmv;
1634
1635                 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1636                 LASSERT(lmv != NULL && lmv_size > 0);
1637
1638                 rc = __mdd_xattr_set(env, mdd_obj,
1639                                      mdd_buf_get_const(env, lmv, lmv_size),
1640                                      XATTR_NAME_LMV, 0, handle);
1641                 if (rc)
1642                         GOTO(unlock, rc);
1643
1644                 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1645                                            handle, 0);
1646         } else {
1647 #ifdef CONFIG_FS_POSIX_ACL
1648                 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1649                         struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1650
1651                         buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1652                         buf->lb_len = spec->u.sp_ea.eadatalen;
1653                         if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1654                                 rc = __mdd_acl_init(env, mdd_obj, buf,
1655                                                     &ma->ma_attr.la_mode,
1656                                                     handle);
1657                                 if (rc)
1658                                         GOTO(unlock, rc);
1659                                 else
1660                                         ma->ma_attr.la_valid |= LA_MODE;
1661                         }
1662
1663                         pfid = spec->u.sp_ea.fid;
1664                 }
1665 #endif
1666                 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1667                                            spec);
1668         }
1669         EXIT;
1670 unlock:
1671         if (rc == 0)
1672                 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1673         mdd_write_unlock(env, mdd_obj);
1674
1675         mdd_trans_stop(env, mdd, rc, handle);
1676 out_pending:
1677 #ifdef HAVE_QUOTA_SUPPORT
1678         if (quota_opc) {
1679                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1680                                       inode_pending, 0);
1681                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1682                                       block_pending, 1);
1683                 /* Trigger dqacq on the owner of child. If failed,
1684                  * the next call for lquota_chkquota will process it. */
1685                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1686                               quota_opc);
1687         }
1688 #endif
1689         return rc;
1690 }
1691
1692 /* partial link */
1693 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1694                        const struct md_attr *ma)
1695 {
1696         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1697         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1698         struct mdd_device *mdd = mdo2mdd(obj);
1699         struct thandle *handle;
1700         int rc;
1701         ENTRY;
1702
1703         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1704         handle = mdd_trans_start(env, mdd);
1705         if (IS_ERR(handle))
1706                 RETURN(-ENOMEM);
1707
1708         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1709         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1710         if (rc == 0)
1711                 __mdd_ref_add(env, mdd_obj, handle);
1712         mdd_write_unlock(env, mdd_obj);
1713         if (rc == 0) {
1714                 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1715                 la_copy->la_ctime = ma->ma_attr.la_ctime;
1716
1717                 la_copy->la_valid = LA_CTIME;
1718                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1719                                                         handle, 0);
1720         }
1721         mdd_trans_stop(env, mdd, 0, handle);
1722
1723         RETURN(rc);
1724 }
1725
1726 /*
1727  * do NOT or the MAY_*'s, you'll get the weakest
1728  */
1729 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1730 {
1731         int res = 0;
1732
1733         /* Sadly, NFSD reopens a file repeatedly during operation, so the
1734          * "acc_mode = 0" allowance for newly-created files isn't honoured.
1735          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1736          * owner can write to a file even if it is marked readonly to hide
1737          * its brokenness. (bug 5781) */
1738         if (flags & MDS_OPEN_OWNEROVERRIDE) {
1739                 struct md_ucred *uc = md_ucred(env);
1740
1741                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1742                     (la->la_uid == uc->mu_fsuid))
1743                         return 0;
1744         }
1745
1746         if (flags & FMODE_READ)
1747                 res |= MAY_READ;
1748         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1749                 res |= MAY_WRITE;
1750         if (flags & MDS_FMODE_EXEC)
1751                 res |= MAY_EXEC;
1752         return res;
1753 }
1754
1755 static int mdd_open_sanity_check(const struct lu_env *env,
1756                                  struct mdd_object *obj, int flag)
1757 {
1758         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1759         int mode, rc;
1760         ENTRY;
1761
1762         /* EEXIST check */
1763         if (mdd_is_dead_obj(obj))
1764                 RETURN(-ENOENT);
1765
1766         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1767         if (rc)
1768                RETURN(rc);
1769
1770         if (S_ISLNK(tmp_la->la_mode))
1771                 RETURN(-ELOOP);
1772
1773         mode = accmode(env, tmp_la, flag);
1774
1775         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1776                 RETURN(-EISDIR);
1777
1778         if (!(flag & MDS_OPEN_CREATED)) {
1779                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1780                 if (rc)
1781                         RETURN(rc);
1782         }
1783
1784         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1785             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1786                 flag &= ~MDS_OPEN_TRUNC;
1787
1788         /* For writing append-only file must open it with append mode. */
1789         if (mdd_is_append(obj)) {
1790                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
1791                         RETURN(-EPERM);
1792                 if (flag & MDS_OPEN_TRUNC)
1793                         RETURN(-EPERM);
1794         }
1795
1796 #if 0
1797         /*
1798          * Now, flag -- O_NOATIME does not be packed by client.
1799          */
1800         if (flag & O_NOATIME) {
1801                 struct md_ucred *uc = md_ucred(env);
1802
1803                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
1804                     (uc->mu_valid == UCRED_NEW)) &&
1805                     (uc->mu_fsuid != tmp_la->la_uid) &&
1806                     !mdd_capable(uc, CFS_CAP_FOWNER))
1807                         RETURN(-EPERM);
1808         }
1809 #endif
1810
1811         RETURN(0);
1812 }
1813
1814 static int mdd_open(const struct lu_env *env, struct md_object *obj,
1815                     int flags)
1816 {
1817         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1818         int rc = 0;
1819
1820         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1821
1822         rc = mdd_open_sanity_check(env, mdd_obj, flags);
1823         if (rc == 0)
1824                 mdd_obj->mod_count++;
1825
1826         mdd_write_unlock(env, mdd_obj);
1827         return rc;
1828 }
1829
1830 /* return md_attr back,
1831  * if it is last unlink then return lov ea + llog cookie*/
1832 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
1833                     struct md_attr *ma)
1834 {
1835         int rc = 0;
1836         ENTRY;
1837
1838         if (S_ISREG(mdd_object_type(obj))) {
1839                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
1840                  * Caller must be ready for that. */
1841
1842                 rc = __mdd_lmm_get(env, obj, ma);
1843                 if ((ma->ma_valid & MA_LOV))
1844                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
1845                                             obj, ma);
1846         }
1847         RETURN(rc);
1848 }
1849
1850 /*
1851  * No permission check is needed.
1852  */
1853 static int mdd_close(const struct lu_env *env, struct md_object *obj,
1854                      struct md_attr *ma)
1855 {
1856         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1857         struct mdd_device *mdd = mdo2mdd(obj);
1858         struct thandle    *handle;
1859         int rc;
1860         int reset = 1;
1861
1862 #ifdef HAVE_QUOTA_SUPPORT
1863         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
1864         struct mds_obd *mds = &obd->u.mds;
1865         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1866         int quota_opc = 0;
1867 #endif
1868         ENTRY;
1869
1870         rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1871         if (rc)
1872                 RETURN(rc);
1873         handle = mdd_trans_start(env, mdo2mdd(obj));
1874         if (IS_ERR(handle))
1875                 RETURN(PTR_ERR(handle));
1876
1877         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1878         /* release open count */
1879         mdd_obj->mod_count --;
1880
1881         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
1882                 /* remove link to object from orphan index */
1883                 rc = __mdd_orphan_del(env, mdd_obj, handle);
1884                 if (rc == 0) {
1885                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
1886                                "list, OSS objects to be destroyed.\n",
1887                                PFID(mdd_object_fid(mdd_obj)));
1888                 } else {
1889                         CERROR("Object "DFID" can not be deleted from orphan "
1890                                 "list, maybe cause OST objects can not be "
1891                                 "destroyed (err: %d).\n",
1892                                 PFID(mdd_object_fid(mdd_obj)), rc);
1893                         /* If object was not deleted from orphan list, do not
1894                          * destroy OSS objects, which will be done when next
1895                          * recovery. */
1896                         GOTO(out, rc);
1897                 }
1898         }
1899
1900         rc = mdd_iattr_get(env, mdd_obj, ma);
1901         /* Object maybe not in orphan list originally, it is rare case for
1902          * mdd_finish_unlink() failure. */
1903         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
1904 #ifdef HAVE_QUOTA_SUPPORT
1905                 if (mds->mds_quota) {
1906                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1907                         mdd_quota_wrapper(&ma->ma_attr, qids);
1908                 }
1909 #endif
1910                 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
1911                 if (ma->ma_valid & MA_FLAGS &&
1912                     ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
1913                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
1914                 } else {
1915                         rc = mdd_object_kill(env, mdd_obj, ma);
1916                                 if (rc == 0)
1917                                         reset = 0;
1918                 }
1919
1920                 if (rc != 0)
1921                         CERROR("Error when prepare to delete Object "DFID" , "
1922                                "which will cause OST objects can not be "
1923                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
1924         }
1925         EXIT;
1926
1927 out:
1928         if (reset)
1929                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
1930
1931         mdd_write_unlock(env, mdd_obj);
1932         mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
1933 #ifdef HAVE_QUOTA_SUPPORT
1934         if (quota_opc)
1935                 /* Trigger dqrel on the owner of child. If failed,
1936                  * the next call for lquota_chkquota will process it */
1937                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1938                               quota_opc);
1939 #endif
1940         return rc;
1941 }
1942
1943 /*
1944  * Permission check is done when open,
1945  * no need check again.
1946  */
1947 static int mdd_readpage_sanity_check(const struct lu_env *env,
1948                                      struct mdd_object *obj)
1949 {
1950         struct dt_object *next = mdd_object_child(obj);
1951         int rc;
1952         ENTRY;
1953
1954         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
1955                 rc = 0;
1956         else
1957                 rc = -ENOTDIR;
1958
1959         RETURN(rc);
1960 }
1961
1962 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
1963                               int first, void *area, int nob,
1964                               const struct dt_it_ops *iops, struct dt_it *it,
1965                               __u64 *start, __u64 *end,
1966                               struct lu_dirent **last, __u32 attr)
1967 {
1968         int                     result;
1969         __u64                   hash = 0;
1970         struct lu_dirent       *ent;
1971
1972         if (first) {
1973                 memset(area, 0, sizeof (struct lu_dirpage));
1974                 area += sizeof (struct lu_dirpage);
1975                 nob  -= sizeof (struct lu_dirpage);
1976         }
1977
1978         ent  = area;
1979         do {
1980                 int    len;
1981                 int    recsize;
1982
1983                 len  = iops->key_size(env, it);
1984
1985                 /* IAM iterator can return record with zero len. */
1986                 if (len == 0)
1987                         goto next;
1988
1989                 hash = iops->store(env, it);
1990                 if (unlikely(first)) {
1991                         first = 0;
1992                         *start = hash;
1993                 }
1994
1995                 /* calculate max space required for lu_dirent */
1996                 recsize = lu_dirent_calc_size(len, attr);
1997
1998                 if (nob >= recsize) {
1999                         result = iops->rec(env, it, ent, attr);
2000                         if (result == -ESTALE)
2001                                 goto next;
2002                         if (result != 0)
2003                                 goto out;
2004
2005                         /* osd might not able to pack all attributes,
2006                          * so recheck rec length */
2007                         recsize = le16_to_cpu(ent->lde_reclen);
2008                 } else {
2009                         /*
2010                          * record doesn't fit into page, enlarge previous one.
2011                          */
2012                         if (*last) {
2013                                 (*last)->lde_reclen =
2014                                         cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
2015                                                         nob);
2016                                 result = 0;
2017                         } else
2018                                 result = -EINVAL;
2019
2020                         goto out;
2021                 }
2022                 *last = ent;
2023                 ent = (void *)ent + recsize;
2024                 nob -= recsize;
2025
2026 next:
2027                 result = iops->next(env, it);
2028                 if (result == -ESTALE)
2029                         goto next;
2030         } while (result == 0);
2031
2032 out:
2033         *end = hash;
2034         return result;
2035 }
2036
2037 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2038                           const struct lu_rdpg *rdpg)
2039 {
2040         struct dt_it      *it;
2041         struct dt_object  *next = mdd_object_child(obj);
2042         const struct dt_it_ops  *iops;
2043         struct page       *pg;
2044         struct lu_dirent  *last = NULL;
2045         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2046         int i;
2047         int rc;
2048         int nob;
2049         __u64 hash_start;
2050         __u64 hash_end = 0;
2051
2052         LASSERT(rdpg->rp_pages != NULL);
2053         LASSERT(next->do_index_ops != NULL);
2054
2055         if (rdpg->rp_count <= 0)
2056                 return -EFAULT;
2057
2058         /*
2059          * iterate through directory and fill pages from @rdpg
2060          */
2061         iops = &next->do_index_ops->dio_it;
2062         it = iops->init(env, next, mdd_object_capa(env, obj));
2063         if (IS_ERR(it))
2064                 return PTR_ERR(it);
2065
2066         rc = iops->load(env, it, rdpg->rp_hash);
2067
2068         if (rc == 0){
2069                 /*
2070                  * Iterator didn't find record with exactly the key requested.
2071                  *
2072                  * It is currently either
2073                  *
2074                  *     - positioned above record with key less than
2075                  *     requested---skip it.
2076                  *
2077                  *     - or not positioned at all (is in IAM_IT_SKEWED
2078                  *     state)---position it on the next item.
2079                  */
2080                 rc = iops->next(env, it);
2081         } else if (rc > 0)
2082                 rc = 0;
2083
2084         /*
2085          * At this point and across for-loop:
2086          *
2087          *  rc == 0 -> ok, proceed.
2088          *  rc >  0 -> end of directory.
2089          *  rc <  0 -> error.
2090          */
2091         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2092              i++, nob -= CFS_PAGE_SIZE) {
2093                 LASSERT(i < rdpg->rp_npages);
2094                 pg = rdpg->rp_pages[i];
2095                 rc = mdd_dir_page_build(env, mdd, !i, cfs_kmap(pg),
2096                                         min_t(int, nob, CFS_PAGE_SIZE), iops,
2097                                         it, &hash_start, &hash_end, &last,
2098                                         rdpg->rp_attrs);
2099                 if (rc != 0 || i == rdpg->rp_npages - 1) {
2100                         if (last)
2101                                 last->lde_reclen = 0;
2102                 }
2103                 cfs_kunmap(pg);
2104         }
2105         if (rc > 0) {
2106                 /*
2107                  * end of directory.
2108                  */
2109                 hash_end = DIR_END_OFF;
2110                 rc = 0;
2111         }
2112         if (rc == 0) {
2113                 struct lu_dirpage *dp;
2114
2115                 dp = cfs_kmap(rdpg->rp_pages[0]);
2116                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2117                 dp->ldp_hash_end   = cpu_to_le64(hash_end);
2118                 if (i == 0)
2119                         /*
2120                          * No pages were processed, mark this.
2121                          */
2122                         dp->ldp_flags |= LDF_EMPTY;
2123
2124                 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2125                 cfs_kunmap(rdpg->rp_pages[0]);
2126         }
2127         iops->put(env, it);
2128         iops->fini(env, it);
2129
2130         return rc;
2131 }
2132
2133 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2134                  const struct lu_rdpg *rdpg)
2135 {
2136         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2137         int rc;
2138         ENTRY;
2139
2140         LASSERT(mdd_object_exists(mdd_obj));
2141
2142         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2143         rc = mdd_readpage_sanity_check(env, mdd_obj);
2144         if (rc)
2145                 GOTO(out_unlock, rc);
2146
2147         if (mdd_is_dead_obj(mdd_obj)) {
2148                 struct page *pg;
2149                 struct lu_dirpage *dp;
2150
2151                 /*
2152                  * According to POSIX, please do not return any entry to client:
2153                  * even dot and dotdot should not be returned.
2154                  */
2155                 CWARN("readdir from dead object: "DFID"\n",
2156                         PFID(mdd_object_fid(mdd_obj)));
2157
2158                 if (rdpg->rp_count <= 0)
2159                         GOTO(out_unlock, rc = -EFAULT);
2160                 LASSERT(rdpg->rp_pages != NULL);
2161
2162                 pg = rdpg->rp_pages[0];
2163                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2164                 memset(dp, 0 , sizeof(struct lu_dirpage));
2165                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2166                 dp->ldp_hash_end   = cpu_to_le64(DIR_END_OFF);
2167                 dp->ldp_flags |= LDF_EMPTY;
2168                 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2169                 cfs_kunmap(pg);
2170                 GOTO(out_unlock, rc = 0);
2171         }
2172
2173         rc = __mdd_readpage(env, mdd_obj, rdpg);
2174
2175         EXIT;
2176 out_unlock:
2177         mdd_read_unlock(env, mdd_obj);
2178         return rc;
2179 }
2180
2181 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2182 {
2183         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2184         struct dt_object *next;
2185
2186         LASSERT(mdd_object_exists(mdd_obj));
2187         next = mdd_object_child(mdd_obj);
2188         return next->do_ops->do_object_sync(env, next);
2189 }
2190
2191 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2192                                         struct md_object *obj)
2193 {
2194         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2195
2196         LASSERT(mdd_object_exists(mdd_obj));
2197         return do_version_get(env, mdd_object_child(mdd_obj));
2198 }
2199
2200 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2201                             dt_obj_version_t version)
2202 {
2203         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2204
2205         LASSERT(mdd_object_exists(mdd_obj));
2206         return do_version_set(env, mdd_object_child(mdd_obj), version);
2207 }
2208
2209 const struct md_object_operations mdd_obj_ops = {
2210         .moo_permission    = mdd_permission,
2211         .moo_attr_get      = mdd_attr_get,
2212         .moo_attr_set      = mdd_attr_set,
2213         .moo_xattr_get     = mdd_xattr_get,
2214         .moo_xattr_set     = mdd_xattr_set,
2215         .moo_xattr_list    = mdd_xattr_list,
2216         .moo_xattr_del     = mdd_xattr_del,
2217         .moo_object_create = mdd_object_create,
2218         .moo_ref_add       = mdd_ref_add,
2219         .moo_ref_del       = mdd_ref_del,
2220         .moo_open          = mdd_open,
2221         .moo_close         = mdd_close,
2222         .moo_readpage      = mdd_readpage,
2223         .moo_readlink      = mdd_readlink,
2224         .moo_capa_get      = mdd_capa_get,
2225         .moo_object_sync   = mdd_object_sync,
2226         .moo_version_get   = mdd_version_get,
2227         .moo_version_set   = mdd_version_set,
2228         .moo_path          = mdd_path,
2229 };