Whamcloud - gitweb
29a6d791898e6b8661d67f17e0f7c3674e1e248a
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_object.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49 #ifdef HAVE_EXT4_LDISKFS
50 #include <ldiskfs/ldiskfs_jbd2.h>
51 #else
52 #include <linux/jbd.h>
53 #endif
54 #include <obd.h>
55 #include <obd_class.h>
56 #include <obd_support.h>
57 #include <lprocfs_status.h>
58 /* fid_be_cpu(), fid_cpu_to_be(). */
59 #include <lustre_fid.h>
60
61 #include <lustre_param.h>
62 #ifdef HAVE_EXT4_LDISKFS
63 #include <ldiskfs/ldiskfs.h>
64 #else
65 #include <linux/ldiskfs_fs.h>
66 #endif
67 #include <lustre_mds.h>
68 #include <lustre/lustre_idl.h>
69
70 #include "mdd_internal.h"
71
72 static const struct lu_object_operations mdd_lu_obj_ops;
73
74 static int mdd_xattr_get(const struct lu_env *env,
75                          struct md_object *obj, struct lu_buf *buf,
76                          const char *name);
77
78 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
79                  void **data)
80 {
81         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
82                  PFID(mdd_object_fid(obj)));
83         mdo_data_get(env, obj, data);
84         return 0;
85 }
86
87 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
88                struct lu_attr *la, struct lustre_capa *capa)
89 {
90         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
91                  PFID(mdd_object_fid(obj)));
92         return mdo_attr_get(env, obj, la, capa);
93 }
94
95 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
96 {
97         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
98
99         if (flags & LUSTRE_APPEND_FL)
100                 obj->mod_flags |= APPEND_OBJ;
101
102         if (flags & LUSTRE_IMMUTABLE_FL)
103                 obj->mod_flags |= IMMUTE_OBJ;
104 }
105
106 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
107 {
108         struct mdd_thread_info *info;
109
110         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
111         LASSERT(info != NULL);
112         return info;
113 }
114
115 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
116 {
117         struct lu_buf *buf;
118
119         buf = &mdd_env_info(env)->mti_buf;
120         buf->lb_buf = area;
121         buf->lb_len = len;
122         return buf;
123 }
124
125 void mdd_buf_put(struct lu_buf *buf)
126 {
127         if (buf == NULL || buf->lb_buf == NULL)
128                 return;
129         if (buf->lb_vmalloc)
130                 OBD_VFREE(buf->lb_buf, buf->lb_len);
131         else
132                 OBD_FREE(buf->lb_buf, buf->lb_len);
133         buf->lb_buf = NULL;
134 }
135
136 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
137                                        const void *area, ssize_t len)
138 {
139         struct lu_buf *buf;
140
141         buf = &mdd_env_info(env)->mti_buf;
142         buf->lb_buf = (void *)area;
143         buf->lb_len = len;
144         return buf;
145 }
146
147 #define BUF_VMALLOC_SIZE (CFS_PAGE_SIZE<<2) /* 16k */
148 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
149 {
150         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
151
152         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
153                 if (buf->lb_vmalloc)
154                         OBD_VFREE(buf->lb_buf, buf->lb_len);
155                 else
156                         OBD_FREE(buf->lb_buf, buf->lb_len);
157                 buf->lb_buf = NULL;
158         }
159         if (buf->lb_buf == NULL) {
160                 buf->lb_len = len;
161                 if (buf->lb_len <= BUF_VMALLOC_SIZE) {
162                         OBD_ALLOC(buf->lb_buf, buf->lb_len);
163                         buf->lb_vmalloc = 0;
164                 }
165                 if (buf->lb_buf == NULL) {
166                         OBD_VMALLOC(buf->lb_buf, buf->lb_len);
167                         buf->lb_vmalloc = 1;
168                 }
169                 if (buf->lb_buf == NULL)
170                         buf->lb_len = 0;
171         }
172         return buf;
173 }
174
175 /** Increase the size of the \a mti_big_buf.
176  * preserves old data in buffer
177  * old buffer remains unchanged on error
178  * \retval 0 or -ENOMEM
179  */
180 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
181 {
182         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
183         struct lu_buf buf;
184
185         LASSERT(len >= oldbuf->lb_len);
186         if (len > BUF_VMALLOC_SIZE) {
187                 OBD_VMALLOC(buf.lb_buf, len);
188                 buf.lb_vmalloc = 1;
189         } else {
190                 OBD_ALLOC(buf.lb_buf, len);
191                 buf.lb_vmalloc = 0;
192         }
193         if (buf.lb_buf == NULL)
194                 return -ENOMEM;
195
196         buf.lb_len = len;
197         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
198
199         if (oldbuf->lb_vmalloc)
200                 OBD_VFREE(oldbuf->lb_buf, oldbuf->lb_len);
201         else
202                 OBD_FREE(oldbuf->lb_buf, oldbuf->lb_len);
203
204         memcpy(oldbuf, &buf, sizeof(buf));
205
206         return 0;
207 }
208
209 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
210                                        struct mdd_device *mdd)
211 {
212         struct mdd_thread_info *mti = mdd_env_info(env);
213         int                     max_cookie_size;
214
215         max_cookie_size = mdd_lov_cookiesize(env, mdd);
216         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
217                 if (mti->mti_max_cookie)
218                         OBD_FREE(mti->mti_max_cookie, mti->mti_max_cookie_size);
219                 mti->mti_max_cookie = NULL;
220                 mti->mti_max_cookie_size = 0;
221         }
222         if (unlikely(mti->mti_max_cookie == NULL)) {
223                 OBD_ALLOC(mti->mti_max_cookie, max_cookie_size);
224                 if (likely(mti->mti_max_cookie != NULL))
225                         mti->mti_max_cookie_size = max_cookie_size;
226         }
227         if (likely(mti->mti_max_cookie != NULL))
228                 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
229         return mti->mti_max_cookie;
230 }
231
232 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
233                                    struct mdd_device *mdd)
234 {
235         struct mdd_thread_info *mti = mdd_env_info(env);
236         int                     max_lmm_size;
237
238         max_lmm_size = mdd_lov_mdsize(env, mdd);
239         if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
240                 if (mti->mti_max_lmm)
241                         OBD_FREE(mti->mti_max_lmm, mti->mti_max_lmm_size);
242                 mti->mti_max_lmm = NULL;
243                 mti->mti_max_lmm_size = 0;
244         }
245         if (unlikely(mti->mti_max_lmm == NULL)) {
246                 OBD_ALLOC(mti->mti_max_lmm, max_lmm_size);
247                 if (unlikely(mti->mti_max_lmm != NULL))
248                         mti->mti_max_lmm_size = max_lmm_size;
249         }
250         return mti->mti_max_lmm;
251 }
252
253 struct lu_object *mdd_object_alloc(const struct lu_env *env,
254                                    const struct lu_object_header *hdr,
255                                    struct lu_device *d)
256 {
257         struct mdd_object *mdd_obj;
258
259         OBD_ALLOC_PTR(mdd_obj);
260         if (mdd_obj != NULL) {
261                 struct lu_object *o;
262
263                 o = mdd2lu_obj(mdd_obj);
264                 lu_object_init(o, NULL, d);
265                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
266                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
267                 mdd_obj->mod_count = 0;
268                 o->lo_ops = &mdd_lu_obj_ops;
269                 return o;
270         } else {
271                 return NULL;
272         }
273 }
274
275 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
276                            const struct lu_object_conf *unused)
277 {
278         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
279         struct mdd_object *mdd_obj = lu2mdd_obj(o);
280         struct lu_object  *below;
281         struct lu_device  *under;
282         ENTRY;
283
284         mdd_obj->mod_cltime = 0;
285         under = &d->mdd_child->dd_lu_dev;
286         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
287         mdd_pdlock_init(mdd_obj);
288         if (below == NULL)
289                 RETURN(-ENOMEM);
290
291         lu_object_add(o, below);
292
293         RETURN(0);
294 }
295
296 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
297 {
298         if (lu_object_exists(o))
299                 return mdd_get_flags(env, lu2mdd_obj(o));
300         else
301                 return 0;
302 }
303
304 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
305 {
306         struct mdd_object *mdd = lu2mdd_obj(o);
307
308         lu_object_fini(o);
309         OBD_FREE_PTR(mdd);
310 }
311
312 static int mdd_object_print(const struct lu_env *env, void *cookie,
313                             lu_printer_t p, const struct lu_object *o)
314 {
315         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
316         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
317                     "valid=%x, cltime=%llu, flags=%lx)",
318                     mdd, mdd->mod_count, mdd->mod_valid,
319                     mdd->mod_cltime, mdd->mod_flags);
320 }
321
322 static const struct lu_object_operations mdd_lu_obj_ops = {
323         .loo_object_init    = mdd_object_init,
324         .loo_object_start   = mdd_object_start,
325         .loo_object_free    = mdd_object_free,
326         .loo_object_print   = mdd_object_print,
327 };
328
329 struct mdd_object *mdd_object_find(const struct lu_env *env,
330                                    struct mdd_device *d,
331                                    const struct lu_fid *f)
332 {
333         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
334 }
335
336 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
337                         const char *path, struct lu_fid *fid)
338 {
339         struct lu_buf *buf;
340         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
341         struct mdd_object *obj;
342         struct lu_name *lname = &mdd_env_info(env)->mti_name;
343         char *name;
344         int rc = 0;
345         ENTRY;
346
347         /* temp buffer for path element */
348         buf = mdd_buf_alloc(env, PATH_MAX);
349         if (buf->lb_buf == NULL)
350                 RETURN(-ENOMEM);
351
352         lname->ln_name = name = buf->lb_buf;
353         lname->ln_namelen = 0;
354         *f = mdd->mdd_root_fid;
355
356         while(1) {
357                 while (*path == '/')
358                         path++;
359                 if (*path == '\0')
360                         break;
361                 while (*path != '/' && *path != '\0') {
362                         *name = *path;
363                         path++;
364                         name++;
365                         lname->ln_namelen++;
366                 }
367
368                 *name = '\0';
369                 /* find obj corresponding to fid */
370                 obj = mdd_object_find(env, mdd, f);
371                 if (obj == NULL)
372                         GOTO(out, rc = -EREMOTE);
373                 if (IS_ERR(obj))
374                         GOTO(out, rc = -PTR_ERR(obj));
375                 /* get child fid from parent and name */
376                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
377                 mdd_object_put(env, obj);
378                 if (rc)
379                         break;
380
381                 name = buf->lb_buf;
382                 lname->ln_namelen = 0;
383         }
384
385         if (!rc)
386                 *fid = *f;
387 out:
388         RETURN(rc);
389 }
390
391 /** The maximum depth that fid2path() will search.
392  * This is limited only because we want to store the fids for
393  * historical path lookup purposes.
394  */
395 #define MAX_PATH_DEPTH 100
396
397 /** mdd_path() lookup structure. */
398 struct path_lookup_info {
399         __u64                pli_recno;        /**< history point */
400         __u64                pli_currec;       /**< current record */
401         struct lu_fid        pli_fid;
402         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
403         struct mdd_object   *pli_mdd_obj;
404         char                *pli_path;         /**< full path */
405         int                  pli_pathlen;
406         int                  pli_linkno;       /**< which hardlink to follow */
407         int                  pli_fidcount;     /**< number of \a pli_fids */
408 };
409
410 static int mdd_path_current(const struct lu_env *env,
411                             struct path_lookup_info *pli)
412 {
413         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
414         struct mdd_object *mdd_obj;
415         struct lu_buf     *buf = NULL;
416         struct link_ea_header *leh;
417         struct link_ea_entry  *lee;
418         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
419         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
420         char *ptr;
421         int reclen;
422         int rc;
423         ENTRY;
424
425         ptr = pli->pli_path + pli->pli_pathlen - 1;
426         *ptr = 0;
427         --ptr;
428         pli->pli_fidcount = 0;
429         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
430
431         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
432                 mdd_obj = mdd_object_find(env, mdd,
433                                           &pli->pli_fids[pli->pli_fidcount]);
434                 if (mdd_obj == NULL)
435                         GOTO(out, rc = -EREMOTE);
436                 if (IS_ERR(mdd_obj))
437                         GOTO(out, rc = -PTR_ERR(mdd_obj));
438                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
439                 if (rc <= 0) {
440                         mdd_object_put(env, mdd_obj);
441                         if (rc == -1)
442                                 rc = -EREMOTE;
443                         else if (rc == 0)
444                                 /* Do I need to error out here? */
445                                 rc = -ENOENT;
446                         GOTO(out, rc);
447                 }
448
449                 /* Get parent fid and object name */
450                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
451                 buf = mdd_links_get(env, mdd_obj);
452                 mdd_read_unlock(env, mdd_obj);
453                 mdd_object_put(env, mdd_obj);
454                 if (IS_ERR(buf))
455                         GOTO(out, rc = PTR_ERR(buf));
456
457                 leh = buf->lb_buf;
458                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
459                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
460
461                 /* If set, use link #linkno for path lookup, otherwise use
462                    link #0.  Only do this for the final path element. */
463                 if ((pli->pli_fidcount == 0) &&
464                     (pli->pli_linkno < leh->leh_reccount)) {
465                         int count;
466                         for (count = 0; count < pli->pli_linkno; count++) {
467                                 lee = (struct link_ea_entry *)
468                                      ((char *)lee + reclen);
469                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
470                         }
471                         if (pli->pli_linkno < leh->leh_reccount - 1)
472                                 /* indicate to user there are more links */
473                                 pli->pli_linkno++;
474                 }
475
476                 /* Pack the name in the end of the buffer */
477                 ptr -= tmpname->ln_namelen;
478                 if (ptr - 1 <= pli->pli_path)
479                         GOTO(out, rc = -EOVERFLOW);
480                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
481                 *(--ptr) = '/';
482
483                 /* Store the parent fid for historic lookup */
484                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
485                         GOTO(out, rc = -EOVERFLOW);
486                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
487         }
488
489         /* Verify that our path hasn't changed since we started the lookup.
490            Record the current index, and verify the path resolves to the
491            same fid. If it does, then the path is correct as of this index. */
492         spin_lock(&mdd->mdd_cl.mc_lock);
493         pli->pli_currec = mdd->mdd_cl.mc_index;
494         spin_unlock(&mdd->mdd_cl.mc_lock);
495         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
496         if (rc) {
497                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
498                 GOTO (out, rc = -EAGAIN);
499         }
500         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
501                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
502                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
503                        PFID(&pli->pli_fid));
504                 GOTO(out, rc = -EAGAIN);
505         }
506
507         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
508
509         EXIT;
510 out:
511         if (buf && !IS_ERR(buf) && buf->lb_vmalloc)
512                 /* if we vmalloced a large buffer drop it */
513                 mdd_buf_put(buf);
514
515         return rc;
516 }
517
518 static int mdd_path_historic(const struct lu_env *env,
519                              struct path_lookup_info *pli)
520 {
521         return 0;
522 }
523
524 /* Returns the full path to this fid, as of changelog record recno. */
525 static int mdd_path(const struct lu_env *env, struct md_object *obj,
526                     char *path, int pathlen, __u64 *recno, int *linkno)
527 {
528         struct path_lookup_info *pli;
529         int tries = 3;
530         int rc = -EAGAIN;
531         ENTRY;
532
533         if (pathlen < 3)
534                 RETURN(-EOVERFLOW);
535
536         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
537                 path[0] = '/';
538                 path[1] = '\0';
539                 RETURN(0);
540         }
541
542         OBD_ALLOC_PTR(pli);
543         if (pli == NULL)
544                 RETURN(-ENOMEM);
545
546         pli->pli_mdd_obj = md2mdd_obj(obj);
547         pli->pli_recno = *recno;
548         pli->pli_path = path;
549         pli->pli_pathlen = pathlen;
550         pli->pli_linkno = *linkno;
551
552         /* Retry multiple times in case file is being moved */
553         while (tries-- && rc == -EAGAIN)
554                 rc = mdd_path_current(env, pli);
555
556         /* For historical path lookup, the current links may not have existed
557          * at "recno" time.  We must switch over to earlier links/parents
558          * by using the changelog records.  If the earlier parent doesn't
559          * exist, we must search back through the changelog to reconstruct
560          * its parents, then check if it exists, etc.
561          * We may ignore this problem for the initial implementation and
562          * state that an "original" hardlink must still exist for us to find
563          * historic path name. */
564         if (pli->pli_recno != -1) {
565                 rc = mdd_path_historic(env, pli);
566         } else {
567                 *recno = pli->pli_currec;
568                 /* Return next link index to caller */
569                 *linkno = pli->pli_linkno;
570         }
571
572         OBD_FREE_PTR(pli);
573
574         RETURN (rc);
575 }
576
577 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
578 {
579         struct lu_attr *la = &mdd_env_info(env)->mti_la;
580         int rc;
581
582         ENTRY;
583         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
584         if (rc == 0) {
585                 mdd_flags_xlate(obj, la->la_flags);
586                 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
587                         obj->mod_flags |= MNLINK_OBJ;
588         }
589         RETURN(rc);
590 }
591
592 /* get only inode attributes */
593 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
594                   struct md_attr *ma)
595 {
596         int rc = 0;
597         ENTRY;
598
599         if (ma->ma_valid & MA_INODE)
600                 RETURN(0);
601
602         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
603                           mdd_object_capa(env, mdd_obj));
604         if (rc == 0)
605                 ma->ma_valid |= MA_INODE;
606         RETURN(rc);
607 }
608
609 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm,
610                        int *size)
611 {
612         struct lov_desc *ldesc;
613         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
614         ENTRY;
615
616         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
617         LASSERT(ldesc != NULL);
618
619         if (!lmm)
620                 RETURN(0);
621
622         lmm->lmm_magic = LOV_MAGIC_V1;
623         lmm->lmm_object_gr = LOV_OBJECT_GROUP_DEFAULT;
624         lmm->lmm_pattern = ldesc->ld_pattern;
625         lmm->lmm_stripe_size = ldesc->ld_default_stripe_size;
626         lmm->lmm_stripe_count = ldesc->ld_default_stripe_count;
627         *size = sizeof(struct lov_mds_md);
628
629         RETURN(sizeof(struct lov_mds_md));
630 }
631
632 /* get lov EA only */
633 static int __mdd_lmm_get(const struct lu_env *env,
634                          struct mdd_object *mdd_obj, struct md_attr *ma)
635 {
636         int rc;
637         ENTRY;
638
639         if (ma->ma_valid & MA_LOV)
640                 RETURN(0);
641
642         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
643                         XATTR_NAME_LOV);
644
645         if (rc == 0 && (ma->ma_need & MA_LOV_DEF)) {
646                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm,
647                                 &ma->ma_lmm_size);
648         }
649
650         if (rc > 0) {
651                 ma->ma_valid |= MA_LOV;
652                 rc = 0;
653         }
654         RETURN(rc);
655 }
656
657 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
658                        struct md_attr *ma)
659 {
660         int rc;
661         ENTRY;
662
663         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
664         rc = __mdd_lmm_get(env, mdd_obj, ma);
665         mdd_read_unlock(env, mdd_obj);
666         RETURN(rc);
667 }
668
669 /* get lmv EA only*/
670 static int __mdd_lmv_get(const struct lu_env *env,
671                          struct mdd_object *mdd_obj, struct md_attr *ma)
672 {
673         int rc;
674         ENTRY;
675
676         if (ma->ma_valid & MA_LMV)
677                 RETURN(0);
678
679         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
680                         XATTR_NAME_LMV);
681         if (rc > 0) {
682                 ma->ma_valid |= MA_LMV;
683                 rc = 0;
684         }
685         RETURN(rc);
686 }
687
688 static int mdd_attr_get_internal(const struct lu_env *env,
689                                  struct mdd_object *mdd_obj,
690                                  struct md_attr *ma)
691 {
692         int rc = 0;
693         ENTRY;
694
695         if (ma->ma_need & MA_INODE)
696                 rc = mdd_iattr_get(env, mdd_obj, ma);
697
698         if (rc == 0 && ma->ma_need & MA_LOV) {
699                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
700                     S_ISDIR(mdd_object_type(mdd_obj)))
701                         rc = __mdd_lmm_get(env, mdd_obj, ma);
702         }
703         if (rc == 0 && ma->ma_need & MA_LMV) {
704                 if (S_ISDIR(mdd_object_type(mdd_obj)))
705                         rc = __mdd_lmv_get(env, mdd_obj, ma);
706         }
707 #ifdef CONFIG_FS_POSIX_ACL
708         if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
709                 if (S_ISDIR(mdd_object_type(mdd_obj)))
710                         rc = mdd_def_acl_get(env, mdd_obj, ma);
711         }
712 #endif
713         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
714                rc, ma->ma_valid);
715         RETURN(rc);
716 }
717
718 int mdd_attr_get_internal_locked(const struct lu_env *env,
719                                  struct mdd_object *mdd_obj, struct md_attr *ma)
720 {
721         int rc;
722         int needlock = ma->ma_need & (MA_LOV | MA_LMV | MA_ACL_DEF);
723
724         if (needlock)
725                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
726         rc = mdd_attr_get_internal(env, mdd_obj, ma);
727         if (needlock)
728                 mdd_read_unlock(env, mdd_obj);
729         return rc;
730 }
731
732 /*
733  * No permission check is needed.
734  */
735 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
736                         struct md_attr *ma)
737 {
738         struct mdd_object *mdd_obj = md2mdd_obj(obj);
739         int                rc;
740
741         ENTRY;
742         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
743         RETURN(rc);
744 }
745
746 /*
747  * No permission check is needed.
748  */
749 static int mdd_xattr_get(const struct lu_env *env,
750                          struct md_object *obj, struct lu_buf *buf,
751                          const char *name)
752 {
753         struct mdd_object *mdd_obj = md2mdd_obj(obj);
754         int rc;
755
756         ENTRY;
757
758         LASSERT(mdd_object_exists(mdd_obj));
759
760         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
761         rc = mdo_xattr_get(env, mdd_obj, buf, name,
762                            mdd_object_capa(env, mdd_obj));
763         mdd_read_unlock(env, mdd_obj);
764
765         RETURN(rc);
766 }
767
768 /*
769  * Permission check is done when open,
770  * no need check again.
771  */
772 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
773                         struct lu_buf *buf)
774 {
775         struct mdd_object *mdd_obj = md2mdd_obj(obj);
776         struct dt_object  *next;
777         loff_t             pos = 0;
778         int                rc;
779         ENTRY;
780
781         LASSERT(mdd_object_exists(mdd_obj));
782
783         next = mdd_object_child(mdd_obj);
784         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
785         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
786                                          mdd_object_capa(env, mdd_obj));
787         mdd_read_unlock(env, mdd_obj);
788         RETURN(rc);
789 }
790
791 /*
792  * No permission check is needed.
793  */
794 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
795                           struct lu_buf *buf)
796 {
797         struct mdd_object *mdd_obj = md2mdd_obj(obj);
798         int rc;
799
800         ENTRY;
801
802         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
803         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
804         mdd_read_unlock(env, mdd_obj);
805
806         RETURN(rc);
807 }
808
809 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
810                                struct mdd_object *c, struct md_attr *ma,
811                                struct thandle *handle,
812                                const struct md_op_spec *spec)
813 {
814         struct lu_attr *attr = &ma->ma_attr;
815         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
816         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
817         const struct dt_index_features *feat = spec->sp_feat;
818         int rc;
819         ENTRY;
820
821         if (!mdd_object_exists(c)) {
822                 struct dt_object *next = mdd_object_child(c);
823                 LASSERT(next);
824
825                 if (feat != &dt_directory_features && feat != NULL)
826                         dof->dof_type = DFT_INDEX;
827                 else
828                         dof->dof_type = dt_mode_to_dft(attr->la_mode);
829
830                 dof->u.dof_idx.di_feat = feat;
831
832                 /* @hint will be initialized by underlying device. */
833                 next->do_ops->do_ah_init(env, hint,
834                                          p ? mdd_object_child(p) : NULL,
835                                          attr->la_mode & S_IFMT);
836
837                 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
838                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
839         } else
840                 rc = -EEXIST;
841
842         RETURN(rc);
843 }
844
845 /**
846  * Make sure the ctime is increased only.
847  */
848 static inline int mdd_attr_check(const struct lu_env *env,
849                                  struct mdd_object *obj,
850                                  struct lu_attr *attr)
851 {
852         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
853         int rc;
854         ENTRY;
855
856         if (attr->la_valid & LA_CTIME) {
857                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
858                 if (rc)
859                         RETURN(rc);
860
861                 if (attr->la_ctime < tmp_la->la_ctime)
862                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
863                 else if (attr->la_valid == LA_CTIME &&
864                          attr->la_ctime == tmp_la->la_ctime)
865                         attr->la_valid &= ~LA_CTIME;
866         }
867         RETURN(0);
868 }
869
870 int mdd_attr_set_internal(const struct lu_env *env,
871                           struct mdd_object *obj,
872                           struct lu_attr *attr,
873                           struct thandle *handle,
874                           int needacl)
875 {
876         int rc;
877         ENTRY;
878
879         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
880 #ifdef CONFIG_FS_POSIX_ACL
881         if (!rc && (attr->la_valid & LA_MODE) && needacl)
882                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
883 #endif
884         RETURN(rc);
885 }
886
887 int mdd_attr_check_set_internal(const struct lu_env *env,
888                                 struct mdd_object *obj,
889                                 struct lu_attr *attr,
890                                 struct thandle *handle,
891                                 int needacl)
892 {
893         int rc;
894         ENTRY;
895
896         rc = mdd_attr_check(env, obj, attr);
897         if (rc)
898                 RETURN(rc);
899
900         if (attr->la_valid)
901                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
902         RETURN(rc);
903 }
904
905 static int mdd_attr_set_internal_locked(const struct lu_env *env,
906                                         struct mdd_object *obj,
907                                         struct lu_attr *attr,
908                                         struct thandle *handle,
909                                         int needacl)
910 {
911         int rc;
912         ENTRY;
913
914         needacl = needacl && (attr->la_valid & LA_MODE);
915         if (needacl)
916                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
917         rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
918         if (needacl)
919                 mdd_write_unlock(env, obj);
920         RETURN(rc);
921 }
922
923 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
924                                        struct mdd_object *obj,
925                                        struct lu_attr *attr,
926                                        struct thandle *handle,
927                                        int needacl)
928 {
929         int rc;
930         ENTRY;
931
932         needacl = needacl && (attr->la_valid & LA_MODE);
933         if (needacl)
934                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
935         rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
936         if (needacl)
937                 mdd_write_unlock(env, obj);
938         RETURN(rc);
939 }
940
941 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
942                     const struct lu_buf *buf, const char *name,
943                     int fl, struct thandle *handle)
944 {
945         struct lustre_capa *capa = mdd_object_capa(env, obj);
946         int rc = -EINVAL;
947         ENTRY;
948
949         if (buf->lb_buf && buf->lb_len > 0)
950                 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
951         else if (buf->lb_buf == NULL && buf->lb_len == 0)
952                 rc = mdo_xattr_del(env, obj, name, handle, capa);
953
954         RETURN(rc);
955 }
956
957 /*
958  * This gives the same functionality as the code between
959  * sys_chmod and inode_setattr
960  * chown_common and inode_setattr
961  * utimes and inode_setattr
962  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
963  */
964 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
965                         struct lu_attr *la, const struct md_attr *ma)
966 {
967         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
968         struct md_ucred  *uc         = md_ucred(env);
969         int               rc;
970         ENTRY;
971
972         if (!la->la_valid)
973                 RETURN(0);
974
975         /* Do not permit change file type */
976         if (la->la_valid & LA_TYPE)
977                 RETURN(-EPERM);
978
979         /* They should not be processed by setattr */
980         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
981                 RETURN(-EPERM);
982
983         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
984         if (rc)
985                 RETURN(rc);
986
987         if (la->la_valid == LA_CTIME) {
988                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
989                         /* This is only for set ctime when rename's source is
990                          * on remote MDS. */
991                         rc = mdd_may_delete(env, NULL, obj,
992                                             (struct md_attr *)ma, 1, 0);
993                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
994                         la->la_valid &= ~LA_CTIME;
995                 RETURN(rc);
996         }
997
998         if (la->la_valid == LA_ATIME) {
999                 /* This is atime only set for read atime update on close. */
1000                 if (la->la_atime <= tmp_la->la_atime +
1001                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff)
1002                         la->la_valid &= ~LA_ATIME;
1003                 RETURN(0);
1004         }
1005
1006         /* Check if flags change. */
1007         if (la->la_valid & LA_FLAGS) {
1008                 unsigned int oldflags = 0;
1009                 unsigned int newflags = la->la_flags &
1010                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1011
1012                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1013                     !mdd_capable(uc, CFS_CAP_FOWNER))
1014                         RETURN(-EPERM);
1015
1016                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1017                  * only be changed by the relevant capability. */
1018                 if (mdd_is_immutable(obj))
1019                         oldflags |= LUSTRE_IMMUTABLE_FL;
1020                 if (mdd_is_append(obj))
1021                         oldflags |= LUSTRE_APPEND_FL;
1022                 if ((oldflags ^ newflags) &&
1023                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1024                         RETURN(-EPERM);
1025
1026                 if (!S_ISDIR(tmp_la->la_mode))
1027                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1028         }
1029
1030         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1031             (la->la_valid & ~LA_FLAGS) &&
1032             !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1033                 RETURN(-EPERM);
1034
1035         /* Check for setting the obj time. */
1036         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1037             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1038                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1039                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
1040                         rc = mdd_permission_internal_locked(env, obj, tmp_la,
1041                                                             MAY_WRITE,
1042                                                             MOR_TGT_CHILD);
1043                         if (rc)
1044                                 RETURN(rc);
1045                 }
1046         }
1047
1048         /* Make sure a caller can chmod. */
1049         if (la->la_valid & LA_MODE) {
1050                 /* Bypass la_vaild == LA_MODE,
1051                  * this is for changing file with SUID or SGID. */
1052                 if ((la->la_valid & ~LA_MODE) &&
1053                     !(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1054                     (uc->mu_fsuid != tmp_la->la_uid) &&
1055                     !mdd_capable(uc, CFS_CAP_FOWNER))
1056                         RETURN(-EPERM);
1057
1058                 if (la->la_mode == (umode_t) -1)
1059                         la->la_mode = tmp_la->la_mode;
1060                 else
1061                         la->la_mode = (la->la_mode & S_IALLUGO) |
1062                                       (tmp_la->la_mode & ~S_IALLUGO);
1063
1064                 /* Also check the setgid bit! */
1065                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1066                                        la->la_gid : tmp_la->la_gid) &&
1067                     !mdd_capable(uc, CFS_CAP_FSETID))
1068                         la->la_mode &= ~S_ISGID;
1069         } else {
1070                la->la_mode = tmp_la->la_mode;
1071         }
1072
1073         /* Make sure a caller can chown. */
1074         if (la->la_valid & LA_UID) {
1075                 if (la->la_uid == (uid_t) -1)
1076                         la->la_uid = tmp_la->la_uid;
1077                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1078                     (la->la_uid != tmp_la->la_uid)) &&
1079                     !mdd_capable(uc, CFS_CAP_CHOWN))
1080                         RETURN(-EPERM);
1081
1082                 /* If the user or group of a non-directory has been
1083                  * changed by a non-root user, remove the setuid bit.
1084                  * 19981026 David C Niemi <niemi@tux.org>
1085                  *
1086                  * Changed this to apply to all users, including root,
1087                  * to avoid some races. This is the behavior we had in
1088                  * 2.0. The check for non-root was definitely wrong
1089                  * for 2.2 anyway, as it should have been using
1090                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
1091                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1092                     !S_ISDIR(tmp_la->la_mode)) {
1093                         la->la_mode &= ~S_ISUID;
1094                         la->la_valid |= LA_MODE;
1095                 }
1096         }
1097
1098         /* Make sure caller can chgrp. */
1099         if (la->la_valid & LA_GID) {
1100                 if (la->la_gid == (gid_t) -1)
1101                         la->la_gid = tmp_la->la_gid;
1102                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1103                     ((la->la_gid != tmp_la->la_gid) &&
1104                     !lustre_in_group_p(uc, la->la_gid))) &&
1105                     !mdd_capable(uc, CFS_CAP_CHOWN))
1106                         RETURN(-EPERM);
1107
1108                 /* Likewise, if the user or group of a non-directory
1109                  * has been changed by a non-root user, remove the
1110                  * setgid bit UNLESS there is no group execute bit
1111                  * (this would be a file marked for mandatory
1112                  * locking).  19981026 David C Niemi <niemi@tux.org>
1113                  *
1114                  * Removed the fsuid check (see the comment above) --
1115                  * 19990830 SD. */
1116                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1117                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1118                         la->la_mode &= ~S_ISGID;
1119                         la->la_valid |= LA_MODE;
1120                 }
1121         }
1122
1123         /* For both Size-on-MDS case and truncate case,
1124          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1125          * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1126          * For SOM case, it is true, the MAY_WRITE perm has been checked
1127          * when open, no need check again. For truncate case, it is false,
1128          * the MAY_WRITE perm should be checked here. */
1129         if (ma->ma_attr_flags & MDS_SOM) {
1130                 /* For the "Size-on-MDS" setattr update, merge coming
1131                  * attributes with the set in the inode. BUG 10641 */
1132                 if ((la->la_valid & LA_ATIME) &&
1133                     (la->la_atime <= tmp_la->la_atime))
1134                         la->la_valid &= ~LA_ATIME;
1135
1136                 /* OST attributes do not have a priority over MDS attributes,
1137                  * so drop times if ctime is equal. */
1138                 if ((la->la_valid & LA_CTIME) &&
1139                     (la->la_ctime <= tmp_la->la_ctime))
1140                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1141         } else {
1142                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1143                         if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1144                               (uc->mu_fsuid == tmp_la->la_uid)) &&
1145                             !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1146                                 rc = mdd_permission_internal_locked(env, obj,
1147                                                             tmp_la, MAY_WRITE,
1148                                                             MOR_TGT_CHILD);
1149                                 if (rc)
1150                                         RETURN(rc);
1151                         }
1152                 }
1153                 if (la->la_valid & LA_CTIME) {
1154                         /* The pure setattr, it has the priority over what is
1155                          * already set, do not drop it if ctime is equal. */
1156                         if (la->la_ctime < tmp_la->la_ctime)
1157                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1158                                                   LA_CTIME);
1159                 }
1160         }
1161
1162         RETURN(0);
1163 }
1164
1165 /** Store a data change changelog record
1166  * If this fails, we must fail the whole transaction; we don't
1167  * want the change to commit without the log entry.
1168  * \param mdd_obj - mdd_object of change
1169  * \param handle - transacion handle
1170  */
1171 static int mdd_changelog_data_store(const struct lu_env     *env,
1172                                     struct mdd_device       *mdd,
1173                                     enum changelog_rec_type type,
1174                                     struct mdd_object       *mdd_obj,
1175                                     struct thandle          *handle)
1176 {
1177         const struct lu_fid *tfid = mdo2fid(mdd_obj);
1178         struct llog_changelog_rec *rec;
1179         struct lu_buf *buf;
1180         int reclen;
1181         int rc;
1182
1183         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1184                 RETURN(0);
1185
1186         LASSERT(handle != NULL);
1187         LASSERT(mdd_obj != NULL);
1188
1189         if ((type == CL_SETATTR) &&
1190             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1191                 /* Don't need multiple updates in this log */
1192                 /* Don't check under lock - no big deal if we get an extra
1193                    entry */
1194                 RETURN(0);
1195         }
1196
1197         reclen = llog_data_len(sizeof(*rec));
1198         buf = mdd_buf_alloc(env, reclen);
1199         if (buf->lb_buf == NULL)
1200                 RETURN(-ENOMEM);
1201         rec = (struct llog_changelog_rec *)buf->lb_buf;
1202
1203         rec->cr.cr_flags = CLF_VERSION;
1204         rec->cr.cr_type = (__u32)type;
1205         rec->cr.cr_tfid = *tfid;
1206         rec->cr.cr_namelen = 0;
1207         mdd_obj->mod_cltime = cfs_time_current_64();
1208
1209         rc = mdd_changelog_llog_write(mdd, rec, handle);
1210         if (rc < 0) {
1211                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1212                        rc, type, PFID(tfid));
1213                 return -EFAULT;
1214         }
1215
1216         return 0;
1217 }
1218
1219 /* set attr and LOV EA at once, return updated attr */
1220 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1221                         const struct md_attr *ma)
1222 {
1223         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1224         struct mdd_device *mdd = mdo2mdd(obj);
1225         struct thandle *handle;
1226         struct lov_mds_md *lmm = NULL;
1227         struct llog_cookie *logcookies = NULL;
1228         int  rc, lmm_size = 0, cookie_size = 0;
1229         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1230 #ifdef HAVE_QUOTA_SUPPORT
1231         struct obd_device *obd = mdd->mdd_obd_dev;
1232         struct mds_obd *mds = &obd->u.mds;
1233         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1234         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1235         int quota_opc = 0, block_count = 0;
1236         int inode_pending[MAXQUOTAS] = { 0, 0 };
1237         int block_pending[MAXQUOTAS] = { 0, 0 };
1238 #endif
1239         ENTRY;
1240
1241         mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1242                                     MDD_TXN_ATTR_SET_OP);
1243         handle = mdd_trans_start(env, mdd);
1244         if (IS_ERR(handle))
1245                 RETURN(PTR_ERR(handle));
1246         /*TODO: add lock here*/
1247         /* start a log jounal handle if needed */
1248         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1249             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1250                 lmm_size = mdd_lov_mdsize(env, mdd);
1251                 lmm = mdd_max_lmm_get(env, mdd);
1252                 if (lmm == NULL)
1253                         GOTO(cleanup, rc = -ENOMEM);
1254
1255                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1256                                 XATTR_NAME_LOV);
1257
1258                 if (rc < 0)
1259                         GOTO(cleanup, rc);
1260         }
1261
1262         if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1263                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1264                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1265
1266         *la_copy = ma->ma_attr;
1267         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1268         if (rc)
1269                 GOTO(cleanup, rc);
1270
1271 #ifdef HAVE_QUOTA_SUPPORT
1272         if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1273                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1274
1275                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1276                 if (!rc) {
1277                         quota_opc = FSFILT_OP_SETATTR;
1278                         mdd_quota_wrapper(la_copy, qnids);
1279                         mdd_quota_wrapper(la_tmp, qoids);
1280                         /* get file quota for new owner */
1281                         lquota_chkquota(mds_quota_interface_ref, obd, qnids,
1282                                         inode_pending, 1, NULL, 0, NULL, 0);
1283                         block_count = (la_tmp->la_blocks + 7) >> 3;
1284                         if (block_count) {
1285                                 void *data = NULL;
1286                                 mdd_data_get(env, mdd_obj, &data);
1287                                 /* get block quota for new owner */
1288                                 lquota_chkquota(mds_quota_interface_ref, obd,
1289                                                 qnids, block_pending,
1290                                                 block_count, NULL,
1291                                                 LQUOTA_FLAGS_BLK, data, 1);
1292                         }
1293                 }
1294         }
1295 #endif
1296
1297         if (la_copy->la_valid & LA_FLAGS) {
1298                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1299                                                   handle, 1);
1300                 if (rc == 0)
1301                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1302         } else if (la_copy->la_valid) {            /* setattr */
1303                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1304                                                   handle, 1);
1305                 /* journal chown/chgrp in llog, just like unlink */
1306                 if (rc == 0 && lmm_size){
1307                         cookie_size = mdd_lov_cookiesize(env, mdd);
1308                         logcookies = mdd_max_cookie_get(env, mdd);
1309                         if (logcookies == NULL)
1310                                 GOTO(cleanup, rc = -ENOMEM);
1311
1312                         if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1313                                             logcookies, cookie_size) <= 0)
1314                                 logcookies = NULL;
1315                 }
1316         }
1317
1318         if (rc == 0 && ma->ma_valid & MA_LOV) {
1319                 umode_t mode;
1320
1321                 mode = mdd_object_type(mdd_obj);
1322                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1323                         rc = mdd_lsm_sanity_check(env, mdd_obj);
1324                         if (rc)
1325                                 GOTO(cleanup, rc);
1326
1327                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1328                                             ma->ma_lmm_size, handle, 1);
1329                 }
1330
1331         }
1332 cleanup:
1333         if ((rc == 0) && (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME)))
1334                 rc = mdd_changelog_data_store(env, mdd, CL_SETATTR, mdd_obj,
1335                                               handle);
1336         mdd_trans_stop(env, mdd, rc, handle);
1337         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1338                 /*set obd attr, if needed*/
1339                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1340                                            logcookies);
1341         }
1342 #ifdef HAVE_QUOTA_SUPPORT
1343         if (quota_opc) {
1344                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1345                                       inode_pending, 0);
1346                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1347                                       block_pending, 1);
1348                 /* Trigger dqrel/dqacq for original owner and new owner.
1349                  * If failed, the next call for lquota_chkquota will
1350                  * process it. */
1351                 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1352                               quota_opc);
1353         }
1354 #endif
1355         RETURN(rc);
1356 }
1357
1358 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1359                       const struct lu_buf *buf, const char *name, int fl,
1360                       struct thandle *handle)
1361 {
1362         int  rc;
1363         ENTRY;
1364
1365         mdd_write_lock(env, obj, MOR_TGT_CHILD);
1366         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1367         mdd_write_unlock(env, obj);
1368
1369         RETURN(rc);
1370 }
1371
1372 static int mdd_xattr_sanity_check(const struct lu_env *env,
1373                                   struct mdd_object *obj)
1374 {
1375         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1376         struct md_ucred *uc     = md_ucred(env);
1377         int rc;
1378         ENTRY;
1379
1380         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1381                 RETURN(-EPERM);
1382
1383         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1384         if (rc)
1385                 RETURN(rc);
1386
1387         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1388             !mdd_capable(uc, CFS_CAP_FOWNER))
1389                 RETURN(-EPERM);
1390
1391         RETURN(rc);
1392 }
1393
1394 /**
1395  * The caller should guarantee to update the object ctime
1396  * after xattr_set if needed.
1397  */
1398 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1399                          const struct lu_buf *buf, const char *name,
1400                          int fl)
1401 {
1402         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1403         struct mdd_device *mdd = mdo2mdd(obj);
1404         struct thandle *handle;
1405         int  rc;
1406         ENTRY;
1407
1408         rc = mdd_xattr_sanity_check(env, mdd_obj);
1409         if (rc)
1410                 RETURN(rc);
1411
1412         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1413         /* security-replated changes may require sync */
1414         if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1415             mdd->mdd_sync_permission == 1)
1416                 txn_param_sync(&mdd_env_info(env)->mti_param);
1417
1418         handle = mdd_trans_start(env, mdd);
1419         if (IS_ERR(handle))
1420                 RETURN(PTR_ERR(handle));
1421
1422         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1423
1424         /* Only record user xattr changes */
1425         if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1426             (strncmp("user.", name, 5) == 0))
1427                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1428                                               handle);
1429         mdd_trans_stop(env, mdd, rc, handle);
1430
1431         RETURN(rc);
1432 }
1433
1434 /**
1435  * The caller should guarantee to update the object ctime
1436  * after xattr_set if needed.
1437  */
1438 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1439                   const char *name)
1440 {
1441         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1442         struct mdd_device *mdd = mdo2mdd(obj);
1443         struct thandle *handle;
1444         int  rc;
1445         ENTRY;
1446
1447         rc = mdd_xattr_sanity_check(env, mdd_obj);
1448         if (rc)
1449                 RETURN(rc);
1450
1451         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1452         handle = mdd_trans_start(env, mdd);
1453         if (IS_ERR(handle))
1454                 RETURN(PTR_ERR(handle));
1455
1456         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1457         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1458                            mdd_object_capa(env, mdd_obj));
1459         mdd_write_unlock(env, mdd_obj);
1460
1461         /* Only record user xattr changes */
1462         if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1463             (strncmp("user.", name, 5) != 0))
1464                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1465                                               handle);
1466
1467         mdd_trans_stop(env, mdd, rc, handle);
1468
1469         RETURN(rc);
1470 }
1471
1472 /* partial unlink */
1473 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1474                        struct md_attr *ma)
1475 {
1476         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1477         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1478         struct mdd_device *mdd = mdo2mdd(obj);
1479         struct thandle *handle;
1480 #ifdef HAVE_QUOTA_SUPPORT
1481         struct obd_device *obd = mdd->mdd_obd_dev;
1482         struct mds_obd *mds = &obd->u.mds;
1483         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1484         int quota_opc = 0;
1485 #endif
1486         int rc;
1487         ENTRY;
1488
1489         /*
1490          * Check -ENOENT early here because we need to get object type
1491          * to calculate credits before transaction start
1492          */
1493         if (!mdd_object_exists(mdd_obj))
1494                 RETURN(-ENOENT);
1495
1496         LASSERT(mdd_object_exists(mdd_obj) > 0);
1497
1498         rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1499         if (rc)
1500                 RETURN(rc);
1501
1502         handle = mdd_trans_start(env, mdd);
1503         if (IS_ERR(handle))
1504                 RETURN(-ENOMEM);
1505
1506         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1507
1508         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1509         if (rc)
1510                 GOTO(cleanup, rc);
1511
1512         __mdd_ref_del(env, mdd_obj, handle, 0);
1513
1514         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1515                 /* unlink dot */
1516                 __mdd_ref_del(env, mdd_obj, handle, 1);
1517         }
1518
1519         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1520         la_copy->la_ctime = ma->ma_attr.la_ctime;
1521
1522         la_copy->la_valid = LA_CTIME;
1523         rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1524         if (rc)
1525                 GOTO(cleanup, rc);
1526
1527         rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1528 #ifdef HAVE_QUOTA_SUPPORT
1529         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1530             ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1531                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1532                 mdd_quota_wrapper(&ma->ma_attr, qids);
1533         }
1534 #endif
1535
1536
1537         EXIT;
1538 cleanup:
1539         mdd_write_unlock(env, mdd_obj);
1540         mdd_trans_stop(env, mdd, rc, handle);
1541 #ifdef HAVE_QUOTA_SUPPORT
1542         if (quota_opc)
1543                 /* Trigger dqrel on the owner of child. If failed,
1544                  * the next call for lquota_chkquota will process it */
1545                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1546                               quota_opc);
1547 #endif
1548         return rc;
1549 }
1550
1551 /* partial operation */
1552 static int mdd_oc_sanity_check(const struct lu_env *env,
1553                                struct mdd_object *obj,
1554                                struct md_attr *ma)
1555 {
1556         int rc;
1557         ENTRY;
1558
1559         switch (ma->ma_attr.la_mode & S_IFMT) {
1560         case S_IFREG:
1561         case S_IFDIR:
1562         case S_IFLNK:
1563         case S_IFCHR:
1564         case S_IFBLK:
1565         case S_IFIFO:
1566         case S_IFSOCK:
1567                 rc = 0;
1568                 break;
1569         default:
1570                 rc = -EINVAL;
1571                 break;
1572         }
1573         RETURN(rc);
1574 }
1575
1576 static int mdd_object_create(const struct lu_env *env,
1577                              struct md_object *obj,
1578                              const struct md_op_spec *spec,
1579                              struct md_attr *ma)
1580 {
1581
1582         struct mdd_device *mdd = mdo2mdd(obj);
1583         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1584         const struct lu_fid *pfid = spec->u.sp_pfid;
1585         struct thandle *handle;
1586 #ifdef HAVE_QUOTA_SUPPORT
1587         struct obd_device *obd = mdd->mdd_obd_dev;
1588         struct mds_obd *mds = &obd->u.mds;
1589         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1590         int quota_opc = 0, block_count = 0;
1591         int inode_pending[MAXQUOTAS] = { 0, 0 };
1592         int block_pending[MAXQUOTAS] = { 0, 0 };
1593 #endif
1594         int rc = 0;
1595         ENTRY;
1596
1597 #ifdef HAVE_QUOTA_SUPPORT
1598         if (mds->mds_quota) {
1599                 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1600                 mdd_quota_wrapper(&ma->ma_attr, qids);
1601                 /* get file quota for child */
1602                 lquota_chkquota(mds_quota_interface_ref, obd, qids,
1603                                 inode_pending, 1, NULL, 0, NULL, 0);
1604                 switch (ma->ma_attr.la_mode & S_IFMT) {
1605                 case S_IFLNK:
1606                 case S_IFDIR:
1607                         block_count = 2;
1608                         break;
1609                 case S_IFREG:
1610                         block_count = 1;
1611                         break;
1612                 }
1613                 /* get block quota for child */
1614                 if (block_count)
1615                         lquota_chkquota(mds_quota_interface_ref, obd, qids,
1616                                         block_pending, block_count, NULL,
1617                                         LQUOTA_FLAGS_BLK, NULL, 0);
1618         }
1619 #endif
1620
1621         mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
1622         handle = mdd_trans_start(env, mdd);
1623         if (IS_ERR(handle))
1624                 GOTO(out_pending, rc = PTR_ERR(handle));
1625
1626         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1627         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1628         if (rc)
1629                 GOTO(unlock, rc);
1630
1631         rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1632         if (rc)
1633                 GOTO(unlock, rc);
1634
1635         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1636                 /* If creating the slave object, set slave EA here. */
1637                 int lmv_size = spec->u.sp_ea.eadatalen;
1638                 struct lmv_stripe_md *lmv;
1639
1640                 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1641                 LASSERT(lmv != NULL && lmv_size > 0);
1642
1643                 rc = __mdd_xattr_set(env, mdd_obj,
1644                                      mdd_buf_get_const(env, lmv, lmv_size),
1645                                      XATTR_NAME_LMV, 0, handle);
1646                 if (rc)
1647                         GOTO(unlock, rc);
1648
1649                 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1650                                            handle, 0);
1651         } else {
1652 #ifdef CONFIG_FS_POSIX_ACL
1653                 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1654                         struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1655
1656                         buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1657                         buf->lb_len = spec->u.sp_ea.eadatalen;
1658                         if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1659                                 rc = __mdd_acl_init(env, mdd_obj, buf,
1660                                                     &ma->ma_attr.la_mode,
1661                                                     handle);
1662                                 if (rc)
1663                                         GOTO(unlock, rc);
1664                                 else
1665                                         ma->ma_attr.la_valid |= LA_MODE;
1666                         }
1667
1668                         pfid = spec->u.sp_ea.fid;
1669                 }
1670 #endif
1671                 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1672                                            spec);
1673         }
1674         EXIT;
1675 unlock:
1676         if (rc == 0)
1677                 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1678         mdd_write_unlock(env, mdd_obj);
1679
1680         mdd_trans_stop(env, mdd, rc, handle);
1681 out_pending:
1682 #ifdef HAVE_QUOTA_SUPPORT
1683         if (quota_opc) {
1684                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1685                                       inode_pending, 0);
1686                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1687                                       block_pending, 1);
1688                 /* Trigger dqacq on the owner of child. If failed,
1689                  * the next call for lquota_chkquota will process it. */
1690                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1691                               quota_opc);
1692         }
1693 #endif
1694         return rc;
1695 }
1696
1697 /* partial link */
1698 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1699                        const struct md_attr *ma)
1700 {
1701         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1702         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1703         struct mdd_device *mdd = mdo2mdd(obj);
1704         struct thandle *handle;
1705         int rc;
1706         ENTRY;
1707
1708         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1709         handle = mdd_trans_start(env, mdd);
1710         if (IS_ERR(handle))
1711                 RETURN(-ENOMEM);
1712
1713         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1714         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1715         if (rc == 0)
1716                 __mdd_ref_add(env, mdd_obj, handle);
1717         mdd_write_unlock(env, mdd_obj);
1718         if (rc == 0) {
1719                 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1720                 la_copy->la_ctime = ma->ma_attr.la_ctime;
1721
1722                 la_copy->la_valid = LA_CTIME;
1723                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1724                                                         handle, 0);
1725         }
1726         mdd_trans_stop(env, mdd, 0, handle);
1727
1728         RETURN(rc);
1729 }
1730
1731 /*
1732  * do NOT or the MAY_*'s, you'll get the weakest
1733  */
1734 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1735 {
1736         int res = 0;
1737
1738         /* Sadly, NFSD reopens a file repeatedly during operation, so the
1739          * "acc_mode = 0" allowance for newly-created files isn't honoured.
1740          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1741          * owner can write to a file even if it is marked readonly to hide
1742          * its brokenness. (bug 5781) */
1743         if (flags & MDS_OPEN_OWNEROVERRIDE) {
1744                 struct md_ucred *uc = md_ucred(env);
1745
1746                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1747                     (la->la_uid == uc->mu_fsuid))
1748                         return 0;
1749         }
1750
1751         if (flags & FMODE_READ)
1752                 res |= MAY_READ;
1753         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1754                 res |= MAY_WRITE;
1755         if (flags & MDS_FMODE_EXEC)
1756                 res |= MAY_EXEC;
1757         return res;
1758 }
1759
1760 static int mdd_open_sanity_check(const struct lu_env *env,
1761                                  struct mdd_object *obj, int flag)
1762 {
1763         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1764         int mode, rc;
1765         ENTRY;
1766
1767         /* EEXIST check */
1768         if (mdd_is_dead_obj(obj))
1769                 RETURN(-ENOENT);
1770
1771         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1772         if (rc)
1773                RETURN(rc);
1774
1775         if (S_ISLNK(tmp_la->la_mode))
1776                 RETURN(-ELOOP);
1777
1778         mode = accmode(env, tmp_la, flag);
1779
1780         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1781                 RETURN(-EISDIR);
1782
1783         if (!(flag & MDS_OPEN_CREATED)) {
1784                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1785                 if (rc)
1786                         RETURN(rc);
1787         }
1788
1789         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1790             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1791                 flag &= ~MDS_OPEN_TRUNC;
1792
1793         /* For writing append-only file must open it with append mode. */
1794         if (mdd_is_append(obj)) {
1795                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
1796                         RETURN(-EPERM);
1797                 if (flag & MDS_OPEN_TRUNC)
1798                         RETURN(-EPERM);
1799         }
1800
1801 #if 0
1802         /*
1803          * Now, flag -- O_NOATIME does not be packed by client.
1804          */
1805         if (flag & O_NOATIME) {
1806                 struct md_ucred *uc = md_ucred(env);
1807
1808                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
1809                     (uc->mu_valid == UCRED_NEW)) &&
1810                     (uc->mu_fsuid != tmp_la->la_uid) &&
1811                     !mdd_capable(uc, CFS_CAP_FOWNER))
1812                         RETURN(-EPERM);
1813         }
1814 #endif
1815
1816         RETURN(0);
1817 }
1818
1819 static int mdd_open(const struct lu_env *env, struct md_object *obj,
1820                     int flags)
1821 {
1822         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1823         int rc = 0;
1824
1825         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1826
1827         rc = mdd_open_sanity_check(env, mdd_obj, flags);
1828         if (rc == 0)
1829                 mdd_obj->mod_count++;
1830
1831         mdd_write_unlock(env, mdd_obj);
1832         return rc;
1833 }
1834
1835 /* return md_attr back,
1836  * if it is last unlink then return lov ea + llog cookie*/
1837 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
1838                     struct md_attr *ma)
1839 {
1840         int rc = 0;
1841         ENTRY;
1842
1843         if (S_ISREG(mdd_object_type(obj))) {
1844                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
1845                  * Caller must be ready for that. */
1846
1847                 rc = __mdd_lmm_get(env, obj, ma);
1848                 if ((ma->ma_valid & MA_LOV))
1849                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
1850                                             obj, ma);
1851         }
1852         RETURN(rc);
1853 }
1854
1855 /*
1856  * No permission check is needed.
1857  */
1858 static int mdd_close(const struct lu_env *env, struct md_object *obj,
1859                      struct md_attr *ma)
1860 {
1861         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1862         struct mdd_device *mdd = mdo2mdd(obj);
1863         struct thandle    *handle;
1864         int rc;
1865         int reset = 1;
1866
1867 #ifdef HAVE_QUOTA_SUPPORT
1868         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
1869         struct mds_obd *mds = &obd->u.mds;
1870         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1871         int quota_opc = 0;
1872 #endif
1873         ENTRY;
1874
1875         rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1876         if (rc)
1877                 RETURN(rc);
1878         handle = mdd_trans_start(env, mdo2mdd(obj));
1879         if (IS_ERR(handle))
1880                 RETURN(PTR_ERR(handle));
1881
1882         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1883         /* release open count */
1884         mdd_obj->mod_count --;
1885
1886         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
1887                 /* remove link to object from orphan index */
1888                 rc = __mdd_orphan_del(env, mdd_obj, handle);
1889                 if (rc == 0) {
1890                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
1891                                "list, OSS objects to be destroyed.\n",
1892                                PFID(mdd_object_fid(mdd_obj)));
1893                 } else {
1894                         CERROR("Object "DFID" can not be deleted from orphan "
1895                                 "list, maybe cause OST objects can not be "
1896                                 "destroyed (err: %d).\n",
1897                                 PFID(mdd_object_fid(mdd_obj)), rc);
1898                         /* If object was not deleted from orphan list, do not
1899                          * destroy OSS objects, which will be done when next
1900                          * recovery. */
1901                         GOTO(out, rc);
1902                 }
1903         }
1904
1905         rc = mdd_iattr_get(env, mdd_obj, ma);
1906         /* Object maybe not in orphan list originally, it is rare case for
1907          * mdd_finish_unlink() failure. */
1908         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
1909 #ifdef HAVE_QUOTA_SUPPORT
1910                 if (mds->mds_quota) {
1911                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1912                         mdd_quota_wrapper(&ma->ma_attr, qids);
1913                 }
1914 #endif
1915                 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
1916                 if (ma->ma_valid & MA_FLAGS &&
1917                     ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
1918                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
1919                 } else {
1920                         rc = mdd_object_kill(env, mdd_obj, ma);
1921                                 if (rc == 0)
1922                                         reset = 0;
1923                 }
1924
1925                 if (rc != 0)
1926                         CERROR("Error when prepare to delete Object "DFID" , "
1927                                "which will cause OST objects can not be "
1928                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
1929         }
1930         EXIT;
1931
1932 out:
1933         if (reset)
1934                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
1935
1936         mdd_write_unlock(env, mdd_obj);
1937         mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
1938 #ifdef HAVE_QUOTA_SUPPORT
1939         if (quota_opc)
1940                 /* Trigger dqrel on the owner of child. If failed,
1941                  * the next call for lquota_chkquota will process it */
1942                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1943                               quota_opc);
1944 #endif
1945         return rc;
1946 }
1947
1948 /*
1949  * Permission check is done when open,
1950  * no need check again.
1951  */
1952 static int mdd_readpage_sanity_check(const struct lu_env *env,
1953                                      struct mdd_object *obj)
1954 {
1955         struct dt_object *next = mdd_object_child(obj);
1956         int rc;
1957         ENTRY;
1958
1959         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
1960                 rc = 0;
1961         else
1962                 rc = -ENOTDIR;
1963
1964         RETURN(rc);
1965 }
1966
1967 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
1968                               int first, void *area, int nob,
1969                               const struct dt_it_ops *iops, struct dt_it *it,
1970                               __u64 *start, __u64 *end,
1971                               struct lu_dirent **last, __u32 attr)
1972 {
1973         int                     result;
1974         __u64                   hash = 0;
1975         struct lu_dirent       *ent;
1976
1977         if (first) {
1978                 memset(area, 0, sizeof (struct lu_dirpage));
1979                 area += sizeof (struct lu_dirpage);
1980                 nob  -= sizeof (struct lu_dirpage);
1981         }
1982
1983         ent  = area;
1984         do {
1985                 int    len;
1986                 int    recsize;
1987
1988                 len  = iops->key_size(env, it);
1989
1990                 /* IAM iterator can return record with zero len. */
1991                 if (len == 0)
1992                         goto next;
1993
1994                 hash = iops->store(env, it);
1995                 if (unlikely(first)) {
1996                         first = 0;
1997                         *start = hash;
1998                 }
1999
2000                 /* calculate max space required for lu_dirent */
2001                 recsize = lu_dirent_calc_size(len, attr);
2002
2003                 if (nob >= recsize) {
2004                         result = iops->rec(env, it, ent, attr);
2005                         if (result == -ESTALE)
2006                                 goto next;
2007                         if (result != 0)
2008                                 goto out;
2009
2010                         /* osd might not able to pack all attributes,
2011                          * so recheck rec length */
2012                         recsize = le16_to_cpu(ent->lde_reclen);
2013                 } else {
2014                         /*
2015                          * record doesn't fit into page, enlarge previous one.
2016                          */
2017                         if (*last) {
2018                                 (*last)->lde_reclen =
2019                                         cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
2020                                                         nob);
2021                                 result = 0;
2022                         } else
2023                                 result = -EINVAL;
2024
2025                         goto out;
2026                 }
2027                 *last = ent;
2028                 ent = (void *)ent + recsize;
2029                 nob -= recsize;
2030
2031 next:
2032                 result = iops->next(env, it);
2033                 if (result == -ESTALE)
2034                         goto next;
2035         } while (result == 0);
2036
2037 out:
2038         *end = hash;
2039         return result;
2040 }
2041
2042 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2043                           const struct lu_rdpg *rdpg)
2044 {
2045         struct dt_it      *it;
2046         struct dt_object  *next = mdd_object_child(obj);
2047         const struct dt_it_ops  *iops;
2048         struct page       *pg;
2049         struct lu_dirent  *last = NULL;
2050         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2051         int i;
2052         int rc;
2053         int nob;
2054         __u64 hash_start;
2055         __u64 hash_end = 0;
2056
2057         LASSERT(rdpg->rp_pages != NULL);
2058         LASSERT(next->do_index_ops != NULL);
2059
2060         if (rdpg->rp_count <= 0)
2061                 return -EFAULT;
2062
2063         /*
2064          * iterate through directory and fill pages from @rdpg
2065          */
2066         iops = &next->do_index_ops->dio_it;
2067         it = iops->init(env, next, mdd_object_capa(env, obj));
2068         if (IS_ERR(it))
2069                 return PTR_ERR(it);
2070
2071         rc = iops->load(env, it, rdpg->rp_hash);
2072
2073         if (rc == 0){
2074                 /*
2075                  * Iterator didn't find record with exactly the key requested.
2076                  *
2077                  * It is currently either
2078                  *
2079                  *     - positioned above record with key less than
2080                  *     requested---skip it.
2081                  *
2082                  *     - or not positioned at all (is in IAM_IT_SKEWED
2083                  *     state)---position it on the next item.
2084                  */
2085                 rc = iops->next(env, it);
2086         } else if (rc > 0)
2087                 rc = 0;
2088
2089         /*
2090          * At this point and across for-loop:
2091          *
2092          *  rc == 0 -> ok, proceed.
2093          *  rc >  0 -> end of directory.
2094          *  rc <  0 -> error.
2095          */
2096         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2097              i++, nob -= CFS_PAGE_SIZE) {
2098                 LASSERT(i < rdpg->rp_npages);
2099                 pg = rdpg->rp_pages[i];
2100                 rc = mdd_dir_page_build(env, mdd, !i, cfs_kmap(pg),
2101                                         min_t(int, nob, CFS_PAGE_SIZE), iops,
2102                                         it, &hash_start, &hash_end, &last,
2103                                         rdpg->rp_attrs);
2104                 if (rc != 0 || i == rdpg->rp_npages - 1) {
2105                         if (last)
2106                                 last->lde_reclen = 0;
2107                 }
2108                 cfs_kunmap(pg);
2109         }
2110         if (rc > 0) {
2111                 /*
2112                  * end of directory.
2113                  */
2114                 hash_end = DIR_END_OFF;
2115                 rc = 0;
2116         }
2117         if (rc == 0) {
2118                 struct lu_dirpage *dp;
2119
2120                 dp = cfs_kmap(rdpg->rp_pages[0]);
2121                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2122                 dp->ldp_hash_end   = cpu_to_le64(hash_end);
2123                 if (i == 0)
2124                         /*
2125                          * No pages were processed, mark this.
2126                          */
2127                         dp->ldp_flags |= LDF_EMPTY;
2128
2129                 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2130                 cfs_kunmap(rdpg->rp_pages[0]);
2131         }
2132         iops->put(env, it);
2133         iops->fini(env, it);
2134
2135         return rc;
2136 }
2137
2138 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2139                  const struct lu_rdpg *rdpg)
2140 {
2141         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2142         int rc;
2143         ENTRY;
2144
2145         LASSERT(mdd_object_exists(mdd_obj));
2146
2147         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2148         rc = mdd_readpage_sanity_check(env, mdd_obj);
2149         if (rc)
2150                 GOTO(out_unlock, rc);
2151
2152         if (mdd_is_dead_obj(mdd_obj)) {
2153                 struct page *pg;
2154                 struct lu_dirpage *dp;
2155
2156                 /*
2157                  * According to POSIX, please do not return any entry to client:
2158                  * even dot and dotdot should not be returned.
2159                  */
2160                 CWARN("readdir from dead object: "DFID"\n",
2161                         PFID(mdd_object_fid(mdd_obj)));
2162
2163                 if (rdpg->rp_count <= 0)
2164                         GOTO(out_unlock, rc = -EFAULT);
2165                 LASSERT(rdpg->rp_pages != NULL);
2166
2167                 pg = rdpg->rp_pages[0];
2168                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2169                 memset(dp, 0 , sizeof(struct lu_dirpage));
2170                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2171                 dp->ldp_hash_end   = cpu_to_le64(DIR_END_OFF);
2172                 dp->ldp_flags |= LDF_EMPTY;
2173                 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2174                 cfs_kunmap(pg);
2175                 GOTO(out_unlock, rc = 0);
2176         }
2177
2178         rc = __mdd_readpage(env, mdd_obj, rdpg);
2179
2180         EXIT;
2181 out_unlock:
2182         mdd_read_unlock(env, mdd_obj);
2183         return rc;
2184 }
2185
2186 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2187 {
2188         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2189         struct dt_object *next;
2190
2191         LASSERT(mdd_object_exists(mdd_obj));
2192         next = mdd_object_child(mdd_obj);
2193         return next->do_ops->do_object_sync(env, next);
2194 }
2195
2196 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2197                                         struct md_object *obj)
2198 {
2199         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2200
2201         LASSERT(mdd_object_exists(mdd_obj));
2202         return do_version_get(env, mdd_object_child(mdd_obj));
2203 }
2204
2205 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2206                             dt_obj_version_t version)
2207 {
2208         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2209
2210         LASSERT(mdd_object_exists(mdd_obj));
2211         return do_version_set(env, mdd_object_child(mdd_obj), version);
2212 }
2213
2214 const struct md_object_operations mdd_obj_ops = {
2215         .moo_permission    = mdd_permission,
2216         .moo_attr_get      = mdd_attr_get,
2217         .moo_attr_set      = mdd_attr_set,
2218         .moo_xattr_get     = mdd_xattr_get,
2219         .moo_xattr_set     = mdd_xattr_set,
2220         .moo_xattr_list    = mdd_xattr_list,
2221         .moo_xattr_del     = mdd_xattr_del,
2222         .moo_object_create = mdd_object_create,
2223         .moo_ref_add       = mdd_ref_add,
2224         .moo_ref_del       = mdd_ref_del,
2225         .moo_open          = mdd_open,
2226         .moo_close         = mdd_close,
2227         .moo_readpage      = mdd_readpage,
2228         .moo_readlink      = mdd_readlink,
2229         .moo_capa_get      = mdd_capa_get,
2230         .moo_object_sync   = mdd_object_sync,
2231         .moo_version_get   = mdd_version_get,
2232         .moo_version_set   = mdd_version_set,
2233         .moo_path          = mdd_path,
2234 };