Whamcloud - gitweb
49e8f6664e50d1e7f923380bbc22049c3699116b
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_object.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49 #ifdef HAVE_EXT4_LDISKFS
50 #include <ldiskfs/ldiskfs_jbd2.h>
51 #else
52 #include <linux/jbd.h>
53 #endif
54 #include <obd.h>
55 #include <obd_class.h>
56 #include <obd_support.h>
57 #include <lprocfs_status.h>
58 /* fid_be_cpu(), fid_cpu_to_be(). */
59 #include <lustre_fid.h>
60
61 #include <lustre_param.h>
62 #ifdef HAVE_EXT4_LDISKFS
63 #include <ldiskfs/ldiskfs.h>
64 #else
65 #include <linux/ldiskfs_fs.h>
66 #endif
67 #include <lustre_mds.h>
68 #include <lustre/lustre_idl.h>
69
70 #include "mdd_internal.h"
71
72 static const struct lu_object_operations mdd_lu_obj_ops;
73
74 static int mdd_xattr_get(const struct lu_env *env,
75                          struct md_object *obj, struct lu_buf *buf,
76                          const char *name);
77
78 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
79                  void **data)
80 {
81         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
82                  PFID(mdd_object_fid(obj)));
83         mdo_data_get(env, obj, data);
84         return 0;
85 }
86
87 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
88                struct lu_attr *la, struct lustre_capa *capa)
89 {
90         LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
91                  PFID(mdd_object_fid(obj)));
92         return mdo_attr_get(env, obj, la, capa);
93 }
94
95 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
96 {
97         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
98
99         if (flags & LUSTRE_APPEND_FL)
100                 obj->mod_flags |= APPEND_OBJ;
101
102         if (flags & LUSTRE_IMMUTABLE_FL)
103                 obj->mod_flags |= IMMUTE_OBJ;
104 }
105
106 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
107 {
108         struct mdd_thread_info *info;
109
110         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
111         LASSERT(info != NULL);
112         return info;
113 }
114
115 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
116 {
117         struct lu_buf *buf;
118
119         buf = &mdd_env_info(env)->mti_buf;
120         buf->lb_buf = area;
121         buf->lb_len = len;
122         return buf;
123 }
124
125 void mdd_buf_put(struct lu_buf *buf)
126 {
127         if (buf == NULL || buf->lb_buf == NULL)
128                 return;
129         if (buf->lb_vmalloc)
130                 OBD_VFREE(buf->lb_buf, buf->lb_len);
131         else
132                 OBD_FREE(buf->lb_buf, buf->lb_len);
133         buf->lb_buf = NULL;
134         buf->lb_len = 0;
135 }
136
137 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
138                                        const void *area, ssize_t len)
139 {
140         struct lu_buf *buf;
141
142         buf = &mdd_env_info(env)->mti_buf;
143         buf->lb_buf = (void *)area;
144         buf->lb_len = len;
145         return buf;
146 }
147
148 #define BUF_VMALLOC_SIZE (CFS_PAGE_SIZE<<2) /* 16k */
149 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
150 {
151         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
152
153         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
154                 if (buf->lb_vmalloc)
155                         OBD_VFREE(buf->lb_buf, buf->lb_len);
156                 else
157                         OBD_FREE(buf->lb_buf, buf->lb_len);
158                 buf->lb_buf = NULL;
159         }
160         if (buf->lb_buf == NULL) {
161                 buf->lb_len = len;
162                 if (buf->lb_len <= BUF_VMALLOC_SIZE) {
163                         OBD_ALLOC(buf->lb_buf, buf->lb_len);
164                         buf->lb_vmalloc = 0;
165                 }
166                 if (buf->lb_buf == NULL) {
167                         OBD_VMALLOC(buf->lb_buf, buf->lb_len);
168                         buf->lb_vmalloc = 1;
169                 }
170                 if (buf->lb_buf == NULL)
171                         buf->lb_len = 0;
172         }
173         return buf;
174 }
175
176 /** Increase the size of the \a mti_big_buf.
177  * preserves old data in buffer
178  * old buffer remains unchanged on error
179  * \retval 0 or -ENOMEM
180  */
181 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
182 {
183         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
184         struct lu_buf buf;
185
186         LASSERT(len >= oldbuf->lb_len);
187         if (len > BUF_VMALLOC_SIZE) {
188                 OBD_VMALLOC(buf.lb_buf, len);
189                 buf.lb_vmalloc = 1;
190         } else {
191                 OBD_ALLOC(buf.lb_buf, len);
192                 buf.lb_vmalloc = 0;
193         }
194         if (buf.lb_buf == NULL)
195                 return -ENOMEM;
196
197         buf.lb_len = len;
198         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
199
200         if (oldbuf->lb_vmalloc)
201                 OBD_VFREE(oldbuf->lb_buf, oldbuf->lb_len);
202         else
203                 OBD_FREE(oldbuf->lb_buf, oldbuf->lb_len);
204
205         memcpy(oldbuf, &buf, sizeof(buf));
206
207         return 0;
208 }
209
210 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
211                                        struct mdd_device *mdd)
212 {
213         struct mdd_thread_info *mti = mdd_env_info(env);
214         int                     max_cookie_size;
215
216         max_cookie_size = mdd_lov_cookiesize(env, mdd);
217         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
218                 if (mti->mti_max_cookie)
219                         OBD_FREE(mti->mti_max_cookie, mti->mti_max_cookie_size);
220                 mti->mti_max_cookie = NULL;
221                 mti->mti_max_cookie_size = 0;
222         }
223         if (unlikely(mti->mti_max_cookie == NULL)) {
224                 OBD_ALLOC(mti->mti_max_cookie, max_cookie_size);
225                 if (likely(mti->mti_max_cookie != NULL))
226                         mti->mti_max_cookie_size = max_cookie_size;
227         }
228         if (likely(mti->mti_max_cookie != NULL))
229                 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
230         return mti->mti_max_cookie;
231 }
232
233 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
234                                    struct mdd_device *mdd)
235 {
236         struct mdd_thread_info *mti = mdd_env_info(env);
237         int                     max_lmm_size;
238
239         max_lmm_size = mdd_lov_mdsize(env, mdd);
240         if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
241                 if (mti->mti_max_lmm)
242                         OBD_FREE(mti->mti_max_lmm, mti->mti_max_lmm_size);
243                 mti->mti_max_lmm = NULL;
244                 mti->mti_max_lmm_size = 0;
245         }
246         if (unlikely(mti->mti_max_lmm == NULL)) {
247                 OBD_ALLOC(mti->mti_max_lmm, max_lmm_size);
248                 if (unlikely(mti->mti_max_lmm != NULL))
249                         mti->mti_max_lmm_size = max_lmm_size;
250         }
251         return mti->mti_max_lmm;
252 }
253
254 struct lu_object *mdd_object_alloc(const struct lu_env *env,
255                                    const struct lu_object_header *hdr,
256                                    struct lu_device *d)
257 {
258         struct mdd_object *mdd_obj;
259
260         OBD_ALLOC_PTR(mdd_obj);
261         if (mdd_obj != NULL) {
262                 struct lu_object *o;
263
264                 o = mdd2lu_obj(mdd_obj);
265                 lu_object_init(o, NULL, d);
266                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
267                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
268                 mdd_obj->mod_count = 0;
269                 o->lo_ops = &mdd_lu_obj_ops;
270                 return o;
271         } else {
272                 return NULL;
273         }
274 }
275
276 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
277                            const struct lu_object_conf *unused)
278 {
279         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
280         struct mdd_object *mdd_obj = lu2mdd_obj(o);
281         struct lu_object  *below;
282         struct lu_device  *under;
283         ENTRY;
284
285         mdd_obj->mod_cltime = 0;
286         under = &d->mdd_child->dd_lu_dev;
287         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
288         mdd_pdlock_init(mdd_obj);
289         if (below == NULL)
290                 RETURN(-ENOMEM);
291
292         lu_object_add(o, below);
293
294         RETURN(0);
295 }
296
297 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
298 {
299         if (lu_object_exists(o))
300                 return mdd_get_flags(env, lu2mdd_obj(o));
301         else
302                 return 0;
303 }
304
305 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
306 {
307         struct mdd_object *mdd = lu2mdd_obj(o);
308
309         lu_object_fini(o);
310         OBD_FREE_PTR(mdd);
311 }
312
313 static int mdd_object_print(const struct lu_env *env, void *cookie,
314                             lu_printer_t p, const struct lu_object *o)
315 {
316         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
317         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
318                     "valid=%x, cltime="LPU64", flags=%lx)",
319                     mdd, mdd->mod_count, mdd->mod_valid,
320                     mdd->mod_cltime, mdd->mod_flags);
321 }
322
323 static const struct lu_object_operations mdd_lu_obj_ops = {
324         .loo_object_init    = mdd_object_init,
325         .loo_object_start   = mdd_object_start,
326         .loo_object_free    = mdd_object_free,
327         .loo_object_print   = mdd_object_print,
328 };
329
330 struct mdd_object *mdd_object_find(const struct lu_env *env,
331                                    struct mdd_device *d,
332                                    const struct lu_fid *f)
333 {
334         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
335 }
336
337 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
338                         const char *path, struct lu_fid *fid)
339 {
340         struct lu_buf *buf;
341         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
342         struct mdd_object *obj;
343         struct lu_name *lname = &mdd_env_info(env)->mti_name;
344         char *name;
345         int rc = 0;
346         ENTRY;
347
348         /* temp buffer for path element */
349         buf = mdd_buf_alloc(env, PATH_MAX);
350         if (buf->lb_buf == NULL)
351                 RETURN(-ENOMEM);
352
353         lname->ln_name = name = buf->lb_buf;
354         lname->ln_namelen = 0;
355         *f = mdd->mdd_root_fid;
356
357         while(1) {
358                 while (*path == '/')
359                         path++;
360                 if (*path == '\0')
361                         break;
362                 while (*path != '/' && *path != '\0') {
363                         *name = *path;
364                         path++;
365                         name++;
366                         lname->ln_namelen++;
367                 }
368
369                 *name = '\0';
370                 /* find obj corresponding to fid */
371                 obj = mdd_object_find(env, mdd, f);
372                 if (obj == NULL)
373                         GOTO(out, rc = -EREMOTE);
374                 if (IS_ERR(obj))
375                         GOTO(out, rc = -PTR_ERR(obj));
376                 /* get child fid from parent and name */
377                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
378                 mdd_object_put(env, obj);
379                 if (rc)
380                         break;
381
382                 name = buf->lb_buf;
383                 lname->ln_namelen = 0;
384         }
385
386         if (!rc)
387                 *fid = *f;
388 out:
389         RETURN(rc);
390 }
391
392 /** The maximum depth that fid2path() will search.
393  * This is limited only because we want to store the fids for
394  * historical path lookup purposes.
395  */
396 #define MAX_PATH_DEPTH 100
397
398 /** mdd_path() lookup structure. */
399 struct path_lookup_info {
400         __u64                pli_recno;        /**< history point */
401         __u64                pli_currec;       /**< current record */
402         struct lu_fid        pli_fid;
403         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
404         struct mdd_object   *pli_mdd_obj;
405         char                *pli_path;         /**< full path */
406         int                  pli_pathlen;
407         int                  pli_linkno;       /**< which hardlink to follow */
408         int                  pli_fidcount;     /**< number of \a pli_fids */
409 };
410
411 static int mdd_path_current(const struct lu_env *env,
412                             struct path_lookup_info *pli)
413 {
414         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
415         struct mdd_object *mdd_obj;
416         struct lu_buf     *buf = NULL;
417         struct link_ea_header *leh;
418         struct link_ea_entry  *lee;
419         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
420         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
421         char *ptr;
422         int reclen;
423         int rc;
424         ENTRY;
425
426         ptr = pli->pli_path + pli->pli_pathlen - 1;
427         *ptr = 0;
428         --ptr;
429         pli->pli_fidcount = 0;
430         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
431
432         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
433                 mdd_obj = mdd_object_find(env, mdd,
434                                           &pli->pli_fids[pli->pli_fidcount]);
435                 if (mdd_obj == NULL)
436                         GOTO(out, rc = -EREMOTE);
437                 if (IS_ERR(mdd_obj))
438                         GOTO(out, rc = -PTR_ERR(mdd_obj));
439                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
440                 if (rc <= 0) {
441                         mdd_object_put(env, mdd_obj);
442                         if (rc == -1)
443                                 rc = -EREMOTE;
444                         else if (rc == 0)
445                                 /* Do I need to error out here? */
446                                 rc = -ENOENT;
447                         GOTO(out, rc);
448                 }
449
450                 /* Get parent fid and object name */
451                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
452                 buf = mdd_links_get(env, mdd_obj);
453                 mdd_read_unlock(env, mdd_obj);
454                 mdd_object_put(env, mdd_obj);
455                 if (IS_ERR(buf))
456                         GOTO(out, rc = PTR_ERR(buf));
457
458                 leh = buf->lb_buf;
459                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
460                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
461
462                 /* If set, use link #linkno for path lookup, otherwise use
463                    link #0.  Only do this for the final path element. */
464                 if ((pli->pli_fidcount == 0) &&
465                     (pli->pli_linkno < leh->leh_reccount)) {
466                         int count;
467                         for (count = 0; count < pli->pli_linkno; count++) {
468                                 lee = (struct link_ea_entry *)
469                                      ((char *)lee + reclen);
470                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
471                         }
472                         if (pli->pli_linkno < leh->leh_reccount - 1)
473                                 /* indicate to user there are more links */
474                                 pli->pli_linkno++;
475                 }
476
477                 /* Pack the name in the end of the buffer */
478                 ptr -= tmpname->ln_namelen;
479                 if (ptr - 1 <= pli->pli_path)
480                         GOTO(out, rc = -EOVERFLOW);
481                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
482                 *(--ptr) = '/';
483
484                 /* Store the parent fid for historic lookup */
485                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
486                         GOTO(out, rc = -EOVERFLOW);
487                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
488         }
489
490         /* Verify that our path hasn't changed since we started the lookup.
491            Record the current index, and verify the path resolves to the
492            same fid. If it does, then the path is correct as of this index. */
493         cfs_spin_lock(&mdd->mdd_cl.mc_lock);
494         pli->pli_currec = mdd->mdd_cl.mc_index;
495         cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
496         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
497         if (rc) {
498                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
499                 GOTO (out, rc = -EAGAIN);
500         }
501         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
502                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
503                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
504                        PFID(&pli->pli_fid));
505                 GOTO(out, rc = -EAGAIN);
506         }
507         ptr++; /* skip leading / */
508         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
509
510         EXIT;
511 out:
512         if (buf && !IS_ERR(buf) && buf->lb_vmalloc)
513                 /* if we vmalloced a large buffer drop it */
514                 mdd_buf_put(buf);
515
516         return rc;
517 }
518
519 static int mdd_path_historic(const struct lu_env *env,
520                              struct path_lookup_info *pli)
521 {
522         return 0;
523 }
524
525 /* Returns the full path to this fid, as of changelog record recno. */
526 static int mdd_path(const struct lu_env *env, struct md_object *obj,
527                     char *path, int pathlen, __u64 *recno, int *linkno)
528 {
529         struct path_lookup_info *pli;
530         int tries = 3;
531         int rc = -EAGAIN;
532         ENTRY;
533
534         if (pathlen < 3)
535                 RETURN(-EOVERFLOW);
536
537         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
538                 path[0] = '\0';
539                 RETURN(0);
540         }
541
542         OBD_ALLOC_PTR(pli);
543         if (pli == NULL)
544                 RETURN(-ENOMEM);
545
546         pli->pli_mdd_obj = md2mdd_obj(obj);
547         pli->pli_recno = *recno;
548         pli->pli_path = path;
549         pli->pli_pathlen = pathlen;
550         pli->pli_linkno = *linkno;
551
552         /* Retry multiple times in case file is being moved */
553         while (tries-- && rc == -EAGAIN)
554                 rc = mdd_path_current(env, pli);
555
556         /* For historical path lookup, the current links may not have existed
557          * at "recno" time.  We must switch over to earlier links/parents
558          * by using the changelog records.  If the earlier parent doesn't
559          * exist, we must search back through the changelog to reconstruct
560          * its parents, then check if it exists, etc.
561          * We may ignore this problem for the initial implementation and
562          * state that an "original" hardlink must still exist for us to find
563          * historic path name. */
564         if (pli->pli_recno != -1) {
565                 rc = mdd_path_historic(env, pli);
566         } else {
567                 *recno = pli->pli_currec;
568                 /* Return next link index to caller */
569                 *linkno = pli->pli_linkno;
570         }
571
572         OBD_FREE_PTR(pli);
573
574         RETURN (rc);
575 }
576
577 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
578 {
579         struct lu_attr *la = &mdd_env_info(env)->mti_la;
580         int rc;
581
582         ENTRY;
583         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
584         if (rc == 0) {
585                 mdd_flags_xlate(obj, la->la_flags);
586                 if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
587                         obj->mod_flags |= MNLINK_OBJ;
588         }
589         RETURN(rc);
590 }
591
592 /* get only inode attributes */
593 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
594                   struct md_attr *ma)
595 {
596         int rc = 0;
597         ENTRY;
598
599         if (ma->ma_valid & MA_INODE)
600                 RETURN(0);
601
602         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
603                           mdd_object_capa(env, mdd_obj));
604         if (rc == 0)
605                 ma->ma_valid |= MA_INODE;
606         RETURN(rc);
607 }
608
609 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm,
610                        int *size)
611 {
612         struct lov_desc *ldesc;
613         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
614         ENTRY;
615
616         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
617         LASSERT(ldesc != NULL);
618         LASSERT(size != NULL);
619
620         if (!lmm) {
621                 *size = 0;
622                 RETURN(0);
623         }
624
625         lmm->lmm_magic = LOV_MAGIC_V1;
626         lmm->lmm_object_seq = LOV_OBJECT_GROUP_DEFAULT;
627         lmm->lmm_pattern = ldesc->ld_pattern;
628         lmm->lmm_stripe_size = ldesc->ld_default_stripe_size;
629         lmm->lmm_stripe_count = ldesc->ld_default_stripe_count;
630         *size = sizeof(struct lov_mds_md);
631
632         RETURN(sizeof(struct lov_mds_md));
633 }
634
635 /* get lov EA only */
636 static int __mdd_lmm_get(const struct lu_env *env,
637                          struct mdd_object *mdd_obj, struct md_attr *ma)
638 {
639         int rc;
640         ENTRY;
641
642         if (ma->ma_valid & MA_LOV)
643                 RETURN(0);
644
645         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
646                         XATTR_NAME_LOV);
647         if (rc == 0 && ma->ma_need & MA_LOV_DEF)
648                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm,
649                                         &ma->ma_lmm_size);
650         if (rc > 0) {
651                 ma->ma_valid |= MA_LOV;
652                 rc = 0;
653         }
654         RETURN(rc);
655 }
656
657 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
658                        struct md_attr *ma)
659 {
660         int rc;
661         ENTRY;
662
663         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
664         rc = __mdd_lmm_get(env, mdd_obj, ma);
665         mdd_read_unlock(env, mdd_obj);
666         RETURN(rc);
667 }
668
669 /* get lmv EA only*/
670 static int __mdd_lmv_get(const struct lu_env *env,
671                          struct mdd_object *mdd_obj, struct md_attr *ma)
672 {
673         int rc;
674         ENTRY;
675
676         if (ma->ma_valid & MA_LMV)
677                 RETURN(0);
678
679         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
680                         XATTR_NAME_LMV);
681         if (rc > 0) {
682                 ma->ma_valid |= MA_LMV;
683                 rc = 0;
684         }
685         RETURN(rc);
686 }
687
688 static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
689                          struct md_attr *ma)
690 {
691         struct mdd_thread_info *info = mdd_env_info(env);
692         struct lustre_mdt_attrs *lma =
693                                  (struct lustre_mdt_attrs *)info->mti_xattr_buf;
694         int lma_size;
695         int rc;
696         ENTRY;
697
698         /* If all needed data are already valid, nothing to do */
699         if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
700             (ma->ma_need & (MA_HSM | MA_SOM)))
701                 RETURN(0);
702
703         /* Read LMA from disk EA */
704         lma_size = sizeof(info->mti_xattr_buf);
705         rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
706         if (rc <= 0)
707                 RETURN(rc);
708
709         /* Useless to check LMA incompatibility because this is already done in
710          * osd_ea_fid_get(), and this will fail long before this code is
711          * called.
712          * So, if we are here, LMA is compatible.
713          */
714
715         lustre_lma_swab(lma);
716
717         /* Swab and copy LMA */
718         if (ma->ma_need & MA_HSM) {
719                 if (lma->lma_compat & LMAC_HSM)
720                         ma->ma_hsm_flags = lma->lma_flags & HSM_FLAGS_MASK;
721                 else
722                         ma->ma_hsm_flags = 0;
723                 ma->ma_valid |= MA_HSM;
724         }
725
726         /* Copy SOM */
727         if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
728                 LASSERT(ma->ma_som != NULL);
729                 ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
730                 ma->ma_som->msd_size    = lma->lma_som_size;
731                 ma->ma_som->msd_blocks  = lma->lma_som_blocks;
732                 ma->ma_som->msd_mountid = lma->lma_som_mountid;
733                 ma->ma_valid |= MA_SOM;
734         }
735
736         RETURN(0);
737 }
738
739 static int mdd_attr_get_internal(const struct lu_env *env,
740                                  struct mdd_object *mdd_obj,
741                                  struct md_attr *ma)
742 {
743         int rc = 0;
744         ENTRY;
745
746         if (ma->ma_need & MA_INODE)
747                 rc = mdd_iattr_get(env, mdd_obj, ma);
748
749         if (rc == 0 && ma->ma_need & MA_LOV) {
750                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
751                     S_ISDIR(mdd_object_type(mdd_obj)))
752                         rc = __mdd_lmm_get(env, mdd_obj, ma);
753         }
754         if (rc == 0 && ma->ma_need & MA_LMV) {
755                 if (S_ISDIR(mdd_object_type(mdd_obj)))
756                         rc = __mdd_lmv_get(env, mdd_obj, ma);
757         }
758         if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
759                 if (S_ISREG(mdd_object_type(mdd_obj)))
760                         rc = __mdd_lma_get(env, mdd_obj, ma);
761         }
762 #ifdef CONFIG_FS_POSIX_ACL
763         if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
764                 if (S_ISDIR(mdd_object_type(mdd_obj)))
765                         rc = mdd_def_acl_get(env, mdd_obj, ma);
766         }
767 #endif
768         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
769                rc, ma->ma_valid);
770         RETURN(rc);
771 }
772
773 int mdd_attr_get_internal_locked(const struct lu_env *env,
774                                  struct mdd_object *mdd_obj, struct md_attr *ma)
775 {
776         int rc;
777         int needlock = ma->ma_need &
778                        (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM);
779
780         if (needlock)
781                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
782         rc = mdd_attr_get_internal(env, mdd_obj, ma);
783         if (needlock)
784                 mdd_read_unlock(env, mdd_obj);
785         return rc;
786 }
787
788 /*
789  * No permission check is needed.
790  */
791 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
792                         struct md_attr *ma)
793 {
794         struct mdd_object *mdd_obj = md2mdd_obj(obj);
795         int                rc;
796
797         ENTRY;
798         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
799         RETURN(rc);
800 }
801
802 /*
803  * No permission check is needed.
804  */
805 static int mdd_xattr_get(const struct lu_env *env,
806                          struct md_object *obj, struct lu_buf *buf,
807                          const char *name)
808 {
809         struct mdd_object *mdd_obj = md2mdd_obj(obj);
810         int rc;
811
812         ENTRY;
813
814         LASSERT(mdd_object_exists(mdd_obj));
815
816         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
817         rc = mdo_xattr_get(env, mdd_obj, buf, name,
818                            mdd_object_capa(env, mdd_obj));
819         mdd_read_unlock(env, mdd_obj);
820
821         RETURN(rc);
822 }
823
824 /*
825  * Permission check is done when open,
826  * no need check again.
827  */
828 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
829                         struct lu_buf *buf)
830 {
831         struct mdd_object *mdd_obj = md2mdd_obj(obj);
832         struct dt_object  *next;
833         loff_t             pos = 0;
834         int                rc;
835         ENTRY;
836
837         LASSERT(mdd_object_exists(mdd_obj));
838
839         next = mdd_object_child(mdd_obj);
840         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
841         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
842                                          mdd_object_capa(env, mdd_obj));
843         mdd_read_unlock(env, mdd_obj);
844         RETURN(rc);
845 }
846
847 /*
848  * No permission check is needed.
849  */
850 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
851                           struct lu_buf *buf)
852 {
853         struct mdd_object *mdd_obj = md2mdd_obj(obj);
854         int rc;
855
856         ENTRY;
857
858         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
859         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
860         mdd_read_unlock(env, mdd_obj);
861
862         RETURN(rc);
863 }
864
865 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
866                                struct mdd_object *c, struct md_attr *ma,
867                                struct thandle *handle,
868                                const struct md_op_spec *spec)
869 {
870         struct lu_attr *attr = &ma->ma_attr;
871         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
872         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
873         const struct dt_index_features *feat = spec->sp_feat;
874         int rc;
875         ENTRY;
876
877         if (!mdd_object_exists(c)) {
878                 struct dt_object *next = mdd_object_child(c);
879                 LASSERT(next);
880
881                 if (feat != &dt_directory_features && feat != NULL)
882                         dof->dof_type = DFT_INDEX;
883                 else
884                         dof->dof_type = dt_mode_to_dft(attr->la_mode);
885
886                 dof->u.dof_idx.di_feat = feat;
887
888                 /* @hint will be initialized by underlying device. */
889                 next->do_ops->do_ah_init(env, hint,
890                                          p ? mdd_object_child(p) : NULL,
891                                          attr->la_mode & S_IFMT);
892
893                 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
894                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
895         } else
896                 rc = -EEXIST;
897
898         RETURN(rc);
899 }
900
901 /**
902  * Make sure the ctime is increased only.
903  */
904 static inline int mdd_attr_check(const struct lu_env *env,
905                                  struct mdd_object *obj,
906                                  struct lu_attr *attr)
907 {
908         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
909         int rc;
910         ENTRY;
911
912         if (attr->la_valid & LA_CTIME) {
913                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
914                 if (rc)
915                         RETURN(rc);
916
917                 if (attr->la_ctime < tmp_la->la_ctime)
918                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
919                 else if (attr->la_valid == LA_CTIME &&
920                          attr->la_ctime == tmp_la->la_ctime)
921                         attr->la_valid &= ~LA_CTIME;
922         }
923         RETURN(0);
924 }
925
926 int mdd_attr_set_internal(const struct lu_env *env,
927                           struct mdd_object *obj,
928                           struct lu_attr *attr,
929                           struct thandle *handle,
930                           int needacl)
931 {
932         int rc;
933         ENTRY;
934
935         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
936 #ifdef CONFIG_FS_POSIX_ACL
937         if (!rc && (attr->la_valid & LA_MODE) && needacl)
938                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
939 #endif
940         RETURN(rc);
941 }
942
943 int mdd_attr_check_set_internal(const struct lu_env *env,
944                                 struct mdd_object *obj,
945                                 struct lu_attr *attr,
946                                 struct thandle *handle,
947                                 int needacl)
948 {
949         int rc;
950         ENTRY;
951
952         rc = mdd_attr_check(env, obj, attr);
953         if (rc)
954                 RETURN(rc);
955
956         if (attr->la_valid)
957                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
958         RETURN(rc);
959 }
960
961 static int mdd_attr_set_internal_locked(const struct lu_env *env,
962                                         struct mdd_object *obj,
963                                         struct lu_attr *attr,
964                                         struct thandle *handle,
965                                         int needacl)
966 {
967         int rc;
968         ENTRY;
969
970         needacl = needacl && (attr->la_valid & LA_MODE);
971         if (needacl)
972                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
973         rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
974         if (needacl)
975                 mdd_write_unlock(env, obj);
976         RETURN(rc);
977 }
978
979 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
980                                        struct mdd_object *obj,
981                                        struct lu_attr *attr,
982                                        struct thandle *handle,
983                                        int needacl)
984 {
985         int rc;
986         ENTRY;
987
988         needacl = needacl && (attr->la_valid & LA_MODE);
989         if (needacl)
990                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
991         rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
992         if (needacl)
993                 mdd_write_unlock(env, obj);
994         RETURN(rc);
995 }
996
997 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
998                     const struct lu_buf *buf, const char *name,
999                     int fl, struct thandle *handle)
1000 {
1001         struct lustre_capa *capa = mdd_object_capa(env, obj);
1002         int rc = -EINVAL;
1003         ENTRY;
1004
1005         if (buf->lb_buf && buf->lb_len > 0)
1006                 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
1007         else if (buf->lb_buf == NULL && buf->lb_len == 0)
1008                 rc = mdo_xattr_del(env, obj, name, handle, capa);
1009
1010         RETURN(rc);
1011 }
1012
1013 /*
1014  * This gives the same functionality as the code between
1015  * sys_chmod and inode_setattr
1016  * chown_common and inode_setattr
1017  * utimes and inode_setattr
1018  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
1019  */
1020 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
1021                         struct lu_attr *la, const struct md_attr *ma)
1022 {
1023         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
1024         struct md_ucred  *uc;
1025         int               rc;
1026         ENTRY;
1027
1028         if (!la->la_valid)
1029                 RETURN(0);
1030
1031         /* Do not permit change file type */
1032         if (la->la_valid & LA_TYPE)
1033                 RETURN(-EPERM);
1034
1035         /* They should not be processed by setattr */
1036         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
1037                 RETURN(-EPERM);
1038
1039         /* export destroy does not have ->le_ses, but we may want
1040          * to drop LUSTRE_SOM_FL. */
1041         if (!env->le_ses)
1042                 RETURN(0);
1043
1044         uc = md_ucred(env);
1045
1046         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1047         if (rc)
1048                 RETURN(rc);
1049
1050         if (la->la_valid == LA_CTIME) {
1051                 if (!(ma->ma_attr_flags & MDS_PERM_BYPASS))
1052                         /* This is only for set ctime when rename's source is
1053                          * on remote MDS. */
1054                         rc = mdd_may_delete(env, NULL, obj,
1055                                             (struct md_attr *)ma, 1, 0);
1056                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
1057                         la->la_valid &= ~LA_CTIME;
1058                 RETURN(rc);
1059         }
1060
1061         if (la->la_valid == LA_ATIME) {
1062                 /* This is atime only set for read atime update on close. */
1063                 if (la->la_atime <= tmp_la->la_atime +
1064                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff)
1065                         la->la_valid &= ~LA_ATIME;
1066                 RETURN(0);
1067         }
1068
1069         /* Check if flags change. */
1070         if (la->la_valid & LA_FLAGS) {
1071                 unsigned int oldflags = 0;
1072                 unsigned int newflags = la->la_flags &
1073                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1074
1075                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1076                     !mdd_capable(uc, CFS_CAP_FOWNER))
1077                         RETURN(-EPERM);
1078
1079                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1080                  * only be changed by the relevant capability. */
1081                 if (mdd_is_immutable(obj))
1082                         oldflags |= LUSTRE_IMMUTABLE_FL;
1083                 if (mdd_is_append(obj))
1084                         oldflags |= LUSTRE_APPEND_FL;
1085                 if ((oldflags ^ newflags) &&
1086                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1087                         RETURN(-EPERM);
1088
1089                 if (!S_ISDIR(tmp_la->la_mode))
1090                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1091         }
1092
1093         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1094             (la->la_valid & ~LA_FLAGS) &&
1095             !(ma->ma_attr_flags & MDS_PERM_BYPASS))
1096                 RETURN(-EPERM);
1097
1098         /* Check for setting the obj time. */
1099         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1100             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1101                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1102                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
1103                         rc = mdd_permission_internal_locked(env, obj, tmp_la,
1104                                                             MAY_WRITE,
1105                                                             MOR_TGT_CHILD);
1106                         if (rc)
1107                                 RETURN(rc);
1108                 }
1109         }
1110
1111         /* Make sure a caller can chmod. */
1112         if (la->la_valid & LA_MODE) {
1113                 /* Bypass la_vaild == LA_MODE,
1114                  * this is for changing file with SUID or SGID. */
1115                 if ((la->la_valid & ~LA_MODE) &&
1116                     !(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
1117                     (uc->mu_fsuid != tmp_la->la_uid) &&
1118                     !mdd_capable(uc, CFS_CAP_FOWNER))
1119                         RETURN(-EPERM);
1120
1121                 if (la->la_mode == (cfs_umode_t) -1)
1122                         la->la_mode = tmp_la->la_mode;
1123                 else
1124                         la->la_mode = (la->la_mode & S_IALLUGO) |
1125                                       (tmp_la->la_mode & ~S_IALLUGO);
1126
1127                 /* Also check the setgid bit! */
1128                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1129                                        la->la_gid : tmp_la->la_gid) &&
1130                     !mdd_capable(uc, CFS_CAP_FSETID))
1131                         la->la_mode &= ~S_ISGID;
1132         } else {
1133                la->la_mode = tmp_la->la_mode;
1134         }
1135
1136         /* Make sure a caller can chown. */
1137         if (la->la_valid & LA_UID) {
1138                 if (la->la_uid == (uid_t) -1)
1139                         la->la_uid = tmp_la->la_uid;
1140                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1141                     (la->la_uid != tmp_la->la_uid)) &&
1142                     !mdd_capable(uc, CFS_CAP_CHOWN))
1143                         RETURN(-EPERM);
1144
1145                 /* If the user or group of a non-directory has been
1146                  * changed by a non-root user, remove the setuid bit.
1147                  * 19981026 David C Niemi <niemi@tux.org>
1148                  *
1149                  * Changed this to apply to all users, including root,
1150                  * to avoid some races. This is the behavior we had in
1151                  * 2.0. The check for non-root was definitely wrong
1152                  * for 2.2 anyway, as it should have been using
1153                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
1154                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1155                     !S_ISDIR(tmp_la->la_mode)) {
1156                         la->la_mode &= ~S_ISUID;
1157                         la->la_valid |= LA_MODE;
1158                 }
1159         }
1160
1161         /* Make sure caller can chgrp. */
1162         if (la->la_valid & LA_GID) {
1163                 if (la->la_gid == (gid_t) -1)
1164                         la->la_gid = tmp_la->la_gid;
1165                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1166                     ((la->la_gid != tmp_la->la_gid) &&
1167                     !lustre_in_group_p(uc, la->la_gid))) &&
1168                     !mdd_capable(uc, CFS_CAP_CHOWN))
1169                         RETURN(-EPERM);
1170
1171                 /* Likewise, if the user or group of a non-directory
1172                  * has been changed by a non-root user, remove the
1173                  * setgid bit UNLESS there is no group execute bit
1174                  * (this would be a file marked for mandatory
1175                  * locking).  19981026 David C Niemi <niemi@tux.org>
1176                  *
1177                  * Removed the fsuid check (see the comment above) --
1178                  * 19990830 SD. */
1179                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1180                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1181                         la->la_mode &= ~S_ISGID;
1182                         la->la_valid |= LA_MODE;
1183                 }
1184         }
1185
1186         /* For both Size-on-MDS case and truncate case,
1187          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1188          * We distinguish them by "ma->ma_attr_flags & MDS_SOM".
1189          * For SOM case, it is true, the MAY_WRITE perm has been checked
1190          * when open, no need check again. For truncate case, it is false,
1191          * the MAY_WRITE perm should be checked here. */
1192         if (ma->ma_attr_flags & MDS_SOM) {
1193                 /* For the "Size-on-MDS" setattr update, merge coming
1194                  * attributes with the set in the inode. BUG 10641 */
1195                 if ((la->la_valid & LA_ATIME) &&
1196                     (la->la_atime <= tmp_la->la_atime))
1197                         la->la_valid &= ~LA_ATIME;
1198
1199                 /* OST attributes do not have a priority over MDS attributes,
1200                  * so drop times if ctime is equal. */
1201                 if ((la->la_valid & LA_CTIME) &&
1202                     (la->la_ctime <= tmp_la->la_ctime))
1203                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1204         } else {
1205                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1206                         if (!((ma->ma_attr_flags & MDS_OPEN_OWNEROVERRIDE) &&
1207                               (uc->mu_fsuid == tmp_la->la_uid)) &&
1208                             !(ma->ma_attr_flags & MDS_PERM_BYPASS)) {
1209                                 rc = mdd_permission_internal_locked(env, obj,
1210                                                             tmp_la, MAY_WRITE,
1211                                                             MOR_TGT_CHILD);
1212                                 if (rc)
1213                                         RETURN(rc);
1214                         }
1215                 }
1216                 if (la->la_valid & LA_CTIME) {
1217                         /* The pure setattr, it has the priority over what is
1218                          * already set, do not drop it if ctime is equal. */
1219                         if (la->la_ctime < tmp_la->la_ctime)
1220                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1221                                                   LA_CTIME);
1222                 }
1223         }
1224
1225         RETURN(0);
1226 }
1227
1228 /** Store a data change changelog record
1229  * If this fails, we must fail the whole transaction; we don't
1230  * want the change to commit without the log entry.
1231  * \param mdd_obj - mdd_object of change
1232  * \param handle - transacion handle
1233  */
1234 static int mdd_changelog_data_store(const struct lu_env     *env,
1235                                     struct mdd_device       *mdd,
1236                                     enum changelog_rec_type type,
1237                                     struct mdd_object       *mdd_obj,
1238                                     struct thandle          *handle)
1239 {
1240         const struct lu_fid *tfid = mdo2fid(mdd_obj);
1241         struct llog_changelog_rec *rec;
1242         struct lu_buf *buf;
1243         int reclen;
1244         int rc;
1245
1246         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1247                 RETURN(0);
1248
1249         LASSERT(handle != NULL);
1250         LASSERT(mdd_obj != NULL);
1251
1252         if ((type == CL_TIME) &&
1253             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1254                 /* Don't need multiple updates in this log */
1255                 /* Don't check under lock - no big deal if we get an extra
1256                    entry */
1257                 RETURN(0);
1258         }
1259
1260         reclen = llog_data_len(sizeof(*rec));
1261         buf = mdd_buf_alloc(env, reclen);
1262         if (buf->lb_buf == NULL)
1263                 RETURN(-ENOMEM);
1264         rec = (struct llog_changelog_rec *)buf->lb_buf;
1265
1266         rec->cr.cr_flags = CLF_VERSION;
1267         rec->cr.cr_type = (__u32)type;
1268         rec->cr.cr_tfid = *tfid;
1269         rec->cr.cr_namelen = 0;
1270         mdd_obj->mod_cltime = cfs_time_current_64();
1271
1272         rc = mdd_changelog_llog_write(mdd, rec, handle);
1273         if (rc < 0) {
1274                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1275                        rc, type, PFID(tfid));
1276                 return -EFAULT;
1277         }
1278
1279         return 0;
1280 }
1281
1282 /**
1283  * Should be called with write lock held.
1284  *
1285  * \see mdd_lma_set_locked().
1286  */
1287 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1288                        const struct md_attr *ma, struct thandle *handle)
1289 {
1290         struct mdd_thread_info *info = mdd_env_info(env);
1291         struct lu_buf *buf;
1292         struct lustre_mdt_attrs *lma =
1293                                 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1294         int lmasize = sizeof(struct lustre_mdt_attrs);
1295         int rc = 0;
1296
1297         ENTRY;
1298
1299         /* Either HSM or SOM part is not valid, we need to read it before */
1300         if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1301                 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1302                 if (rc <= 0)
1303                         RETURN(rc);
1304
1305                 lustre_lma_swab(lma);
1306         } else {
1307                 memset(lma, 0, lmasize);
1308         }
1309
1310         /* Copy HSM data */
1311         if (ma->ma_valid & MA_HSM) {
1312                 lma->lma_flags  |= ma->ma_hsm_flags & HSM_FLAGS_MASK;
1313                 lma->lma_compat |= LMAC_HSM;
1314         }
1315
1316         /* Copy SOM data */
1317         if (ma->ma_valid & MA_SOM) {
1318                 LASSERT(ma->ma_som != NULL);
1319                 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1320                         lma->lma_compat     &= ~LMAC_SOM;
1321                 } else {
1322                         lma->lma_compat     |= LMAC_SOM;
1323                         lma->lma_ioepoch     = ma->ma_som->msd_ioepoch;
1324                         lma->lma_som_size    = ma->ma_som->msd_size;
1325                         lma->lma_som_blocks  = ma->ma_som->msd_blocks;
1326                         lma->lma_som_mountid = ma->ma_som->msd_mountid;
1327                 }
1328         }
1329
1330         /* Copy FID */
1331         memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1332
1333         lustre_lma_swab(lma);
1334         buf = mdd_buf_get(env, lma, lmasize);
1335         rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1336
1337         RETURN(rc);
1338 }
1339
1340 /**
1341  * Save LMA extended attributes with data from \a ma.
1342  *
1343  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1344  * not, LMA EA will be first read from disk, modified and write back.
1345  *
1346  */
1347 static int mdd_lma_set_locked(const struct lu_env *env,
1348                               struct mdd_object *mdd_obj,
1349                               const struct md_attr *ma, struct thandle *handle)
1350 {
1351         int rc;
1352
1353         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1354         rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1355         mdd_write_unlock(env, mdd_obj);
1356         return rc;
1357 }
1358
1359 /* set attr and LOV EA at once, return updated attr */
1360 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1361                         const struct md_attr *ma)
1362 {
1363         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1364         struct mdd_device *mdd = mdo2mdd(obj);
1365         struct thandle *handle;
1366         struct lov_mds_md *lmm = NULL;
1367         struct llog_cookie *logcookies = NULL;
1368         int  rc, lmm_size = 0, cookie_size = 0;
1369         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1370 #ifdef HAVE_QUOTA_SUPPORT
1371         struct obd_device *obd = mdd->mdd_obd_dev;
1372         struct mds_obd *mds = &obd->u.mds;
1373         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
1374         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
1375         int quota_opc = 0, block_count = 0;
1376         int inode_pending[MAXQUOTAS] = { 0, 0 };
1377         int block_pending[MAXQUOTAS] = { 0, 0 };
1378 #endif
1379         ENTRY;
1380
1381         mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
1382                                     MDD_TXN_ATTR_SET_OP);
1383         handle = mdd_trans_start(env, mdd);
1384         if (IS_ERR(handle))
1385                 RETURN(PTR_ERR(handle));
1386         /*TODO: add lock here*/
1387         /* start a log jounal handle if needed */
1388         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1389             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1390                 lmm_size = mdd_lov_mdsize(env, mdd);
1391                 lmm = mdd_max_lmm_get(env, mdd);
1392                 if (lmm == NULL)
1393                         GOTO(cleanup, rc = -ENOMEM);
1394
1395                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1396                                 XATTR_NAME_LOV);
1397
1398                 if (rc < 0)
1399                         GOTO(cleanup, rc);
1400         }
1401
1402         if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
1403                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1404                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
1405
1406         *la_copy = ma->ma_attr;
1407         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
1408         if (rc)
1409                 GOTO(cleanup, rc);
1410
1411 #ifdef HAVE_QUOTA_SUPPORT
1412         if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
1413                 struct obd_export *exp = md_quota(env)->mq_exp;
1414                 struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
1415
1416                 rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
1417                 if (!rc) {
1418                         quota_opc = FSFILT_OP_SETATTR;
1419                         mdd_quota_wrapper(la_copy, qnids);
1420                         mdd_quota_wrapper(la_tmp, qoids);
1421                         /* get file quota for new owner */
1422                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1423                                         qnids, inode_pending, 1, NULL, 0,
1424                                         NULL, 0);
1425                         block_count = (la_tmp->la_blocks + 7) >> 3;
1426                         if (block_count) {
1427                                 void *data = NULL;
1428                                 mdd_data_get(env, mdd_obj, &data);
1429                                 /* get block quota for new owner */
1430                                 lquota_chkquota(mds_quota_interface_ref, obd,
1431                                                 exp, qnids, block_pending,
1432                                                 block_count, NULL,
1433                                                 LQUOTA_FLAGS_BLK, data, 1);
1434                         }
1435                 }
1436         }
1437 #endif
1438
1439         if (la_copy->la_valid & LA_FLAGS) {
1440                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1441                                                   handle, 1);
1442                 if (rc == 0)
1443                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1444         } else if (la_copy->la_valid) {            /* setattr */
1445                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
1446                                                   handle, 1);
1447                 /* journal chown/chgrp in llog, just like unlink */
1448                 if (rc == 0 && lmm_size){
1449                         cookie_size = mdd_lov_cookiesize(env, mdd);
1450                         logcookies = mdd_max_cookie_get(env, mdd);
1451                         if (logcookies == NULL)
1452                                 GOTO(cleanup, rc = -ENOMEM);
1453
1454                         if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1455                                             logcookies, cookie_size) <= 0)
1456                                 logcookies = NULL;
1457                 }
1458         }
1459
1460         if (rc == 0 && ma->ma_valid & MA_LOV) {
1461                 cfs_umode_t mode;
1462
1463                 mode = mdd_object_type(mdd_obj);
1464                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1465                         rc = mdd_lsm_sanity_check(env, mdd_obj);
1466                         if (rc)
1467                                 GOTO(cleanup, rc);
1468
1469                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1470                                             ma->ma_lmm_size, handle, 1);
1471                 }
1472
1473         }
1474         if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1475                 cfs_umode_t mode;
1476
1477                 mode = mdd_object_type(mdd_obj);
1478                 if (S_ISREG(mode))
1479                         rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1480
1481         }
1482 cleanup:
1483         if (rc == 0)
1484                 rc = mdd_changelog_data_store(env, mdd,
1485                                               (ma->ma_attr.la_valid &
1486                                                ~(LA_MTIME|LA_CTIME|LA_ATIME)) ?
1487                                               CL_SETATTR : CL_TIME,
1488                                               mdd_obj, handle);
1489         mdd_trans_stop(env, mdd, rc, handle);
1490         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1491                 /*set obd attr, if needed*/
1492                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1493                                            logcookies);
1494         }
1495 #ifdef HAVE_QUOTA_SUPPORT
1496         if (quota_opc) {
1497                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1498                                       inode_pending, 0);
1499                 lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
1500                                       block_pending, 1);
1501                 /* Trigger dqrel/dqacq for original owner and new owner.
1502                  * If failed, the next call for lquota_chkquota will
1503                  * process it. */
1504                 lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
1505                               quota_opc);
1506         }
1507 #endif
1508         RETURN(rc);
1509 }
1510
1511 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1512                       const struct lu_buf *buf, const char *name, int fl,
1513                       struct thandle *handle)
1514 {
1515         int  rc;
1516         ENTRY;
1517
1518         mdd_write_lock(env, obj, MOR_TGT_CHILD);
1519         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1520         mdd_write_unlock(env, obj);
1521
1522         RETURN(rc);
1523 }
1524
1525 static int mdd_xattr_sanity_check(const struct lu_env *env,
1526                                   struct mdd_object *obj)
1527 {
1528         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1529         struct md_ucred *uc     = md_ucred(env);
1530         int rc;
1531         ENTRY;
1532
1533         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1534                 RETURN(-EPERM);
1535
1536         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1537         if (rc)
1538                 RETURN(rc);
1539
1540         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1541             !mdd_capable(uc, CFS_CAP_FOWNER))
1542                 RETURN(-EPERM);
1543
1544         RETURN(rc);
1545 }
1546
1547 /**
1548  * The caller should guarantee to update the object ctime
1549  * after xattr_set if needed.
1550  */
1551 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1552                          const struct lu_buf *buf, const char *name,
1553                          int fl)
1554 {
1555         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1556         struct mdd_device *mdd = mdo2mdd(obj);
1557         struct thandle *handle;
1558         int  rc;
1559         ENTRY;
1560
1561         rc = mdd_xattr_sanity_check(env, mdd_obj);
1562         if (rc)
1563                 RETURN(rc);
1564
1565         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1566         /* security-replated changes may require sync */
1567         if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
1568             mdd->mdd_sync_permission == 1)
1569                 txn_param_sync(&mdd_env_info(env)->mti_param);
1570
1571         handle = mdd_trans_start(env, mdd);
1572         if (IS_ERR(handle))
1573                 RETURN(PTR_ERR(handle));
1574
1575         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
1576
1577         /* Only record user xattr changes */
1578         if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1579             (strncmp("user.", name, 5) == 0))
1580                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1581                                               handle);
1582         mdd_trans_stop(env, mdd, rc, handle);
1583
1584         RETURN(rc);
1585 }
1586
1587 /**
1588  * The caller should guarantee to update the object ctime
1589  * after xattr_set if needed.
1590  */
1591 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1592                   const char *name)
1593 {
1594         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1595         struct mdd_device *mdd = mdo2mdd(obj);
1596         struct thandle *handle;
1597         int  rc;
1598         ENTRY;
1599
1600         rc = mdd_xattr_sanity_check(env, mdd_obj);
1601         if (rc)
1602                 RETURN(rc);
1603
1604         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1605         handle = mdd_trans_start(env, mdd);
1606         if (IS_ERR(handle))
1607                 RETURN(PTR_ERR(handle));
1608
1609         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1610         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1611                            mdd_object_capa(env, mdd_obj));
1612         mdd_write_unlock(env, mdd_obj);
1613
1614         /* Only record user xattr changes */
1615         if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
1616             (strncmp("user.", name, 5) != 0))
1617                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
1618                                               handle);
1619
1620         mdd_trans_stop(env, mdd, rc, handle);
1621
1622         RETURN(rc);
1623 }
1624
1625 /* partial unlink */
1626 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
1627                        struct md_attr *ma)
1628 {
1629         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1630         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1631         struct mdd_device *mdd = mdo2mdd(obj);
1632         struct thandle *handle;
1633 #ifdef HAVE_QUOTA_SUPPORT
1634         struct obd_device *obd = mdd->mdd_obd_dev;
1635         struct mds_obd *mds = &obd->u.mds;
1636         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1637         int quota_opc = 0;
1638 #endif
1639         int rc;
1640         ENTRY;
1641
1642         /*
1643          * Check -ENOENT early here because we need to get object type
1644          * to calculate credits before transaction start
1645          */
1646         if (!mdd_object_exists(mdd_obj))
1647                 RETURN(-ENOENT);
1648
1649         LASSERT(mdd_object_exists(mdd_obj) > 0);
1650
1651         rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
1652         if (rc)
1653                 RETURN(rc);
1654
1655         handle = mdd_trans_start(env, mdd);
1656         if (IS_ERR(handle))
1657                 RETURN(-ENOMEM);
1658
1659         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1660
1661         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
1662         if (rc)
1663                 GOTO(cleanup, rc);
1664
1665         __mdd_ref_del(env, mdd_obj, handle, 0);
1666
1667         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
1668                 /* unlink dot */
1669                 __mdd_ref_del(env, mdd_obj, handle, 1);
1670         }
1671
1672         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1673         la_copy->la_ctime = ma->ma_attr.la_ctime;
1674
1675         la_copy->la_valid = LA_CTIME;
1676         rc = mdd_attr_check_set_internal(env, mdd_obj, la_copy, handle, 0);
1677         if (rc)
1678                 GOTO(cleanup, rc);
1679
1680         rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
1681 #ifdef HAVE_QUOTA_SUPPORT
1682         if (mds->mds_quota && ma->ma_valid & MA_INODE &&
1683             ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
1684                 quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
1685                 mdd_quota_wrapper(&ma->ma_attr, qids);
1686         }
1687 #endif
1688
1689
1690         EXIT;
1691 cleanup:
1692         mdd_write_unlock(env, mdd_obj);
1693         mdd_trans_stop(env, mdd, rc, handle);
1694 #ifdef HAVE_QUOTA_SUPPORT
1695         if (quota_opc)
1696                 /* Trigger dqrel on the owner of child. If failed,
1697                  * the next call for lquota_chkquota will process it */
1698                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1699                               quota_opc);
1700 #endif
1701         return rc;
1702 }
1703
1704 /* partial operation */
1705 static int mdd_oc_sanity_check(const struct lu_env *env,
1706                                struct mdd_object *obj,
1707                                struct md_attr *ma)
1708 {
1709         int rc;
1710         ENTRY;
1711
1712         switch (ma->ma_attr.la_mode & S_IFMT) {
1713         case S_IFREG:
1714         case S_IFDIR:
1715         case S_IFLNK:
1716         case S_IFCHR:
1717         case S_IFBLK:
1718         case S_IFIFO:
1719         case S_IFSOCK:
1720                 rc = 0;
1721                 break;
1722         default:
1723                 rc = -EINVAL;
1724                 break;
1725         }
1726         RETURN(rc);
1727 }
1728
1729 static int mdd_object_create(const struct lu_env *env,
1730                              struct md_object *obj,
1731                              const struct md_op_spec *spec,
1732                              struct md_attr *ma)
1733 {
1734
1735         struct mdd_device *mdd = mdo2mdd(obj);
1736         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1737         const struct lu_fid *pfid = spec->u.sp_pfid;
1738         struct thandle *handle;
1739 #ifdef HAVE_QUOTA_SUPPORT
1740         struct obd_device *obd = mdd->mdd_obd_dev;
1741         struct obd_export *exp = md_quota(env)->mq_exp;
1742         struct mds_obd *mds = &obd->u.mds;
1743         unsigned int qids[MAXQUOTAS] = { 0, 0 };
1744         int quota_opc = 0, block_count = 0;
1745         int inode_pending[MAXQUOTAS] = { 0, 0 };
1746         int block_pending[MAXQUOTAS] = { 0, 0 };
1747 #endif
1748         int rc = 0;
1749         ENTRY;
1750
1751 #ifdef HAVE_QUOTA_SUPPORT
1752         if (mds->mds_quota) {
1753                 quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
1754                 mdd_quota_wrapper(&ma->ma_attr, qids);
1755                 /* get file quota for child */
1756                 lquota_chkquota(mds_quota_interface_ref, obd, exp,
1757                                 qids, inode_pending, 1, NULL, 0,
1758                                 NULL, 0);
1759                 switch (ma->ma_attr.la_mode & S_IFMT) {
1760                 case S_IFLNK:
1761                 case S_IFDIR:
1762                         block_count = 2;
1763                         break;
1764                 case S_IFREG:
1765                         block_count = 1;
1766                         break;
1767                 }
1768                 /* get block quota for child */
1769                 if (block_count)
1770                         lquota_chkquota(mds_quota_interface_ref, obd, exp,
1771                                         qids, block_pending, block_count,
1772                                         NULL, LQUOTA_FLAGS_BLK, NULL, 0);
1773         }
1774 #endif
1775
1776         mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
1777         handle = mdd_trans_start(env, mdd);
1778         if (IS_ERR(handle))
1779                 GOTO(out_pending, rc = PTR_ERR(handle));
1780
1781         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1782         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
1783         if (rc)
1784                 GOTO(unlock, rc);
1785
1786         rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
1787         if (rc)
1788                 GOTO(unlock, rc);
1789
1790         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
1791                 /* If creating the slave object, set slave EA here. */
1792                 int lmv_size = spec->u.sp_ea.eadatalen;
1793                 struct lmv_stripe_md *lmv;
1794
1795                 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1796                 LASSERT(lmv != NULL && lmv_size > 0);
1797
1798                 rc = __mdd_xattr_set(env, mdd_obj,
1799                                      mdd_buf_get_const(env, lmv, lmv_size),
1800                                      XATTR_NAME_LMV, 0, handle);
1801                 if (rc)
1802                         GOTO(unlock, rc);
1803
1804                 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr,
1805                                            handle, 0);
1806         } else {
1807 #ifdef CONFIG_FS_POSIX_ACL
1808                 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1809                         struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1810
1811                         buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1812                         buf->lb_len = spec->u.sp_ea.eadatalen;
1813                         if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1814                                 rc = __mdd_acl_init(env, mdd_obj, buf,
1815                                                     &ma->ma_attr.la_mode,
1816                                                     handle);
1817                                 if (rc)
1818                                         GOTO(unlock, rc);
1819                                 else
1820                                         ma->ma_attr.la_valid |= LA_MODE;
1821                         }
1822
1823                         pfid = spec->u.sp_ea.fid;
1824                 }
1825 #endif
1826                 rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
1827                                            spec);
1828         }
1829         EXIT;
1830 unlock:
1831         if (rc == 0)
1832                 rc = mdd_attr_get_internal(env, mdd_obj, ma);
1833         mdd_write_unlock(env, mdd_obj);
1834
1835         mdd_trans_stop(env, mdd, rc, handle);
1836 out_pending:
1837 #ifdef HAVE_QUOTA_SUPPORT
1838         if (quota_opc) {
1839                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1840                                       inode_pending, 0);
1841                 lquota_pending_commit(mds_quota_interface_ref, obd, qids,
1842                                       block_pending, 1);
1843                 /* Trigger dqacq on the owner of child. If failed,
1844                  * the next call for lquota_chkquota will process it. */
1845                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
1846                               quota_opc);
1847         }
1848 #endif
1849         return rc;
1850 }
1851
1852 /* partial link */
1853 static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
1854                        const struct md_attr *ma)
1855 {
1856         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1857         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1858         struct mdd_device *mdd = mdo2mdd(obj);
1859         struct thandle *handle;
1860         int rc;
1861         ENTRY;
1862
1863         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1864         handle = mdd_trans_start(env, mdd);
1865         if (IS_ERR(handle))
1866                 RETURN(-ENOMEM);
1867
1868         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1869         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
1870         if (rc == 0)
1871                 __mdd_ref_add(env, mdd_obj, handle);
1872         mdd_write_unlock(env, mdd_obj);
1873         if (rc == 0) {
1874                 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
1875                 la_copy->la_ctime = ma->ma_attr.la_ctime;
1876
1877                 la_copy->la_valid = LA_CTIME;
1878                 rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la_copy,
1879                                                         handle, 0);
1880         }
1881         mdd_trans_stop(env, mdd, 0, handle);
1882
1883         RETURN(rc);
1884 }
1885
1886 /*
1887  * do NOT or the MAY_*'s, you'll get the weakest
1888  */
1889 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1890 {
1891         int res = 0;
1892
1893         /* Sadly, NFSD reopens a file repeatedly during operation, so the
1894          * "acc_mode = 0" allowance for newly-created files isn't honoured.
1895          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1896          * owner can write to a file even if it is marked readonly to hide
1897          * its brokenness. (bug 5781) */
1898         if (flags & MDS_OPEN_OWNEROVERRIDE) {
1899                 struct md_ucred *uc = md_ucred(env);
1900
1901                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1902                     (la->la_uid == uc->mu_fsuid))
1903                         return 0;
1904         }
1905
1906         if (flags & FMODE_READ)
1907                 res |= MAY_READ;
1908         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1909                 res |= MAY_WRITE;
1910         if (flags & MDS_FMODE_EXEC)
1911                 res |= MAY_EXEC;
1912         return res;
1913 }
1914
1915 static int mdd_open_sanity_check(const struct lu_env *env,
1916                                  struct mdd_object *obj, int flag)
1917 {
1918         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1919         int mode, rc;
1920         ENTRY;
1921
1922         /* EEXIST check */
1923         if (mdd_is_dead_obj(obj))
1924                 RETURN(-ENOENT);
1925
1926         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1927         if (rc)
1928                RETURN(rc);
1929
1930         if (S_ISLNK(tmp_la->la_mode))
1931                 RETURN(-ELOOP);
1932
1933         mode = accmode(env, tmp_la, flag);
1934
1935         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1936                 RETURN(-EISDIR);
1937
1938         if (!(flag & MDS_OPEN_CREATED)) {
1939                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1940                 if (rc)
1941                         RETURN(rc);
1942         }
1943
1944         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1945             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1946                 flag &= ~MDS_OPEN_TRUNC;
1947
1948         /* For writing append-only file must open it with append mode. */
1949         if (mdd_is_append(obj)) {
1950                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
1951                         RETURN(-EPERM);
1952                 if (flag & MDS_OPEN_TRUNC)
1953                         RETURN(-EPERM);
1954         }
1955
1956 #if 0
1957         /*
1958          * Now, flag -- O_NOATIME does not be packed by client.
1959          */
1960         if (flag & O_NOATIME) {
1961                 struct md_ucred *uc = md_ucred(env);
1962
1963                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
1964                     (uc->mu_valid == UCRED_NEW)) &&
1965                     (uc->mu_fsuid != tmp_la->la_uid) &&
1966                     !mdd_capable(uc, CFS_CAP_FOWNER))
1967                         RETURN(-EPERM);
1968         }
1969 #endif
1970
1971         RETURN(0);
1972 }
1973
1974 static int mdd_open(const struct lu_env *env, struct md_object *obj,
1975                     int flags)
1976 {
1977         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1978         int rc = 0;
1979
1980         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1981
1982         rc = mdd_open_sanity_check(env, mdd_obj, flags);
1983         if (rc == 0)
1984                 mdd_obj->mod_count++;
1985
1986         mdd_write_unlock(env, mdd_obj);
1987         return rc;
1988 }
1989
1990 /* return md_attr back,
1991  * if it is last unlink then return lov ea + llog cookie*/
1992 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
1993                     struct md_attr *ma)
1994 {
1995         int rc = 0;
1996         ENTRY;
1997
1998         if (S_ISREG(mdd_object_type(obj))) {
1999                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
2000                  * Caller must be ready for that. */
2001
2002                 rc = __mdd_lmm_get(env, obj, ma);
2003                 if ((ma->ma_valid & MA_LOV))
2004                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
2005                                             obj, ma);
2006         }
2007         RETURN(rc);
2008 }
2009
2010 /*
2011  * No permission check is needed.
2012  */
2013 static int mdd_close(const struct lu_env *env, struct md_object *obj,
2014                      struct md_attr *ma)
2015 {
2016         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2017         struct mdd_device *mdd = mdo2mdd(obj);
2018         struct thandle    *handle;
2019         int rc;
2020         int reset = 1;
2021
2022 #ifdef HAVE_QUOTA_SUPPORT
2023         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
2024         struct mds_obd *mds = &obd->u.mds;
2025         unsigned int qids[MAXQUOTAS] = { 0, 0 };
2026         int quota_opc = 0;
2027 #endif
2028         ENTRY;
2029
2030         rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
2031         if (rc)
2032                 RETURN(rc);
2033         handle = mdd_trans_start(env, mdo2mdd(obj));
2034         if (IS_ERR(handle))
2035                 RETURN(PTR_ERR(handle));
2036
2037         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
2038         /* release open count */
2039         mdd_obj->mod_count --;
2040
2041         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
2042                 /* remove link to object from orphan index */
2043                 rc = __mdd_orphan_del(env, mdd_obj, handle);
2044                 if (rc == 0) {
2045                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
2046                                "list, OSS objects to be destroyed.\n",
2047                                PFID(mdd_object_fid(mdd_obj)));
2048                 } else {
2049                         CERROR("Object "DFID" can not be deleted from orphan "
2050                                 "list, maybe cause OST objects can not be "
2051                                 "destroyed (err: %d).\n",
2052                                 PFID(mdd_object_fid(mdd_obj)), rc);
2053                         /* If object was not deleted from orphan list, do not
2054                          * destroy OSS objects, which will be done when next
2055                          * recovery. */
2056                         GOTO(out, rc);
2057                 }
2058         }
2059
2060         rc = mdd_iattr_get(env, mdd_obj, ma);
2061         /* Object maybe not in orphan list originally, it is rare case for
2062          * mdd_finish_unlink() failure. */
2063         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
2064 #ifdef HAVE_QUOTA_SUPPORT
2065                 if (mds->mds_quota) {
2066                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
2067                         mdd_quota_wrapper(&ma->ma_attr, qids);
2068                 }
2069 #endif
2070                 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2071                 if (ma->ma_valid & MA_FLAGS &&
2072                     ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2073                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2074                 } else {
2075                         rc = mdd_object_kill(env, mdd_obj, ma);
2076                                 if (rc == 0)
2077                                         reset = 0;
2078                 }
2079
2080                 if (rc != 0)
2081                         CERROR("Error when prepare to delete Object "DFID" , "
2082                                "which will cause OST objects can not be "
2083                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
2084         }
2085         EXIT;
2086
2087 out:
2088         if (reset)
2089                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2090
2091         mdd_write_unlock(env, mdd_obj);
2092         mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
2093 #ifdef HAVE_QUOTA_SUPPORT
2094         if (quota_opc)
2095                 /* Trigger dqrel on the owner of child. If failed,
2096                  * the next call for lquota_chkquota will process it */
2097                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
2098                               quota_opc);
2099 #endif
2100         return rc;
2101 }
2102
2103 /*
2104  * Permission check is done when open,
2105  * no need check again.
2106  */
2107 static int mdd_readpage_sanity_check(const struct lu_env *env,
2108                                      struct mdd_object *obj)
2109 {
2110         struct dt_object *next = mdd_object_child(obj);
2111         int rc;
2112         ENTRY;
2113
2114         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2115                 rc = 0;
2116         else
2117                 rc = -ENOTDIR;
2118
2119         RETURN(rc);
2120 }
2121
2122 static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
2123                               int first, void *area, int nob,
2124                               const struct dt_it_ops *iops, struct dt_it *it,
2125                               __u64 *start, __u64 *end,
2126                               struct lu_dirent **last, __u32 attr)
2127 {
2128         int                     result;
2129         __u64                   hash = 0;
2130         struct lu_dirent       *ent;
2131
2132         if (first) {
2133                 memset(area, 0, sizeof (struct lu_dirpage));
2134                 area += sizeof (struct lu_dirpage);
2135                 nob  -= sizeof (struct lu_dirpage);
2136         }
2137
2138         ent  = area;
2139         do {
2140                 int    len;
2141                 int    recsize;
2142
2143                 len  = iops->key_size(env, it);
2144
2145                 /* IAM iterator can return record with zero len. */
2146                 if (len == 0)
2147                         goto next;
2148
2149                 hash = iops->store(env, it);
2150                 if (unlikely(first)) {
2151                         first = 0;
2152                         *start = hash;
2153                 }
2154
2155                 /* calculate max space required for lu_dirent */
2156                 recsize = lu_dirent_calc_size(len, attr);
2157
2158                 if (nob >= recsize) {
2159                         result = iops->rec(env, it, ent, attr);
2160                         if (result == -ESTALE)
2161                                 goto next;
2162                         if (result != 0)
2163                                 goto out;
2164
2165                         /* osd might not able to pack all attributes,
2166                          * so recheck rec length */
2167                         recsize = le16_to_cpu(ent->lde_reclen);
2168                 } else {
2169                         /*
2170                          * record doesn't fit into page, enlarge previous one.
2171                          */
2172                         if (*last) {
2173                                 (*last)->lde_reclen =
2174                                         cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
2175                                                         nob);
2176                                 result = 0;
2177                         } else
2178                                 result = -EINVAL;
2179
2180                         goto out;
2181                 }
2182                 *last = ent;
2183                 ent = (void *)ent + recsize;
2184                 nob -= recsize;
2185
2186 next:
2187                 result = iops->next(env, it);
2188                 if (result == -ESTALE)
2189                         goto next;
2190         } while (result == 0);
2191
2192 out:
2193         *end = hash;
2194         return result;
2195 }
2196
2197 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
2198                           const struct lu_rdpg *rdpg)
2199 {
2200         struct dt_it      *it;
2201         struct dt_object  *next = mdd_object_child(obj);
2202         const struct dt_it_ops  *iops;
2203         struct page       *pg;
2204         struct lu_dirent  *last = NULL;
2205         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
2206         int i;
2207         int rc;
2208         int nob;
2209         __u64 hash_start;
2210         __u64 hash_end = 0;
2211
2212         LASSERT(rdpg->rp_pages != NULL);
2213         LASSERT(next->do_index_ops != NULL);
2214
2215         if (rdpg->rp_count <= 0)
2216                 return -EFAULT;
2217
2218         /*
2219          * iterate through directory and fill pages from @rdpg
2220          */
2221         iops = &next->do_index_ops->dio_it;
2222         it = iops->init(env, next, mdd_object_capa(env, obj));
2223         if (IS_ERR(it))
2224                 return PTR_ERR(it);
2225
2226         rc = iops->load(env, it, rdpg->rp_hash);
2227
2228         if (rc == 0){
2229                 /*
2230                  * Iterator didn't find record with exactly the key requested.
2231                  *
2232                  * It is currently either
2233                  *
2234                  *     - positioned above record with key less than
2235                  *     requested---skip it.
2236                  *
2237                  *     - or not positioned at all (is in IAM_IT_SKEWED
2238                  *     state)---position it on the next item.
2239                  */
2240                 rc = iops->next(env, it);
2241         } else if (rc > 0)
2242                 rc = 0;
2243
2244         /*
2245          * At this point and across for-loop:
2246          *
2247          *  rc == 0 -> ok, proceed.
2248          *  rc >  0 -> end of directory.
2249          *  rc <  0 -> error.
2250          */
2251         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
2252              i++, nob -= CFS_PAGE_SIZE) {
2253                 LASSERT(i < rdpg->rp_npages);
2254                 pg = rdpg->rp_pages[i];
2255                 rc = mdd_dir_page_build(env, mdd, !i, cfs_kmap(pg),
2256                                         min_t(int, nob, CFS_PAGE_SIZE), iops,
2257                                         it, &hash_start, &hash_end, &last,
2258                                         rdpg->rp_attrs);
2259                 if (rc != 0 || i == rdpg->rp_npages - 1) {
2260                         if (last)
2261                                 last->lde_reclen = 0;
2262                 }
2263                 cfs_kunmap(pg);
2264         }
2265         if (rc > 0) {
2266                 /*
2267                  * end of directory.
2268                  */
2269                 hash_end = DIR_END_OFF;
2270                 rc = 0;
2271         }
2272         if (rc == 0) {
2273                 struct lu_dirpage *dp;
2274
2275                 dp = cfs_kmap(rdpg->rp_pages[0]);
2276                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2277                 dp->ldp_hash_end   = cpu_to_le64(hash_end);
2278                 if (i == 0)
2279                         /*
2280                          * No pages were processed, mark this.
2281                          */
2282                         dp->ldp_flags |= LDF_EMPTY;
2283
2284                 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2285                 cfs_kunmap(rdpg->rp_pages[0]);
2286         }
2287         iops->put(env, it);
2288         iops->fini(env, it);
2289
2290         return rc;
2291 }
2292
2293 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2294                  const struct lu_rdpg *rdpg)
2295 {
2296         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2297         int rc;
2298         ENTRY;
2299
2300         LASSERT(mdd_object_exists(mdd_obj));
2301
2302         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2303         rc = mdd_readpage_sanity_check(env, mdd_obj);
2304         if (rc)
2305                 GOTO(out_unlock, rc);
2306
2307         if (mdd_is_dead_obj(mdd_obj)) {
2308                 struct page *pg;
2309                 struct lu_dirpage *dp;
2310
2311                 /*
2312                  * According to POSIX, please do not return any entry to client:
2313                  * even dot and dotdot should not be returned.
2314                  */
2315                 CWARN("readdir from dead object: "DFID"\n",
2316                         PFID(mdd_object_fid(mdd_obj)));
2317
2318                 if (rdpg->rp_count <= 0)
2319                         GOTO(out_unlock, rc = -EFAULT);
2320                 LASSERT(rdpg->rp_pages != NULL);
2321
2322                 pg = rdpg->rp_pages[0];
2323                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2324                 memset(dp, 0 , sizeof(struct lu_dirpage));
2325                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2326                 dp->ldp_hash_end   = cpu_to_le64(DIR_END_OFF);
2327                 dp->ldp_flags |= LDF_EMPTY;
2328                 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2329                 cfs_kunmap(pg);
2330                 GOTO(out_unlock, rc = 0);
2331         }
2332
2333         rc = __mdd_readpage(env, mdd_obj, rdpg);
2334
2335         EXIT;
2336 out_unlock:
2337         mdd_read_unlock(env, mdd_obj);
2338         return rc;
2339 }
2340
2341 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2342 {
2343         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2344         struct dt_object *next;
2345
2346         LASSERT(mdd_object_exists(mdd_obj));
2347         next = mdd_object_child(mdd_obj);
2348         return next->do_ops->do_object_sync(env, next);
2349 }
2350
2351 static dt_obj_version_t mdd_version_get(const struct lu_env *env,
2352                                         struct md_object *obj)
2353 {
2354         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2355
2356         LASSERT(mdd_object_exists(mdd_obj));
2357         return do_version_get(env, mdd_object_child(mdd_obj));
2358 }
2359
2360 static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
2361                             dt_obj_version_t version)
2362 {
2363         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2364
2365         LASSERT(mdd_object_exists(mdd_obj));
2366         return do_version_set(env, mdd_object_child(mdd_obj), version);
2367 }
2368
2369 const struct md_object_operations mdd_obj_ops = {
2370         .moo_permission    = mdd_permission,
2371         .moo_attr_get      = mdd_attr_get,
2372         .moo_attr_set      = mdd_attr_set,
2373         .moo_xattr_get     = mdd_xattr_get,
2374         .moo_xattr_set     = mdd_xattr_set,
2375         .moo_xattr_list    = mdd_xattr_list,
2376         .moo_xattr_del     = mdd_xattr_del,
2377         .moo_object_create = mdd_object_create,
2378         .moo_ref_add       = mdd_ref_add,
2379         .moo_ref_del       = mdd_ref_del,
2380         .moo_open          = mdd_open,
2381         .moo_close         = mdd_close,
2382         .moo_readpage      = mdd_readpage,
2383         .moo_readlink      = mdd_readlink,
2384         .moo_capa_get      = mdd_capa_get,
2385         .moo_object_sync   = mdd_object_sync,
2386         .moo_version_get   = mdd_version_get,
2387         .moo_version_set   = mdd_version_set,
2388         .moo_path          = mdd_path,
2389 };