Whamcloud - gitweb
LU-1842 quota: remove quota code
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_object.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <linux/module.h>
46 #include <obd.h>
47 #include <obd_class.h>
48 #include <obd_support.h>
49 #include <lprocfs_status.h>
50 /* fid_be_cpu(), fid_cpu_to_be(). */
51 #include <lustre_fid.h>
52 #include <obd_lov.h>
53
54 #include <lustre_param.h>
55 #include <lustre_mds.h>
56 #include <lustre/lustre_idl.h>
57
58 #include "mdd_internal.h"
59
60 static const struct lu_object_operations mdd_lu_obj_ops;
61 extern cfs_mem_cache_t *mdd_object_kmem;
62
63 static int mdd_xattr_get(const struct lu_env *env,
64                          struct md_object *obj, struct lu_buf *buf,
65                          const char *name);
66
67 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
68                  void **data)
69 {
70         if (mdd_object_exists(obj) == 0) {
71                 CERROR("%s: object "DFID" not found: rc = -2\n",
72                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
73                 return -ENOENT;
74         }
75         mdo_data_get(env, obj, data);
76         return 0;
77 }
78
79 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
80                struct lu_attr *la, struct lustre_capa *capa)
81 {
82         if (mdd_object_exists(obj) == 0) {
83                 CERROR("%s: object "DFID" not found: rc = -2\n",
84                        mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)));
85                 return -ENOENT;
86         }
87         return mdo_attr_get(env, obj, la, capa);
88 }
89
90 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
91 {
92         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
93
94         if (flags & LUSTRE_APPEND_FL)
95                 obj->mod_flags |= APPEND_OBJ;
96
97         if (flags & LUSTRE_IMMUTABLE_FL)
98                 obj->mod_flags |= IMMUTE_OBJ;
99 }
100
101 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
102 {
103         struct mdd_thread_info *info;
104
105         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
106         LASSERT(info != NULL);
107         return info;
108 }
109
110 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
111 {
112         struct lu_buf *buf;
113
114         buf = &mdd_env_info(env)->mti_buf;
115         buf->lb_buf = area;
116         buf->lb_len = len;
117         return buf;
118 }
119
120 void mdd_buf_put(struct lu_buf *buf)
121 {
122         if (buf == NULL || buf->lb_buf == NULL)
123                 return;
124         OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
125         buf->lb_buf = NULL;
126         buf->lb_len = 0;
127 }
128
129 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
130                                        const void *area, ssize_t len)
131 {
132         struct lu_buf *buf;
133
134         buf = &mdd_env_info(env)->mti_buf;
135         buf->lb_buf = (void *)area;
136         buf->lb_len = len;
137         return buf;
138 }
139
140 struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
141 {
142         struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
143
144         if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
145                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
146                 buf->lb_buf = NULL;
147         }
148         if (buf->lb_buf == NULL) {
149                 buf->lb_len = len;
150                 OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
151                 if (buf->lb_buf == NULL)
152                         buf->lb_len = 0;
153         }
154         return buf;
155 }
156
157 /** Increase the size of the \a mti_big_buf.
158  * preserves old data in buffer
159  * old buffer remains unchanged on error
160  * \retval 0 or -ENOMEM
161  */
162 int mdd_buf_grow(const struct lu_env *env, ssize_t len)
163 {
164         struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
165         struct lu_buf buf;
166
167         LASSERT(len >= oldbuf->lb_len);
168         OBD_ALLOC_LARGE(buf.lb_buf, len);
169
170         if (buf.lb_buf == NULL)
171                 return -ENOMEM;
172
173         buf.lb_len = len;
174         memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
175
176         OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
177
178         memcpy(oldbuf, &buf, sizeof(buf));
179
180         return 0;
181 }
182
183 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
184                                        struct mdd_device *mdd)
185 {
186         struct mdd_thread_info *mti = mdd_env_info(env);
187         int                     max_cookie_size;
188
189         max_cookie_size = mdd_lov_cookiesize(env, mdd);
190         if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
191                 if (mti->mti_max_cookie)
192                         OBD_FREE_LARGE(mti->mti_max_cookie,
193                                        mti->mti_max_cookie_size);
194                 mti->mti_max_cookie = NULL;
195                 mti->mti_max_cookie_size = 0;
196         }
197         if (unlikely(mti->mti_max_cookie == NULL)) {
198                 OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
199                 if (likely(mti->mti_max_cookie != NULL))
200                         mti->mti_max_cookie_size = max_cookie_size;
201         }
202         if (likely(mti->mti_max_cookie != NULL))
203                 memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
204         return mti->mti_max_cookie;
205 }
206
207 struct lov_mds_md *mdd_max_lmm_buffer(const struct lu_env *env, int size)
208 {
209         struct mdd_thread_info *mti = mdd_env_info(env);
210
211         if (unlikely(mti->mti_max_lmm_size < size)) {
212                 int rsize = size_roundup_power2(size);
213
214                 if (mti->mti_max_lmm_size > 0) {
215                         LASSERT(mti->mti_max_lmm);
216                         OBD_FREE_LARGE(mti->mti_max_lmm,
217                                        mti->mti_max_lmm_size);
218                         mti->mti_max_lmm = NULL;
219                         mti->mti_max_lmm_size = 0;
220                 }
221
222                 OBD_ALLOC_LARGE(mti->mti_max_lmm, rsize);
223                 if (likely(mti->mti_max_lmm != NULL))
224                         mti->mti_max_lmm_size = rsize;
225         }
226         return mti->mti_max_lmm;
227 }
228
229 struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
230                                    struct mdd_device *mdd)
231 {
232         int max_lmm_size;
233
234         max_lmm_size = mdd_lov_mdsize(env, mdd);
235         return mdd_max_lmm_buffer(env, max_lmm_size);
236 }
237
238 struct lu_object *mdd_object_alloc(const struct lu_env *env,
239                                    const struct lu_object_header *hdr,
240                                    struct lu_device *d)
241 {
242         struct mdd_object *mdd_obj;
243
244         OBD_SLAB_ALLOC_PTR_GFP(mdd_obj, mdd_object_kmem, CFS_ALLOC_IO);
245         if (mdd_obj != NULL) {
246                 struct lu_object *o;
247
248                 o = mdd2lu_obj(mdd_obj);
249                 lu_object_init(o, NULL, d);
250                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
251                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
252                 mdd_obj->mod_count = 0;
253                 o->lo_ops = &mdd_lu_obj_ops;
254                 return o;
255         } else {
256                 return NULL;
257         }
258 }
259
260 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
261                            const struct lu_object_conf *unused)
262 {
263         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
264         struct mdd_object *mdd_obj = lu2mdd_obj(o);
265         struct lu_object  *below;
266         struct lu_device  *under;
267         ENTRY;
268
269         mdd_obj->mod_cltime = 0;
270         under = &d->mdd_child->dd_lu_dev;
271         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
272         mdd_pdlock_init(mdd_obj);
273         if (below == NULL)
274                 RETURN(-ENOMEM);
275
276         lu_object_add(o, below);
277
278         RETURN(0);
279 }
280
281 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
282 {
283         if (lu_object_exists(o))
284                 return mdd_get_flags(env, lu2mdd_obj(o));
285         else
286                 return 0;
287 }
288
289 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
290 {
291         struct mdd_object *mdd = lu2mdd_obj(o);
292
293         lu_object_fini(o);
294         OBD_SLAB_FREE_PTR(mdd, mdd_object_kmem);
295 }
296
297 static int mdd_object_print(const struct lu_env *env, void *cookie,
298                             lu_printer_t p, const struct lu_object *o)
299 {
300         struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
301         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
302                     "valid=%x, cltime="LPU64", flags=%lx)",
303                     mdd, mdd->mod_count, mdd->mod_valid,
304                     mdd->mod_cltime, mdd->mod_flags);
305 }
306
307 static const struct lu_object_operations mdd_lu_obj_ops = {
308         .loo_object_init    = mdd_object_init,
309         .loo_object_start   = mdd_object_start,
310         .loo_object_free    = mdd_object_free,
311         .loo_object_print   = mdd_object_print,
312 };
313
314 struct mdd_object *mdd_object_find(const struct lu_env *env,
315                                    struct mdd_device *d,
316                                    const struct lu_fid *f)
317 {
318         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
319 }
320
321 static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
322                         const char *path, struct lu_fid *fid)
323 {
324         struct lu_buf *buf;
325         struct lu_fid *f = &mdd_env_info(env)->mti_fid;
326         struct mdd_object *obj;
327         struct lu_name *lname = &mdd_env_info(env)->mti_name;
328         char *name;
329         int rc = 0;
330         ENTRY;
331
332         /* temp buffer for path element */
333         buf = mdd_buf_alloc(env, PATH_MAX);
334         if (buf->lb_buf == NULL)
335                 RETURN(-ENOMEM);
336
337         lname->ln_name = name = buf->lb_buf;
338         lname->ln_namelen = 0;
339         *f = mdd->mdd_root_fid;
340
341         while(1) {
342                 while (*path == '/')
343                         path++;
344                 if (*path == '\0')
345                         break;
346                 while (*path != '/' && *path != '\0') {
347                         *name = *path;
348                         path++;
349                         name++;
350                         lname->ln_namelen++;
351                 }
352
353                 *name = '\0';
354                 /* find obj corresponding to fid */
355                 obj = mdd_object_find(env, mdd, f);
356                 if (obj == NULL)
357                         GOTO(out, rc = -EREMOTE);
358                 if (IS_ERR(obj))
359                         GOTO(out, rc = PTR_ERR(obj));
360                 /* get child fid from parent and name */
361                 rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
362                 mdd_object_put(env, obj);
363                 if (rc)
364                         break;
365
366                 name = buf->lb_buf;
367                 lname->ln_namelen = 0;
368         }
369
370         if (!rc)
371                 *fid = *f;
372 out:
373         RETURN(rc);
374 }
375
376 /** The maximum depth that fid2path() will search.
377  * This is limited only because we want to store the fids for
378  * historical path lookup purposes.
379  */
380 #define MAX_PATH_DEPTH 100
381
382 /** mdd_path() lookup structure. */
383 struct path_lookup_info {
384         __u64                pli_recno;        /**< history point */
385         __u64                pli_currec;       /**< current record */
386         struct lu_fid        pli_fid;
387         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
388         struct mdd_object   *pli_mdd_obj;
389         char                *pli_path;         /**< full path */
390         int                  pli_pathlen;
391         int                  pli_linkno;       /**< which hardlink to follow */
392         int                  pli_fidcount;     /**< number of \a pli_fids */
393 };
394
395 static int mdd_path_current(const struct lu_env *env,
396                             struct path_lookup_info *pli)
397 {
398         struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
399         struct mdd_object *mdd_obj;
400         struct lu_buf     *buf = NULL;
401         struct link_ea_header *leh;
402         struct link_ea_entry  *lee;
403         struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
404         struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
405         char *ptr;
406         int reclen;
407         int rc;
408         ENTRY;
409
410         ptr = pli->pli_path + pli->pli_pathlen - 1;
411         *ptr = 0;
412         --ptr;
413         pli->pli_fidcount = 0;
414         pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
415
416         while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
417                 mdd_obj = mdd_object_find(env, mdd,
418                                           &pli->pli_fids[pli->pli_fidcount]);
419                 if (mdd_obj == NULL)
420                         GOTO(out, rc = -EREMOTE);
421                 if (IS_ERR(mdd_obj))
422                         GOTO(out, rc = PTR_ERR(mdd_obj));
423                 rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
424                 if (rc <= 0) {
425                         mdd_object_put(env, mdd_obj);
426                         if (rc == -1)
427                                 rc = -EREMOTE;
428                         else if (rc == 0)
429                                 /* Do I need to error out here? */
430                                 rc = -ENOENT;
431                         GOTO(out, rc);
432                 }
433
434                 /* Get parent fid and object name */
435                 mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
436                 buf = mdd_links_get(env, mdd_obj);
437                 mdd_read_unlock(env, mdd_obj);
438                 mdd_object_put(env, mdd_obj);
439                 if (IS_ERR(buf))
440                         GOTO(out, rc = PTR_ERR(buf));
441
442                 leh = buf->lb_buf;
443                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
444                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
445
446                 /* If set, use link #linkno for path lookup, otherwise use
447                    link #0.  Only do this for the final path element. */
448                 if ((pli->pli_fidcount == 0) &&
449                     (pli->pli_linkno < leh->leh_reccount)) {
450                         int count;
451                         for (count = 0; count < pli->pli_linkno; count++) {
452                                 lee = (struct link_ea_entry *)
453                                      ((char *)lee + reclen);
454                                 mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
455                         }
456                         if (pli->pli_linkno < leh->leh_reccount - 1)
457                                 /* indicate to user there are more links */
458                                 pli->pli_linkno++;
459                 }
460
461                 /* Pack the name in the end of the buffer */
462                 ptr -= tmpname->ln_namelen;
463                 if (ptr - 1 <= pli->pli_path)
464                         GOTO(out, rc = -EOVERFLOW);
465                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
466                 *(--ptr) = '/';
467
468                 /* Store the parent fid for historic lookup */
469                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
470                         GOTO(out, rc = -EOVERFLOW);
471                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
472         }
473
474         /* Verify that our path hasn't changed since we started the lookup.
475            Record the current index, and verify the path resolves to the
476            same fid. If it does, then the path is correct as of this index. */
477         cfs_spin_lock(&mdd->mdd_cl.mc_lock);
478         pli->pli_currec = mdd->mdd_cl.mc_index;
479         cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
480         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
481         if (rc) {
482                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
483                 GOTO (out, rc = -EAGAIN);
484         }
485         if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
486                 CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
487                        " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
488                        PFID(&pli->pli_fid));
489                 GOTO(out, rc = -EAGAIN);
490         }
491         ptr++; /* skip leading / */
492         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
493
494         EXIT;
495 out:
496         if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
497                 /* if we vmalloced a large buffer drop it */
498                 mdd_buf_put(buf);
499
500         return rc;
501 }
502
503 static int mdd_path_historic(const struct lu_env *env,
504                              struct path_lookup_info *pli)
505 {
506         return 0;
507 }
508
509 /* Returns the full path to this fid, as of changelog record recno. */
510 static int mdd_path(const struct lu_env *env, struct md_object *obj,
511                     char *path, int pathlen, __u64 *recno, int *linkno)
512 {
513         struct path_lookup_info *pli;
514         int tries = 3;
515         int rc = -EAGAIN;
516         ENTRY;
517
518         if (pathlen < 3)
519                 RETURN(-EOVERFLOW);
520
521         if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
522                 path[0] = '\0';
523                 RETURN(0);
524         }
525
526         OBD_ALLOC_PTR(pli);
527         if (pli == NULL)
528                 RETURN(-ENOMEM);
529
530         pli->pli_mdd_obj = md2mdd_obj(obj);
531         pli->pli_recno = *recno;
532         pli->pli_path = path;
533         pli->pli_pathlen = pathlen;
534         pli->pli_linkno = *linkno;
535
536         /* Retry multiple times in case file is being moved */
537         while (tries-- && rc == -EAGAIN)
538                 rc = mdd_path_current(env, pli);
539
540         /* For historical path lookup, the current links may not have existed
541          * at "recno" time.  We must switch over to earlier links/parents
542          * by using the changelog records.  If the earlier parent doesn't
543          * exist, we must search back through the changelog to reconstruct
544          * its parents, then check if it exists, etc.
545          * We may ignore this problem for the initial implementation and
546          * state that an "original" hardlink must still exist for us to find
547          * historic path name. */
548         if (pli->pli_recno != -1) {
549                 rc = mdd_path_historic(env, pli);
550         } else {
551                 *recno = pli->pli_currec;
552                 /* Return next link index to caller */
553                 *linkno = pli->pli_linkno;
554         }
555
556         OBD_FREE_PTR(pli);
557
558         RETURN (rc);
559 }
560
561 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
562 {
563         struct lu_attr *la = &mdd_env_info(env)->mti_la;
564         int rc;
565
566         ENTRY;
567         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
568         if (rc == 0) {
569                 mdd_flags_xlate(obj, la->la_flags);
570         }
571         RETURN(rc);
572 }
573
574 /* get only inode attributes */
575 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
576                   struct md_attr *ma)
577 {
578         int rc = 0;
579         ENTRY;
580
581         if (ma->ma_valid & MA_INODE)
582                 RETURN(0);
583
584         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
585                           mdd_object_capa(env, mdd_obj));
586         if (rc == 0)
587                 ma->ma_valid |= MA_INODE;
588         RETURN(rc);
589 }
590
591 int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
592 {
593         struct lov_desc *ldesc;
594         struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
595         struct lov_user_md *lum = (struct lov_user_md*)lmm;
596         ENTRY;
597
598         if (!lum)
599                 RETURN(0);
600
601         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
602         LASSERT(ldesc != NULL);
603
604         lum->lmm_magic = LOV_MAGIC_V1;
605         lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
606         lum->lmm_pattern = ldesc->ld_pattern;
607         lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
608         lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
609         lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
610
611         RETURN(sizeof(*lum));
612 }
613
614 static int is_rootdir(struct mdd_object *mdd_obj)
615 {
616         const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
617         const struct lu_fid *fid = mdo2fid(mdd_obj);
618
619         return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
620 }
621
622 int mdd_big_lmm_get(const struct lu_env *env, struct mdd_object *obj,
623                     struct md_attr *ma)
624 {
625         struct mdd_thread_info *info = mdd_env_info(env);
626         int                     size;
627         int                     rc   = -EINVAL;
628         ENTRY;
629
630         LASSERT(info != NULL);
631         LASSERT(ma->ma_big_lmm_used == 0);
632
633         if (ma->ma_lmm_size == 0) {
634                 CERROR("No buffer to hold %s xattr of object "DFID"\n",
635                        XATTR_NAME_LOV, PFID(mdd_object_fid(obj)));
636                 RETURN(rc);
637         }
638
639         rc = mdo_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
640                            mdd_object_capa(env, obj));
641         if (rc < 0)
642                 RETURN(rc);
643
644         /* big_lmm may need to grow */
645         size = rc;
646         mdd_max_lmm_buffer(env, size);
647         if (info->mti_max_lmm == NULL)
648                 RETURN(-ENOMEM);
649
650         LASSERT(info->mti_max_lmm_size >= size);
651         rc = mdd_get_md(env, obj, info->mti_max_lmm, &size,
652                         XATTR_NAME_LOV);
653         if (rc < 0)
654                 RETURN(rc);
655
656         ma->ma_big_lmm_used = 1;
657         ma->ma_valid |= MA_LOV;
658         ma->ma_lmm = info->mti_max_lmm;
659         ma->ma_lmm_size = size;
660         LASSERT(size == rc);
661         RETURN(rc);
662 }
663
664 /* get lov EA only */
665 static int __mdd_lmm_get(const struct lu_env *env,
666                          struct mdd_object *mdd_obj, struct md_attr *ma)
667 {
668         int rc;
669         ENTRY;
670
671         if (ma->ma_valid & MA_LOV)
672                 RETURN(0);
673
674         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
675                         XATTR_NAME_LOV);
676         if (rc == -ERANGE)
677                 rc = mdd_big_lmm_get(env, mdd_obj, ma);
678         else if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
679                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
680
681         if (rc > 0) {
682                 ma->ma_lmm_size = rc;
683                 ma->ma_layout_gen = ma->ma_lmm->lmm_layout_gen;
684                 ma->ma_valid |= MA_LOV | MA_LAY_GEN;
685                 rc = 0;
686         }
687         RETURN(rc);
688 }
689
690 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
691                        struct md_attr *ma)
692 {
693         int rc;
694         ENTRY;
695
696         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
697         rc = __mdd_lmm_get(env, mdd_obj, ma);
698         mdd_read_unlock(env, mdd_obj);
699         RETURN(rc);
700 }
701
702 /*
703  * No permission check is needed.
704  */
705 int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
706                  struct md_attr *ma)
707 {
708         struct mdd_object *mdd_obj = md2mdd_obj(obj);
709         int                rc;
710
711         ENTRY;
712         rc = mdd_iattr_get(env, mdd_obj, ma);
713         RETURN(rc);
714 }
715
716 /*
717  * No permission check is needed.
718  */
719 static int mdd_xattr_get(const struct lu_env *env,
720                          struct md_object *obj, struct lu_buf *buf,
721                          const char *name)
722 {
723         struct mdd_object *mdd_obj = md2mdd_obj(obj);
724         struct mdd_device *mdd = mdo2mdd(obj);
725         struct lu_fid      rootfid;
726         int is_root;
727         int rc;
728
729         ENTRY;
730
731         if (mdd_object_exists(mdd_obj) == 0) {
732                 CERROR("%s: object "DFID" not found: rc = -2\n",
733                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
734                 return -ENOENT;
735         }
736
737         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
738         rc = mdo_xattr_get(env, mdd_obj, buf, name,
739                            mdd_object_capa(env, mdd_obj));
740         mdd_read_unlock(env, mdd_obj);
741
742         dt_root_get(env, mdd->mdd_child, &rootfid);
743         is_root = lu_fid_eq(mdd_object_fid(mdd_obj), &rootfid);
744
745         /* XXX: a temp. solution till LOD/OSP is landed */
746         if (rc == -ENODATA && strcmp(name, XATTR_NAME_LOV) == 0 && is_root) {
747                 if (buf->lb_buf == NULL) {
748                         rc = sizeof(struct lov_user_md);
749                 } else if (buf->lb_len >= sizeof(struct lov_user_md)) {
750                         rc = mdd_get_default_md(mdd_obj, buf->lb_buf);
751                 } else {
752                         rc = -ERANGE;
753                 }
754         }
755
756         RETURN(rc);
757 }
758
759 /*
760  * Permission check is done when open,
761  * no need check again.
762  */
763 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
764                         struct lu_buf *buf)
765 {
766         struct mdd_object *mdd_obj = md2mdd_obj(obj);
767         struct dt_object  *next;
768         loff_t             pos = 0;
769         int                rc;
770         ENTRY;
771
772         if (mdd_object_exists(mdd_obj) == 0) {
773                 CERROR("%s: object "DFID" not found: rc = -2\n",
774                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
775                 return -ENOENT;
776         }
777
778         next = mdd_object_child(mdd_obj);
779         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
780         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
781                                          mdd_object_capa(env, mdd_obj));
782         mdd_read_unlock(env, mdd_obj);
783         RETURN(rc);
784 }
785
786 /*
787  * No permission check is needed.
788  */
789 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
790                           struct lu_buf *buf)
791 {
792         struct mdd_object *mdd_obj = md2mdd_obj(obj);
793         int rc;
794
795         ENTRY;
796
797         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
798         rc = mdo_xattr_list(env, mdd_obj, buf, mdd_object_capa(env, mdd_obj));
799         mdd_read_unlock(env, mdd_obj);
800
801         RETURN(rc);
802 }
803
804 int mdd_declare_object_create_internal(const struct lu_env *env,
805                                        struct mdd_object *p,
806                                        struct mdd_object *c,
807                                        struct lu_attr *attr,
808                                        struct thandle *handle,
809                                        const struct md_op_spec *spec)
810 {
811         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
812         const struct dt_index_features *feat = spec->sp_feat;
813         int rc;
814         ENTRY;
815
816         if (feat != &dt_directory_features && feat != NULL)
817                 dof->dof_type = DFT_INDEX;
818         else
819                 dof->dof_type = dt_mode_to_dft(attr->la_mode);
820
821         dof->u.dof_idx.di_feat = feat;
822
823         rc = mdo_declare_create_obj(env, c, attr, NULL, dof, handle);
824
825         RETURN(rc);
826 }
827
828 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
829                                struct mdd_object *c, struct lu_attr *attr,
830                                struct thandle *handle,
831                                const struct md_op_spec *spec)
832 {
833         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
834         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
835         const struct dt_index_features *feat = spec->sp_feat;
836         int rc;
837         ENTRY;
838
839         if (!mdd_object_exists(c)) {
840                 struct dt_object *next = mdd_object_child(c);
841                 LASSERT(next);
842
843                 if (feat != &dt_directory_features && feat != NULL)
844                         dof->dof_type = DFT_INDEX;
845                 else
846                         dof->dof_type = dt_mode_to_dft(attr->la_mode);
847
848                 dof->u.dof_idx.di_feat = feat;
849
850                 rc = mdo_create_obj(env, c, attr, hint, dof, handle);
851                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
852         } else
853                 rc = -EEXIST;
854
855         RETURN(rc);
856 }
857
858 /**
859  * Make sure the ctime is increased only.
860  */
861 static inline int mdd_attr_check(const struct lu_env *env,
862                                  struct mdd_object *obj,
863                                  struct lu_attr *attr)
864 {
865         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
866         int rc;
867         ENTRY;
868
869         if (attr->la_valid & LA_CTIME) {
870                 rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
871                 if (rc)
872                         RETURN(rc);
873
874                 if (attr->la_ctime < tmp_la->la_ctime)
875                         attr->la_valid &= ~(LA_MTIME | LA_CTIME);
876                 else if (attr->la_valid == LA_CTIME &&
877                          attr->la_ctime == tmp_la->la_ctime)
878                         attr->la_valid &= ~LA_CTIME;
879         }
880         RETURN(0);
881 }
882
883 int mdd_attr_set_internal(const struct lu_env *env, struct mdd_object *obj,
884                           struct lu_attr *attr, struct thandle *handle,
885                           int needacl)
886 {
887         int rc;
888         ENTRY;
889
890         rc = mdo_attr_set(env, obj, attr, handle, mdd_object_capa(env, obj));
891 #ifdef CONFIG_FS_POSIX_ACL
892         if (!rc && (attr->la_valid & LA_MODE) && needacl)
893                 rc = mdd_acl_chmod(env, obj, attr->la_mode, handle);
894 #endif
895         RETURN(rc);
896 }
897
898 int mdd_attr_check_set_internal(const struct lu_env *env,
899                                 struct mdd_object *obj, struct lu_attr *attr,
900                                 struct thandle *handle, int needacl)
901 {
902         int rc;
903         ENTRY;
904
905         rc = mdd_attr_check(env, obj, attr);
906         if (rc)
907                 RETURN(rc);
908
909         if (attr->la_valid)
910                 rc = mdd_attr_set_internal(env, obj, attr, handle, needacl);
911         RETURN(rc);
912 }
913
914 int mdd_attr_check_set_internal_locked(const struct lu_env *env,
915                                        struct mdd_object *obj,
916                                        struct lu_attr *attr,
917                                        struct thandle *handle,
918                                        int needacl)
919 {
920         int rc;
921         ENTRY;
922
923         needacl = needacl && (attr->la_valid & LA_MODE);
924         if (needacl)
925                 mdd_write_lock(env, obj, MOR_TGT_CHILD);
926         rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
927         if (needacl)
928                 mdd_write_unlock(env, obj);
929         RETURN(rc);
930 }
931
932 int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
933                     const struct lu_buf *buf, const char *name,
934                     int fl, struct thandle *handle)
935 {
936         struct lustre_capa *capa = mdd_object_capa(env, obj);
937         int rc = -EINVAL;
938         ENTRY;
939
940         if (buf->lb_buf && buf->lb_len > 0)
941                 rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
942         else if (buf->lb_buf == NULL && buf->lb_len == 0)
943                 rc = mdo_xattr_del(env, obj, name, handle, capa);
944
945         RETURN(rc);
946 }
947
948 /*
949  * This gives the same functionality as the code between
950  * sys_chmod and inode_setattr
951  * chown_common and inode_setattr
952  * utimes and inode_setattr
953  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
954  */
955 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
956                         struct lu_attr *la, const unsigned long flags)
957 {
958         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
959         struct md_ucred  *uc;
960         int               rc;
961         ENTRY;
962
963         if (!la->la_valid)
964                 RETURN(0);
965
966         /* Do not permit change file type */
967         if (la->la_valid & LA_TYPE)
968                 RETURN(-EPERM);
969
970         /* They should not be processed by setattr */
971         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
972                 RETURN(-EPERM);
973
974         /* export destroy does not have ->le_ses, but we may want
975          * to drop LUSTRE_SOM_FL. */
976         if (!env->le_ses)
977                 RETURN(0);
978
979         uc = md_ucred(env);
980
981         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
982         if (rc)
983                 RETURN(rc);
984
985         if (la->la_valid == LA_CTIME) {
986                 if (!(flags & MDS_PERM_BYPASS))
987                         /* This is only for set ctime when rename's source is
988                          * on remote MDS. */
989                         rc = mdd_may_delete(env, NULL, obj, tmp_la, NULL, 1, 0);
990                 if (rc == 0 && la->la_ctime <= tmp_la->la_ctime)
991                         la->la_valid &= ~LA_CTIME;
992                 RETURN(rc);
993         }
994
995         if (la->la_valid == LA_ATIME) {
996                 /* This is atime only set for read atime update on close. */
997                 if (la->la_atime >= tmp_la->la_atime &&
998                     la->la_atime < (tmp_la->la_atime +
999                                     mdd_obj2mdd_dev(obj)->mdd_atime_diff))
1000                         la->la_valid &= ~LA_ATIME;
1001                 RETURN(0);
1002         }
1003
1004         /* Check if flags change. */
1005         if (la->la_valid & LA_FLAGS) {
1006                 unsigned int oldflags = 0;
1007                 unsigned int newflags = la->la_flags &
1008                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
1009
1010                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1011                     !mdd_capable(uc, CFS_CAP_FOWNER))
1012                         RETURN(-EPERM);
1013
1014                 /* XXX: the IMMUTABLE and APPEND_ONLY flags can
1015                  * only be changed by the relevant capability. */
1016                 if (mdd_is_immutable(obj))
1017                         oldflags |= LUSTRE_IMMUTABLE_FL;
1018                 if (mdd_is_append(obj))
1019                         oldflags |= LUSTRE_APPEND_FL;
1020                 if ((oldflags ^ newflags) &&
1021                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
1022                         RETURN(-EPERM);
1023
1024                 if (!S_ISDIR(tmp_la->la_mode))
1025                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
1026         }
1027
1028         if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
1029             (la->la_valid & ~LA_FLAGS) &&
1030             !(flags & MDS_PERM_BYPASS))
1031                 RETURN(-EPERM);
1032
1033         /* Check for setting the obj time. */
1034         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
1035             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
1036                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
1037                     !mdd_capable(uc, CFS_CAP_FOWNER)) {
1038                         rc = mdd_permission_internal(env, obj, tmp_la,
1039                                                      MAY_WRITE);
1040                         if (rc)
1041                                 RETURN(rc);
1042                 }
1043         }
1044
1045         if (la->la_valid & LA_KILL_SUID) {
1046                 la->la_valid &= ~LA_KILL_SUID;
1047                 if ((tmp_la->la_mode & S_ISUID) &&
1048                     !(la->la_valid & LA_MODE)) {
1049                         la->la_mode = tmp_la->la_mode;
1050                         la->la_valid |= LA_MODE;
1051                 }
1052                 la->la_mode &= ~S_ISUID;
1053         }
1054
1055         if (la->la_valid & LA_KILL_SGID) {
1056                 la->la_valid &= ~LA_KILL_SGID;
1057                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1058                                         (S_ISGID | S_IXGRP)) &&
1059                     !(la->la_valid & LA_MODE)) {
1060                         la->la_mode = tmp_la->la_mode;
1061                         la->la_valid |= LA_MODE;
1062                 }
1063                 la->la_mode &= ~S_ISGID;
1064         }
1065
1066         /* Make sure a caller can chmod. */
1067         if (la->la_valid & LA_MODE) {
1068                 if (!(flags & MDS_PERM_BYPASS) &&
1069                     (uc->mu_fsuid != tmp_la->la_uid) &&
1070                     !mdd_capable(uc, CFS_CAP_FOWNER))
1071                         RETURN(-EPERM);
1072
1073                 if (la->la_mode == (cfs_umode_t) -1)
1074                         la->la_mode = tmp_la->la_mode;
1075                 else
1076                         la->la_mode = (la->la_mode & S_IALLUGO) |
1077                                       (tmp_la->la_mode & ~S_IALLUGO);
1078
1079                 /* Also check the setgid bit! */
1080                 if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
1081                                        la->la_gid : tmp_la->la_gid) &&
1082                     !mdd_capable(uc, CFS_CAP_FSETID))
1083                         la->la_mode &= ~S_ISGID;
1084         } else {
1085                la->la_mode = tmp_la->la_mode;
1086         }
1087
1088         /* Make sure a caller can chown. */
1089         if (la->la_valid & LA_UID) {
1090                 if (la->la_uid == (uid_t) -1)
1091                         la->la_uid = tmp_la->la_uid;
1092                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1093                     (la->la_uid != tmp_la->la_uid)) &&
1094                     !mdd_capable(uc, CFS_CAP_CHOWN))
1095                         RETURN(-EPERM);
1096
1097                 /* If the user or group of a non-directory has been
1098                  * changed by a non-root user, remove the setuid bit.
1099                  * 19981026 David C Niemi <niemi@tux.org>
1100                  *
1101                  * Changed this to apply to all users, including root,
1102                  * to avoid some races. This is the behavior we had in
1103                  * 2.0. The check for non-root was definitely wrong
1104                  * for 2.2 anyway, as it should have been using
1105                  * CAP_FSETID rather than fsuid -- 19990830 SD. */
1106                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
1107                     !S_ISDIR(tmp_la->la_mode)) {
1108                         la->la_mode &= ~S_ISUID;
1109                         la->la_valid |= LA_MODE;
1110                 }
1111         }
1112
1113         /* Make sure caller can chgrp. */
1114         if (la->la_valid & LA_GID) {
1115                 if (la->la_gid == (gid_t) -1)
1116                         la->la_gid = tmp_la->la_gid;
1117                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
1118                     ((la->la_gid != tmp_la->la_gid) &&
1119                     !lustre_in_group_p(uc, la->la_gid))) &&
1120                     !mdd_capable(uc, CFS_CAP_CHOWN))
1121                         RETURN(-EPERM);
1122
1123                 /* Likewise, if the user or group of a non-directory
1124                  * has been changed by a non-root user, remove the
1125                  * setgid bit UNLESS there is no group execute bit
1126                  * (this would be a file marked for mandatory
1127                  * locking).  19981026 David C Niemi <niemi@tux.org>
1128                  *
1129                  * Removed the fsuid check (see the comment above) --
1130                  * 19990830 SD. */
1131                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
1132                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
1133                         la->la_mode &= ~S_ISGID;
1134                         la->la_valid |= LA_MODE;
1135                 }
1136         }
1137
1138         /* For both Size-on-MDS case and truncate case,
1139          * "la->la_valid & (LA_SIZE | LA_BLOCKS)" are ture.
1140          * We distinguish them by "flags & MDS_SOM".
1141          * For SOM case, it is true, the MAY_WRITE perm has been checked
1142          * when open, no need check again. For truncate case, it is false,
1143          * the MAY_WRITE perm should be checked here. */
1144         if (flags & MDS_SOM) {
1145                 /* For the "Size-on-MDS" setattr update, merge coming
1146                  * attributes with the set in the inode. BUG 10641 */
1147                 if ((la->la_valid & LA_ATIME) &&
1148                     (la->la_atime <= tmp_la->la_atime))
1149                         la->la_valid &= ~LA_ATIME;
1150
1151                 /* OST attributes do not have a priority over MDS attributes,
1152                  * so drop times if ctime is equal. */
1153                 if ((la->la_valid & LA_CTIME) &&
1154                     (la->la_ctime <= tmp_la->la_ctime))
1155                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
1156         } else {
1157                 if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
1158                         if (!((flags & MDS_OPEN_OWNEROVERRIDE) &&
1159                               (uc->mu_fsuid == tmp_la->la_uid)) &&
1160                             !(flags & MDS_PERM_BYPASS)) {
1161                                 rc = mdd_permission_internal(env, obj,
1162                                                              tmp_la, MAY_WRITE);
1163                                 if (rc)
1164                                         RETURN(rc);
1165                         }
1166                 }
1167                 if (la->la_valid & LA_CTIME) {
1168                         /* The pure setattr, it has the priority over what is
1169                          * already set, do not drop it if ctime is equal. */
1170                         if (la->la_ctime < tmp_la->la_ctime)
1171                                 la->la_valid &= ~(LA_ATIME | LA_MTIME |
1172                                                   LA_CTIME);
1173                 }
1174         }
1175
1176         RETURN(0);
1177 }
1178
1179 /** Store a data change changelog record
1180  * If this fails, we must fail the whole transaction; we don't
1181  * want the change to commit without the log entry.
1182  * \param mdd_obj - mdd_object of change
1183  * \param handle - transacion handle
1184  */
1185 static int mdd_changelog_data_store(const struct lu_env     *env,
1186                                     struct mdd_device       *mdd,
1187                                     enum changelog_rec_type type,
1188                                     int                     flags,
1189                                     struct mdd_object       *mdd_obj,
1190                                     struct thandle          *handle)
1191 {
1192         const struct lu_fid *tfid = mdo2fid(mdd_obj);
1193         struct llog_changelog_rec *rec;
1194         struct thandle *th = NULL;
1195         struct lu_buf *buf;
1196         int reclen;
1197         int rc;
1198
1199         /* Not recording */
1200         if (!(mdd->mdd_cl.mc_flags & CLM_ON))
1201                 RETURN(0);
1202         if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
1203                 RETURN(0);
1204
1205         LASSERT(mdd_obj != NULL);
1206         LASSERT(handle != NULL);
1207
1208         if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
1209             cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
1210                 /* Don't need multiple updates in this log */
1211                 /* Don't check under lock - no big deal if we get an extra
1212                    entry */
1213                 RETURN(0);
1214         }
1215
1216         reclen = llog_data_len(sizeof(*rec));
1217         buf = mdd_buf_alloc(env, reclen);
1218         if (buf->lb_buf == NULL)
1219                 RETURN(-ENOMEM);
1220         rec = (struct llog_changelog_rec *)buf->lb_buf;
1221
1222         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
1223         rec->cr.cr_type = (__u32)type;
1224         rec->cr.cr_tfid = *tfid;
1225         rec->cr.cr_namelen = 0;
1226         mdd_obj->mod_cltime = cfs_time_current_64();
1227
1228         rc = mdd_changelog_llog_write(mdd, rec, handle ? : th);
1229
1230         if (th)
1231                 mdd_trans_stop(env, mdd, rc, th);
1232
1233         if (rc < 0) {
1234                 CERROR("changelog failed: rc=%d op%d t"DFID"\n",
1235                        rc, type, PFID(tfid));
1236                 return -EFAULT;
1237         }
1238
1239         return 0;
1240 }
1241
1242 int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
1243                   int flags, struct md_object *obj)
1244 {
1245         struct thandle *handle;
1246         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1247         struct mdd_device *mdd = mdo2mdd(obj);
1248         int rc;
1249         ENTRY;
1250
1251         handle = mdd_trans_create(env, mdd);
1252         if (IS_ERR(handle))
1253                 return(PTR_ERR(handle));
1254
1255         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1256         if (rc)
1257                 GOTO(stop, rc);
1258
1259         rc = mdd_trans_start(env, mdd, handle);
1260         if (rc)
1261                 GOTO(stop, rc);
1262
1263         rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
1264                                       handle);
1265
1266 stop:
1267         mdd_trans_stop(env, mdd, rc, handle);
1268
1269         RETURN(rc);
1270 }
1271
1272 /**
1273  * Should be called with write lock held.
1274  *
1275  * \see mdd_lma_set_locked().
1276  */
1277 static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
1278                        const struct md_attr *ma, struct thandle *handle)
1279 {
1280         struct mdd_thread_info *info = mdd_env_info(env);
1281         struct lu_buf *buf;
1282         struct lustre_mdt_attrs *lma =
1283                                 (struct lustre_mdt_attrs *) info->mti_xattr_buf;
1284         int lmasize = sizeof(struct lustre_mdt_attrs);
1285         int rc = 0;
1286
1287         ENTRY;
1288
1289         /* Either HSM or SOM part is not valid, we need to read it before */
1290         if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
1291                 rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
1292                 if (rc <= 0)
1293                         RETURN(rc);
1294
1295                 lustre_lma_swab(lma);
1296         } else {
1297                 memset(lma, 0, lmasize);
1298         }
1299
1300         /* Copy HSM data */
1301         if (ma->ma_valid & MA_HSM) {
1302                 lma->lma_flags  |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
1303                 lma->lma_compat |= LMAC_HSM;
1304         }
1305
1306         /* Copy SOM data */
1307         if (ma->ma_valid & MA_SOM) {
1308                 LASSERT(ma->ma_som != NULL);
1309                 if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
1310                         lma->lma_compat     &= ~LMAC_SOM;
1311                 } else {
1312                         lma->lma_compat     |= LMAC_SOM;
1313                         lma->lma_ioepoch     = ma->ma_som->msd_ioepoch;
1314                         lma->lma_som_size    = ma->ma_som->msd_size;
1315                         lma->lma_som_blocks  = ma->ma_som->msd_blocks;
1316                         lma->lma_som_mountid = ma->ma_som->msd_mountid;
1317                 }
1318         }
1319
1320         /* Copy FID */
1321         memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
1322
1323         lustre_lma_swab(lma);
1324         buf = mdd_buf_get(env, lma, lmasize);
1325         rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
1326
1327         RETURN(rc);
1328 }
1329
1330 /**
1331  * Save LMA extended attributes with data from \a ma.
1332  *
1333  * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
1334  * not, LMA EA will be first read from disk, modified and write back.
1335  *
1336  */
1337 static int mdd_lma_set_locked(const struct lu_env *env,
1338                               struct mdd_object *mdd_obj,
1339                               const struct md_attr *ma, struct thandle *handle)
1340 {
1341         int rc;
1342
1343         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1344         rc = __mdd_lma_set(env, mdd_obj, ma, handle);
1345         mdd_write_unlock(env, mdd_obj);
1346         return rc;
1347 }
1348
1349 /* Precedence for choosing record type when multiple
1350  * attributes change: setattr > mtime > ctime > atime
1351  * (ctime changes when mtime does, plus chmod/chown.
1352  * atime and ctime are independent.) */
1353 static int mdd_attr_set_changelog(const struct lu_env *env,
1354                                   struct md_object *obj, struct thandle *handle,
1355                                   __u64 valid)
1356 {
1357         struct mdd_device *mdd = mdo2mdd(obj);
1358         int bits, type = 0;
1359
1360         bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
1361         bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
1362         bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
1363         bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
1364         bits = bits & mdd->mdd_cl.mc_mask;
1365         if (bits == 0)
1366                 return 0;
1367
1368         /* The record type is the lowest non-masked set bit */
1369         while (bits && ((bits & 1) == 0)) {
1370                 bits = bits >> 1;
1371                 type++;
1372         }
1373
1374         /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
1375         return mdd_changelog_data_store(env, mdd, type, (int)valid,
1376                                         md2mdd_obj(obj), handle);
1377 }
1378
1379 static int mdd_declare_attr_set(const struct lu_env *env,
1380                                 struct mdd_device *mdd,
1381                                 struct mdd_object *obj,
1382                                 const struct md_attr *ma,
1383                                 struct lov_mds_md *lmm,
1384                                 struct thandle *handle)
1385 {
1386         struct lu_buf  *buf = &mdd_env_info(env)->mti_buf;
1387         struct lu_attr *attr = (struct lu_attr *) &ma->ma_attr;
1388         int             rc, i;
1389
1390         rc = mdo_declare_attr_set(env, obj, attr, handle);
1391         if (rc)
1392                 return rc;
1393
1394         rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1395         if (rc)
1396                 return rc;
1397
1398         if (ma->ma_valid & MA_LOV) {
1399                 buf->lb_buf = NULL;
1400                 buf->lb_len = ma->ma_lmm_size;
1401                 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
1402                                            0, handle);
1403                 if (rc)
1404                         return rc;
1405         }
1406
1407         if (ma->ma_valid & (MA_HSM | MA_SOM)) {
1408                 buf->lb_buf = NULL;
1409                 buf->lb_len = sizeof(struct lustre_mdt_attrs);
1410                 rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA,
1411                                            0, handle);
1412                 if (rc)
1413                         return rc;
1414         }
1415
1416 #ifdef CONFIG_FS_POSIX_ACL
1417         if (attr->la_valid & LA_MODE) {
1418                 mdd_read_lock(env, obj, MOR_TGT_CHILD);
1419                 rc = mdo_xattr_get(env, obj, &LU_BUF_NULL,
1420                                    XATTR_NAME_ACL_ACCESS, BYPASS_CAPA);
1421                 mdd_read_unlock(env, obj);
1422                 if (rc == -EOPNOTSUPP || rc == -ENODATA)
1423                         rc = 0;
1424                 else if (rc < 0)
1425                         return rc;
1426
1427                 if (rc != 0) {
1428                         struct lu_buf *buf = mdd_buf_get(env, NULL, rc);
1429                         rc = mdo_declare_xattr_set(env, obj, buf,
1430                                                    XATTR_NAME_ACL_ACCESS, 0,
1431                                                    handle);
1432                         if (rc)
1433                                 return rc;
1434                 }
1435         }
1436 #endif
1437
1438         /* basically the log is the same as in unlink case */
1439         if (lmm) {
1440                 __u16 stripe;
1441
1442                 if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V1 &&
1443                                 le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V3) {
1444                         CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n",
1445                                mdd->mdd_obd_dev->obd_name,
1446                                le32_to_cpu(lmm->lmm_magic),
1447                                PFID(lu_object_fid(&obj->mod_obj.mo_lu)));
1448                         return -EINVAL;
1449                 }
1450
1451                 stripe = le16_to_cpu(lmm->lmm_stripe_count);
1452                 if (stripe == LOV_ALL_STRIPES) {
1453                         struct lov_desc *ldesc;
1454
1455                         ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
1456                         LASSERT(ldesc != NULL);
1457                         stripe = ldesc->ld_tgt_count;
1458                 }
1459
1460                 for (i = 0; i < stripe; i++) {
1461                         rc = mdd_declare_llog_record(env, mdd,
1462                                         sizeof(struct llog_unlink_rec),
1463                                         handle);
1464                         if (rc)
1465                                 return rc;
1466                 }
1467         }
1468
1469         return rc;
1470 }
1471
1472 /* set attr and LOV EA at once, return updated attr */
1473 int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
1474                  const struct md_attr *ma)
1475 {
1476         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1477         struct mdd_device *mdd = mdo2mdd(obj);
1478         struct thandle *handle;
1479         struct lov_mds_md *lmm = NULL;
1480         struct llog_cookie *logcookies = NULL;
1481         int  rc, lmm_size = 0, cookie_size = 0;
1482         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1483         const struct lu_attr *la = &ma->ma_attr;
1484         ENTRY;
1485
1486         *la_copy = ma->ma_attr;
1487         rc = mdd_fix_attr(env, mdd_obj, la_copy, ma->ma_attr_flags);
1488         if (rc)
1489                 RETURN(rc);
1490
1491         /* setattr on "close" only change atime, or do nothing */
1492         if (la->la_valid == LA_ATIME && la_copy->la_valid == 0)
1493                 RETURN(0);
1494
1495         if (S_ISREG(mdd_object_type(mdd_obj)) &&
1496             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
1497                 lmm_size = mdd_lov_mdsize(env, mdd);
1498                 lmm = mdd_max_lmm_get(env, mdd);
1499                 if (lmm == NULL)
1500                         RETURN(-ENOMEM);
1501
1502                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
1503                                 XATTR_NAME_LOV);
1504
1505                 if (rc < 0)
1506                         RETURN(rc);
1507         }
1508
1509         handle = mdd_trans_create(env, mdd);
1510         if (IS_ERR(handle))
1511                 RETURN(PTR_ERR(handle));
1512
1513         rc = mdd_declare_attr_set(env, mdd, mdd_obj, ma,
1514                                   lmm_size > 0 ? lmm : NULL, handle);
1515         if (rc)
1516                 GOTO(stop, rc);
1517
1518         rc = mdd_trans_start(env, mdd, handle);
1519         if (rc)
1520                 GOTO(stop, rc);
1521
1522         /* permission changes may require sync operation */
1523         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
1524                 handle->th_sync |= !!mdd->mdd_sync_permission;
1525
1526         if (la->la_valid & (LA_MTIME | LA_CTIME))
1527                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
1528                        la->la_mtime, la->la_ctime);
1529
1530         if (la_copy->la_valid & LA_FLAGS) {
1531                 rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
1532                 if (rc == 0)
1533                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
1534         } else if (la_copy->la_valid) {            /* setattr */
1535                 rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
1536                 /* journal chown/chgrp in llog, just like unlink */
1537                 if (rc == 0 && lmm_size){
1538                         cookie_size = mdd_lov_cookiesize(env, mdd);
1539                         logcookies = mdd_max_cookie_get(env, mdd);
1540                         if (logcookies == NULL)
1541                                 GOTO(cleanup, rc = -ENOMEM);
1542
1543                         if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
1544                                             logcookies, cookie_size) <= 0)
1545                                 logcookies = NULL;
1546                 }
1547         }
1548
1549         if (rc == 0 && ma->ma_valid & MA_LOV) {
1550                 cfs_umode_t mode;
1551
1552                 mode = mdd_object_type(mdd_obj);
1553                 if (S_ISREG(mode) || S_ISDIR(mode)) {
1554                         rc = mdd_lsm_sanity_check(env, mdd_obj);
1555                         if (rc)
1556                                 GOTO(cleanup, rc);
1557
1558                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
1559                                             ma->ma_lmm_size, handle, 1);
1560                 }
1561
1562         }
1563         if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
1564                 cfs_umode_t mode;
1565
1566                 mode = mdd_object_type(mdd_obj);
1567                 if (S_ISREG(mode))
1568                         rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
1569
1570         }
1571 cleanup:
1572         if (rc == 0)
1573                 rc = mdd_attr_set_changelog(env, obj, handle,
1574                                             ma->ma_attr.la_valid);
1575 stop:
1576         mdd_trans_stop(env, mdd, rc, handle);
1577         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
1578                 /*set obd attr, if needed*/
1579                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
1580                                            logcookies);
1581         }
1582         RETURN(rc);
1583 }
1584
1585 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
1586                       const struct lu_buf *buf, const char *name, int fl,
1587                       struct thandle *handle)
1588 {
1589         int  rc;
1590         ENTRY;
1591
1592         mdd_write_lock(env, obj, MOR_TGT_CHILD);
1593         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
1594         mdd_write_unlock(env, obj);
1595
1596         RETURN(rc);
1597 }
1598
1599 static int mdd_xattr_sanity_check(const struct lu_env *env,
1600                                   struct mdd_object *obj)
1601 {
1602         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
1603         struct md_ucred *uc     = md_ucred(env);
1604         int rc;
1605         ENTRY;
1606
1607         if (mdd_is_immutable(obj) || mdd_is_append(obj))
1608                 RETURN(-EPERM);
1609
1610         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1611         if (rc)
1612                 RETURN(rc);
1613
1614         if ((uc->mu_fsuid != tmp_la->la_uid) &&
1615             !mdd_capable(uc, CFS_CAP_FOWNER))
1616                 RETURN(-EPERM);
1617
1618         RETURN(rc);
1619 }
1620
1621 static int mdd_declare_xattr_set(const struct lu_env *env,
1622                                  struct mdd_device *mdd,
1623                                  struct mdd_object *obj,
1624                                  const struct lu_buf *buf,
1625                                  const char *name,
1626                                  struct thandle *handle)
1627 {
1628         int rc;
1629
1630         rc = mdo_declare_xattr_set(env, obj, buf, name, 0, handle);
1631         if (rc)
1632                 return rc;
1633
1634         /* Only record user xattr changes */
1635         if ((strncmp("user.", name, 5) == 0))
1636                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1637
1638         return rc;
1639 }
1640
1641 /**
1642  * The caller should guarantee to update the object ctime
1643  * after xattr_set if needed.
1644  */
1645 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
1646                          const struct lu_buf *buf, const char *name,
1647                          int fl)
1648 {
1649         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1650         struct mdd_device *mdd = mdo2mdd(obj);
1651         struct thandle *handle;
1652         int  rc;
1653         ENTRY;
1654
1655         if (!strcmp(name, XATTR_NAME_ACL_ACCESS)) {
1656                 rc = mdd_acl_set(env, mdd_obj, buf, fl);
1657                 RETURN(rc);
1658         }
1659
1660         rc = mdd_xattr_sanity_check(env, mdd_obj);
1661         if (rc)
1662                 RETURN(rc);
1663
1664         handle = mdd_trans_create(env, mdd);
1665         if (IS_ERR(handle))
1666                 RETURN(PTR_ERR(handle));
1667
1668         rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, handle);
1669         if (rc)
1670                 GOTO(stop, rc);
1671
1672         rc = mdd_trans_start(env, mdd, handle);
1673         if (rc)
1674                 GOTO(stop, rc);
1675
1676         /* security-replated changes may require sync */
1677         if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
1678                 handle->th_sync |= !!mdd->mdd_sync_permission;
1679
1680         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1681         rc = mdo_xattr_set(env, mdd_obj, buf, name, fl, handle,
1682                            mdd_object_capa(env, mdd_obj));
1683         mdd_write_unlock(env, mdd_obj);
1684         if (rc)
1685                 GOTO(stop, rc);
1686
1687         /* Only record system & user xattr changes */
1688         if (strncmp(XATTR_USER_PREFIX, name,
1689                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1690                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1691                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1692                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1693                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0)
1694                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1695                                               handle);
1696
1697 stop:
1698         mdd_trans_stop(env, mdd, rc, handle);
1699
1700         RETURN(rc);
1701 }
1702
1703 static int mdd_declare_xattr_del(const struct lu_env *env,
1704                                  struct mdd_device *mdd,
1705                                  struct mdd_object *obj,
1706                                  const char *name,
1707                                  struct thandle *handle)
1708 {
1709         int rc;
1710
1711         rc = mdo_declare_xattr_del(env, obj, name, handle);
1712         if (rc)
1713                 return rc;
1714
1715         /* Only record user xattr changes */
1716         if ((strncmp("user.", name, 5) == 0))
1717                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1718
1719         return rc;
1720 }
1721
1722 /**
1723  * The caller should guarantee to update the object ctime
1724  * after xattr_set if needed.
1725  */
1726 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
1727                   const char *name)
1728 {
1729         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1730         struct mdd_device *mdd = mdo2mdd(obj);
1731         struct thandle *handle;
1732         int  rc;
1733         ENTRY;
1734
1735         rc = mdd_xattr_sanity_check(env, mdd_obj);
1736         if (rc)
1737                 RETURN(rc);
1738
1739         handle = mdd_trans_create(env, mdd);
1740         if (IS_ERR(handle))
1741                 RETURN(PTR_ERR(handle));
1742
1743         rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
1744         if (rc)
1745                 GOTO(stop, rc);
1746
1747         rc = mdd_trans_start(env, mdd, handle);
1748         if (rc)
1749                 GOTO(stop, rc);
1750
1751         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1752         rc = mdo_xattr_del(env, mdd_obj, name, handle,
1753                            mdd_object_capa(env, mdd_obj));
1754         mdd_write_unlock(env, mdd_obj);
1755         if (rc)
1756                 GOTO(stop, rc);
1757
1758         /* Only record system & user xattr changes */
1759         if (strncmp(XATTR_USER_PREFIX, name,
1760                                   sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
1761                           strncmp(POSIX_ACL_XATTR_ACCESS, name,
1762                                   sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
1763                           strncmp(POSIX_ACL_XATTR_DEFAULT, name,
1764                                   sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0)
1765                 rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
1766                                               handle);
1767
1768 stop:
1769         mdd_trans_stop(env, mdd, rc, handle);
1770
1771         RETURN(rc);
1772 }
1773
1774 void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent,
1775                 struct mdd_object *child, struct lu_attr *attr)
1776 {
1777         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
1778         struct dt_object *np = parent ? mdd_object_child(parent) : NULL;
1779         struct dt_object *nc = mdd_object_child(child);
1780
1781         /* @hint will be initialized by underlying device. */
1782         nc->do_ops->do_ah_init(env, hint, np, nc, attr->la_mode & S_IFMT);
1783 }
1784
1785 /*
1786  * do NOT or the MAY_*'s, you'll get the weakest
1787  */
1788 int accmode(const struct lu_env *env, struct lu_attr *la, int flags)
1789 {
1790         int res = 0;
1791
1792         /* Sadly, NFSD reopens a file repeatedly during operation, so the
1793          * "acc_mode = 0" allowance for newly-created files isn't honoured.
1794          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1795          * owner can write to a file even if it is marked readonly to hide
1796          * its brokenness. (bug 5781) */
1797         if (flags & MDS_OPEN_OWNEROVERRIDE) {
1798                 struct md_ucred *uc = md_ucred(env);
1799
1800                 if ((uc == NULL) || (uc->mu_valid == UCRED_INIT) ||
1801                     (la->la_uid == uc->mu_fsuid))
1802                         return 0;
1803         }
1804
1805         if (flags & FMODE_READ)
1806                 res |= MAY_READ;
1807         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1808                 res |= MAY_WRITE;
1809         if (flags & MDS_FMODE_EXEC)
1810                 res = MAY_EXEC;
1811         return res;
1812 }
1813
1814 static int mdd_open_sanity_check(const struct lu_env *env,
1815                                  struct mdd_object *obj, int flag)
1816 {
1817         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1818         int mode, rc;
1819         ENTRY;
1820
1821         /* EEXIST check */
1822         if (mdd_is_dead_obj(obj))
1823                 RETURN(-ENOENT);
1824
1825         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1826         if (rc)
1827                RETURN(rc);
1828
1829         if (S_ISLNK(tmp_la->la_mode))
1830                 RETURN(-ELOOP);
1831
1832         mode = accmode(env, tmp_la, flag);
1833
1834         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1835                 RETURN(-EISDIR);
1836
1837         if (!(flag & MDS_OPEN_CREATED)) {
1838                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1839                 if (rc)
1840                         RETURN(rc);
1841         }
1842
1843         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1844             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1845                 flag &= ~MDS_OPEN_TRUNC;
1846
1847         /* For writing append-only file must open it with append mode. */
1848         if (mdd_is_append(obj)) {
1849                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
1850                         RETURN(-EPERM);
1851                 if (flag & MDS_OPEN_TRUNC)
1852                         RETURN(-EPERM);
1853         }
1854
1855 #if 0
1856         /*
1857          * Now, flag -- O_NOATIME does not be packed by client.
1858          */
1859         if (flag & O_NOATIME) {
1860                 struct md_ucred *uc = md_ucred(env);
1861
1862                 if (uc && ((uc->mu_valid == UCRED_OLD) ||
1863                     (uc->mu_valid == UCRED_NEW)) &&
1864                     (uc->mu_fsuid != tmp_la->la_uid) &&
1865                     !mdd_capable(uc, CFS_CAP_FOWNER))
1866                         RETURN(-EPERM);
1867         }
1868 #endif
1869
1870         RETURN(0);
1871 }
1872
1873 static int mdd_open(const struct lu_env *env, struct md_object *obj,
1874                     int flags)
1875 {
1876         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1877         int rc = 0;
1878
1879         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1880
1881         rc = mdd_open_sanity_check(env, mdd_obj, flags);
1882         if (rc == 0)
1883                 mdd_obj->mod_count++;
1884
1885         mdd_write_unlock(env, mdd_obj);
1886         return rc;
1887 }
1888
1889 int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
1890                             struct md_attr *ma, struct thandle *handle)
1891 {
1892         int rc;
1893
1894         rc = mdd_declare_unlink_log(env, obj, ma, handle);
1895         if (rc)
1896                 return rc;
1897
1898         return mdo_declare_destroy(env, obj, handle);
1899 }
1900
1901 /* return md_attr back,
1902  * if it is last unlink then return lov ea + llog cookie*/
1903 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
1904                     struct md_attr *ma, struct thandle *handle)
1905 {
1906         int rc = 0;
1907         ENTRY;
1908
1909         if (S_ISREG(mdd_object_type(obj))) {
1910                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
1911                  * Caller must be ready for that. */
1912                 rc = __mdd_lmm_get(env, obj, ma);
1913                 if ((ma->ma_valid & MA_LOV))
1914                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
1915                                             obj, ma);
1916         }
1917
1918         if (rc == 0)
1919                 rc = mdo_destroy(env, obj, handle);
1920
1921         RETURN(rc);
1922 }
1923
1924 static int mdd_declare_close(const struct lu_env *env,
1925                              struct mdd_object *obj,
1926                              struct md_attr *ma,
1927                              struct thandle *handle)
1928 {
1929         int rc;
1930
1931         rc = orph_declare_index_delete(env, obj, handle);
1932         if (rc)
1933                 return rc;
1934
1935         return mdd_declare_object_kill(env, obj, ma, handle);
1936 }
1937
1938 /*
1939  * No permission check is needed.
1940  */
1941 static int mdd_close(const struct lu_env *env, struct md_object *obj,
1942                      struct md_attr *ma, int mode)
1943 {
1944         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1945         struct mdd_device *mdd = mdo2mdd(obj);
1946         struct thandle    *handle = NULL;
1947         int rc;
1948         int is_orphan = 0, reset = 1;
1949         ENTRY;
1950
1951         if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
1952                 mdd_obj->mod_count--;
1953
1954                 if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
1955                         CDEBUG(D_HA, "Object "DFID" is retained in orphan "
1956                                "list\n", PFID(mdd_object_fid(mdd_obj)));
1957                 RETURN(0);
1958         }
1959
1960         /* check without any lock */
1961         if (mdd_obj->mod_count == 1 &&
1962             (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
1963  again:
1964                 handle = mdd_trans_create(env, mdo2mdd(obj));
1965                 if (IS_ERR(handle))
1966                         RETURN(PTR_ERR(handle));
1967
1968                 rc = mdd_declare_close(env, mdd_obj, ma, handle);
1969                 if (rc)
1970                         GOTO(stop, rc);
1971
1972                 rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
1973                 if (rc)
1974                         GOTO(stop, rc);
1975
1976                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
1977                 if (rc)
1978                         GOTO(stop, rc);
1979         }
1980
1981         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
1982         if (handle == NULL && mdd_obj->mod_count == 1 &&
1983             (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
1984                 mdd_write_unlock(env, mdd_obj);
1985                 goto again;
1986         }
1987
1988         /* release open count */
1989         mdd_obj->mod_count --;
1990
1991         if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
1992                 /* remove link to object from orphan index */
1993                 LASSERT(handle != NULL);
1994                 rc = __mdd_orphan_del(env, mdd_obj, handle);
1995                 if (rc == 0) {
1996                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
1997                                "list, OSS objects to be destroyed.\n",
1998                                PFID(mdd_object_fid(mdd_obj)));
1999                         is_orphan = 1;
2000                 } else {
2001                         CERROR("Object "DFID" can not be deleted from orphan "
2002                                 "list, maybe cause OST objects can not be "
2003                                 "destroyed (err: %d).\n",
2004                                 PFID(mdd_object_fid(mdd_obj)), rc);
2005                         /* If object was not deleted from orphan list, do not
2006                          * destroy OSS objects, which will be done when next
2007                          * recovery. */
2008                         GOTO(out, rc);
2009                 }
2010         }
2011
2012         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
2013                         mdd_object_capa(env, mdd_obj));
2014         /* Object maybe not in orphan list originally, it is rare case for
2015          * mdd_finish_unlink() failure. */
2016         if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_orphan)) {
2017                 /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
2018                 if (ma->ma_valid & MA_FLAGS &&
2019                     ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
2020                         rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
2021                 } else {
2022                         if (handle == NULL) {
2023                                 handle = mdd_trans_create(env, mdo2mdd(obj));
2024                                 if (IS_ERR(handle))
2025                                         GOTO(out, rc = PTR_ERR(handle));
2026
2027                                 rc = mdd_declare_object_kill(env, mdd_obj, ma,
2028                                                              handle);
2029                                 if (rc)
2030                                         GOTO(out, rc);
2031
2032                                 rc = mdd_declare_changelog_store(env, mdd,
2033                                                                  NULL, handle);
2034                                 if (rc)
2035                                         GOTO(stop, rc);
2036
2037                                 rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2038                                 if (rc)
2039                                         GOTO(out, rc);
2040                         }
2041
2042                         rc = mdd_object_kill(env, mdd_obj, ma, handle);
2043                         if (rc == 0)
2044                                 reset = 0;
2045                 }
2046
2047                 if (rc != 0)
2048                         CERROR("Error when prepare to delete Object "DFID" , "
2049                                "which will cause OST objects can not be "
2050                                "destroyed.\n",  PFID(mdd_object_fid(mdd_obj)));
2051         }
2052         EXIT;
2053
2054 out:
2055         if (reset)
2056                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
2057
2058         mdd_write_unlock(env, mdd_obj);
2059
2060         if (rc == 0 &&
2061             (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
2062             !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
2063                 if (handle == NULL) {
2064                         handle = mdd_trans_create(env, mdo2mdd(obj));
2065                         if (IS_ERR(handle))
2066                                 GOTO(stop, rc = IS_ERR(handle));
2067
2068                         rc = mdd_declare_changelog_store(env, mdd, NULL,
2069                                                          handle);
2070                         if (rc)
2071                                 GOTO(stop, rc);
2072
2073                         rc = mdd_trans_start(env, mdo2mdd(obj), handle);
2074                         if (rc)
2075                                 GOTO(stop, rc);
2076                 }
2077
2078                 mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
2079                                          mdd_obj, handle);
2080         }
2081
2082 stop:
2083         if (handle != NULL)
2084                 mdd_trans_stop(env, mdd, rc, handle);
2085         return rc;
2086 }
2087
2088 /*
2089  * Permission check is done when open,
2090  * no need check again.
2091  */
2092 static int mdd_readpage_sanity_check(const struct lu_env *env,
2093                                      struct mdd_object *obj)
2094 {
2095         struct dt_object *next = mdd_object_child(obj);
2096         int rc;
2097         ENTRY;
2098
2099         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
2100                 rc = 0;
2101         else
2102                 rc = -ENOTDIR;
2103
2104         RETURN(rc);
2105 }
2106
2107 static int mdd_dir_page_build(const struct lu_env *env, union lu_page *lp,
2108                               int nob, const struct dt_it_ops *iops,
2109                               struct dt_it *it, __u32 attr, void *arg)
2110 {
2111         struct lu_dirpage       *dp = &lp->lp_dir;
2112         void                    *area = dp;
2113         int                      result;
2114         __u64                    hash = 0;
2115         struct lu_dirent        *ent;
2116         struct lu_dirent        *last = NULL;
2117         int                      first = 1;
2118
2119         memset(area, 0, sizeof (*dp));
2120         area += sizeof (*dp);
2121         nob  -= sizeof (*dp);
2122
2123         ent  = area;
2124         do {
2125                 int    len;
2126                 int    recsize;
2127
2128                 len = iops->key_size(env, it);
2129
2130                 /* IAM iterator can return record with zero len. */
2131                 if (len == 0)
2132                         goto next;
2133
2134                 hash = iops->store(env, it);
2135                 if (unlikely(first)) {
2136                         first = 0;
2137                         dp->ldp_hash_start = cpu_to_le64(hash);
2138                 }
2139
2140                 /* calculate max space required for lu_dirent */
2141                 recsize = lu_dirent_calc_size(len, attr);
2142
2143                 if (nob >= recsize) {
2144                         result = iops->rec(env, it, (struct dt_rec *)ent, attr);
2145                         if (result == -ESTALE)
2146                                 goto next;
2147                         if (result != 0)
2148                                 goto out;
2149
2150                         /* osd might not able to pack all attributes,
2151                          * so recheck rec length */
2152                         recsize = le16_to_cpu(ent->lde_reclen);
2153                 } else {
2154                         result = (last != NULL) ? 0 :-EINVAL;
2155                         goto out;
2156                 }
2157                 last = ent;
2158                 ent = (void *)ent + recsize;
2159                 nob -= recsize;
2160
2161 next:
2162                 result = iops->next(env, it);
2163                 if (result == -ESTALE)
2164                         goto next;
2165         } while (result == 0);
2166
2167 out:
2168         dp->ldp_hash_end = cpu_to_le64(hash);
2169         if (last != NULL) {
2170                 if (last->lde_hash == dp->ldp_hash_end)
2171                         dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
2172                 last->lde_reclen = 0; /* end mark */
2173         }
2174         if (result > 0)
2175                 /* end of directory */
2176                 dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
2177         if (result < 0)
2178                 CWARN("build page failed: %d!\n", result);
2179         return result;
2180 }
2181
2182 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
2183                  const struct lu_rdpg *rdpg)
2184 {
2185         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2186         int rc;
2187         ENTRY;
2188
2189         if (mdd_object_exists(mdd_obj) == 0) {
2190                 CERROR("%s: object "DFID" not found: rc = -2\n",
2191                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2192                 return -ENOENT;
2193         }
2194
2195         mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
2196         rc = mdd_readpage_sanity_check(env, mdd_obj);
2197         if (rc)
2198                 GOTO(out_unlock, rc);
2199
2200         if (mdd_is_dead_obj(mdd_obj)) {
2201                 struct page *pg;
2202                 struct lu_dirpage *dp;
2203
2204                 /*
2205                  * According to POSIX, please do not return any entry to client:
2206                  * even dot and dotdot should not be returned.
2207                  */
2208                 CDEBUG(D_INODE, "readdir from dead object: "DFID"\n",
2209                        PFID(mdd_object_fid(mdd_obj)));
2210
2211                 if (rdpg->rp_count <= 0)
2212                         GOTO(out_unlock, rc = -EFAULT);
2213                 LASSERT(rdpg->rp_pages != NULL);
2214
2215                 pg = rdpg->rp_pages[0];
2216                 dp = (struct lu_dirpage*)cfs_kmap(pg);
2217                 memset(dp, 0 , sizeof(struct lu_dirpage));
2218                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2219                 dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
2220                 dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2221                 cfs_kunmap(pg);
2222                 GOTO(out_unlock, rc = LU_PAGE_SIZE);
2223         }
2224
2225         rc = dt_index_walk(env, mdd_object_child(mdd_obj), rdpg,
2226                            mdd_dir_page_build, NULL);
2227         if (rc >= 0) {
2228                 struct lu_dirpage       *dp;
2229
2230                 dp = cfs_kmap(rdpg->rp_pages[0]);
2231                 dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
2232                 if (rc == 0) {
2233                         /*
2234                          * No pages were processed, mark this for first page
2235                          * and send back.
2236                          */
2237                         dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
2238                         rc = min_t(unsigned int, LU_PAGE_SIZE, rdpg->rp_count);
2239                 }
2240                 cfs_kunmap(rdpg->rp_pages[0]);
2241         }
2242
2243         GOTO(out_unlock, rc);
2244 out_unlock:
2245         mdd_read_unlock(env, mdd_obj);
2246         return rc;
2247 }
2248
2249 static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
2250 {
2251         struct mdd_object *mdd_obj = md2mdd_obj(obj);
2252
2253         if (mdd_object_exists(mdd_obj) == 0) {
2254                 CERROR("%s: object "DFID" not found: rc = -2\n",
2255                        mdd_obj_dev_name(mdd_obj),PFID(mdd_object_fid(mdd_obj)));
2256                 return -ENOENT;
2257         }
2258         return dt_object_sync(env, mdd_object_child(mdd_obj));
2259 }
2260
2261 const struct md_object_operations mdd_obj_ops = {
2262         .moo_permission    = mdd_permission,
2263         .moo_attr_get      = mdd_attr_get,
2264         .moo_attr_set      = mdd_attr_set,
2265         .moo_xattr_get     = mdd_xattr_get,
2266         .moo_xattr_set     = mdd_xattr_set,
2267         .moo_xattr_list    = mdd_xattr_list,
2268         .moo_xattr_del     = mdd_xattr_del,
2269         .moo_open          = mdd_open,
2270         .moo_close         = mdd_close,
2271         .moo_readpage      = mdd_readpage,
2272         .moo_readlink      = mdd_readlink,
2273         .moo_changelog     = mdd_changelog,
2274         .moo_capa_get      = mdd_capa_get,
2275         .moo_object_sync   = mdd_object_sync,
2276         .moo_path          = mdd_path,
2277 };