Whamcloud - gitweb
b=19964 SOM EA
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_object.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49 #ifdef HAVE_EXT4_LDISKFS
50 #include <ldiskfs/ldiskfs_jbd2.h>
51 #else
52 #include <linux/jbd.h>
53 #endif
54 #include <obd.h>
55 #include <obd_class.h>
56 #include <obd_support.h>
57 #include <lprocfs_status.h>
58 /* fid_be_cpu(), fid_cpu_to_be(). */
59 #include <lustre_fid.h>
60
61 #include <lustre_param.h>
62 #ifdef HAVE_EXT4_LDISKFS
63 #include <ldiskfs/ldiskfs.h>
64 #else
65 #include <linux/ldiskfs_fs.h>
66 #endif
67 #include <lustre_mds.h>
68 #include <lustre/lustre_idl.h>
69
70 #include "mdd_internal.h"
71
72 static const struct lu_object_operations mdd_lu_obj_ops;
73
74 static int mdd_xattr_get(const struct lu_env *env,
75                          struct md_object *obj, struct lu_buf *buf,
76                          const char *name);
77
78 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
79                  void **data)
80 {
81         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
82                  PFID(mdd_object_fid(obj)));
83         mdo_data_get(env, obj, data);
84         return 0;
85 }
86
87 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
88                struct lu_attr *la, struct lustre_capa *capa)
89 {
90         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
91                  PFID(mdd_object_fid(obj)));
92         return mdo_attr_get(env, obj, la, capa);
93 }
94
95 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
96 {
97         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
98
99         if (flags & LUSTRE_APPEND_FL)
100                 obj->mod_flags |= APPEND_OBJ;
101
102         if (flags & LUSTRE_IMMUTABLE_FL)
103                 obj->mod_flags |= IMMUTE_OBJ;
104 }
105
106 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
107 {
108         struct mdd_thread_info *info;
109
110         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
111         LASSERT(info != NULL);
112         return info;
113 }
114
115 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
116 {
117         struct lu_buf *buf;
118
119         buf = &mdd_env_info(env)->mti_buf;
120         buf->lb_buf = area;
121         buf->lb_len = len;
122         return buf;
123 }
124
125 void mdd_buf_put(struct lu_buf *buf)
126 {
127         if (buf == NULL || buf->lb_buf == NULL)
128                 return;
129         if (buf->lb_vmalloc)
130                 OBD_VFREE(buf->lb_buf, buf->lb_len);
131         else
132                 OBD_FREE(buf->lb_buf, buf->lb_len);
133         buf->lb_buf = NULL;
134 }
135
136 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
137                                        const void *area, ssize_t len)
138 {
139         struct lu_buf *buf;
140
141         buf = &mdd_env_info(env)->mti_buf;
142         buf->lb_buf = (void *)area;
143         buf->lb_len = len;
144         return buf;
145 }
146
147 #define BUF_VMALLOC_SIZE (CFS_PAGE_SIZE<<2) /* 16k */
148 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
149 {
150         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
151
152         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
153                 if (buf->lb_vmalloc)
154                         OBD_VFREE(buf->lb_buf, buf->lb_len);
155                 else
156                         OBD_FREE(buf->lb_buf, buf->lb_len);
157                 buf->lb_buf = NULL;
158         }
159         if (buf->lb_buf == NULL) {
160                 buf->lb_len = len;
161                 if (buf->lb_len <= BUF_VMALLOC_SIZE) {
162                         OBD_ALLOC(buf->lb_buf, buf->lb_len);
163                         buf->lb_vmalloc = 0;
164                 }
165                 if (buf->lb_buf == NULL) {
166                         OBD_VMALLOC(buf->lb_buf, buf->lb_len);
167                         buf->lb_vmalloc = 1;
168                 }
169                 if (buf->lb_buf == NULL)
170                         buf->lb_len = 0;
171         }
172         return buf;
173 }
174
175 /** Increase the size of the \a mti_big_buf.
176  * preserves old data in buffer
177  * old buffer remains unchanged on error
178  * \retval 0 or -ENOMEM
179  */
180 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
181 {
182         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
183         struct lu_buf buf;
184
185         LASSERT(len >= oldbuf->lb_len);
186         if (len > BUF_VMALLOC_SIZE) {
187                 OBD_VMALLOC(buf.lb_buf, len);
188                 buf.lb_vmalloc = 1;
189         } else {
190                 OBD_ALLOC(buf.lb_buf, len);
191                 buf.lb_vmalloc = 0;
192         }
193         if (buf.lb_buf == NULL)
194                 return -ENOMEM;
195
196         buf.lb_len = len;
197         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
198
199         if (oldbuf->lb_vmalloc)
200                 OBD_VFREE(oldbuf->lb_buf, oldbuf->lb_len);
201         else
202                 OBD_FREE(oldbuf->lb_buf, oldbuf->lb_len);
203
204         memcpy(oldbuf, &buf, sizeof(buf));
205
206         return 0;
207 }
208
209 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
210                                        struct mdd_device *mdd)
211 {
212         struct mdd_thread_info *mti = mdd_env_info(env);
213         int                     max_cookie_size;
214
215         max_cookie_size = mdd_lov_cookiesize(env, mdd);
216         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
217                 if (mti->mti_max_cookie)
218                         OBD_FREE(mti->mti_max_cookie, mti->mti_max_cookie_size);
219                 mti->mti_max_cookie = NULL;
220                 mti->mti_max_cookie_size = 0;
221         }
222         if (unlikely(mti->mti_max_cookie == NULL)) {
223                 OBD_ALLOC(mti->mti_max_cookie, max_cookie_size);
224                 if (likely(mti->mti_max_cookie != NULL))
225                         mti->mti_max_cookie_size = max_cookie_size;
226         }
227         if (likely(mti->mti_max_cookie != NULL))
228                 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
229         return mti->mti_max_cookie;
230 }
231
232 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
233                                    struct mdd_device *mdd)
234 {
235         struct mdd_thread_info *mti = mdd_env_info(env);
236         int                     max_lmm_size;
237
238         max_lmm_size = mdd_lov_mdsize(env, mdd);
239         if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
240                 if (mti->mti_max_lmm)
241                         OBD_FREE(mti->mti_max_lmm, mti->mti_max_lmm_size);
242                 mti->mti_max_lmm = NULL;
243                 mti->mti_max_lmm_size = 0;
244         }
245         if (unlikely(mti->mti_max_lmm == NULL)) {
246                 OBD_ALLOC(mti->mti_max_lmm, max_lmm_size);
247                 if (unlikely(mti->mti_max_lmm != NULL))
248                         mti->mti_max_lmm_size = max_lmm_size;
249         }
250         return mti->mti_max_lmm;
251 }
252
253 struct lu_object *mdd_object_alloc(const struct lu_env *env,
254                                    const struct lu_object_header *hdr,
255                                    struct lu_device *d)
256 {
257         struct mdd_object *mdd_obj;
258
259         OBD_ALLOC_PTR(mdd_obj);
260         if (mdd_obj != NULL) {
261                 struct lu_object *o;
262
263                 o = mdd2lu_obj(mdd_obj);
264                 lu_object_init(o, NULL, d);
265                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
266                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
267                 mdd_obj->mod_count = 0;
268                 o->lo_ops = &mdd_lu_obj_ops;
269                 return o;
270         } else {
271                 return NULL;
272         }
273 }
274
275 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
276                            const struct lu_object_conf *unused)
277 {
278         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
279         struct mdd_object *mdd_obj = lu2mdd_obj(o);
280         struct lu_object  *below;
281         struct lu_device  *under;
282         ENTRY;
283
284         mdd_obj->mod_cltime = 0;
285         under = &d->mdd_child->dd_lu_dev;
286         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
287         mdd_pdlock_init(mdd_obj);
288         if (below == NULL)
289                 RETURN(-ENOMEM);
290
291         lu_object_add(o, below);
292
293         RETURN(0);
294 }
295
296 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
297 {
298         if (lu_object_exists(o))
299                 return mdd_get_flags(env, lu2mdd_obj(o));
300         else
301                 return 0;
302 }
303
304 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
305 {
306         struct mdd_object *mdd = lu2mdd_obj(o);
307
308         lu_object_fini(o);
309         OBD_FREE_PTR(mdd);
310 }
311
312 static int mdd_object_print(const struct lu_env *env, void *cookie,
313                             lu_printer_t p, const struct lu_object *o)
314 {
315         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
316         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
317                     "valid=%x, cltime=%llu, flags=%lx)",
318                     mdd, mdd->mod_count, mdd->mod_valid,
319                     mdd->mod_cltime, mdd->mod_flags);
320 }
321
322 static const struct lu_object_operations mdd_lu_obj_ops = {
323         .loo_object_init    = mdd_object_init,
324         .loo_object_start   = mdd_object_start,
325         .loo_object_free    = mdd_object_free,
326         .loo_object_print   = mdd_object_print,
327 };
328
329 struct mdd_object *mdd_object_find(const struct lu_env *env,
330                                    struct mdd_device *d,
331                                    const struct lu_fid *f)
332 {
333         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
334 }
335
336 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
337                         const char *path, struct lu_fid *fid)
338 {
339         struct lu_buf *buf;
340         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
341         struct mdd_object *obj;
342         struct lu_name *lname = &mdd_env_info(env)->mti_name;
343         char *name;
344         int rc = 0;
345         ENTRY;
346
347         /* temp buffer for path element */
348         buf = mdd_buf_alloc(env, PATH_MAX);
349         if (buf->lb_buf == NULL)
350                 RETURN(-ENOMEM);
351
352         lname->ln_name = name = buf->lb_buf;
353         lname->ln_namelen = 0;
354         *f = mdd->mdd_root_fid;
355
356         while(1) {
357                 while (*path == '/')
358                         path++;
359                 if (*path == '\0')
360                         break;
361                 while (*path != '/' && *path != '\0') {
362                         *name = *path;
363                         path++;
364                         name++;
365                         lname->ln_namelen++;
366                 }
367
368                 *name = '\0';
369                 /* find obj corresponding to fid */
370                 obj = mdd_object_find(env, mdd, f);
371                 if (obj == NULL)
372                         GOTO(out, rc = -EREMOTE);
373                 if (IS_ERR(obj))
374                         GOTO(out, rc = -PTR_ERR(obj));
375                 /* get child fid from parent and name */
376                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
377                 mdd_object_put(env, obj);
378                 if (rc)
379                         break;
380
381                 name = buf->lb_buf;
382                 lname->ln_namelen = 0;
383         }
384
385         if (!rc)
386                 *fid = *f;
387 out:
388         RETURN(rc);
389 }
390
391 /** The maximum depth that fid2path() will search.
392  * This is limited only because we want to store the fids for
393  * historical path lookup purposes.
394  */
395 #define MAX_PATH_DEPTH 100
396
397 /** mdd_path() lookup structure. */
398 struct path_lookup_info {
399         __u64                pli_recno;        /**< history point */
400         __u64                pli_currec;       /**< current record */
401         struct lu_fid        pli_fid;
402         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
403         struct mdd_object   *pli_mdd_obj;
404         char                *pli_path;         /**< full path */
405         int                  pli_pathlen;
406         int                  pli_linkno;       /**< which hardlink to follow */
407         int                  pli_fidcount;     /**< number of \a pli_fids */
408 };
409
410 static int mdd_path_current(const struct lu_env *env,
411                             struct path_lookup_info *pli)
412 {
413         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
414         struct mdd_object *mdd_obj;
415         struct lu_buf     *buf = NULL;
416         struct link_ea_header *leh;
417         struct link_ea_entry  *lee;
418         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
419         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
420         char *ptr;
421         int reclen;
422         int rc;
423         ENTRY;
424
425         ptr = pli->pli_path + pli->pli_pathlen - 1;
426         *ptr = 0;
427         --ptr;
428         pli->pli_fidcount = 0;
429         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
430
431         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
432                 mdd_obj = mdd_object_find(env, mdd,
433                                           &pli->pli_fids[pli->pli_fidcount]);
434                 if (mdd_obj == NULL)
435                         GOTO(out, rc = -EREMOTE);
436                 if (IS_ERR(mdd_obj))
437                         GOTO(out, rc = -PTR_ERR(mdd_obj));
438                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
439                 if (rc <= 0) {
440                         mdd_object_put(env, mdd_obj);
441                         if (rc == -1)
442                                 rc = -EREMOTE;
443                         else if (rc == 0)
444                                 /* Do I need to error out here? */
445                                 rc = -ENOENT;
446                         GOTO(out, rc);
447                 }
448
449                 /* Get parent fid and object name */
450                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
451                 buf = mdd_links_get(env, mdd_obj);
452                 mdd_read_unlock(env, mdd_obj);
453                 mdd_object_put(env, mdd_obj);
454                 if (IS_ERR(buf))
455                         GOTO(out, rc = PTR_ERR(buf));
456
457                 leh = buf->lb_buf;
458                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
459                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
460
461                 /* If set, use link #linkno for path lookup, otherwise use
462                    link #0.  Only do this for the final path element. */
463                 if ((pli->pli_fidcount == 0) &&
464                     (pli->pli_linkno < leh->leh_reccount)) {
465                         int count;
466                         for (count = 0; count < pli->pli_linkno; count++) {
467                                 lee = (struct link_ea_entry *)
468                                      ((char *)lee + reclen);
469                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
470                         }
471                         if (pli->pli_linkno < leh->leh_reccount - 1)
472                                 /* indicate to user there are more links */
473                                 pli->pli_linkno++;
474                 }
475
476                 /* Pack the name in the end of the buffer */
477                 ptr -= tmpname->ln_namelen;
478                 if (ptr - 1 <= pli->pli_path)
479                         GOTO(out, rc = -EOVERFLOW);
480                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
481                 *(--ptr) = '/';
482
483                 /* Store the parent fid for historic lookup */
484                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
485                         GOTO(out, rc = -EOVERFLOW);
486                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
487         }
488
489         /* Verify that our path hasn't changed since we started the lookup.
490            Record the current index, and verify the path resolves to the
491            same fid. If it does, then the path is correct as of this index. */
492         spin_lock(&mdd->mdd_cl.mc_lock);
493         pli->pli_currec = mdd->mdd_cl.mc_index;
494         spin_unlock(&mdd->mdd_cl.mc_lock);
495         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
496         if (rc) {
497                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
498                 GOTO (out, rc = -EAGAIN);
499         }
500         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
501                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
502                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
503                        PFID(&pli->pli_fid));
504                 GOTO(out, rc = -EAGAIN);
505         }
506
507         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
508
509         EXIT;
510 out:
511         if (buf && !IS_ERR(buf) && buf->lb_vmalloc)
512                 /* if we vmalloced a large buffer drop it */
513                 mdd_buf_put(buf);
514
515         return rc;
516 }
517
518 static int mdd_path_historic(const struct lu_env *env,
519                              struct path_lookup_info *pli)
520 {
521         return 0;
522 }
523
524 /* Returns the full path to this fid, as of changelog record recno. */
525 static int mdd_path(const struct lu_env *env, struct md_object *obj,
526                     char *path, int pathlen, __u64 *recno, int *linkno)
527 {
528         struct path_lookup_info *pli;
529         int tries = 3;
530         int rc = -EAGAIN;
531         ENTRY;
532
533         if (pathlen < 3)
534                 RETURN(-EOVERFLOW);
535
536         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
537                 path[0] = '/';
538                 path[1] = '\0';
539                 RETURN(0);
540         }
541
542         OBD_ALLOC_PTR(pli);
543         if (pli == NULL)
544                 RETURN(-ENOMEM);
545
546         pli->pli_mdd_obj = md2mdd_obj(obj);
547         pli->pli_recno = *recno;
548         pli->pli_path = path;
549         pli->pli_pathlen = pathlen;
550         pli->pli_linkno = *linkno;
551
552         /* Retry multiple times in case file is being moved */
553         while (tries-- && rc == -EAGAIN)
554                 rc = mdd_path_current(env, pli);
555
556         /* For historical path lookup, the current links may not have existed
557          * at "recno" time.  We must switch over to earlier links/parents
558          * by using the changelog records.  If the earlier parent doesn't
559          * exist, we must search back through the changelog to reconstruct
560          * its parents, then check if it exists, etc.
561          * We may ignore this problem for the initial implementation and
562          * state that an "original" hardlink must still exist for us to find
563          * historic path name. */
564         if (pli->pli_recno != -1) {
565                 rc = mdd_path_historic(env, pli);
566         } else {
567                 *recno = pli->pli_currec;
568                 /* Return next link index to caller */
569                 *linkno = pli->pli_linkno;
570         }
571
572         OBD_FREE_PTR(pli);
573
574         RETURN (rc);
575 }
576
577 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
578 {
579         struct lu_attr *la = &mdd_env_info(env)->mti_la;
580         int rc;
581
582         ENTRY;
583         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
584         if (rc == 0) {
585                 mdd_flags_xlate(obj, la->la_flags);
586                 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
587                         obj->mod_flags |= MNLINK_OBJ;
588         }
589         RETURN(rc);
590 }
591
592 /* get only inode attributes */
593 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
594                   struct md_attr *ma)
595 {
596         int rc = 0;
597         ENTRY;
598
599         if (ma->ma_valid & MA_INODE)
600                 RETURN(0);
601
602         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
603                           mdd_object_capa(env, mdd_obj));
604         if (rc == 0)
605                 ma->ma_valid |= MA_INODE;
606         RETURN(rc);
607 }
608
609 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm,
610                        int *size)
611 {
612         struct lov_desc *ldesc;
613         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
614         ENTRY;
615
616         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
617         LASSERT(ldesc != NULL);
618
619         if (!lmm)
620                 RETURN(0);
621
622         lmm->lmm_magic = LOV_MAGIC_V1;
623         lmm->lmm_object_gr = LOV_OBJECT_GROUP_DEFAULT;
624         lmm->lmm_pattern = ldesc->ld_pattern;
625         lmm->lmm_stripe_size = ldesc->ld_default_stripe_size;
626         lmm->lmm_stripe_count = ldesc->ld_default_stripe_count;
627         *size = sizeof(struct lov_mds_md);
628
629         RETURN(sizeof(struct lov_mds_md));
630 }
631
632 /* get lov EA only */
633 static int __mdd_lmm_get(const struct lu_env *env,
634                          struct mdd_object *mdd_obj, struct md_attr *ma)
635 {
636         int rc;
637         ENTRY;
638
639         if (ma->ma_valid & MA_LOV)
640                 RETURN(0);
641
642         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
643                         XATTR_NAME_LOV);
644
645         if (rc == 0 && (ma->ma_need & MA_LOV_DEF)) {
646                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm,
647                                 &ma->ma_lmm_size);
648         }
649
650         if (rc > 0) {
651                 ma->ma_valid |= MA_LOV;
652                 rc = 0;
653         }
654         RETURN(rc);
655 }
656
657 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
658                        struct md_attr *ma)
659 {
660         int rc;
661         ENTRY;
662
663         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
664         rc = __mdd_lmm_get(env, mdd_obj, ma);
665         mdd_read_unlock(env, mdd_obj);
666         RETURN(rc);
667 }
668
669 /* get lmv EA only*/
670 static int __mdd_lmv_get(const struct lu_env *env,
671                          struct mdd_object *mdd_obj, struct md_attr *ma)
672 {
673         int rc;
674         ENTRY;
675
676         if (ma->ma_valid & MA_LMV)
677                 RETURN(0);
678
679         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
680                         XATTR_NAME_LMV);
681         if (rc > 0) {
682                 ma->ma_valid |= MA_LMV;
683                 rc = 0;
684         }
685         RETURN(rc);
686 }
687
688 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
689                          struct md_attr *ma)
690 {
691         struct mdd_thread_info *info = mdd_env_info(env);
692         struct lustre_mdt_attrs *lma =
693                                  (struct lustre_mdt_attrs *)info->mti_xattr_buf;
694         int lma_size;
695         int rc;
696         ENTRY;
697
698         /* If all needed data are already valid, nothing to do */
699         if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
700             (ma->ma_need & (MA_HSM | MA_SOM)))
701                 RETURN(0);
702
703         /* Read LMA from disk EA */
704         lma_size = sizeof(info->mti_xattr_buf);
705         rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
706         if (rc <= 0)
707                 RETURN(rc);
708
709         /* Useless to check LMA incompatibility because this is already done in
710          * osd_ea_fid_get(), and this will fail long before this code is
711          * called.
712          * So, if we are here, LMA is compatible.
713          */
714
715         lustre_lma_swab(lma);
716
717         /* Swab and copy LMA */
718         if (ma->ma_need & MA_HSM) {
719                 if (lma->lma_compat & LMAC_HSM)
720                         ma->ma_hsm_flags = lma->lma_flags & HSM_FLAGS_MASK;
721                 else
722                         ma->ma_hsm_flags = 0;
723                 ma->ma_valid |= MA_HSM;
724         }
725
726         /* Copy SOM */
727         if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
728                 LASSERT(ma->ma_som != NULL);
729                 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
730                 ma->ma_som->msd_size    = lma->lma_som_size;
731                 ma->ma_som->msd_blocks  = lma->lma_som_blocks;
732                 ma->ma_som->msd_mountid = lma->lma_som_mountid;
733                 ma->ma_valid |= MA_SOM;
734         }
735
736         RETURN(0);
737 }
738
739 static int mdd_attr_get_internal(const struct lu_env *env,
740                                  struct mdd_object *mdd_obj,
741                                  struct md_attr *ma)
742 {
743         int rc = 0;
744         ENTRY;
745
746         if (ma->ma_need & MA_INODE)
747                 rc = mdd_iattr_get(env, mdd_obj, ma);
748
749         if (rc == 0 && ma->ma_need & MA_LOV) {
750                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
751                     S_ISDIR(mdd_object_type(mdd_obj)))
752                         rc = __mdd_lmm_get(env, mdd_obj, ma);
753         }
754         if (rc == 0 && ma->ma_need & MA_LMV) {
755                 if (S_ISDIR(mdd_object_type(mdd_obj)))
756                         rc = __mdd_lmv_get(env, mdd_obj, ma);
757         }
758         if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
759                 if (S_ISREG(mdd_object_type(mdd_obj)))
760                         rc = __mdd_lma_get(env, mdd_obj, ma);
761         }
762 #ifdef CONFIG_FS_POSIX_ACL
763         if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
764                 if (S_ISDIR(mdd_object_type(mdd_obj)))
765                         rc = mdd_def_acl_get(env, mdd_obj, ma);
766         }
767 #endif
768         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
769                rc, ma->ma_valid);
770         RETURN(rc);
771 }
772
773 int mdd_attr_get_internal_locked(const struct lu_env *env,
774                                  struct mdd_object *mdd_obj, struct md_attr *ma)
775 {
776         int rc;
777         int needlock = ma->ma_need &
778                        (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM);
779
780         if (needlock)
781                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
782         rc = mdd_attr_get_internal(env, mdd_obj, ma);
783         if (needlock)
784                 mdd_read_unlock(env, mdd_obj);
785         return rc;
786 }
787
788 /*
789  * No permission check is needed.
790  */
791 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
792                         struct md_attr *ma)
793 {
794         struct mdd_object *mdd_obj = md2mdd_obj(obj);
795         int                rc;
796
797         ENTRY;
798         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
799         RETURN(rc);
800 }
801
802 /*
803  * No permission check is needed.
804  */
805 static int mdd_xattr_get(const struct lu_env *env,
806                          struct md_object *obj, struct lu_buf *buf,
807                          const char *name)
808 {
809         struct mdd_object *mdd_obj = md2mdd_obj(obj);
810         int rc;
811
812         ENTRY;
813
814         LASSERT(mdd_object_exists(mdd_obj));
815
816         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
817         rc = mdo_xattr_get(env, mdd_obj, buf, name,
818                            mdd_object_capa(env, mdd_obj));
819         mdd_read_unlock(env, mdd_obj);
820
821         RETURN(rc);
822 }
823
824 /*
825  * Permission check is done when open,
826  * no need check again.
827  */
828 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
829                         struct lu_buf *buf)
830 {
831         struct mdd_object *mdd_obj = md2mdd_obj(obj);
832         struct dt_object  *next;
833         loff_t             pos = 0;
834         int                rc;
835         ENTRY;
836
837         LASSERT(mdd_object_exists(mdd_obj));
838
839         next = mdd_object_child(mdd_obj);
840         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
841         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
842                                          mdd_object_capa(env, mdd_obj));
843         mdd_read_unlock(env, mdd_obj);
844         RETURN(rc);
845 }
846
847 /*
848  * No permission check is needed.
849  */
850 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
851                           struct lu_buf *buf)
852 {
853         struct mdd_object *mdd_obj = md2mdd_obj(obj);
854         int rc;
855
856         ENTRY;
857
858         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
859         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
860         mdd_read_unlock(env, mdd_obj);
861
862         RETURN(rc);
863 }
864
865 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
866                                struct mdd_object *c, struct md_attr *ma,
867                                struct thandle *handle,
868                                const struct md_op_spec *spec)
869 {
870         struct lu_attr *attr = &ma->ma_attr;
871         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
872         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
873         const struct dt_index_features *feat = spec->sp_feat;
874         int rc;
875         ENTRY;
876
877         if (!mdd_object_exists(c)) {
878                 struct dt_object *next = mdd_object_child(c);
879                 LASSERT(next);
880
881                 if (feat != &dt_directory_features && feat != NULL)
882                         dof->dof_type = DFT_INDEX;
883                 else
884                         dof->dof_type = dt_mode_to_dft(attr->la_mode);
885
886                 dof->u.dof_idx.di_feat = feat;
887
888                 /* @hint will be initialized by underlying device. */
889                 next->do_ops->do_ah_init(env, hint,
890                                          p ? mdd_object_child(p) : NULL,
891                                          attr->la_mode & S_IFMT);
892
893                 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
894                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
895         } else
896                 rc = -EEXIST;
897
898         RETURN(rc);
899 }
900
901 /**
902  * Make sure the ctime is increased only.
903  */
904 static inline int mdd_attr_check(const struct lu_env *env,
905                                  struct mdd_object *obj,
906                                  struct lu_attr *attr)
907 {
908         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
909         int rc;
910         ENTRY;
911
912         if (attr->la_valid & LA_CTIME) {
913                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
914                 if (rc)
915                         RETURN(rc);
916
917                 if (attr->la_ctime < tmp_la->la_ctime)
918                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
919                 else if (attr->la_valid == LA_CTIME &&
920                          attr->la_ctime == tmp_la->la_ctime)
921                         attr->la_valid &= ~LA_CTIME;
922         }
923         RETURN(0);
924 }
925
926 int mdd_attr_set_internal(const struct lu_env *env,
927                           struct mdd_object *obj,
928                           struct lu_attr *attr,
929                           struct thandle *handle,
930                           int needacl)
931 {
932         int rc;
933         ENTRY;
934
935         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
936 #ifdef CONFIG_FS_POSIX_ACL
937         if (!rc && (attr->la_valid & LA_MODE) && needacl)
938                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
939 #endif
940         RETURN(rc);
941 }
942
943 int mdd_attr_check_set_internal(const struct lu_env *env,
944                                 struct mdd_object *obj,
945                                 struct lu_attr *attr,
946                                 struct thandle *handle,
947                                 int needacl)
948 {
949         int rc;
950         ENTRY;
951
952         rc = mdd_attr_check(env, obj, attr);
953         if (rc)
954                 RETURN(rc);
955
956         if (attr->la_valid)
957                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
958         RETURN(rc);
959 }
960
961 static int mdd_attr_set_internal_locked(const struct lu_env *env,
962                                         struct mdd_object *obj,
963                                         struct lu_attr *attr,
964                                         struct thandle *handle,
965                                         int needacl)
966 {
967         int rc;
968         ENTRY;
969
970         needacl = needacl && (attr->la_valid & LA_MODE);
971         if (needacl)
972                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
973         rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
974         if (needacl)
975                 mdd_write_unlock(env, obj);
976         RETURN(rc);
977 }
978
979 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
980                                        struct mdd_object *obj,
981                                        struct lu_attr *attr,
982                                        struct thandle *handle,
983                                        int needacl)
984 {
985         int rc;
986         ENTRY;
987
988         needacl = needacl && (attr->la_valid & LA_MODE);
989         if (needacl)
990                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
991         rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
992         if (needacl)
993                 mdd_write_unlock(env, obj);
994         RETURN(rc);
995 }
996
997 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
998                     const struct lu_buf *buf, const char *name,
999                     int fl, struct thandle *handle)
1000 {
1001         struct lustre_capa *capa = mdd_object_capa(env, obj);
1002         int rc = -EINVAL;
1003         ENTRY;
1004
1005         if (buf->lb_buf && buf->lb_len > 0)
1006                 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1007         else if (buf->lb_buf == NULL && buf->lb_len == 0)
1008                 rc = mdo_xattr_del(env, obj, name, handle, capa);
1009
1010         RETURN(rc);
1011 }
1012
1013 /*
1014  * This gives the same functionality as the code between
1015  * sys_chmod and inode_setattr
1016  * chown_common and inode_setattr
1017  * utimes and inode_setattr
1018  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1019  */
1020 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1021                         struct lu_attr *la, const struct md_attr *ma)
1022 {
1023         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
1024         struct md_ucred  *uc;
1025         int               rc;
1026         ENTRY;
1027
1028         if (!la->la_valid)
1029                 RETURN(0);
1030
1031         /* Do not permit change file type */
1032         if (la->la_valid & LA_TYPE)
1033                 RETURN(-EPERM);
1034
1035         /* They should not be processed by setattr */
1036         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1037                 RETURN(-EPERM);
1038
1039         /* export destroy does not have ->le_ses, but we may want
1040          * to drop LUSTRE_SOM_FL. */
1041         if (!env->le_ses)
1042                 RETURN(0);
1043
1044         uc = md_ucred(env);
1045
1046         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1047         if (rc)
1048                 RETURN(rc);
1049
1050         if (la->la_valid == LA_CTIME) {
1051                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1052                         /* This is only for set ctime when rename's source is
1053                          * on remote MDS. */
1054                         rc = mdd_may_delete(env, NULL, obj,
1055                                             (struct md_attr *)ma, 1, 0);
1056                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1057                         la->la_valid &= ~LA_CTIME;
1058                 RETURN(rc);
1059         }
1060
1061         if (la->la_valid == LA_ATIME) {
1062                 /* This is atime only set for read atime update on close. */
1063                 if (la->la_atime <= tmp_la->la_atime +
1064                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff)
1065                         la->la_valid &= ~LA_ATIME;
1066                 RETURN(0);
1067         }
1068
1069         /* Check if flags change. */
1070         if (la->la_valid & LA_FLAGS) {
1071                 unsigned int oldflags = 0;
1072                 unsigned int newflags = la->la_flags &
1073                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1074
1075                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1076                     !mdd_capable(uc, CFS_CAP_FOWNER))
1077                         RETURN(-EPERM);
1078
1079                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1080                  * only be changed by the relevant capability. */
1081                 if (mdd_is_immutable(obj))
1082                         oldflags |= LUSTRE_IMMUTABLE_FL;
1083                 if (mdd_is_append(obj))
1084                         oldflags |= LUSTRE_APPEND_FL;
1085                 if ((oldflags ^ newflags) &&
1086                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1087                         RETURN(-EPERM);
1088
1089                 if (!S_ISDIR(tmp_la->la_mode))
1090                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1091         }
1092
1093         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1094             (la->la_valid & ~LA_FLAGS) &&
1095             !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1096                 RETURN(-EPERM);
1097
1098         /* Check for setting the obj time. */
1099         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1100             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1101                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1102                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
1103                         rc = mdd_permission_internal_locked(env, obj, tmp_la,
1104                                                             MAY_WRITE,
1105                                                             MOR_TGT_CHILD);
1106                         if (rc)
1107                                 RETURN(rc);
1108                 }
1109         }
1110
1111         /* Make sure a caller can chmod. */
1112         if (la->la_valid & LA_MODE) {
1113                 /* Bypass la_vaild == LA_MODE,
1114                  * this is for changing file with SUID or SGID. */
1115                 if ((la->la_valid & ~LA_MODE) &&
1116                     !(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1117                     (uc->mu_fsuid != tmp_la->la_uid) &&
1118                     !mdd_capable(uc, CFS_CAP_FOWNER))
1119                         RETURN(-EPERM);
1120
1121                 if (la->la_mode == (umode_t) -1)
1122                         la->la_mode = tmp_la->la_mode;
1123                 else
1124                         la->la_mode = (la->la_mode & S_IALLUGO) |
1125                                       (tmp_la->la_mode & ~S_IALLUGO);
1126
1127                 /* Also check the setgid bit! */
1128                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1129                                        la->la_gid : tmp_la->la_gid) &&
1130                     !mdd_capable(uc, CFS_CAP_FSETID))
1131                         la->la_mode &= ~S_ISGID;
1132         } else {
1133                la->la_mode = tmp_la->la_mode;
1134         }
1135
1136         /* Make sure a caller can chown. */
1137         if (la->la_valid & LA_UID) {
1138                 if (la->la_uid == (uid_t) -1)
1139                         la->la_uid = tmp_la->la_uid;
1140                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1141                     (la->la_uid != tmp_la->la_uid)) &&
1142                     !mdd_capable(uc, CFS_CAP_CHOWN))
1143                         RETURN(-EPERM);
1144
1145                 /* If the user or group of a non-directory has been
1146                  * changed by a non-root user, remove the setuid bit.
1147                  * 19981026 David C Niemi <niemi@tux.org>
1148                  *
1149                  * Changed this to apply to all users, including root,
1150                  * to avoid some races. This is the behavior we had in
1151                  * 2.0. The check for non-root was definitely wrong
1152                  * for 2.2 anyway, as it should have been using
1153                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
1154                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1155                     !S_ISDIR(tmp_la->la_mode)) {
1156                         la->la_mode &= ~S_ISUID;
1157                         la->la_valid |= LA_MODE;
1158                 }
1159         }
1160
1161         /* Make sure caller can chgrp. */
1162         if (la->la_valid & LA_GID) {
1163                 if (la->la_gid == (gid_t) -1)
1164                         la->la_gid = tmp_la->la_gid;
1165                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1166                     ((la->la_gid != tmp_la->la_gid) &&
1167                     !lustre_in_group_p(uc, la->la_gid))) &&
1168                     !mdd_capable(uc, CFS_CAP_CHOWN))
1169                         RETURN(-EPERM);
1170
1171                 /* Likewise, if the user or group of a non-directory
1172                  * has been changed by a non-root user, remove the
1173                  * setgid bit UNLESS there is no group execute bit
1174                  * (this would be a file marked for mandatory
1175                  * locking).  19981026 David C Niemi <niemi@tux.org>
1176                  *
1177                  * Removed the fsuid check (see the comment above) --
1178                  * 19990830 SD. */
1179                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1180                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1181                         la->la_mode &= ~S_ISGID;
1182                         la->la_valid |= LA_MODE;
1183                 }
1184         }
1185
1186         /* For both Size-on-MDS case and truncate case,
1187          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1188          * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1189          * For SOM case, it is true, the MAY_WRITE perm has been checked
1190          * when open, no need check again. For truncate case, it is false,
1191          * the MAY_WRITE perm should be checked here. */
1192         if (ma->ma_attr_flags & MDS_SOM) {
1193                 /* For the "Size-on-MDS" setattr update, merge coming
1194                  * attributes with the set in the inode. BUG 10641 */
1195                 if ((la->la_valid & LA_ATIME) &&
1196                     (la->la_atime <= tmp_la->la_atime))
1197                         la->la_valid &= ~LA_ATIME;
1198
1199                 /* OST attributes do not have a priority over MDS attributes,
1200                  * so drop times if ctime is equal. */
1201                 if ((la->la_valid & LA_CTIME) &&
1202                     (la->la_ctime <= tmp_la->la_ctime))
1203                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1204         } else {
1205                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1206                         if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1207                               (uc->mu_fsuid == tmp_la->la_uid)) &&
1208                             !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1209                                 rc = mdd_permission_internal_locked(env, obj,
1210                                                             tmp_la, MAY_WRITE,
1211                                                             MOR_TGT_CHILD);
1212                                 if (rc)
1213                                         RETURN(rc);
1214                         }
1215                 }
1216                 if (la->la_valid & LA_CTIME) {
1217                         /* The pure setattr, it has the priority over what is
1218                          * already set, do not drop it if ctime is equal. */
1219                         if (la->la_ctime < tmp_la->la_ctime)
1220                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1221                                                   LA_CTIME);
1222                 }
1223         }
1224
1225         RETURN(0);
1226 }
1227
1228 /** Store a data change changelog record
1229  * If this fails, we must fail the whole transaction; we don't
1230  * want the change to commit without the log entry.
1231  * \param mdd_obj - mdd_object of change
1232  * \param handle - transacion handle
1233  */
1234 static int mdd_changelog_data_store(const struct lu_env     *env,
1235                                     struct mdd_device       *mdd,
1236                                     enum changelog_rec_type type,
1237                                     struct mdd_object       *mdd_obj,
1238                                     struct thandle          *handle)
1239 {
1240         const struct lu_fid *tfid = mdo2fid(mdd_obj);
1241         struct llog_changelog_rec *rec;
1242         struct lu_buf *buf;
1243         int reclen;
1244         int rc;
1245
1246         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1247                 RETURN(0);
1248
1249         LASSERT(handle != NULL);
1250         LASSERT(mdd_obj != NULL);
1251
1252         if ((type == CL_SETATTR) &&
1253             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1254                 /* Don't need multiple updates in this log */
1255                 /* Don't check under lock - no big deal if we get an extra
1256                    entry */
1257                 RETURN(0);
1258         }
1259
1260         reclen = llog_data_len(sizeof(*rec));
1261         buf = mdd_buf_alloc(env, reclen);
1262         if (buf->lb_buf == NULL)
1263                 RETURN(-ENOMEM);
1264         rec = (struct llog_changelog_rec *)buf->lb_buf;
1265
1266         rec->cr.cr_flags = CLF_VERSION;
1267         rec->cr.cr_type = (__u32)type;
1268         rec->cr.cr_tfid = *tfid;
1269         rec->cr.cr_namelen = 0;
1270         mdd_obj->mod_cltime = cfs_time_current_64();
1271
1272         rc = mdd_changelog_llog_write(mdd, rec, handle);
1273         if (rc < 0) {
1274                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1275                        rc, type, PFID(tfid));
1276                 return -EFAULT;
1277         }
1278
1279         return 0;
1280 }
1281
1282 /**
1283  * Should be called with write lock held.
1284  *
1285  * \see mdd_lma_set_locked().
1286  */
1287 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1288                        const struct md_attr *ma, struct thandle *handle)
1289 {
1290         struct mdd_thread_info *info = mdd_env_info(env);
1291         struct lu_buf *buf;
1292         struct lustre_mdt_attrs *lma =
1293                                 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1294         int lmasize = sizeof(struct lustre_mdt_attrs);
1295         int rc = 0;
1296
1297         ENTRY;
1298
1299         /* Either HSM or SOM part is not valid, we need to read it before */
1300         if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1301                 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1302                 if (rc <= 0)
1303                         RETURN(rc);
1304
1305                 lustre_lma_swab(lma);
1306         } else {
1307                 memset(lma, 0, lmasize);
1308         }
1309
1310         /* Copy HSM data */
1311         if (ma->ma_valid & MA_HSM) {
1312                 lma->lma_flags  |= ma->ma_hsm_flags & HSM_FLAGS_MASK;
1313                 lma->lma_compat |= LMAC_HSM;
1314         }
1315
1316         /* Copy SOM data */
1317         if (ma->ma_valid & MA_SOM) {
1318                 LASSERT(ma->ma_som != NULL);
1319                 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1320                         lma->lma_compat     &= ~LMAC_SOM;
1321                 } else {
1322                         lma->lma_compat     |= LMAC_SOM;
1323                         lma->lma_ioepoch     = ma->ma_som->msd_ioepoch;
1324                         lma->lma_som_size    = ma->ma_som->msd_size;
1325                         lma->lma_som_blocks  = ma->ma_som->msd_blocks;
1326                         lma->lma_som_mountid = ma->ma_som->msd_mountid;
1327                 }
1328         }
1329
1330         /* Copy FID */
1331         memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1332
1333         lustre_lma_swab(lma);
1334         buf = mdd_buf_get(env, lma, lmasize);
1335         rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1336
1337         RETURN(rc);
1338 }
1339
1340 /**
1341  * Save LMA extended attributes with data from \a ma.
1342  *
1343  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1344  * not, LMA EA will be first read from disk, modified and write back.
1345  *
1346  */
1347 static int mdd_lma_set_locked(const struct lu_env *env,
1348                               struct mdd_object *mdd_obj,
1349                               const struct md_attr *ma, struct thandle *handle)
1350 {
1351         int rc;
1352
1353         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1354         rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1355         mdd_write_unlock(env, mdd_obj);
1356         return rc;
1357 }
1358
1359 /* set attr and LOV EA at once, return updated attr */
1360 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1361                         const struct md_attr *ma)
1362 {
1363         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1364         struct mdd_device *mdd = mdo2mdd(obj);
1365         struct thandle *handle;
1366         struct lov_mds_md *lmm = NULL;
1367         struct llog_cookie *logcookies = NULL;
1368         int  rc, lmm_size = 0, cookie_size = 0;
1369         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1370 #ifdef HAVE_QUOTA_SUPPORT
1371         struct obd_device *obd = mdd->mdd_obd_dev;
1372         struct obd_export *exp = md_quota(env)->mq_exp;
1373         struct mds_obd *mds = &obd->u.mds;
1374         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1375         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1376         int quota_opc = 0, block_count = 0;
1377         int inode_pending[MAXQUOTAS] = { 0, 0 };
1378         int block_pending[MAXQUOTAS] = { 0, 0 };
1379 #endif
1380         ENTRY;
1381
1382         mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1383                                     MDD_TXN_ATTR_SET_OP);
1384         handle = mdd_trans_start(env, mdd);
1385         if (IS_ERR(handle))
1386                 RETURN(PTR_ERR(handle));
1387         /*TODO: add lock here*/
1388         /* start a log jounal handle if needed */
1389         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1390             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1391                 lmm_size = mdd_lov_mdsize(env, mdd);
1392                 lmm = mdd_max_lmm_get(env, mdd);
1393                 if (lmm == NULL)
1394                         GOTO(cleanup, rc = -ENOMEM);
1395
1396                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1397                                 XATTR_NAME_LOV);
1398
1399                 if (rc < 0)
1400                         GOTO(cleanup, rc);
1401         }
1402
1403         if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1404                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1405                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1406
1407         *la_copy = ma->ma_attr;
1408         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1409         if (rc)
1410                 GOTO(cleanup, rc);
1411
1412 #ifdef HAVE_QUOTA_SUPPORT
1413         if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1414                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1415
1416                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1417                 if (!rc) {
1418                         quota_opc = FSFILT_OP_SETATTR;
1419                         mdd_quota_wrapper(la_copy, qnids);
1420                         mdd_quota_wrapper(la_tmp, qoids);
1421                         /* get file quota for new owner */
1422                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1423                                         qnids, inode_pending, 1, NULL, 0,
1424                                         NULL, 0);
1425                         block_count = (la_tmp->la_blocks + 7) >> 3;
1426                         if (block_count) {
1427                                 void *data = NULL;
1428                                 mdd_data_get(env, mdd_obj, &data);
1429                                 /* get block quota for new owner */
1430                                 lquota_chkquota(mds_quota_interface_ref, obd,
1431                                                 exp, qnids, block_pending,
1432                                                 block_count, NULL,
1433                                                 LQUOTA_FLAGS_BLK, data, 1);
1434                         }
1435                 }
1436         }
1437 #endif
1438
1439         if (la_copy->la_valid & LA_FLAGS) {
1440                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1441                                                   handle, 1);
1442                 if (rc == 0)
1443                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1444         } else if (la_copy->la_valid) {            /* setattr */
1445                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1446                                                   handle, 1);
1447                 /* journal chown/chgrp in llog, just like unlink */
1448                 if (rc == 0 && lmm_size){
1449                         cookie_size = mdd_lov_cookiesize(env, mdd);
1450                         logcookies = mdd_max_cookie_get(env, mdd);
1451                         if (logcookies == NULL)
1452                                 GOTO(cleanup, rc = -ENOMEM);
1453
1454                         if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1455                                             logcookies, cookie_size) <= 0)
1456                                 logcookies = NULL;
1457                 }
1458         }
1459
1460         if (rc == 0 && ma->ma_valid & MA_LOV) {
1461                 umode_t mode;
1462
1463                 mode = mdd_object_type(mdd_obj);
1464                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1465                         rc = mdd_lsm_sanity_check(env, mdd_obj);
1466                         if (rc)
1467                                 GOTO(cleanup, rc);
1468
1469                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1470                                             ma->ma_lmm_size, handle, 1);
1471                 }
1472
1473         }
1474         if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1475                 umode_t mode;
1476
1477                 mode = mdd_object_type(mdd_obj);
1478                 if (S_ISREG(mode))
1479                         rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1480
1481         }
1482 cleanup:
1483         if ((rc == 0) && (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME)))
1484                 rc = mdd_changelog_data_store(env, mdd, CL_SETATTR, mdd_obj,
1485                                               handle);
1486         mdd_trans_stop(env, mdd, rc, handle);
1487         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1488                 /*set obd attr, if needed*/
1489                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1490                                            logcookies);
1491         }
1492 #ifdef HAVE_QUOTA_SUPPORT
1493         if (quota_opc) {
1494                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1495                                       inode_pending, 0);
1496                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1497                                       block_pending, 1);
1498                 /* Trigger dqrel/dqacq for original owner and new owner.
1499                  * If failed, the next call for lquota_chkquota will
1500                  * process it. */
1501                 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1502                               quota_opc);
1503         }
1504 #endif
1505         RETURN(rc);
1506 }
1507
1508 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1509                       const struct lu_buf *buf, const char *name, int fl,
1510                       struct thandle *handle)
1511 {
1512         int  rc;
1513         ENTRY;
1514
1515         mdd_write_lock(env, obj, MOR_TGT_CHILD);
1516         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1517         mdd_write_unlock(env, obj);
1518
1519         RETURN(rc);
1520 }
1521
1522 static int mdd_xattr_sanity_check(const struct lu_env *env,
1523                                   struct mdd_object *obj)
1524 {
1525         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1526         struct md_ucred *uc     = md_ucred(env);
1527         int rc;
1528         ENTRY;
1529
1530         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1531                 RETURN(-EPERM);
1532
1533         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1534         if (rc)
1535                 RETURN(rc);
1536
1537         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1538             !mdd_capable(uc, CFS_CAP_FOWNER))
1539                 RETURN(-EPERM);
1540
1541         RETURN(rc);
1542 }
1543
1544 /**
1545  * The caller should guarantee to update the object ctime
1546  * after xattr_set if needed.
1547  */
1548 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1549                          const struct lu_buf *buf, const char *name,
1550                          int fl)
1551 {
1552         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1553         struct mdd_device *mdd = mdo2mdd(obj);
1554         struct thandle *handle;
1555         int  rc;
1556         ENTRY;
1557
1558         rc = mdd_xattr_sanity_check(env, mdd_obj);
1559         if (rc)
1560                 RETURN(rc);
1561
1562         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1563         /* security-replated changes may require sync */
1564         if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1565             mdd->mdd_sync_permission == 1)
1566                 txn_param_sync(&mdd_env_info(env)->mti_param);
1567
1568         handle = mdd_trans_start(env, mdd);
1569         if (IS_ERR(handle))
1570                 RETURN(PTR_ERR(handle));
1571
1572         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1573
1574         /* Only record user xattr changes */
1575         if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1576             (strncmp("user.", name, 5) == 0))
1577                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1578                                               handle);
1579         mdd_trans_stop(env, mdd, rc, handle);
1580
1581         RETURN(rc);
1582 }
1583
1584 /**
1585  * The caller should guarantee to update the object ctime
1586  * after xattr_set if needed.
1587  */
1588 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1589                   const char *name)
1590 {
1591         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1592         struct mdd_device *mdd = mdo2mdd(obj);
1593         struct thandle *handle;
1594         int  rc;
1595         ENTRY;
1596
1597         rc = mdd_xattr_sanity_check(env, mdd_obj);
1598         if (rc)
1599                 RETURN(rc);
1600
1601         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1602         handle = mdd_trans_start(env, mdd);
1603         if (IS_ERR(handle))
1604                 RETURN(PTR_ERR(handle));
1605
1606         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1607         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1608                            mdd_object_capa(env, mdd_obj));
1609         mdd_write_unlock(env, mdd_obj);
1610
1611         /* Only record user xattr changes */
1612         if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1613             (strncmp("user.", name, 5) != 0))
1614                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1615                                               handle);
1616
1617         mdd_trans_stop(env, mdd, rc, handle);
1618
1619         RETURN(rc);
1620 }
1621
1622 /* partial unlink */
1623 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1624                        struct md_attr *ma)
1625 {
1626         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1627         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1628         struct mdd_device *mdd = mdo2mdd(obj);
1629         struct thandle *handle;
1630 #ifdef HAVE_QUOTA_SUPPORT
1631         struct obd_device *obd = mdd->mdd_obd_dev;
1632         struct mds_obd *mds = &obd->u.mds;
1633         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1634         int quota_opc = 0;
1635 #endif
1636         int rc;
1637         ENTRY;
1638
1639         /*
1640          * Check -ENOENT early here because we need to get object type
1641          * to calculate credits before transaction start
1642          */
1643         if (!mdd_object_exists(mdd_obj))
1644                 RETURN(-ENOENT);
1645
1646         LASSERT(mdd_object_exists(mdd_obj) > 0);
1647
1648         rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1649         if (rc)
1650                 RETURN(rc);
1651
1652         handle = mdd_trans_start(env, mdd);
1653         if (IS_ERR(handle))
1654                 RETURN(-ENOMEM);
1655
1656         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1657
1658         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1659         if (rc)
1660                 GOTO(cleanup, rc);
1661
1662         __mdd_ref_del(env, mdd_obj, handle, 0);
1663
1664         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1665                 /* unlink dot */
1666                 __mdd_ref_del(env, mdd_obj, handle, 1);
1667         }
1668
1669         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1670         la_copy->la_ctime = ma->ma_attr.la_ctime;
1671
1672         la_copy->la_valid = LA_CTIME;
1673         rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1674         if (rc)
1675                 GOTO(cleanup, rc);
1676
1677         rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1678 #ifdef HAVE_QUOTA_SUPPORT
1679         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1680             ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1681                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1682                 mdd_quota_wrapper(&ma->ma_attr, qids);
1683         }
1684 #endif
1685
1686
1687         EXIT;
1688 cleanup:
1689         mdd_write_unlock(env, mdd_obj);
1690         mdd_trans_stop(env, mdd, rc, handle);
1691 #ifdef HAVE_QUOTA_SUPPORT
1692         if (quota_opc)
1693                 /* Trigger dqrel on the owner of child. If failed,
1694                  * the next call for lquota_chkquota will process it */
1695                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1696                               quota_opc);
1697 #endif
1698         return rc;
1699 }
1700
1701 /* partial operation */
1702 static int mdd_oc_sanity_check(const struct lu_env *env,
1703                                struct mdd_object *obj,
1704                                struct md_attr *ma)
1705 {
1706         int rc;
1707         ENTRY;
1708
1709         switch (ma->ma_attr.la_mode & S_IFMT) {
1710         case S_IFREG:
1711         case S_IFDIR:
1712         case S_IFLNK:
1713         case S_IFCHR:
1714         case S_IFBLK:
1715         case S_IFIFO:
1716         case S_IFSOCK:
1717                 rc = 0;
1718                 break;
1719         default:
1720                 rc = -EINVAL;
1721                 break;
1722         }
1723         RETURN(rc);
1724 }
1725
1726 static int mdd_object_create(const struct lu_env *env,
1727                              struct md_object *obj,
1728                              const struct md_op_spec *spec,
1729                              struct md_attr *ma)
1730 {
1731
1732         struct mdd_device *mdd = mdo2mdd(obj);
1733         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1734         const struct lu_fid *pfid = spec->u.sp_pfid;
1735         struct thandle *handle;
1736 #ifdef HAVE_QUOTA_SUPPORT
1737         struct obd_device *obd = mdd->mdd_obd_dev;
1738         struct obd_export *exp = md_quota(env)->mq_exp;
1739         struct mds_obd *mds = &obd->u.mds;
1740         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1741         int quota_opc = 0, block_count = 0;
1742         int inode_pending[MAXQUOTAS] = { 0, 0 };
1743         int block_pending[MAXQUOTAS] = { 0, 0 };
1744 #endif
1745         int rc = 0;
1746         ENTRY;
1747
1748 #ifdef HAVE_QUOTA_SUPPORT
1749         if (mds->mds_quota) {
1750                 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1751                 mdd_quota_wrapper(&ma->ma_attr, qids);
1752                 /* get file quota for child */
1753                 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1754                                 qids, inode_pending, 1, NULL, 0,
1755                                 NULL, 0);
1756                 switch (ma->ma_attr.la_mode & S_IFMT) {
1757                 case S_IFLNK:
1758                 case S_IFDIR:
1759                         block_count = 2;
1760                         break;
1761                 case S_IFREG:
1762                         block_count = 1;
1763                         break;
1764                 }
1765                 /* get block quota for child */
1766                 if (block_count)
1767                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1768                                         qids, block_pending, block_count,
1769                                         NULL, LQUOTA_FLAGS_BLK, NULL, 0);
1770         }
1771 #endif
1772
1773         mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
1774         handle = mdd_trans_start(env, mdd);
1775         if (IS_ERR(handle))
1776                 GOTO(out_pending, rc = PTR_ERR(handle));
1777
1778         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1779         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1780         if (rc)
1781                 GOTO(unlock, rc);
1782
1783         rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1784         if (rc)
1785                 GOTO(unlock, rc);
1786
1787         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1788                 /* If creating the slave object, set slave EA here. */
1789                 int lmv_size = spec->u.sp_ea.eadatalen;
1790                 struct lmv_stripe_md *lmv;
1791
1792                 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1793                 LASSERT(lmv != NULL && lmv_size > 0);
1794
1795                 rc = __mdd_xattr_set(env, mdd_obj,
1796                                      mdd_buf_get_const(env, lmv, lmv_size),
1797                                      XATTR_NAME_LMV, 0, handle);
1798                 if (rc)
1799                         GOTO(unlock, rc);
1800
1801                 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1802                                            handle, 0);
1803         } else {
1804 #ifdef CONFIG_FS_POSIX_ACL
1805                 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1806                         struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1807
1808                         buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1809                         buf->lb_len = spec->u.sp_ea.eadatalen;
1810                         if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1811                                 rc = __mdd_acl_init(env, mdd_obj, buf,
1812                                                     &ma->ma_attr.la_mode,
1813                                                     handle);
1814                                 if (rc)
1815                                         GOTO(unlock, rc);
1816                                 else
1817                                         ma->ma_attr.la_valid |= LA_MODE;
1818                         }
1819
1820                         pfid = spec->u.sp_ea.fid;
1821                 }
1822 #endif
1823                 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1824                                            spec);
1825         }
1826         EXIT;
1827 unlock:
1828         if (rc == 0)
1829                 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1830         mdd_write_unlock(env, mdd_obj);
1831
1832         mdd_trans_stop(env, mdd, rc, handle);
1833 out_pending:
1834 #ifdef HAVE_QUOTA_SUPPORT
1835         if (quota_opc) {
1836                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1837                                       inode_pending, 0);
1838                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1839                                       block_pending, 1);
1840                 /* Trigger dqacq on the owner of child. If failed,
1841                  * the next call for lquota_chkquota will process it. */
1842                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1843                               quota_opc);
1844         }
1845 #endif
1846         return rc;
1847 }
1848
1849 /* partial link */
1850 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1851                        const struct md_attr *ma)
1852 {
1853         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1854         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1855         struct mdd_device *mdd = mdo2mdd(obj);
1856         struct thandle *handle;
1857         int rc;
1858         ENTRY;
1859
1860         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1861         handle = mdd_trans_start(env, mdd);
1862         if (IS_ERR(handle))
1863                 RETURN(-ENOMEM);
1864
1865         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1866         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1867         if (rc == 0)
1868                 __mdd_ref_add(env, mdd_obj, handle);
1869         mdd_write_unlock(env, mdd_obj);
1870         if (rc == 0) {
1871                 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1872                 la_copy->la_ctime = ma->ma_attr.la_ctime;
1873
1874                 la_copy->la_valid = LA_CTIME;
1875                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1876                                                         handle, 0);
1877         }
1878         mdd_trans_stop(env, mdd, 0, handle);
1879
1880         RETURN(rc);
1881 }
1882
1883 /*
1884  * do NOT or the MAY_*'s, you'll get the weakest
1885  */
1886 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1887 {
1888         int res = 0;
1889
1890         /* Sadly, NFSD reopens a file repeatedly during operation, so the
1891          * "acc_mode = 0" allowance for newly-created files isn't honoured.
1892          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1893          * owner can write to a file even if it is marked readonly to hide
1894          * its brokenness. (bug 5781) */
1895         if (flags & MDS_OPEN_OWNEROVERRIDE) {
1896                 struct md_ucred *uc = md_ucred(env);
1897
1898                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1899                     (la->la_uid == uc->mu_fsuid))
1900                         return 0;
1901         }
1902
1903         if (flags & FMODE_READ)
1904                 res |= MAY_READ;
1905         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1906                 res |= MAY_WRITE;
1907         if (flags & MDS_FMODE_EXEC)
1908                 res |= MAY_EXEC;
1909         return res;
1910 }
1911
1912 static int mdd_open_sanity_check(const struct lu_env *env,
1913                                  struct mdd_object *obj, int flag)
1914 {
1915         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1916         int mode, rc;
1917         ENTRY;
1918
1919         /* EEXIST check */
1920         if (mdd_is_dead_obj(obj))
1921                 RETURN(-ENOENT);
1922
1923         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1924         if (rc)
1925                RETURN(rc);
1926
1927         if (S_ISLNK(tmp_la->la_mode))
1928                 RETURN(-ELOOP);
1929
1930         mode = accmode(env, tmp_la, flag);
1931
1932         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1933                 RETURN(-EISDIR);
1934
1935         if (!(flag & MDS_OPEN_CREATED)) {
1936                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1937                 if (rc)
1938                         RETURN(rc);
1939         }
1940
1941         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1942             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1943                 flag &= ~MDS_OPEN_TRUNC;
1944
1945         /* For writing append-only file must open it with append mode. */
1946         if (mdd_is_append(obj)) {
1947                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
1948                         RETURN(-EPERM);
1949                 if (flag & MDS_OPEN_TRUNC)
1950                         RETURN(-EPERM);
1951         }
1952
1953 #if 0
1954         /*
1955          * Now, flag -- O_NOATIME does not be packed by client.
1956          */
1957         if (flag & O_NOATIME) {
1958                 struct md_ucred *uc = md_ucred(env);
1959
1960                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
1961                     (uc->mu_valid == UCRED_NEW)) &&
1962                     (uc->mu_fsuid != tmp_la->la_uid) &&
1963                     !mdd_capable(uc, CFS_CAP_FOWNER))
1964                         RETURN(-EPERM);
1965         }
1966 #endif
1967
1968         RETURN(0);
1969 }
1970
1971 static int mdd_open(const struct lu_env *env, struct md_object *obj,
1972                     int flags)
1973 {
1974         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1975         int rc = 0;
1976
1977         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1978
1979         rc = mdd_open_sanity_check(env, mdd_obj, flags);
1980         if (rc == 0)
1981                 mdd_obj->mod_count++;
1982
1983         mdd_write_unlock(env, mdd_obj);
1984         return rc;
1985 }
1986
1987 /* return md_attr back,
1988  * if it is last unlink then return lov ea + llog cookie*/
1989 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
1990                     struct md_attr *ma)
1991 {
1992         int rc = 0;
1993         ENTRY;
1994
1995         if (S_ISREG(mdd_object_type(obj))) {
1996                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
1997                  * Caller must be ready for that. */
1998
1999                 rc = __mdd_lmm_get(env, obj, ma);
2000                 if ((ma->ma_valid & MA_LOV))
2001                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2002                                             obj, ma);
2003         }
2004         RETURN(rc);
2005 }
2006
2007 /*
2008  * No permission check is needed.
2009  */
2010 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2011                      struct md_attr *ma)
2012 {
2013         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2014         struct mdd_device *mdd = mdo2mdd(obj);
2015         struct thandle    *handle;
2016         int rc;
2017         int reset = 1;
2018
2019 #ifdef HAVE_QUOTA_SUPPORT
2020         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2021         struct mds_obd *mds = &obd->u.mds;
2022         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2023         int quota_opc = 0;
2024 #endif
2025         ENTRY;
2026
2027         rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
2028         if (rc)
2029                 RETURN(rc);
2030         handle = mdd_trans_start(env, mdo2mdd(obj));
2031         if (IS_ERR(handle))
2032                 RETURN(PTR_ERR(handle));
2033
2034         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2035         /* release open count */
2036         mdd_obj->mod_count --;
2037
2038         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2039                 /* remove link to object from orphan index */
2040                 rc = __mdd_orphan_del(env, mdd_obj, handle);
2041                 if (rc == 0) {
2042                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2043                                "list, OSS objects to be destroyed.\n",
2044                                PFID(mdd_object_fid(mdd_obj)));
2045                 } else {
2046                         CERROR("Object "DFID" can not be deleted from orphan "
2047                                 "list, maybe cause OST objects can not be "
2048                                 "destroyed (err: %d).\n",
2049                                 PFID(mdd_object_fid(mdd_obj)), rc);
2050                         /* If object was not deleted from orphan list, do not
2051                          * destroy OSS objects, which will be done when next
2052                          * recovery. */
2053                         GOTO(out, rc);
2054                 }
2055         }
2056
2057         rc = mdd_iattr_get(env, mdd_obj, ma);
2058         /* Object maybe not in orphan list originally, it is rare case for
2059          * mdd_finish_unlink() failure. */
2060         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
2061 #ifdef HAVE_QUOTA_SUPPORT
2062                 if (mds->mds_quota) {
2063                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2064                         mdd_quota_wrapper(&ma->ma_attr, qids);
2065                 }
2066 #endif
2067                 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2068                 if (ma->ma_valid & MA_FLAGS &&
2069                     ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2070                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2071                 } else {
2072                         rc = mdd_object_kill(env, mdd_obj, ma);
2073                                 if (rc == 0)
2074                                         reset = 0;
2075                 }
2076
2077                 if (rc != 0)
2078                         CERROR("Error when prepare to delete Object "DFID" , "
2079                                "which will cause OST objects can not be "
2080                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
2081         }
2082         EXIT;
2083
2084 out:
2085         if (reset)
2086                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2087
2088         mdd_write_unlock(env, mdd_obj);
2089         mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
2090 #ifdef HAVE_QUOTA_SUPPORT
2091         if (quota_opc)
2092                 /* Trigger dqrel on the owner of child. If failed,
2093                  * the next call for lquota_chkquota will process it */
2094                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2095                               quota_opc);
2096 #endif
2097         return rc;
2098 }
2099
2100 /*
2101  * Permission check is done when open,
2102  * no need check again.
2103  */
2104 static int mdd_readpage_sanity_check(const struct lu_env *env,
2105                                      struct mdd_object *obj)
2106 {
2107         struct dt_object *next = mdd_object_child(obj);
2108         int rc;
2109         ENTRY;
2110
2111         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2112                 rc = 0;
2113         else
2114                 rc = -ENOTDIR;
2115
2116         RETURN(rc);
2117 }
2118
2119 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2120                               int first, void *area, int nob,
2121                               const struct dt_it_ops *iops, struct dt_it *it,
2122                               __u64 *start, __u64 *end,
2123                               struct lu_dirent **last, __u32 attr)
2124 {
2125         int                     result;
2126         __u64                   hash = 0;
2127         struct lu_dirent       *ent;
2128
2129         if (first) {
2130                 memset(area, 0, sizeof (struct lu_dirpage));
2131                 area += sizeof (struct lu_dirpage);
2132                 nob  -= sizeof (struct lu_dirpage);
2133         }
2134
2135         ent  = area;
2136         do {
2137                 int    len;
2138                 int    recsize;
2139
2140                 len  = iops->key_size(env, it);
2141
2142                 /* IAM iterator can return record with zero len. */
2143                 if (len == 0)
2144                         goto next;
2145
2146                 hash = iops->store(env, it);
2147                 if (unlikely(first)) {
2148                         first = 0;
2149                         *start = hash;
2150                 }
2151
2152                 /* calculate max space required for lu_dirent */
2153                 recsize = lu_dirent_calc_size(len, attr);
2154
2155                 if (nob >= recsize) {
2156                         result = iops->rec(env, it, ent, attr);
2157                         if (result == -ESTALE)
2158                                 goto next;
2159                         if (result != 0)
2160                                 goto out;
2161
2162                         /* osd might not able to pack all attributes,
2163                          * so recheck rec length */
2164                         recsize = le16_to_cpu(ent->lde_reclen);
2165                 } else {
2166                         /*
2167                          * record doesn't fit into page, enlarge previous one.
2168                          */
2169                         if (*last) {
2170                                 (*last)->lde_reclen =
2171                                         cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
2172                                                         nob);
2173                                 result = 0;
2174                         } else
2175                                 result = -EINVAL;
2176
2177                         goto out;
2178                 }
2179                 *last = ent;
2180                 ent = (void *)ent + recsize;
2181                 nob -= recsize;
2182
2183 next:
2184                 result = iops->next(env, it);
2185                 if (result == -ESTALE)
2186                         goto next;
2187         } while (result == 0);
2188
2189 out:
2190         *end = hash;
2191         return result;
2192 }
2193
2194 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2195                           const struct lu_rdpg *rdpg)
2196 {
2197         struct dt_it      *it;
2198         struct dt_object  *next = mdd_object_child(obj);
2199         const struct dt_it_ops  *iops;
2200         struct page       *pg;
2201         struct lu_dirent  *last = NULL;
2202         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2203         int i;
2204         int rc;
2205         int nob;
2206         __u64 hash_start;
2207         __u64 hash_end = 0;
2208
2209         LASSERT(rdpg->rp_pages != NULL);
2210         LASSERT(next->do_index_ops != NULL);
2211
2212         if (rdpg->rp_count <= 0)
2213                 return -EFAULT;
2214
2215         /*
2216          * iterate through directory and fill pages from @rdpg
2217          */
2218         iops = &next->do_index_ops->dio_it;
2219         it = iops->init(env, next, mdd_object_capa(env, obj));
2220         if (IS_ERR(it))
2221                 return PTR_ERR(it);
2222
2223         rc = iops->load(env, it, rdpg->rp_hash);
2224
2225         if (rc == 0){
2226                 /*
2227                  * Iterator didn't find record with exactly the key requested.
2228                  *
2229                  * It is currently either
2230                  *
2231                  *     - positioned above record with key less than
2232                  *     requested---skip it.
2233                  *
2234                  *     - or not positioned at all (is in IAM_IT_SKEWED
2235                  *     state)---position it on the next item.
2236                  */
2237                 rc = iops->next(env, it);
2238         } else if (rc > 0)
2239                 rc = 0;
2240
2241         /*
2242          * At this point and across for-loop:
2243          *
2244          *  rc == 0 -> ok, proceed.
2245          *  rc >  0 -> end of directory.
2246          *  rc <  0 -> error.
2247          */
2248         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2249              i++, nob -= CFS_PAGE_SIZE) {
2250                 LASSERT(i < rdpg->rp_npages);
2251                 pg = rdpg->rp_pages[i];
2252                 rc = mdd_dir_page_build(env, mdd, !i, cfs_kmap(pg),
2253                                         min_t(int, nob, CFS_PAGE_SIZE), iops,
2254                                         it, &hash_start, &hash_end, &last,
2255                                         rdpg->rp_attrs);
2256                 if (rc != 0 || i == rdpg->rp_npages - 1) {
2257                         if (last)
2258                                 last->lde_reclen = 0;
2259                 }
2260                 cfs_kunmap(pg);
2261         }
2262         if (rc > 0) {
2263                 /*
2264                  * end of directory.
2265                  */
2266                 hash_end = DIR_END_OFF;
2267                 rc = 0;
2268         }
2269         if (rc == 0) {
2270                 struct lu_dirpage *dp;
2271
2272                 dp = cfs_kmap(rdpg->rp_pages[0]);
2273                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2274                 dp->ldp_hash_end   = cpu_to_le64(hash_end);
2275                 if (i == 0)
2276                         /*
2277                          * No pages were processed, mark this.
2278                          */
2279                         dp->ldp_flags |= LDF_EMPTY;
2280
2281                 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2282                 cfs_kunmap(rdpg->rp_pages[0]);
2283         }
2284         iops->put(env, it);
2285         iops->fini(env, it);
2286
2287         return rc;
2288 }
2289
2290 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2291                  const struct lu_rdpg *rdpg)
2292 {
2293         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2294         int rc;
2295         ENTRY;
2296
2297         LASSERT(mdd_object_exists(mdd_obj));
2298
2299         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2300         rc = mdd_readpage_sanity_check(env, mdd_obj);
2301         if (rc)
2302                 GOTO(out_unlock, rc);
2303
2304         if (mdd_is_dead_obj(mdd_obj)) {
2305                 struct page *pg;
2306                 struct lu_dirpage *dp;
2307
2308                 /*
2309                  * According to POSIX, please do not return any entry to client:
2310                  * even dot and dotdot should not be returned.
2311                  */
2312                 CWARN("readdir from dead object: "DFID"\n",
2313                         PFID(mdd_object_fid(mdd_obj)));
2314
2315                 if (rdpg->rp_count <= 0)
2316                         GOTO(out_unlock, rc = -EFAULT);
2317                 LASSERT(rdpg->rp_pages != NULL);
2318
2319                 pg = rdpg->rp_pages[0];
2320                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2321                 memset(dp, 0 , sizeof(struct lu_dirpage));
2322                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2323                 dp->ldp_hash_end   = cpu_to_le64(DIR_END_OFF);
2324                 dp->ldp_flags |= LDF_EMPTY;
2325                 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2326                 cfs_kunmap(pg);
2327                 GOTO(out_unlock, rc = 0);
2328         }
2329
2330         rc = __mdd_readpage(env, mdd_obj, rdpg);
2331
2332         EXIT;
2333 out_unlock:
2334         mdd_read_unlock(env, mdd_obj);
2335         return rc;
2336 }
2337
2338 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2339 {
2340         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2341         struct dt_object *next;
2342
2343         LASSERT(mdd_object_exists(mdd_obj));
2344         next = mdd_object_child(mdd_obj);
2345         return next->do_ops->do_object_sync(env, next);
2346 }
2347
2348 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2349                                         struct md_object *obj)
2350 {
2351         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2352
2353         LASSERT(mdd_object_exists(mdd_obj));
2354         return do_version_get(env, mdd_object_child(mdd_obj));
2355 }
2356
2357 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2358                             dt_obj_version_t version)
2359 {
2360         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2361
2362         LASSERT(mdd_object_exists(mdd_obj));
2363         return do_version_set(env, mdd_object_child(mdd_obj), version);
2364 }
2365
2366 const struct md_object_operations mdd_obj_ops = {
2367         .moo_permission    = mdd_permission,
2368         .moo_attr_get      = mdd_attr_get,
2369         .moo_attr_set      = mdd_attr_set,
2370         .moo_xattr_get     = mdd_xattr_get,
2371         .moo_xattr_set     = mdd_xattr_set,
2372         .moo_xattr_list    = mdd_xattr_list,
2373         .moo_xattr_del     = mdd_xattr_del,
2374         .moo_object_create = mdd_object_create,
2375         .moo_ref_add       = mdd_ref_add,
2376         .moo_ref_del       = mdd_ref_del,
2377         .moo_open          = mdd_open,
2378         .moo_close         = mdd_close,
2379         .moo_readpage      = mdd_readpage,
2380         .moo_readlink      = mdd_readlink,
2381         .moo_capa_get      = mdd_capa_get,
2382         .moo_object_sync   = mdd_object_sync,
2383         .moo_version_get   = mdd_version_get,
2384         .moo_version_set   = mdd_version_set,
2385         .moo_path          = mdd_path,
2386 };