Whamcloud - gitweb
4470ff727705c16daecc77c12e82fefea554f6d1
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_object.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49 #ifdef HAVE_EXT4_LDISKFS
50 #include <ldiskfs/ldiskfs_jbd2.h>
51 #else
52 #include <linux/jbd.h>
53 #endif
54 #include <obd.h>
55 #include <obd_class.h>
56 #include <obd_support.h>
57 #include <lprocfs_status.h>
58 /* fid_be_cpu(), fid_cpu_to_be(). */
59 #include <lustre_fid.h>
60
61 #include <lustre_param.h>
62 #ifdef HAVE_EXT4_LDISKFS
63 #include <ldiskfs/ldiskfs.h>
64 #else
65 #include <linux/ldiskfs_fs.h>
66 #endif
67 #include <lustre_mds.h>
68 #include <lustre/lustre_idl.h>
69
70 #include "mdd_internal.h"
71
72 static const struct lu_object_operations mdd_lu_obj_ops;
73
74 static int mdd_xattr_get(const struct lu_env *env,
75                          struct md_object *obj, struct lu_buf *buf,
76                          const char *name);
77
78 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
79                  void **data)
80 {
81         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
82                  PFID(mdd_object_fid(obj)));
83         mdo_data_get(env, obj, data);
84         return 0;
85 }
86
87 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
88                struct lu_attr *la, struct lustre_capa *capa)
89 {
90         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
91                  PFID(mdd_object_fid(obj)));
92         return mdo_attr_get(env, obj, la, capa);
93 }
94
95 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
96 {
97         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
98
99         if (flags & LUSTRE_APPEND_FL)
100                 obj->mod_flags |= APPEND_OBJ;
101
102         if (flags & LUSTRE_IMMUTABLE_FL)
103                 obj->mod_flags |= IMMUTE_OBJ;
104 }
105
106 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
107 {
108         struct mdd_thread_info *info;
109
110         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
111         LASSERT(info != NULL);
112         return info;
113 }
114
115 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
116 {
117         struct lu_buf *buf;
118
119         buf = &mdd_env_info(env)->mti_buf;
120         buf->lb_buf = area;
121         buf->lb_len = len;
122         return buf;
123 }
124
125 void mdd_buf_put(struct lu_buf *buf)
126 {
127         if (buf == NULL || buf->lb_buf == NULL)
128                 return;
129         if (buf->lb_vmalloc)
130                 OBD_VFREE(buf->lb_buf, buf->lb_len);
131         else
132                 OBD_FREE(buf->lb_buf, buf->lb_len);
133         buf->lb_buf = NULL;
134         buf->lb_len = 0;
135 }
136
137 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
138                                        const void *area, ssize_t len)
139 {
140         struct lu_buf *buf;
141
142         buf = &mdd_env_info(env)->mti_buf;
143         buf->lb_buf = (void *)area;
144         buf->lb_len = len;
145         return buf;
146 }
147
148 #define BUF_VMALLOC_SIZE (CFS_PAGE_SIZE<<2) /* 16k */
149 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
150 {
151         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
152
153         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
154                 if (buf->lb_vmalloc)
155                         OBD_VFREE(buf->lb_buf, buf->lb_len);
156                 else
157                         OBD_FREE(buf->lb_buf, buf->lb_len);
158                 buf->lb_buf = NULL;
159         }
160         if (buf->lb_buf == NULL) {
161                 buf->lb_len = len;
162                 if (buf->lb_len <= BUF_VMALLOC_SIZE) {
163                         OBD_ALLOC(buf->lb_buf, buf->lb_len);
164                         buf->lb_vmalloc = 0;
165                 }
166                 if (buf->lb_buf == NULL) {
167                         OBD_VMALLOC(buf->lb_buf, buf->lb_len);
168                         buf->lb_vmalloc = 1;
169                 }
170                 if (buf->lb_buf == NULL)
171                         buf->lb_len = 0;
172         }
173         return buf;
174 }
175
176 /** Increase the size of the \a mti_big_buf.
177  * preserves old data in buffer
178  * old buffer remains unchanged on error
179  * \retval 0 or -ENOMEM
180  */
181 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
182 {
183         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
184         struct lu_buf buf;
185
186         LASSERT(len >= oldbuf->lb_len);
187         if (len > BUF_VMALLOC_SIZE) {
188                 OBD_VMALLOC(buf.lb_buf, len);
189                 buf.lb_vmalloc = 1;
190         } else {
191                 OBD_ALLOC(buf.lb_buf, len);
192                 buf.lb_vmalloc = 0;
193         }
194         if (buf.lb_buf == NULL)
195                 return -ENOMEM;
196
197         buf.lb_len = len;
198         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
199
200         if (oldbuf->lb_vmalloc)
201                 OBD_VFREE(oldbuf->lb_buf, oldbuf->lb_len);
202         else
203                 OBD_FREE(oldbuf->lb_buf, oldbuf->lb_len);
204
205         memcpy(oldbuf, &buf, sizeof(buf));
206
207         return 0;
208 }
209
210 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
211                                        struct mdd_device *mdd)
212 {
213         struct mdd_thread_info *mti = mdd_env_info(env);
214         int                     max_cookie_size;
215
216         max_cookie_size = mdd_lov_cookiesize(env, mdd);
217         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
218                 if (mti->mti_max_cookie)
219                         OBD_FREE(mti->mti_max_cookie, mti->mti_max_cookie_size);
220                 mti->mti_max_cookie = NULL;
221                 mti->mti_max_cookie_size = 0;
222         }
223         if (unlikely(mti->mti_max_cookie == NULL)) {
224                 OBD_ALLOC(mti->mti_max_cookie, max_cookie_size);
225                 if (likely(mti->mti_max_cookie != NULL))
226                         mti->mti_max_cookie_size = max_cookie_size;
227         }
228         if (likely(mti->mti_max_cookie != NULL))
229                 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
230         return mti->mti_max_cookie;
231 }
232
233 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
234                                    struct mdd_device *mdd)
235 {
236         struct mdd_thread_info *mti = mdd_env_info(env);
237         int                     max_lmm_size;
238
239         max_lmm_size = mdd_lov_mdsize(env, mdd);
240         if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
241                 if (mti->mti_max_lmm)
242                         OBD_FREE(mti->mti_max_lmm, mti->mti_max_lmm_size);
243                 mti->mti_max_lmm = NULL;
244                 mti->mti_max_lmm_size = 0;
245         }
246         if (unlikely(mti->mti_max_lmm == NULL)) {
247                 OBD_ALLOC(mti->mti_max_lmm, max_lmm_size);
248                 if (likely(mti->mti_max_lmm != NULL))
249                         mti->mti_max_lmm_size = max_lmm_size;
250         }
251         return mti->mti_max_lmm;
252 }
253
254 struct lu_object *mdd_object_alloc(const struct lu_env *env,
255                                    const struct lu_object_header *hdr,
256                                    struct lu_device *d)
257 {
258         struct mdd_object *mdd_obj;
259
260         OBD_ALLOC_PTR(mdd_obj);
261         if (mdd_obj != NULL) {
262                 struct lu_object *o;
263
264                 o = mdd2lu_obj(mdd_obj);
265                 lu_object_init(o, NULL, d);
266                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
267                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
268                 mdd_obj->mod_count = 0;
269                 o->lo_ops = &mdd_lu_obj_ops;
270                 return o;
271         } else {
272                 return NULL;
273         }
274 }
275
276 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
277                            const struct lu_object_conf *unused)
278 {
279         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
280         struct mdd_object *mdd_obj = lu2mdd_obj(o);
281         struct lu_object  *below;
282         struct lu_device  *under;
283         ENTRY;
284
285         mdd_obj->mod_cltime = 0;
286         under = &d->mdd_child->dd_lu_dev;
287         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
288         mdd_pdlock_init(mdd_obj);
289         if (below == NULL)
290                 RETURN(-ENOMEM);
291
292         lu_object_add(o, below);
293
294         RETURN(0);
295 }
296
297 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
298 {
299         if (lu_object_exists(o))
300                 return mdd_get_flags(env, lu2mdd_obj(o));
301         else
302                 return 0;
303 }
304
305 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
306 {
307         struct mdd_object *mdd = lu2mdd_obj(o);
308
309         lu_object_fini(o);
310         OBD_FREE_PTR(mdd);
311 }
312
313 static int mdd_object_print(const struct lu_env *env, void *cookie,
314                             lu_printer_t p, const struct lu_object *o)
315 {
316         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
317         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
318                     "valid=%x, cltime="LPU64", flags=%lx)",
319                     mdd, mdd->mod_count, mdd->mod_valid,
320                     mdd->mod_cltime, mdd->mod_flags);
321 }
322
323 static const struct lu_object_operations mdd_lu_obj_ops = {
324         .loo_object_init    = mdd_object_init,
325         .loo_object_start   = mdd_object_start,
326         .loo_object_free    = mdd_object_free,
327         .loo_object_print   = mdd_object_print,
328 };
329
330 struct mdd_object *mdd_object_find(const struct lu_env *env,
331                                    struct mdd_device *d,
332                                    const struct lu_fid *f)
333 {
334         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
335 }
336
337 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
338                         const char *path, struct lu_fid *fid)
339 {
340         struct lu_buf *buf;
341         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
342         struct mdd_object *obj;
343         struct lu_name *lname = &mdd_env_info(env)->mti_name;
344         char *name;
345         int rc = 0;
346         ENTRY;
347
348         /* temp buffer for path element */
349         buf = mdd_buf_alloc(env, PATH_MAX);
350         if (buf->lb_buf == NULL)
351                 RETURN(-ENOMEM);
352
353         lname->ln_name = name = buf->lb_buf;
354         lname->ln_namelen = 0;
355         *f = mdd->mdd_root_fid;
356
357         while(1) {
358                 while (*path == '/')
359                         path++;
360                 if (*path == '\0')
361                         break;
362                 while (*path != '/' && *path != '\0') {
363                         *name = *path;
364                         path++;
365                         name++;
366                         lname->ln_namelen++;
367                 }
368
369                 *name = '\0';
370                 /* find obj corresponding to fid */
371                 obj = mdd_object_find(env, mdd, f);
372                 if (obj == NULL)
373                         GOTO(out, rc = -EREMOTE);
374                 if (IS_ERR(obj))
375                         GOTO(out, rc = -PTR_ERR(obj));
376                 /* get child fid from parent and name */
377                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
378                 mdd_object_put(env, obj);
379                 if (rc)
380                         break;
381
382                 name = buf->lb_buf;
383                 lname->ln_namelen = 0;
384         }
385
386         if (!rc)
387                 *fid = *f;
388 out:
389         RETURN(rc);
390 }
391
392 /** The maximum depth that fid2path() will search.
393  * This is limited only because we want to store the fids for
394  * historical path lookup purposes.
395  */
396 #define MAX_PATH_DEPTH 100
397
398 /** mdd_path() lookup structure. */
399 struct path_lookup_info {
400         __u64                pli_recno;        /**< history point */
401         __u64                pli_currec;       /**< current record */
402         struct lu_fid        pli_fid;
403         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
404         struct mdd_object   *pli_mdd_obj;
405         char                *pli_path;         /**< full path */
406         int                  pli_pathlen;
407         int                  pli_linkno;       /**< which hardlink to follow */
408         int                  pli_fidcount;     /**< number of \a pli_fids */
409 };
410
411 static int mdd_path_current(const struct lu_env *env,
412                             struct path_lookup_info *pli)
413 {
414         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
415         struct mdd_object *mdd_obj;
416         struct lu_buf     *buf = NULL;
417         struct link_ea_header *leh;
418         struct link_ea_entry  *lee;
419         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
420         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
421         char *ptr;
422         int reclen;
423         int rc;
424         ENTRY;
425
426         ptr = pli->pli_path + pli->pli_pathlen - 1;
427         *ptr = 0;
428         --ptr;
429         pli->pli_fidcount = 0;
430         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
431
432         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
433                 mdd_obj = mdd_object_find(env, mdd,
434                                           &pli->pli_fids[pli->pli_fidcount]);
435                 if (mdd_obj == NULL)
436                         GOTO(out, rc = -EREMOTE);
437                 if (IS_ERR(mdd_obj))
438                         GOTO(out, rc = -PTR_ERR(mdd_obj));
439                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
440                 if (rc <= 0) {
441                         mdd_object_put(env, mdd_obj);
442                         if (rc == -1)
443                                 rc = -EREMOTE;
444                         else if (rc == 0)
445                                 /* Do I need to error out here? */
446                                 rc = -ENOENT;
447                         GOTO(out, rc);
448                 }
449
450                 /* Get parent fid and object name */
451                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
452                 buf = mdd_links_get(env, mdd_obj);
453                 mdd_read_unlock(env, mdd_obj);
454                 mdd_object_put(env, mdd_obj);
455                 if (IS_ERR(buf))
456                         GOTO(out, rc = PTR_ERR(buf));
457
458                 leh = buf->lb_buf;
459                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
460                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
461
462                 /* If set, use link #linkno for path lookup, otherwise use
463                    link #0.  Only do this for the final path element. */
464                 if ((pli->pli_fidcount == 0) &&
465                     (pli->pli_linkno < leh->leh_reccount)) {
466                         int count;
467                         for (count = 0; count < pli->pli_linkno; count++) {
468                                 lee = (struct link_ea_entry *)
469                                      ((char *)lee + reclen);
470                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
471                         }
472                         if (pli->pli_linkno < leh->leh_reccount - 1)
473                                 /* indicate to user there are more links */
474                                 pli->pli_linkno++;
475                 }
476
477                 /* Pack the name in the end of the buffer */
478                 ptr -= tmpname->ln_namelen;
479                 if (ptr - 1 <= pli->pli_path)
480                         GOTO(out, rc = -EOVERFLOW);
481                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
482                 *(--ptr) = '/';
483
484                 /* Store the parent fid for historic lookup */
485                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
486                         GOTO(out, rc = -EOVERFLOW);
487                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
488         }
489
490         /* Verify that our path hasn't changed since we started the lookup.
491            Record the current index, and verify the path resolves to the
492            same fid. If it does, then the path is correct as of this index. */
493         cfs_spin_lock(&mdd->mdd_cl.mc_lock);
494         pli->pli_currec = mdd->mdd_cl.mc_index;
495         cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
496         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
497         if (rc) {
498                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
499                 GOTO (out, rc = -EAGAIN);
500         }
501         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
502                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
503                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
504                        PFID(&pli->pli_fid));
505                 GOTO(out, rc = -EAGAIN);
506         }
507         ptr++; /* skip leading / */
508         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
509
510         EXIT;
511 out:
512         if (buf && !IS_ERR(buf) && buf->lb_vmalloc)
513                 /* if we vmalloced a large buffer drop it */
514                 mdd_buf_put(buf);
515
516         return rc;
517 }
518
519 static int mdd_path_historic(const struct lu_env *env,
520                              struct path_lookup_info *pli)
521 {
522         return 0;
523 }
524
525 /* Returns the full path to this fid, as of changelog record recno. */
526 static int mdd_path(const struct lu_env *env, struct md_object *obj,
527                     char *path, int pathlen, __u64 *recno, int *linkno)
528 {
529         struct path_lookup_info *pli;
530         int tries = 3;
531         int rc = -EAGAIN;
532         ENTRY;
533
534         if (pathlen < 3)
535                 RETURN(-EOVERFLOW);
536
537         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
538                 path[0] = '\0';
539                 RETURN(0);
540         }
541
542         OBD_ALLOC_PTR(pli);
543         if (pli == NULL)
544                 RETURN(-ENOMEM);
545
546         pli->pli_mdd_obj = md2mdd_obj(obj);
547         pli->pli_recno = *recno;
548         pli->pli_path = path;
549         pli->pli_pathlen = pathlen;
550         pli->pli_linkno = *linkno;
551
552         /* Retry multiple times in case file is being moved */
553         while (tries-- && rc == -EAGAIN)
554                 rc = mdd_path_current(env, pli);
555
556         /* For historical path lookup, the current links may not have existed
557          * at "recno" time.  We must switch over to earlier links/parents
558          * by using the changelog records.  If the earlier parent doesn't
559          * exist, we must search back through the changelog to reconstruct
560          * its parents, then check if it exists, etc.
561          * We may ignore this problem for the initial implementation and
562          * state that an "original" hardlink must still exist for us to find
563          * historic path name. */
564         if (pli->pli_recno != -1) {
565                 rc = mdd_path_historic(env, pli);
566         } else {
567                 *recno = pli->pli_currec;
568                 /* Return next link index to caller */
569                 *linkno = pli->pli_linkno;
570         }
571
572         OBD_FREE_PTR(pli);
573
574         RETURN (rc);
575 }
576
577 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
578 {
579         struct lu_attr *la = &mdd_env_info(env)->mti_la;
580         int rc;
581
582         ENTRY;
583         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
584         if (rc == 0) {
585                 mdd_flags_xlate(obj, la->la_flags);
586                 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
587                         obj->mod_flags |= MNLINK_OBJ;
588         }
589         RETURN(rc);
590 }
591
592 /* get only inode attributes */
593 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
594                   struct md_attr *ma)
595 {
596         int rc = 0;
597         ENTRY;
598
599         if (ma->ma_valid & MA_INODE)
600                 RETURN(0);
601
602         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
603                           mdd_object_capa(env, mdd_obj));
604         if (rc == 0)
605                 ma->ma_valid |= MA_INODE;
606         RETURN(rc);
607 }
608
609 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
610 {
611         struct lov_desc *ldesc;
612         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
613         struct lov_user_md *lum = (struct lov_user_md*)lmm;
614         ENTRY;
615
616         if (!lum)
617                 RETURN(0);
618
619         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
620         LASSERT(ldesc != NULL);
621
622         lum->lmm_magic = LOV_MAGIC_V1;
623         lum->lmm_object_seq = LOV_OBJECT_GROUP_DEFAULT;
624         lum->lmm_pattern = ldesc->ld_pattern;
625         lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
626         lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
627         lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
628
629         RETURN(sizeof(*lum));
630 }
631
632 /* get lov EA only */
633 static int __mdd_lmm_get(const struct lu_env *env,
634                          struct mdd_object *mdd_obj, struct md_attr *ma)
635 {
636         int rc;
637         ENTRY;
638
639         if (ma->ma_valid & MA_LOV)
640                 RETURN(0);
641
642         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
643                         XATTR_NAME_LOV);
644         if (rc == 0 && (ma->ma_need & MA_LOV_DEF))
645                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
646         if (rc > 0) {
647                 ma->ma_lmm_size = rc;
648                 ma->ma_valid |= MA_LOV;
649                 rc = 0;
650         }
651         RETURN(rc);
652 }
653
654 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
655                        struct md_attr *ma)
656 {
657         int rc;
658         ENTRY;
659
660         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
661         rc = __mdd_lmm_get(env, mdd_obj, ma);
662         mdd_read_unlock(env, mdd_obj);
663         RETURN(rc);
664 }
665
666 /* get lmv EA only*/
667 static int __mdd_lmv_get(const struct lu_env *env,
668                          struct mdd_object *mdd_obj, struct md_attr *ma)
669 {
670         int rc;
671         ENTRY;
672
673         if (ma->ma_valid & MA_LMV)
674                 RETURN(0);
675
676         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
677                         XATTR_NAME_LMV);
678         if (rc > 0) {
679                 ma->ma_valid |= MA_LMV;
680                 rc = 0;
681         }
682         RETURN(rc);
683 }
684
685 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
686                          struct md_attr *ma)
687 {
688         struct mdd_thread_info *info = mdd_env_info(env);
689         struct lustre_mdt_attrs *lma =
690                                  (struct lustre_mdt_attrs *)info->mti_xattr_buf;
691         int lma_size;
692         int rc;
693         ENTRY;
694
695         /* If all needed data are already valid, nothing to do */
696         if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
697             (ma->ma_need & (MA_HSM | MA_SOM)))
698                 RETURN(0);
699
700         /* Read LMA from disk EA */
701         lma_size = sizeof(info->mti_xattr_buf);
702         rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
703         if (rc <= 0)
704                 RETURN(rc);
705
706         /* Useless to check LMA incompatibility because this is already done in
707          * osd_ea_fid_get(), and this will fail long before this code is
708          * called.
709          * So, if we are here, LMA is compatible.
710          */
711
712         lustre_lma_swab(lma);
713
714         /* Swab and copy LMA */
715         if (ma->ma_need & MA_HSM) {
716                 if (lma->lma_compat & LMAC_HSM)
717                         ma->ma_hsm_flags = lma->lma_flags & HSM_FLAGS_MASK;
718                 else
719                         ma->ma_hsm_flags = 0;
720                 ma->ma_valid |= MA_HSM;
721         }
722
723         /* Copy SOM */
724         if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
725                 LASSERT(ma->ma_som != NULL);
726                 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
727                 ma->ma_som->msd_size    = lma->lma_som_size;
728                 ma->ma_som->msd_blocks  = lma->lma_som_blocks;
729                 ma->ma_som->msd_mountid = lma->lma_som_mountid;
730                 ma->ma_valid |= MA_SOM;
731         }
732
733         RETURN(0);
734 }
735
736 static int mdd_attr_get_internal(const struct lu_env *env,
737                                  struct mdd_object *mdd_obj,
738                                  struct md_attr *ma)
739 {
740         int rc = 0;
741         ENTRY;
742
743         if (ma->ma_need & MA_INODE)
744                 rc = mdd_iattr_get(env, mdd_obj, ma);
745
746         if (rc == 0 && ma->ma_need & MA_LOV) {
747                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
748                     S_ISDIR(mdd_object_type(mdd_obj)))
749                         rc = __mdd_lmm_get(env, mdd_obj, ma);
750         }
751         if (rc == 0 && ma->ma_need & MA_LMV) {
752                 if (S_ISDIR(mdd_object_type(mdd_obj)))
753                         rc = __mdd_lmv_get(env, mdd_obj, ma);
754         }
755         if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
756                 if (S_ISREG(mdd_object_type(mdd_obj)))
757                         rc = __mdd_lma_get(env, mdd_obj, ma);
758         }
759 #ifdef CONFIG_FS_POSIX_ACL
760         if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
761                 if (S_ISDIR(mdd_object_type(mdd_obj)))
762                         rc = mdd_def_acl_get(env, mdd_obj, ma);
763         }
764 #endif
765         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
766                rc, ma->ma_valid);
767         RETURN(rc);
768 }
769
770 int mdd_attr_get_internal_locked(const struct lu_env *env,
771                                  struct mdd_object *mdd_obj, struct md_attr *ma)
772 {
773         int rc;
774         int needlock = ma->ma_need &
775                        (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM);
776
777         if (needlock)
778                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
779         rc = mdd_attr_get_internal(env, mdd_obj, ma);
780         if (needlock)
781                 mdd_read_unlock(env, mdd_obj);
782         return rc;
783 }
784
785 /*
786  * No permission check is needed.
787  */
788 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
789                         struct md_attr *ma)
790 {
791         struct mdd_object *mdd_obj = md2mdd_obj(obj);
792         int                rc;
793
794         ENTRY;
795         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
796         RETURN(rc);
797 }
798
799 /*
800  * No permission check is needed.
801  */
802 static int mdd_xattr_get(const struct lu_env *env,
803                          struct md_object *obj, struct lu_buf *buf,
804                          const char *name)
805 {
806         struct mdd_object *mdd_obj = md2mdd_obj(obj);
807         int rc;
808
809         ENTRY;
810
811         LASSERT(mdd_object_exists(mdd_obj));
812
813         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
814         rc = mdo_xattr_get(env, mdd_obj, buf, name,
815                            mdd_object_capa(env, mdd_obj));
816         mdd_read_unlock(env, mdd_obj);
817
818         RETURN(rc);
819 }
820
821 /*
822  * Permission check is done when open,
823  * no need check again.
824  */
825 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
826                         struct lu_buf *buf)
827 {
828         struct mdd_object *mdd_obj = md2mdd_obj(obj);
829         struct dt_object  *next;
830         loff_t             pos = 0;
831         int                rc;
832         ENTRY;
833
834         LASSERT(mdd_object_exists(mdd_obj));
835
836         next = mdd_object_child(mdd_obj);
837         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
838         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
839                                          mdd_object_capa(env, mdd_obj));
840         mdd_read_unlock(env, mdd_obj);
841         RETURN(rc);
842 }
843
844 /*
845  * No permission check is needed.
846  */
847 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
848                           struct lu_buf *buf)
849 {
850         struct mdd_object *mdd_obj = md2mdd_obj(obj);
851         int rc;
852
853         ENTRY;
854
855         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
856         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
857         mdd_read_unlock(env, mdd_obj);
858
859         RETURN(rc);
860 }
861
862 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
863                                struct mdd_object *c, struct md_attr *ma,
864                                struct thandle *handle,
865                                const struct md_op_spec *spec)
866 {
867         struct lu_attr *attr = &ma->ma_attr;
868         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
869         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
870         const struct dt_index_features *feat = spec->sp_feat;
871         int rc;
872         ENTRY;
873
874         if (!mdd_object_exists(c)) {
875                 struct dt_object *next = mdd_object_child(c);
876                 LASSERT(next);
877
878                 if (feat != &dt_directory_features && feat != NULL)
879                         dof->dof_type = DFT_INDEX;
880                 else
881                         dof->dof_type = dt_mode_to_dft(attr->la_mode);
882
883                 dof->u.dof_idx.di_feat = feat;
884
885                 /* @hint will be initialized by underlying device. */
886                 next->do_ops->do_ah_init(env, hint,
887                                          p ? mdd_object_child(p) : NULL,
888                                          attr->la_mode & S_IFMT);
889
890                 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
891                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
892         } else
893                 rc = -EEXIST;
894
895         RETURN(rc);
896 }
897
898 /**
899  * Make sure the ctime is increased only.
900  */
901 static inline int mdd_attr_check(const struct lu_env *env,
902                                  struct mdd_object *obj,
903                                  struct lu_attr *attr)
904 {
905         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
906         int rc;
907         ENTRY;
908
909         if (attr->la_valid & LA_CTIME) {
910                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
911                 if (rc)
912                         RETURN(rc);
913
914                 if (attr->la_ctime < tmp_la->la_ctime)
915                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
916                 else if (attr->la_valid == LA_CTIME &&
917                          attr->la_ctime == tmp_la->la_ctime)
918                         attr->la_valid &= ~LA_CTIME;
919         }
920         RETURN(0);
921 }
922
923 int mdd_attr_set_internal(const struct lu_env *env,
924                           struct mdd_object *obj,
925                           struct lu_attr *attr,
926                           struct thandle *handle,
927                           int needacl)
928 {
929         int rc;
930         ENTRY;
931
932         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
933 #ifdef CONFIG_FS_POSIX_ACL
934         if (!rc && (attr->la_valid & LA_MODE) && needacl)
935                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
936 #endif
937         RETURN(rc);
938 }
939
940 int mdd_attr_check_set_internal(const struct lu_env *env,
941                                 struct mdd_object *obj,
942                                 struct lu_attr *attr,
943                                 struct thandle *handle,
944                                 int needacl)
945 {
946         int rc;
947         ENTRY;
948
949         rc = mdd_attr_check(env, obj, attr);
950         if (rc)
951                 RETURN(rc);
952
953         if (attr->la_valid)
954                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
955         RETURN(rc);
956 }
957
958 static int mdd_attr_set_internal_locked(const struct lu_env *env,
959                                         struct mdd_object *obj,
960                                         struct lu_attr *attr,
961                                         struct thandle *handle,
962                                         int needacl)
963 {
964         int rc;
965         ENTRY;
966
967         needacl = needacl && (attr->la_valid & LA_MODE);
968         if (needacl)
969                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
970         rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
971         if (needacl)
972                 mdd_write_unlock(env, obj);
973         RETURN(rc);
974 }
975
976 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
977                                        struct mdd_object *obj,
978                                        struct lu_attr *attr,
979                                        struct thandle *handle,
980                                        int needacl)
981 {
982         int rc;
983         ENTRY;
984
985         needacl = needacl && (attr->la_valid & LA_MODE);
986         if (needacl)
987                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
988         rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
989         if (needacl)
990                 mdd_write_unlock(env, obj);
991         RETURN(rc);
992 }
993
994 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
995                     const struct lu_buf *buf, const char *name,
996                     int fl, struct thandle *handle)
997 {
998         struct lustre_capa *capa = mdd_object_capa(env, obj);
999         int rc = -EINVAL;
1000         ENTRY;
1001
1002         if (buf->lb_buf && buf->lb_len > 0)
1003                 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1004         else if (buf->lb_buf == NULL && buf->lb_len == 0)
1005                 rc = mdo_xattr_del(env, obj, name, handle, capa);
1006
1007         RETURN(rc);
1008 }
1009
1010 /*
1011  * This gives the same functionality as the code between
1012  * sys_chmod and inode_setattr
1013  * chown_common and inode_setattr
1014  * utimes and inode_setattr
1015  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1016  */
1017 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1018                         struct lu_attr *la, const struct md_attr *ma)
1019 {
1020         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
1021         struct md_ucred  *uc;
1022         int               rc;
1023         ENTRY;
1024
1025         if (!la->la_valid)
1026                 RETURN(0);
1027
1028         /* Do not permit change file type */
1029         if (la->la_valid & LA_TYPE)
1030                 RETURN(-EPERM);
1031
1032         /* They should not be processed by setattr */
1033         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1034                 RETURN(-EPERM);
1035
1036         /* export destroy does not have ->le_ses, but we may want
1037          * to drop LUSTRE_SOM_FL. */
1038         if (!env->le_ses)
1039                 RETURN(0);
1040
1041         uc = md_ucred(env);
1042
1043         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1044         if (rc)
1045                 RETURN(rc);
1046
1047         if (la->la_valid == LA_CTIME) {
1048                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1049                         /* This is only for set ctime when rename's source is
1050                          * on remote MDS. */
1051                         rc = mdd_may_delete(env, NULL, obj,
1052                                             (struct md_attr *)ma, 1, 0);
1053                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1054                         la->la_valid &= ~LA_CTIME;
1055                 RETURN(rc);
1056         }
1057
1058         if (la->la_valid == LA_ATIME) {
1059                 /* This is atime only set for read atime update on close. */
1060                 if (la->la_atime > tmp_la->la_atime &&
1061                     la->la_atime <= (tmp_la->la_atime +
1062                                      mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1063                         la->la_valid &= ~LA_ATIME;
1064                 RETURN(0);
1065         }
1066
1067         /* Check if flags change. */
1068         if (la->la_valid & LA_FLAGS) {
1069                 unsigned int oldflags = 0;
1070                 unsigned int newflags = la->la_flags &
1071                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1072
1073                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1074                     !mdd_capable(uc, CFS_CAP_FOWNER))
1075                         RETURN(-EPERM);
1076
1077                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1078                  * only be changed by the relevant capability. */
1079                 if (mdd_is_immutable(obj))
1080                         oldflags |= LUSTRE_IMMUTABLE_FL;
1081                 if (mdd_is_append(obj))
1082                         oldflags |= LUSTRE_APPEND_FL;
1083                 if ((oldflags ^ newflags) &&
1084                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1085                         RETURN(-EPERM);
1086
1087                 if (!S_ISDIR(tmp_la->la_mode))
1088                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1089         }
1090
1091         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1092             (la->la_valid & ~LA_FLAGS) &&
1093             !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1094                 RETURN(-EPERM);
1095
1096         /* Check for setting the obj time. */
1097         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1098             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1099                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1100                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
1101                         rc = mdd_permission_internal_locked(env, obj, tmp_la,
1102                                                             MAY_WRITE,
1103                                                             MOR_TGT_CHILD);
1104                         if (rc)
1105                                 RETURN(rc);
1106                 }
1107         }
1108
1109         /* Make sure a caller can chmod. */
1110         if (la->la_valid & LA_MODE) {
1111                 /* Bypass la_vaild == LA_MODE,
1112                  * this is for changing file with SUID or SGID. */
1113                 if ((la->la_valid & ~LA_MODE) &&
1114                     !(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1115                     (uc->mu_fsuid != tmp_la->la_uid) &&
1116                     !mdd_capable(uc, CFS_CAP_FOWNER))
1117                         RETURN(-EPERM);
1118
1119                 if (la->la_mode == (cfs_umode_t) -1)
1120                         la->la_mode = tmp_la->la_mode;
1121                 else
1122                         la->la_mode = (la->la_mode & S_IALLUGO) |
1123                                       (tmp_la->la_mode & ~S_IALLUGO);
1124
1125                 /* Also check the setgid bit! */
1126                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1127                                        la->la_gid : tmp_la->la_gid) &&
1128                     !mdd_capable(uc, CFS_CAP_FSETID))
1129                         la->la_mode &= ~S_ISGID;
1130         } else {
1131                la->la_mode = tmp_la->la_mode;
1132         }
1133
1134         /* Make sure a caller can chown. */
1135         if (la->la_valid & LA_UID) {
1136                 if (la->la_uid == (uid_t) -1)
1137                         la->la_uid = tmp_la->la_uid;
1138                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1139                     (la->la_uid != tmp_la->la_uid)) &&
1140                     !mdd_capable(uc, CFS_CAP_CHOWN))
1141                         RETURN(-EPERM);
1142
1143                 /* If the user or group of a non-directory has been
1144                  * changed by a non-root user, remove the setuid bit.
1145                  * 19981026 David C Niemi <niemi@tux.org>
1146                  *
1147                  * Changed this to apply to all users, including root,
1148                  * to avoid some races. This is the behavior we had in
1149                  * 2.0. The check for non-root was definitely wrong
1150                  * for 2.2 anyway, as it should have been using
1151                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
1152                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1153                     !S_ISDIR(tmp_la->la_mode)) {
1154                         la->la_mode &= ~S_ISUID;
1155                         la->la_valid |= LA_MODE;
1156                 }
1157         }
1158
1159         /* Make sure caller can chgrp. */
1160         if (la->la_valid & LA_GID) {
1161                 if (la->la_gid == (gid_t) -1)
1162                         la->la_gid = tmp_la->la_gid;
1163                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1164                     ((la->la_gid != tmp_la->la_gid) &&
1165                     !lustre_in_group_p(uc, la->la_gid))) &&
1166                     !mdd_capable(uc, CFS_CAP_CHOWN))
1167                         RETURN(-EPERM);
1168
1169                 /* Likewise, if the user or group of a non-directory
1170                  * has been changed by a non-root user, remove the
1171                  * setgid bit UNLESS there is no group execute bit
1172                  * (this would be a file marked for mandatory
1173                  * locking).  19981026 David C Niemi <niemi@tux.org>
1174                  *
1175                  * Removed the fsuid check (see the comment above) --
1176                  * 19990830 SD. */
1177                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1178                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1179                         la->la_mode &= ~S_ISGID;
1180                         la->la_valid |= LA_MODE;
1181                 }
1182         }
1183
1184         /* For both Size-on-MDS case and truncate case,
1185          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1186          * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1187          * For SOM case, it is true, the MAY_WRITE perm has been checked
1188          * when open, no need check again. For truncate case, it is false,
1189          * the MAY_WRITE perm should be checked here. */
1190         if (ma->ma_attr_flags & MDS_SOM) {
1191                 /* For the "Size-on-MDS" setattr update, merge coming
1192                  * attributes with the set in the inode. BUG 10641 */
1193                 if ((la->la_valid & LA_ATIME) &&
1194                     (la->la_atime <= tmp_la->la_atime))
1195                         la->la_valid &= ~LA_ATIME;
1196
1197                 /* OST attributes do not have a priority over MDS attributes,
1198                  * so drop times if ctime is equal. */
1199                 if ((la->la_valid & LA_CTIME) &&
1200                     (la->la_ctime <= tmp_la->la_ctime))
1201                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1202         } else {
1203                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1204                         if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1205                               (uc->mu_fsuid == tmp_la->la_uid)) &&
1206                             !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1207                                 rc = mdd_permission_internal_locked(env, obj,
1208                                                             tmp_la, MAY_WRITE,
1209                                                             MOR_TGT_CHILD);
1210                                 if (rc)
1211                                         RETURN(rc);
1212                         }
1213                 }
1214                 if (la->la_valid & LA_CTIME) {
1215                         /* The pure setattr, it has the priority over what is
1216                          * already set, do not drop it if ctime is equal. */
1217                         if (la->la_ctime < tmp_la->la_ctime)
1218                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1219                                                   LA_CTIME);
1220                 }
1221         }
1222
1223         RETURN(0);
1224 }
1225
1226 /** Store a data change changelog record
1227  * If this fails, we must fail the whole transaction; we don't
1228  * want the change to commit without the log entry.
1229  * \param mdd_obj - mdd_object of change
1230  * \param handle - transacion handle
1231  */
1232 static int mdd_changelog_data_store(const struct lu_env     *env,
1233                                     struct mdd_device       *mdd,
1234                                     enum changelog_rec_type type,
1235                                     int                     flags,
1236                                     struct mdd_object       *mdd_obj,
1237                                     struct thandle          *handle)
1238 {
1239         const struct lu_fid *tfid = mdo2fid(mdd_obj);
1240         struct llog_changelog_rec *rec;
1241         struct lu_buf *buf;
1242         int reclen;
1243         int rc;
1244
1245         /* Not recording */
1246         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1247                 RETURN(0);
1248         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1249                 RETURN(0);
1250
1251         LASSERT(handle != NULL);
1252         LASSERT(mdd_obj != NULL);
1253
1254         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1255             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1256                 /* Don't need multiple updates in this log */
1257                 /* Don't check under lock - no big deal if we get an extra
1258                    entry */
1259                 RETURN(0);
1260         }
1261
1262         reclen = llog_data_len(sizeof(*rec));
1263         buf = mdd_buf_alloc(env, reclen);
1264         if (buf->lb_buf == NULL)
1265                 RETURN(-ENOMEM);
1266         rec = (struct llog_changelog_rec *)buf->lb_buf;
1267
1268         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1269         rec->cr.cr_type = (__u32)type;
1270         rec->cr.cr_tfid = *tfid;
1271         rec->cr.cr_namelen = 0;
1272         mdd_obj->mod_cltime = cfs_time_current_64();
1273
1274         rc = mdd_changelog_llog_write(mdd, rec, handle);
1275         if (rc < 0) {
1276                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1277                        rc, type, PFID(tfid));
1278                 return -EFAULT;
1279         }
1280
1281         return 0;
1282 }
1283
1284 /**
1285  * Should be called with write lock held.
1286  *
1287  * \see mdd_lma_set_locked().
1288  */
1289 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1290                        const struct md_attr *ma, struct thandle *handle)
1291 {
1292         struct mdd_thread_info *info = mdd_env_info(env);
1293         struct lu_buf *buf;
1294         struct lustre_mdt_attrs *lma =
1295                                 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1296         int lmasize = sizeof(struct lustre_mdt_attrs);
1297         int rc = 0;
1298
1299         ENTRY;
1300
1301         /* Either HSM or SOM part is not valid, we need to read it before */
1302         if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1303                 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1304                 if (rc <= 0)
1305                         RETURN(rc);
1306
1307                 lustre_lma_swab(lma);
1308         } else {
1309                 memset(lma, 0, lmasize);
1310         }
1311
1312         /* Copy HSM data */
1313         if (ma->ma_valid & MA_HSM) {
1314                 lma->lma_flags  |= ma->ma_hsm_flags & HSM_FLAGS_MASK;
1315                 lma->lma_compat |= LMAC_HSM;
1316         }
1317
1318         /* Copy SOM data */
1319         if (ma->ma_valid & MA_SOM) {
1320                 LASSERT(ma->ma_som != NULL);
1321                 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1322                         lma->lma_compat     &= ~LMAC_SOM;
1323                 } else {
1324                         lma->lma_compat     |= LMAC_SOM;
1325                         lma->lma_ioepoch     = ma->ma_som->msd_ioepoch;
1326                         lma->lma_som_size    = ma->ma_som->msd_size;
1327                         lma->lma_som_blocks  = ma->ma_som->msd_blocks;
1328                         lma->lma_som_mountid = ma->ma_som->msd_mountid;
1329                 }
1330         }
1331
1332         /* Copy FID */
1333         memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1334
1335         lustre_lma_swab(lma);
1336         buf = mdd_buf_get(env, lma, lmasize);
1337         rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1338
1339         RETURN(rc);
1340 }
1341
1342 /**
1343  * Save LMA extended attributes with data from \a ma.
1344  *
1345  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1346  * not, LMA EA will be first read from disk, modified and write back.
1347  *
1348  */
1349 static int mdd_lma_set_locked(const struct lu_env *env,
1350                               struct mdd_object *mdd_obj,
1351                               const struct md_attr *ma, struct thandle *handle)
1352 {
1353         int rc;
1354
1355         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1356         rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1357         mdd_write_unlock(env, mdd_obj);
1358         return rc;
1359 }
1360
1361 /* Precedence for choosing record type when multiple
1362  * attributes change: setattr > mtime > ctime > atime
1363  * (ctime changes when mtime does, plus chmod/chown.
1364  * atime and ctime are independent.) */
1365 static int mdd_attr_set_changelog(const struct lu_env *env,
1366                                   struct md_object *obj, struct thandle *handle,
1367                                   __u64 valid)
1368 {
1369         struct mdd_device *mdd = mdo2mdd(obj);
1370         int bits, type = 0;
1371
1372         bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1373         bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1374         bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1375         bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1376         bits = bits & mdd->mdd_cl.mc_mask;
1377         if (bits == 0)
1378                 return 0;
1379
1380         /* The record type is the lowest non-masked set bit */
1381         while (bits && ((bits & 1) == 0)) {
1382                 bits = bits >> 1;
1383                 type++;
1384         }
1385
1386         /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1387         return mdd_changelog_data_store(env, mdd, type, (int)valid,
1388                                         md2mdd_obj(obj), handle);
1389 }
1390
1391 /* set attr and LOV EA at once, return updated attr */
1392 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1393                         const struct md_attr *ma)
1394 {
1395         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1396         struct mdd_device *mdd = mdo2mdd(obj);
1397         struct thandle *handle;
1398         struct lov_mds_md *lmm = NULL;
1399         struct llog_cookie *logcookies = NULL;
1400         int  rc, lmm_size = 0, cookie_size = 0;
1401         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1402 #ifdef HAVE_QUOTA_SUPPORT
1403         struct obd_device *obd = mdd->mdd_obd_dev;
1404         struct mds_obd *mds = &obd->u.mds;
1405         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1406         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1407         int quota_opc = 0, block_count = 0;
1408         int inode_pending[MAXQUOTAS] = { 0, 0 };
1409         int block_pending[MAXQUOTAS] = { 0, 0 };
1410 #endif
1411         ENTRY;
1412
1413         mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1414                                     MDD_TXN_ATTR_SET_OP);
1415         handle = mdd_trans_start(env, mdd);
1416         if (IS_ERR(handle))
1417                 RETURN(PTR_ERR(handle));
1418         /*TODO: add lock here*/
1419         /* start a log jounal handle if needed */
1420         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1421             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1422                 lmm_size = mdd_lov_mdsize(env, mdd);
1423                 lmm = mdd_max_lmm_get(env, mdd);
1424                 if (lmm == NULL)
1425                         GOTO(cleanup, rc = -ENOMEM);
1426
1427                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1428                                 XATTR_NAME_LOV);
1429
1430                 if (rc < 0)
1431                         GOTO(cleanup, rc);
1432         }
1433
1434         if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1435                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1436                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1437
1438         *la_copy = ma->ma_attr;
1439         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1440         if (rc)
1441                 GOTO(cleanup, rc);
1442
1443 #ifdef HAVE_QUOTA_SUPPORT
1444         if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1445                 struct obd_export *exp = md_quota(env)->mq_exp;
1446                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1447
1448                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1449                 if (!rc) {
1450                         quota_opc = FSFILT_OP_SETATTR;
1451                         mdd_quota_wrapper(la_copy, qnids);
1452                         mdd_quota_wrapper(la_tmp, qoids);
1453                         /* get file quota for new owner */
1454                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1455                                         qnids, inode_pending, 1, NULL, 0,
1456                                         NULL, 0);
1457                         block_count = (la_tmp->la_blocks + 7) >> 3;
1458                         if (block_count) {
1459                                 void *data = NULL;
1460                                 mdd_data_get(env, mdd_obj, &data);
1461                                 /* get block quota for new owner */
1462                                 lquota_chkquota(mds_quota_interface_ref, obd,
1463                                                 exp, qnids, block_pending,
1464                                                 block_count, NULL,
1465                                                 LQUOTA_FLAGS_BLK, data, 1);
1466                         }
1467                 }
1468         }
1469 #endif
1470
1471         if (la_copy->la_valid & LA_FLAGS) {
1472                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1473                                                   handle, 1);
1474                 if (rc == 0)
1475                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1476         } else if (la_copy->la_valid) {            /* setattr */
1477                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1478                                                   handle, 1);
1479                 /* journal chown/chgrp in llog, just like unlink */
1480                 if (rc == 0 && lmm_size){
1481                         cookie_size = mdd_lov_cookiesize(env, mdd);
1482                         logcookies = mdd_max_cookie_get(env, mdd);
1483                         if (logcookies == NULL)
1484                                 GOTO(cleanup, rc = -ENOMEM);
1485
1486                         if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1487                                             logcookies, cookie_size) <= 0)
1488                                 logcookies = NULL;
1489                 }
1490         }
1491
1492         if (rc == 0 && ma->ma_valid & MA_LOV) {
1493                 cfs_umode_t mode;
1494
1495                 mode = mdd_object_type(mdd_obj);
1496                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1497                         rc = mdd_lsm_sanity_check(env, mdd_obj);
1498                         if (rc)
1499                                 GOTO(cleanup, rc);
1500
1501                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1502                                             ma->ma_lmm_size, handle, 1);
1503                 }
1504
1505         }
1506         if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1507                 cfs_umode_t mode;
1508
1509                 mode = mdd_object_type(mdd_obj);
1510                 if (S_ISREG(mode))
1511                         rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1512
1513         }
1514 cleanup:
1515         if (rc == 0)
1516                 rc = mdd_attr_set_changelog(env, obj, handle,
1517                                             ma->ma_attr.la_valid);
1518         mdd_trans_stop(env, mdd, rc, handle);
1519         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1520                 /*set obd attr, if needed*/
1521                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1522                                            logcookies);
1523         }
1524 #ifdef HAVE_QUOTA_SUPPORT
1525         if (quota_opc) {
1526                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1527                                       inode_pending, 0);
1528                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1529                                       block_pending, 1);
1530                 /* Trigger dqrel/dqacq for original owner and new owner.
1531                  * If failed, the next call for lquota_chkquota will
1532                  * process it. */
1533                 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1534                               quota_opc);
1535         }
1536 #endif
1537         RETURN(rc);
1538 }
1539
1540 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1541                       const struct lu_buf *buf, const char *name, int fl,
1542                       struct thandle *handle)
1543 {
1544         int  rc;
1545         ENTRY;
1546
1547         mdd_write_lock(env, obj, MOR_TGT_CHILD);
1548         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1549         mdd_write_unlock(env, obj);
1550
1551         RETURN(rc);
1552 }
1553
1554 static int mdd_xattr_sanity_check(const struct lu_env *env,
1555                                   struct mdd_object *obj)
1556 {
1557         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1558         struct md_ucred *uc     = md_ucred(env);
1559         int rc;
1560         ENTRY;
1561
1562         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1563                 RETURN(-EPERM);
1564
1565         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1566         if (rc)
1567                 RETURN(rc);
1568
1569         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1570             !mdd_capable(uc, CFS_CAP_FOWNER))
1571                 RETURN(-EPERM);
1572
1573         RETURN(rc);
1574 }
1575
1576 /**
1577  * The caller should guarantee to update the object ctime
1578  * after xattr_set if needed.
1579  */
1580 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1581                          const struct lu_buf *buf, const char *name,
1582                          int fl)
1583 {
1584         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1585         struct mdd_device *mdd = mdo2mdd(obj);
1586         struct thandle *handle;
1587         int  rc;
1588         ENTRY;
1589
1590         rc = mdd_xattr_sanity_check(env, mdd_obj);
1591         if (rc)
1592                 RETURN(rc);
1593
1594         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1595         /* security-replated changes may require sync */
1596         if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1597             mdd->mdd_sync_permission == 1)
1598                 txn_param_sync(&mdd_env_info(env)->mti_param);
1599
1600         handle = mdd_trans_start(env, mdd);
1601         if (IS_ERR(handle))
1602                 RETURN(PTR_ERR(handle));
1603
1604         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1605
1606         /* Only record user xattr changes */
1607         if ((rc == 0) && (strncmp("user.", name, 5) == 0))
1608                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1609                                               handle);
1610         mdd_trans_stop(env, mdd, rc, handle);
1611
1612         RETURN(rc);
1613 }
1614
1615 /**
1616  * The caller should guarantee to update the object ctime
1617  * after xattr_set if needed.
1618  */
1619 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1620                   const char *name)
1621 {
1622         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1623         struct mdd_device *mdd = mdo2mdd(obj);
1624         struct thandle *handle;
1625         int  rc;
1626         ENTRY;
1627
1628         rc = mdd_xattr_sanity_check(env, mdd_obj);
1629         if (rc)
1630                 RETURN(rc);
1631
1632         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1633         handle = mdd_trans_start(env, mdd);
1634         if (IS_ERR(handle))
1635                 RETURN(PTR_ERR(handle));
1636
1637         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1638         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1639                            mdd_object_capa(env, mdd_obj));
1640         mdd_write_unlock(env, mdd_obj);
1641
1642         /* Only record user xattr changes */
1643         if ((rc == 0) && (strncmp("user.", name, 5) != 0))
1644                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1645                                               handle);
1646
1647         mdd_trans_stop(env, mdd, rc, handle);
1648
1649         RETURN(rc);
1650 }
1651
1652 /* partial unlink */
1653 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1654                        struct md_attr *ma)
1655 {
1656         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1657         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1658         struct mdd_device *mdd = mdo2mdd(obj);
1659         struct thandle *handle;
1660 #ifdef HAVE_QUOTA_SUPPORT
1661         struct obd_device *obd = mdd->mdd_obd_dev;
1662         struct mds_obd *mds = &obd->u.mds;
1663         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1664         int quota_opc = 0;
1665 #endif
1666         int rc;
1667         ENTRY;
1668
1669         /*
1670          * Check -ENOENT early here because we need to get object type
1671          * to calculate credits before transaction start
1672          */
1673         if (!mdd_object_exists(mdd_obj))
1674                 RETURN(-ENOENT);
1675
1676         LASSERT(mdd_object_exists(mdd_obj) > 0);
1677
1678         rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1679         if (rc)
1680                 RETURN(rc);
1681
1682         handle = mdd_trans_start(env, mdd);
1683         if (IS_ERR(handle))
1684                 RETURN(-ENOMEM);
1685
1686         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1687
1688         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1689         if (rc)
1690                 GOTO(cleanup, rc);
1691
1692         __mdd_ref_del(env, mdd_obj, handle, 0);
1693
1694         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1695                 /* unlink dot */
1696                 __mdd_ref_del(env, mdd_obj, handle, 1);
1697         }
1698
1699         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1700         la_copy->la_ctime = ma->ma_attr.la_ctime;
1701
1702         la_copy->la_valid = LA_CTIME;
1703         rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1704         if (rc)
1705                 GOTO(cleanup, rc);
1706
1707         rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1708 #ifdef HAVE_QUOTA_SUPPORT
1709         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1710             ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1711                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1712                 mdd_quota_wrapper(&ma->ma_attr, qids);
1713         }
1714 #endif
1715
1716
1717         EXIT;
1718 cleanup:
1719         mdd_write_unlock(env, mdd_obj);
1720         mdd_trans_stop(env, mdd, rc, handle);
1721 #ifdef HAVE_QUOTA_SUPPORT
1722         if (quota_opc)
1723                 /* Trigger dqrel on the owner of child. If failed,
1724                  * the next call for lquota_chkquota will process it */
1725                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1726                               quota_opc);
1727 #endif
1728         return rc;
1729 }
1730
1731 /* partial operation */
1732 static int mdd_oc_sanity_check(const struct lu_env *env,
1733                                struct mdd_object *obj,
1734                                struct md_attr *ma)
1735 {
1736         int rc;
1737         ENTRY;
1738
1739         switch (ma->ma_attr.la_mode & S_IFMT) {
1740         case S_IFREG:
1741         case S_IFDIR:
1742         case S_IFLNK:
1743         case S_IFCHR:
1744         case S_IFBLK:
1745         case S_IFIFO:
1746         case S_IFSOCK:
1747                 rc = 0;
1748                 break;
1749         default:
1750                 rc = -EINVAL;
1751                 break;
1752         }
1753         RETURN(rc);
1754 }
1755
1756 static int mdd_object_create(const struct lu_env *env,
1757                              struct md_object *obj,
1758                              const struct md_op_spec *spec,
1759                              struct md_attr *ma)
1760 {
1761
1762         struct mdd_device *mdd = mdo2mdd(obj);
1763         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1764         const struct lu_fid *pfid = spec->u.sp_pfid;
1765         struct thandle *handle;
1766 #ifdef HAVE_QUOTA_SUPPORT
1767         struct obd_device *obd = mdd->mdd_obd_dev;
1768         struct obd_export *exp = md_quota(env)->mq_exp;
1769         struct mds_obd *mds = &obd->u.mds;
1770         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1771         int quota_opc = 0, block_count = 0;
1772         int inode_pending[MAXQUOTAS] = { 0, 0 };
1773         int block_pending[MAXQUOTAS] = { 0, 0 };
1774 #endif
1775         int rc = 0;
1776         ENTRY;
1777
1778 #ifdef HAVE_QUOTA_SUPPORT
1779         if (mds->mds_quota) {
1780                 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1781                 mdd_quota_wrapper(&ma->ma_attr, qids);
1782                 /* get file quota for child */
1783                 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1784                                 qids, inode_pending, 1, NULL, 0,
1785                                 NULL, 0);
1786                 switch (ma->ma_attr.la_mode & S_IFMT) {
1787                 case S_IFLNK:
1788                 case S_IFDIR:
1789                         block_count = 2;
1790                         break;
1791                 case S_IFREG:
1792                         block_count = 1;
1793                         break;
1794                 }
1795                 /* get block quota for child */
1796                 if (block_count)
1797                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1798                                         qids, block_pending, block_count,
1799                                         NULL, LQUOTA_FLAGS_BLK, NULL, 0);
1800         }
1801 #endif
1802
1803         mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
1804         handle = mdd_trans_start(env, mdd);
1805         if (IS_ERR(handle))
1806                 GOTO(out_pending, rc = PTR_ERR(handle));
1807
1808         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1809         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1810         if (rc)
1811                 GOTO(unlock, rc);
1812
1813         rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1814         if (rc)
1815                 GOTO(unlock, rc);
1816
1817         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1818                 /* If creating the slave object, set slave EA here. */
1819                 int lmv_size = spec->u.sp_ea.eadatalen;
1820                 struct lmv_stripe_md *lmv;
1821
1822                 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1823                 LASSERT(lmv != NULL && lmv_size > 0);
1824
1825                 rc = __mdd_xattr_set(env, mdd_obj,
1826                                      mdd_buf_get_const(env, lmv, lmv_size),
1827                                      XATTR_NAME_LMV, 0, handle);
1828                 if (rc)
1829                         GOTO(unlock, rc);
1830
1831                 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1832                                            handle, 0);
1833         } else {
1834 #ifdef CONFIG_FS_POSIX_ACL
1835                 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1836                         struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1837
1838                         buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1839                         buf->lb_len = spec->u.sp_ea.eadatalen;
1840                         if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1841                                 rc = __mdd_acl_init(env, mdd_obj, buf,
1842                                                     &ma->ma_attr.la_mode,
1843                                                     handle);
1844                                 if (rc)
1845                                         GOTO(unlock, rc);
1846                                 else
1847                                         ma->ma_attr.la_valid |= LA_MODE;
1848                         }
1849
1850                         pfid = spec->u.sp_ea.fid;
1851                 }
1852 #endif
1853                 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1854                                            spec);
1855         }
1856         EXIT;
1857 unlock:
1858         if (rc == 0)
1859                 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1860         mdd_write_unlock(env, mdd_obj);
1861
1862         mdd_trans_stop(env, mdd, rc, handle);
1863 out_pending:
1864 #ifdef HAVE_QUOTA_SUPPORT
1865         if (quota_opc) {
1866                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1867                                       inode_pending, 0);
1868                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1869                                       block_pending, 1);
1870                 /* Trigger dqacq on the owner of child. If failed,
1871                  * the next call for lquota_chkquota will process it. */
1872                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1873                               quota_opc);
1874         }
1875 #endif
1876         return rc;
1877 }
1878
1879 /* partial link */
1880 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1881                        const struct md_attr *ma)
1882 {
1883         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1884         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1885         struct mdd_device *mdd = mdo2mdd(obj);
1886         struct thandle *handle;
1887         int rc;
1888         ENTRY;
1889
1890         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1891         handle = mdd_trans_start(env, mdd);
1892         if (IS_ERR(handle))
1893                 RETURN(-ENOMEM);
1894
1895         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1896         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1897         if (rc == 0)
1898                 __mdd_ref_add(env, mdd_obj, handle);
1899         mdd_write_unlock(env, mdd_obj);
1900         if (rc == 0) {
1901                 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1902                 la_copy->la_ctime = ma->ma_attr.la_ctime;
1903
1904                 la_copy->la_valid = LA_CTIME;
1905                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1906                                                         handle, 0);
1907         }
1908         mdd_trans_stop(env, mdd, 0, handle);
1909
1910         RETURN(rc);
1911 }
1912
1913 /*
1914  * do NOT or the MAY_*'s, you'll get the weakest
1915  */
1916 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1917 {
1918         int res = 0;
1919
1920         /* Sadly, NFSD reopens a file repeatedly during operation, so the
1921          * "acc_mode = 0" allowance for newly-created files isn't honoured.
1922          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1923          * owner can write to a file even if it is marked readonly to hide
1924          * its brokenness. (bug 5781) */
1925         if (flags & MDS_OPEN_OWNEROVERRIDE) {
1926                 struct md_ucred *uc = md_ucred(env);
1927
1928                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1929                     (la->la_uid == uc->mu_fsuid))
1930                         return 0;
1931         }
1932
1933         if (flags & FMODE_READ)
1934                 res |= MAY_READ;
1935         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1936                 res |= MAY_WRITE;
1937         if (flags & MDS_FMODE_EXEC)
1938                 res |= MAY_EXEC;
1939         return res;
1940 }
1941
1942 static int mdd_open_sanity_check(const struct lu_env *env,
1943                                  struct mdd_object *obj, int flag)
1944 {
1945         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1946         int mode, rc;
1947         ENTRY;
1948
1949         /* EEXIST check */
1950         if (mdd_is_dead_obj(obj))
1951                 RETURN(-ENOENT);
1952
1953         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1954         if (rc)
1955                RETURN(rc);
1956
1957         if (S_ISLNK(tmp_la->la_mode))
1958                 RETURN(-ELOOP);
1959
1960         mode = accmode(env, tmp_la, flag);
1961
1962         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1963                 RETURN(-EISDIR);
1964
1965         if (!(flag & MDS_OPEN_CREATED)) {
1966                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1967                 if (rc)
1968                         RETURN(rc);
1969         }
1970
1971         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1972             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1973                 flag &= ~MDS_OPEN_TRUNC;
1974
1975         /* For writing append-only file must open it with append mode. */
1976         if (mdd_is_append(obj)) {
1977                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
1978                         RETURN(-EPERM);
1979                 if (flag & MDS_OPEN_TRUNC)
1980                         RETURN(-EPERM);
1981         }
1982
1983 #if 0
1984         /*
1985          * Now, flag -- O_NOATIME does not be packed by client.
1986          */
1987         if (flag & O_NOATIME) {
1988                 struct md_ucred *uc = md_ucred(env);
1989
1990                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
1991                     (uc->mu_valid == UCRED_NEW)) &&
1992                     (uc->mu_fsuid != tmp_la->la_uid) &&
1993                     !mdd_capable(uc, CFS_CAP_FOWNER))
1994                         RETURN(-EPERM);
1995         }
1996 #endif
1997
1998         RETURN(0);
1999 }
2000
2001 static int mdd_open(const struct lu_env *env, struct md_object *obj,
2002                     int flags)
2003 {
2004         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2005         int rc = 0;
2006
2007         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2008
2009         rc = mdd_open_sanity_check(env, mdd_obj, flags);
2010         if (rc == 0)
2011                 mdd_obj->mod_count++;
2012
2013         mdd_write_unlock(env, mdd_obj);
2014         return rc;
2015 }
2016
2017 /* return md_attr back,
2018  * if it is last unlink then return lov ea + llog cookie*/
2019 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
2020                     struct md_attr *ma)
2021 {
2022         int rc = 0;
2023         ENTRY;
2024
2025         if (S_ISREG(mdd_object_type(obj))) {
2026                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2027                  * Caller must be ready for that. */
2028
2029                 rc = __mdd_lmm_get(env, obj, ma);
2030                 if ((ma->ma_valid & MA_LOV))
2031                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2032                                             obj, ma);
2033         }
2034         RETURN(rc);
2035 }
2036
2037 /*
2038  * No permission check is needed.
2039  */
2040 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2041                      struct md_attr *ma)
2042 {
2043         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2044         struct mdd_device *mdd = mdo2mdd(obj);
2045         struct thandle    *handle;
2046         int rc;
2047         int reset = 1;
2048
2049 #ifdef HAVE_QUOTA_SUPPORT
2050         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2051         struct mds_obd *mds = &obd->u.mds;
2052         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2053         int quota_opc = 0;
2054 #endif
2055         ENTRY;
2056
2057         rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
2058         if (rc)
2059                 RETURN(rc);
2060         handle = mdd_trans_start(env, mdo2mdd(obj));
2061         if (IS_ERR(handle))
2062                 RETURN(PTR_ERR(handle));
2063
2064         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2065         /* release open count */
2066         mdd_obj->mod_count --;
2067
2068         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2069                 /* remove link to object from orphan index */
2070                 rc = __mdd_orphan_del(env, mdd_obj, handle);
2071                 if (rc == 0) {
2072                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2073                                "list, OSS objects to be destroyed.\n",
2074                                PFID(mdd_object_fid(mdd_obj)));
2075                 } else {
2076                         CERROR("Object "DFID" can not be deleted from orphan "
2077                                 "list, maybe cause OST objects can not be "
2078                                 "destroyed (err: %d).\n",
2079                                 PFID(mdd_object_fid(mdd_obj)), rc);
2080                         /* If object was not deleted from orphan list, do not
2081                          * destroy OSS objects, which will be done when next
2082                          * recovery. */
2083                         GOTO(out, rc);
2084                 }
2085         }
2086
2087         rc = mdd_iattr_get(env, mdd_obj, ma);
2088         /* Object maybe not in orphan list originally, it is rare case for
2089          * mdd_finish_unlink() failure. */
2090         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
2091 #ifdef HAVE_QUOTA_SUPPORT
2092                 if (mds->mds_quota) {
2093                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2094                         mdd_quota_wrapper(&ma->ma_attr, qids);
2095                 }
2096 #endif
2097                 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2098                 if (ma->ma_valid & MA_FLAGS &&
2099                     ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2100                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2101                 } else {
2102                         rc = mdd_object_kill(env, mdd_obj, ma);
2103                                 if (rc == 0)
2104                                         reset = 0;
2105                 }
2106
2107                 if (rc != 0)
2108                         CERROR("Error when prepare to delete Object "DFID" , "
2109                                "which will cause OST objects can not be "
2110                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
2111         }
2112         EXIT;
2113
2114 out:
2115         if (reset)
2116                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2117
2118         mdd_write_unlock(env, mdd_obj);
2119         mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
2120 #ifdef HAVE_QUOTA_SUPPORT
2121         if (quota_opc)
2122                 /* Trigger dqrel on the owner of child. If failed,
2123                  * the next call for lquota_chkquota will process it */
2124                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2125                               quota_opc);
2126 #endif
2127         return rc;
2128 }
2129
2130 /*
2131  * Permission check is done when open,
2132  * no need check again.
2133  */
2134 static int mdd_readpage_sanity_check(const struct lu_env *env,
2135                                      struct mdd_object *obj)
2136 {
2137         struct dt_object *next = mdd_object_child(obj);
2138         int rc;
2139         ENTRY;
2140
2141         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2142                 rc = 0;
2143         else
2144                 rc = -ENOTDIR;
2145
2146         RETURN(rc);
2147 }
2148
2149 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2150                               int first, void *area, int nob,
2151                               const struct dt_it_ops *iops, struct dt_it *it,
2152                               __u64 *start, __u64 *end,
2153                               struct lu_dirent **last, __u32 attr)
2154 {
2155         int                     result;
2156         __u64                   hash = 0;
2157         struct lu_dirent       *ent;
2158
2159         if (first) {
2160                 memset(area, 0, sizeof (struct lu_dirpage));
2161                 area += sizeof (struct lu_dirpage);
2162                 nob  -= sizeof (struct lu_dirpage);
2163         }
2164
2165         ent  = area;
2166         do {
2167                 int    len;
2168                 int    recsize;
2169
2170                 len  = iops->key_size(env, it);
2171
2172                 /* IAM iterator can return record with zero len. */
2173                 if (len == 0)
2174                         goto next;
2175
2176                 hash = iops->store(env, it);
2177                 if (unlikely(first)) {
2178                         first = 0;
2179                         *start = hash;
2180                 }
2181
2182                 /* calculate max space required for lu_dirent */
2183                 recsize = lu_dirent_calc_size(len, attr);
2184
2185                 if (nob >= recsize) {
2186                         result = iops->rec(env, it, ent, attr);
2187                         if (result == -ESTALE)
2188                                 goto next;
2189                         if (result != 0)
2190                                 goto out;
2191
2192                         /* osd might not able to pack all attributes,
2193                          * so recheck rec length */
2194                         recsize = le16_to_cpu(ent->lde_reclen);
2195                 } else {
2196                         /*
2197                          * record doesn't fit into page, enlarge previous one.
2198                          */
2199                         if (*last) {
2200                                 (*last)->lde_reclen =
2201                                         cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
2202                                                         nob);
2203                                 result = 0;
2204                         } else
2205                                 result = -EINVAL;
2206
2207                         goto out;
2208                 }
2209                 *last = ent;
2210                 ent = (void *)ent + recsize;
2211                 nob -= recsize;
2212
2213 next:
2214                 result = iops->next(env, it);
2215                 if (result == -ESTALE)
2216                         goto next;
2217         } while (result == 0);
2218
2219 out:
2220         *end = hash;
2221         return result;
2222 }
2223
2224 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2225                           const struct lu_rdpg *rdpg)
2226 {
2227         struct dt_it      *it;
2228         struct dt_object  *next = mdd_object_child(obj);
2229         const struct dt_it_ops  *iops;
2230         struct page       *pg;
2231         struct lu_dirent  *last = NULL;
2232         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2233         int i;
2234         int rc;
2235         int nob;
2236         __u64 hash_start;
2237         __u64 hash_end = 0;
2238
2239         LASSERT(rdpg->rp_pages != NULL);
2240         LASSERT(next->do_index_ops != NULL);
2241
2242         if (rdpg->rp_count <= 0)
2243                 return -EFAULT;
2244
2245         /*
2246          * iterate through directory and fill pages from @rdpg
2247          */
2248         iops = &next->do_index_ops->dio_it;
2249         it = iops->init(env, next, mdd_object_capa(env, obj));
2250         if (IS_ERR(it))
2251                 return PTR_ERR(it);
2252
2253         rc = iops->load(env, it, rdpg->rp_hash);
2254
2255         if (rc == 0){
2256                 /*
2257                  * Iterator didn't find record with exactly the key requested.
2258                  *
2259                  * It is currently either
2260                  *
2261                  *     - positioned above record with key less than
2262                  *     requested---skip it.
2263                  *
2264                  *     - or not positioned at all (is in IAM_IT_SKEWED
2265                  *     state)---position it on the next item.
2266                  */
2267                 rc = iops->next(env, it);
2268         } else if (rc > 0)
2269                 rc = 0;
2270
2271         /*
2272          * At this point and across for-loop:
2273          *
2274          *  rc == 0 -> ok, proceed.
2275          *  rc >  0 -> end of directory.
2276          *  rc <  0 -> error.
2277          */
2278         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2279              i++, nob -= CFS_PAGE_SIZE) {
2280                 LASSERT(i < rdpg->rp_npages);
2281                 pg = rdpg->rp_pages[i];
2282                 rc = mdd_dir_page_build(env, mdd, !i, cfs_kmap(pg),
2283                                         min_t(int, nob, CFS_PAGE_SIZE), iops,
2284                                         it, &hash_start, &hash_end, &last,
2285                                         rdpg->rp_attrs);
2286                 if (rc != 0 || i == rdpg->rp_npages - 1) {
2287                         if (last)
2288                                 last->lde_reclen = 0;
2289                 }
2290                 cfs_kunmap(pg);
2291         }
2292         if (rc > 0) {
2293                 /*
2294                  * end of directory.
2295                  */
2296                 hash_end = DIR_END_OFF;
2297                 rc = 0;
2298         }
2299         if (rc == 0) {
2300                 struct lu_dirpage *dp;
2301
2302                 dp = cfs_kmap(rdpg->rp_pages[0]);
2303                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2304                 dp->ldp_hash_end   = cpu_to_le64(hash_end);
2305                 if (i == 0)
2306                         /*
2307                          * No pages were processed, mark this.
2308                          */
2309                         dp->ldp_flags |= LDF_EMPTY;
2310
2311                 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2312                 cfs_kunmap(rdpg->rp_pages[0]);
2313         }
2314         iops->put(env, it);
2315         iops->fini(env, it);
2316
2317         return rc;
2318 }
2319
2320 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2321                  const struct lu_rdpg *rdpg)
2322 {
2323         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2324         int rc;
2325         ENTRY;
2326
2327         LASSERT(mdd_object_exists(mdd_obj));
2328
2329         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2330         rc = mdd_readpage_sanity_check(env, mdd_obj);
2331         if (rc)
2332                 GOTO(out_unlock, rc);
2333
2334         if (mdd_is_dead_obj(mdd_obj)) {
2335                 struct page *pg;
2336                 struct lu_dirpage *dp;
2337
2338                 /*
2339                  * According to POSIX, please do not return any entry to client:
2340                  * even dot and dotdot should not be returned.
2341                  */
2342                 CWARN("readdir from dead object: "DFID"\n",
2343                         PFID(mdd_object_fid(mdd_obj)));
2344
2345                 if (rdpg->rp_count <= 0)
2346                         GOTO(out_unlock, rc = -EFAULT);
2347                 LASSERT(rdpg->rp_pages != NULL);
2348
2349                 pg = rdpg->rp_pages[0];
2350                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2351                 memset(dp, 0 , sizeof(struct lu_dirpage));
2352                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2353                 dp->ldp_hash_end   = cpu_to_le64(DIR_END_OFF);
2354                 dp->ldp_flags |= LDF_EMPTY;
2355                 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2356                 cfs_kunmap(pg);
2357                 GOTO(out_unlock, rc = 0);
2358         }
2359
2360         rc = __mdd_readpage(env, mdd_obj, rdpg);
2361
2362         EXIT;
2363 out_unlock:
2364         mdd_read_unlock(env, mdd_obj);
2365         return rc;
2366 }
2367
2368 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2369 {
2370         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2371         struct dt_object *next;
2372
2373         LASSERT(mdd_object_exists(mdd_obj));
2374         next = mdd_object_child(mdd_obj);
2375         return next->do_ops->do_object_sync(env, next);
2376 }
2377
2378 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2379                                         struct md_object *obj)
2380 {
2381         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2382
2383         LASSERT(mdd_object_exists(mdd_obj));
2384         return do_version_get(env, mdd_object_child(mdd_obj));
2385 }
2386
2387 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2388                             dt_obj_version_t version)
2389 {
2390         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2391
2392         LASSERT(mdd_object_exists(mdd_obj));
2393         do_version_set(env, mdd_object_child(mdd_obj), version);
2394 }
2395
2396 const struct md_object_operations mdd_obj_ops = {
2397         .moo_permission    = mdd_permission,
2398         .moo_attr_get      = mdd_attr_get,
2399         .moo_attr_set      = mdd_attr_set,
2400         .moo_xattr_get     = mdd_xattr_get,
2401         .moo_xattr_set     = mdd_xattr_set,
2402         .moo_xattr_list    = mdd_xattr_list,
2403         .moo_xattr_del     = mdd_xattr_del,
2404         .moo_object_create = mdd_object_create,
2405         .moo_ref_add       = mdd_ref_add,
2406         .moo_ref_del       = mdd_ref_del,
2407         .moo_open          = mdd_open,
2408         .moo_close         = mdd_close,
2409         .moo_readpage      = mdd_readpage,
2410         .moo_readlink      = mdd_readlink,
2411         .moo_capa_get      = mdd_capa_get,
2412         .moo_object_sync   = mdd_object_sync,
2413         .moo_version_get   = mdd_version_get,
2414         .moo_version_set   = mdd_version_set,
2415         .moo_path          = mdd_path,
2416 };