Whamcloud - gitweb
- do not check dir for split if cleint already knows that it is split.
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  mdd/mdd_handler.c
5  *  Lustre Metadata Server (mdd) routines
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Wang Di <wangdi@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_MDS
32
33 #include <linux/module.h>
34 #include <linux/jbd.h>
35 #include <obd.h>
36 #include <obd_class.h>
37 #include <lustre_ver.h>
38 #include <obd_support.h>
39 #include <lprocfs_status.h>
40 /* fid_be_cpu(), fid_cpu_to_be(). */
41 #include <lustre_fid.h>
42
43 #include <linux/ldiskfs_fs.h>
44 #include <lustre_mds.h>
45 #include <lustre/lustre_idl.h>
46
47 #include "mdd_internal.h"
48
49 static struct lu_object_operations mdd_lu_obj_ops;
50
51 static inline void mdd_set_dead_obj(struct mdd_object *obj)
52 {
53         if (obj)
54                 obj->mod_flags |= DEAD_OBJ;
55 }
56
57 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
58                struct lu_attr *la, struct lustre_capa *capa)
59 {
60         struct dt_object *next = mdd_object_child(obj);
61         LASSERT(lu_object_exists(mdd2lu_obj(obj)));
62         return next->do_ops->do_attr_get(env, next, la, capa);
63 }
64
65 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
66 {
67         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
68
69         if (flags & LUSTRE_APPEND_FL)
70                 obj->mod_flags |= APPEND_OBJ;
71
72         if (flags & LUSTRE_IMMUTABLE_FL)
73                 obj->mod_flags |= IMMUTE_OBJ;
74 }
75
76 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
77 {
78         struct lu_buf *buf;
79
80         buf = &mdd_env_info(env)->mti_buf;
81         buf->lb_buf = area;
82         buf->lb_len = len;
83         return buf;
84 }
85
86 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
87                                        const void *area, ssize_t len)
88 {
89         struct lu_buf *buf;
90
91         buf = &mdd_env_info(env)->mti_buf;
92         buf->lb_buf = (void *)area;
93         buf->lb_len = len;
94         return buf;
95 }
96
97 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
98 {
99         struct mdd_thread_info *info;
100
101         info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
102         LASSERT(info != NULL);
103         return info;
104 }
105
106 struct lu_object *mdd_object_alloc(const struct lu_env *env,
107                                    const struct lu_object_header *hdr,
108                                    struct lu_device *d)
109 {
110         struct mdd_object *mdd_obj;
111
112         OBD_ALLOC_PTR(mdd_obj);
113         if (mdd_obj != NULL) {
114                 struct lu_object *o;
115
116                 o = mdd2lu_obj(mdd_obj);
117                 lu_object_init(o, NULL, d);
118                 mdd_obj->mod_obj.mo_ops = &mdd_obj_ops;
119                 mdd_obj->mod_obj.mo_dir_ops = &mdd_dir_ops;
120                 mdd_obj->mod_count = 0;
121                 o->lo_ops = &mdd_lu_obj_ops;
122                 return o;
123         } else {
124                 return NULL;
125         }
126 }
127
128 static int mdd_object_init(const struct lu_env *env, struct lu_object *o)
129 {
130         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
131         struct lu_object  *below;
132         struct lu_device  *under;
133         ENTRY;
134
135         under = &d->mdd_child->dd_lu_dev;
136         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
137         mdd_pdlock_init(lu2mdd_obj(o));
138         if (below == NULL)
139                 RETURN(-ENOMEM);
140
141         lu_object_add(o, below);
142         RETURN(0);
143 }
144
145 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
146 {
147         if (lu_object_exists(o))
148                 return mdd_get_flags(env, lu2mdd_obj(o));
149         else
150                 return 0;
151 }
152
153 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
154 {
155         struct mdd_object *mdd = lu2mdd_obj(o);
156         
157         lu_object_fini(o);
158         OBD_FREE_PTR(mdd);
159 }
160
161 static int mdd_object_print(const struct lu_env *env, void *cookie,
162                             lu_printer_t p, const struct lu_object *o)
163 {
164         return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p", o);
165 }
166
167 /* orphan handling is here */
168 static void mdd_object_delete(const struct lu_env *env,
169                                struct lu_object *o)
170 {
171         struct mdd_object *mdd_obj = lu2mdd_obj(o);
172         struct thandle *handle = NULL;
173         ENTRY;
174
175         if (lu2mdd_dev(o->lo_dev)->mdd_orphans == NULL)
176                 return;
177
178         if (mdd_obj->mod_flags & ORPHAN_OBJ) {
179                 mdd_txn_param_build(env, lu2mdd_dev(o->lo_dev),
180                                     MDD_TXN_INDEX_DELETE_OP);
181                 handle = mdd_trans_start(env, lu2mdd_dev(o->lo_dev));
182                 if (IS_ERR(handle))
183                         CERROR("Cannot get thandle\n");
184                 else {
185                         mdd_write_lock(env, mdd_obj);
186                         /* let's remove obj from the orphan list */
187                         __mdd_orphan_del(env, mdd_obj, handle);
188                         mdd_write_unlock(env, mdd_obj);
189                         mdd_trans_stop(env, lu2mdd_dev(o->lo_dev),
190                                        0, handle);
191                 }
192         }
193 }
194
195 static struct lu_object_operations mdd_lu_obj_ops = {
196         .loo_object_init    = mdd_object_init,
197         .loo_object_start   = mdd_object_start,
198         .loo_object_free    = mdd_object_free,
199         .loo_object_print   = mdd_object_print,
200         .loo_object_delete  = mdd_object_delete
201 };
202
203 struct mdd_object *mdd_object_find(const struct lu_env *env,
204                                    struct mdd_device *d,
205                                    const struct lu_fid *f)
206 {
207         struct lu_object *o, *lo;
208         struct mdd_object *m;
209         ENTRY;
210
211         o = lu_object_find(env, mdd2lu_dev(d)->ld_site, f);
212         if (IS_ERR(o))
213                 m = (struct mdd_object *)o;
214         else {
215                 lo = lu_object_locate(o->lo_header, mdd2lu_dev(d)->ld_type);
216                 /* remote object can't be located and should be put then */
217                 if (lo == NULL)
218                         lu_object_put(env, o);
219                 m = lu2mdd_obj(lo);
220         }
221         RETURN(m);
222 }
223
224 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
225 {
226         struct lu_attr *la = &mdd_env_info(env)->mti_la;
227         int rc;
228
229         ENTRY;
230         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
231         if (rc == 0)
232                 mdd_flags_xlate(obj, la->la_flags);
233         RETURN(rc);
234 }
235
236 void mdd_ref_add_internal(const struct lu_env *env, struct mdd_object *obj,
237                           struct thandle *handle)
238 {
239         struct dt_object *next;
240
241         LASSERT(lu_object_exists(mdd2lu_obj(obj)));
242         next = mdd_object_child(obj);
243         next->do_ops->do_ref_add(env, next, handle);
244 }
245
246 void mdd_ref_del_internal(const struct lu_env *env, struct mdd_object *obj,
247                           struct thandle *handle)
248 {
249         struct dt_object *next = mdd_object_child(obj);
250         ENTRY;
251
252         LASSERT(lu_object_exists(mdd2lu_obj(obj)));
253
254         next->do_ops->do_ref_del(env, next, handle);
255         EXIT;
256 }
257
258 /* get only inode attributes */
259 int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
260                   struct md_attr *ma)
261 {
262         int rc = 0;
263         ENTRY;
264
265         if (ma->ma_valid & MA_INODE)
266                 RETURN(0);
267
268         rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
269                           mdd_object_capa(env, mdd_obj));
270         if (rc == 0)
271                 ma->ma_valid |= MA_INODE;
272         RETURN(rc);
273 }
274
275 /* get lov EA only */
276 static int __mdd_lmm_get(const struct lu_env *env,
277                          struct mdd_object *mdd_obj, struct md_attr *ma)
278 {
279         int rc, lmm_size;
280         ENTRY;
281
282         if (ma->ma_valid & MA_LOV)
283                 RETURN(0);
284
285         lmm_size = ma->ma_lmm_size;
286         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &lmm_size,
287                         MDS_LOV_MD_NAME);
288         if (rc > 0) {
289                 ma->ma_valid |= MA_LOV;
290                 ma->ma_lmm_size = lmm_size;
291                 rc = 0;
292         }
293         RETURN(rc);
294 }
295
296 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
297                        struct md_attr *ma)
298 {
299         int rc;
300         ENTRY;
301
302         mdd_read_lock(env, mdd_obj);
303         rc = __mdd_lmm_get(env, mdd_obj, ma);
304         mdd_read_unlock(env, mdd_obj);
305         RETURN(rc);
306 }
307
308 /* get lmv EA only*/
309 static int __mdd_lmv_get(const struct lu_env *env,
310                          struct mdd_object *mdd_obj, struct md_attr *ma)
311 {
312         int rc;
313
314         if (ma->ma_valid & MA_LMV)
315                 RETURN(0);
316
317         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
318                         MDS_LMV_MD_NAME);
319         if (rc > 0) {
320                 ma->ma_valid |= MA_LMV;
321                 rc = 0;
322         }
323         RETURN(rc);
324 }
325
326 static int mdd_attr_get_internal(const struct lu_env *env,
327                                  struct mdd_object *mdd_obj,
328                                  struct md_attr *ma)
329 {
330         int rc = 0;
331         ENTRY;
332
333         if (ma->ma_need & MA_INODE)
334                 rc = mdd_iattr_get(env, mdd_obj, ma);
335
336         if (rc == 0 && ma->ma_need & MA_LOV) {
337                 if (S_ISREG(mdd_object_type(mdd_obj)) ||
338                     S_ISDIR(mdd_object_type(mdd_obj)))
339                         rc = __mdd_lmm_get(env, mdd_obj, ma);
340         }
341         if (rc == 0 && ma->ma_need & MA_LMV) {
342                 if (S_ISDIR(mdd_object_type(mdd_obj)))
343                         rc = __mdd_lmv_get(env, mdd_obj, ma);
344         }
345 #ifdef CONFIG_FS_POSIX_ACL
346         if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
347                 if (S_ISDIR(mdd_object_type(mdd_obj)))
348                         rc = mdd_acl_def_get(env, mdd_obj, ma);
349         }
350 #endif
351         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
352                rc, ma->ma_valid);
353         RETURN(rc);
354 }
355
356 int mdd_attr_get_internal_locked(const struct lu_env *env,
357                                  struct mdd_object *mdd_obj, struct md_attr *ma)
358 {
359         int rc;
360         int needlock = ma->ma_need & (MA_LOV | MA_LMV | MA_ACL_DEF);
361
362         if (needlock)
363                 mdd_read_lock(env, mdd_obj);
364         rc = mdd_attr_get_internal(env, mdd_obj, ma);
365         if (needlock)
366                 mdd_read_unlock(env, mdd_obj);
367         return rc;
368 }
369
370 /*
371  * No permission check is needed.
372  */
373 static int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
374                         struct md_attr *ma)
375 {
376         struct mdd_object *mdd_obj = md2mdd_obj(obj);
377         int                rc;
378
379         ENTRY;
380         rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
381         RETURN(rc);
382 }
383
384 /*
385  * No permission check is needed.
386  */
387 static int mdd_xattr_get(const struct lu_env *env,
388                          struct md_object *obj, struct lu_buf *buf,
389                          const char *name)
390 {
391         struct mdd_object *mdd_obj = md2mdd_obj(obj);
392         struct dt_object  *next;
393         int rc;
394
395         ENTRY;
396
397         LASSERT(lu_object_exists(&obj->mo_lu));
398
399         next = mdd_object_child(mdd_obj);
400         mdd_read_lock(env, mdd_obj);
401         rc = next->do_ops->do_xattr_get(env, next, buf, name,
402                                         mdd_object_capa(env, mdd_obj));
403         mdd_read_unlock(env, mdd_obj);
404
405         RETURN(rc);
406 }
407
408 /*
409  * Permission check is done when open,
410  * no need check again.
411  */
412 static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
413                         struct lu_buf *buf)
414 {
415         struct mdd_object *mdd_obj = md2mdd_obj(obj);
416         struct dt_object  *next;
417         loff_t             pos = 0;
418         int                rc;
419         ENTRY;
420
421         LASSERT(lu_object_exists(&obj->mo_lu));
422
423         next = mdd_object_child(mdd_obj);
424         mdd_read_lock(env, mdd_obj);
425         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
426                                          mdd_object_capa(env, mdd_obj));
427         mdd_read_unlock(env, mdd_obj);
428         RETURN(rc);
429 }
430
431 static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
432                           struct lu_buf *buf)
433 {
434         struct mdd_object *mdd_obj = md2mdd_obj(obj);
435         struct dt_object  *next;
436         int rc;
437
438         ENTRY;
439
440         LASSERT(lu_object_exists(&obj->mo_lu));
441
442         next = mdd_object_child(mdd_obj);
443         mdd_read_lock(env, mdd_obj);
444         rc = next->do_ops->do_xattr_list(env, next, buf,
445                                          mdd_object_capa(env, mdd_obj));
446         mdd_read_unlock(env, mdd_obj);
447
448         RETURN(rc);
449 }
450
451 int mdd_object_create_internal(const struct lu_env *env,
452                                struct mdd_object *obj, struct md_attr *ma,
453                                struct thandle *handle)
454 {
455         struct dt_object *next;
456         struct lu_attr *attr = &ma->ma_attr;
457         int rc;
458         ENTRY;
459
460         if (!lu_object_exists(mdd2lu_obj(obj))) {
461                 next = mdd_object_child(obj);
462                 rc = next->do_ops->do_create(env, next, attr, handle);
463         } else
464                 rc = -EEXIST;
465
466         LASSERT(ergo(rc == 0, lu_object_exists(mdd2lu_obj(obj))));
467
468         RETURN(rc);
469 }
470
471
472 int mdd_attr_set_internal(const struct lu_env *env, struct mdd_object *o,
473                           const struct lu_attr *attr, struct thandle *handle,
474                           const int needacl)
475 {
476         struct dt_object *next;
477         int rc;
478
479         LASSERT(lu_object_exists(mdd2lu_obj(o)));
480         next = mdd_object_child(o);
481         rc = next->do_ops->do_attr_set(env, next, attr, handle,
482                                        mdd_object_capa(env, o));
483 #ifdef CONFIG_FS_POSIX_ACL
484         if (!rc && (attr->la_valid & LA_MODE) && needacl)
485                 rc = mdd_acl_chmod(env, o, attr->la_mode, handle);
486 #endif
487         return rc;
488 }
489
490 int mdd_attr_set_internal_locked(const struct lu_env *env,
491                                  struct mdd_object *o,
492                                  const struct lu_attr *attr,
493                                  struct thandle *handle, int needacl)
494 {
495         int rc;
496
497         needacl = needacl && (attr->la_valid & LA_MODE);
498
499         if (needacl)
500                 mdd_write_lock(env, o);
501
502         rc = mdd_attr_set_internal(env, o, attr, handle, needacl);
503
504         if (needacl)
505                 mdd_write_unlock(env, o);
506         return rc;
507 }
508
509 static int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *o,
510                            const struct lu_buf *buf, const char *name,
511                            int fl, struct thandle *handle)
512 {
513         struct dt_object *next;
514         struct lustre_capa *capa = mdd_object_capa(env, o);
515         int rc = 0;
516         ENTRY;
517
518         LASSERT(lu_object_exists(mdd2lu_obj(o)));
519         next = mdd_object_child(o);
520         if (buf->lb_buf && buf->lb_len > 0) {
521                 rc = next->do_ops->do_xattr_set(env, next, buf, name, 0, handle,
522                                                 capa);
523         } else if (buf->lb_buf == NULL && buf->lb_len == 0) {
524                 rc = next->do_ops->do_xattr_del(env, next, name, handle, capa);
525         }
526         RETURN(rc);
527 }
528
529 /*
530  * This gives the same functionality as the code between
531  * sys_chmod and inode_setattr
532  * chown_common and inode_setattr
533  * utimes and inode_setattr
534  * This API is ported from mds_fix_attr but remove some unnecesssary stuff.
535  */
536 static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
537                         struct lu_attr *la)
538 {
539         struct lu_attr   *tmp_la     = &mdd_env_info(env)->mti_la;
540         struct md_ucred  *uc         = md_ucred(env);
541         time_t            now        = CURRENT_SECONDS;
542         int               rc;
543         ENTRY;
544
545         if (!la->la_valid)
546                 RETURN(0);
547
548         /* Do not permit change file type */
549         if (la->la_valid & LA_TYPE)
550                 RETURN(-EPERM);
551
552         /* They should not be processed by setattr */
553         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
554                 RETURN(-EPERM);
555
556         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
557         if (rc)
558                 RETURN(rc);
559
560         if (mdd_is_immutable(obj) || mdd_is_append(obj)) {
561
562                 /*
563                  * If only change flags of the object, we should
564                  * let it pass, but also need capability check
565                  * here if (!capable(CAP_LINUX_IMMUTABLE)),
566                  * fix it, when implement capable in mds
567                  */
568                 if (la->la_valid & ~LA_FLAGS)
569                         RETURN(-EPERM);
570
571                 if (!mdd_capable(uc, CAP_LINUX_IMMUTABLE))
572                         RETURN(-EPERM);
573
574                 if ((uc->mu_fsuid != tmp_la->la_uid) &&
575                     !mdd_capable(uc, CAP_FOWNER))
576                         RETURN(-EPERM);
577
578                 /*
579                  * According to Ext3 implementation on this,
580                  * the ctime will be changed, but not clear why?
581                  */
582                 la->la_ctime = now;
583                 la->la_valid |= LA_CTIME;
584                 RETURN(0);
585         }
586
587         /* Check for setting the obj time. */
588         if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
589             !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
590                 rc = mdd_permission_internal_locked(env, obj, tmp_la, MAY_WRITE);
591                 if (rc)
592                         RETURN(rc);
593         }
594
595         /* Make sure a caller can chmod. */
596         if (la->la_valid & LA_MODE) {
597                 /*
598                  * Bypass la_vaild == LA_MODE,
599                  * this is for changing file with SUID or SGID.
600                  */
601                 if ((la->la_valid & ~LA_MODE) &&
602                     (uc->mu_fsuid != tmp_la->la_uid) &&
603                     !mdd_capable(uc, CAP_FOWNER))
604                         RETURN(-EPERM);
605
606                 if (la->la_mode == (umode_t) -1)
607                         la->la_mode = tmp_la->la_mode;
608                 else
609                         la->la_mode = (la->la_mode & S_IALLUGO) |
610                                       (tmp_la->la_mode & ~S_IALLUGO);
611
612                 /* Also check the setgid bit! */
613                 if (!mdd_in_group_p(uc, (la->la_valid & LA_GID) ? la->la_gid :
614                                 tmp_la->la_gid) && !mdd_capable(uc, CAP_FSETID))
615                         la->la_mode &= ~S_ISGID;
616         } else {
617                la->la_mode = tmp_la->la_mode;
618         }
619
620         /* Make sure a caller can chown. */
621         if (la->la_valid & LA_UID) {
622                 if (la->la_uid == (uid_t) -1)
623                         la->la_uid = tmp_la->la_uid;
624                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
625                     (la->la_uid != tmp_la->la_uid)) &&
626                     !mdd_capable(uc, CAP_CHOWN))
627                         RETURN(-EPERM);
628
629                 /*
630                  * If the user or group of a non-directory has been
631                  * changed by a non-root user, remove the setuid bit.
632                  * 19981026 David C Niemi <niemi@tux.org>
633                  *
634                  * Changed this to apply to all users, including root,
635                  * to avoid some races. This is the behavior we had in
636                  * 2.0. The check for non-root was definitely wrong
637                  * for 2.2 anyway, as it should have been using
638                  * CAP_FSETID rather than fsuid -- 19990830 SD.
639                  */
640                 if (((tmp_la->la_mode & S_ISUID) == S_ISUID) &&
641                     !S_ISDIR(tmp_la->la_mode)) {
642                         la->la_mode &= ~S_ISUID;
643                         la->la_valid |= LA_MODE;
644                 }
645         }
646
647         /* Make sure caller can chgrp. */
648         if (la->la_valid & LA_GID) {
649                 if (la->la_gid == (gid_t) -1)
650                         la->la_gid = tmp_la->la_gid;
651                 if (((uc->mu_fsuid != tmp_la->la_uid) ||
652                     ((la->la_gid != tmp_la->la_gid) &&
653                     !mdd_in_group_p(uc, la->la_gid))) &&
654                     !mdd_capable(uc, CAP_CHOWN))
655                         RETURN(-EPERM);
656
657                 /*
658                  * Likewise, if the user or group of a non-directory
659                  * has been changed by a non-root user, remove the
660                  * setgid bit UNLESS there is no group execute bit
661                  * (this would be a file marked for mandatory
662                  * locking).  19981026 David C Niemi <niemi@tux.org>
663                  *
664                  * Removed the fsuid check (see the comment above) --
665                  * 19990830 SD.
666                  */
667                 if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
668                      (S_ISGID | S_IXGRP)) && !S_ISDIR(tmp_la->la_mode)) {
669                         la->la_mode &= ~S_ISGID;
670                         la->la_valid |= LA_MODE;
671                 }
672         }
673
674         /* For tuncate (or setsize), we should have MAY_WRITE perm */
675         if (la->la_valid & (LA_SIZE | LA_BLOCKS)) {
676                 rc = mdd_permission_internal_locked(env, obj, tmp_la, MAY_WRITE);
677                 if (rc)
678                         RETURN(rc);
679
680                 /*
681                  * For the "Size-on-MDS" setattr update, merge coming
682                  * attributes with the set in the inode. BUG 10641
683                  */
684                 if ((la->la_valid & LA_ATIME) &&
685                     (la->la_atime < tmp_la->la_atime))
686                         la->la_valid &= ~LA_ATIME;
687
688                 if ((la->la_valid & LA_CTIME) &&
689                     (la->la_ctime < tmp_la->la_ctime))
690                         la->la_valid &= ~(LA_MTIME | LA_CTIME);
691
692                 if (!(la->la_valid & LA_MTIME) && (now > tmp_la->la_mtime)) {
693                         la->la_mtime = now;
694                         la->la_valid |= LA_MTIME;
695                 }
696         }
697
698         /* For last, ctime must be fixed */
699         if (!(la->la_valid & LA_CTIME) && (now > tmp_la->la_ctime)) {
700                 la->la_ctime = now;
701                 la->la_valid |= LA_CTIME;
702         }
703
704         RETURN(0);
705 }
706
707 /* set attr and LOV EA at once, return updated attr */
708 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
709                         const struct md_attr *ma)
710 {
711         struct mdd_object *mdd_obj = md2mdd_obj(obj);
712         struct mdd_device *mdd = mdo2mdd(obj);
713         struct thandle *handle;
714         struct lov_mds_md *lmm = NULL;
715         int  rc, lmm_size = 0, max_size = 0;
716         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
717         ENTRY;
718
719         mdd_txn_param_build(env, mdd, MDD_TXN_ATTR_SET_OP);
720         handle = mdd_trans_start(env, mdd);
721         if (IS_ERR(handle))
722                 RETURN(PTR_ERR(handle));
723         /*TODO: add lock here*/
724         /* start a log jounal handle if needed */
725         if (S_ISREG(mdd_object_type(mdd_obj)) &&
726             ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
727                 max_size = mdd_lov_mdsize(env, mdd);
728                 OBD_ALLOC(lmm, max_size);
729                 lmm_size = max_size;
730                 if (lmm == NULL)
731                         GOTO(cleanup, rc = -ENOMEM);
732
733                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
734                                 MDS_LOV_MD_NAME);
735
736                 if (rc < 0)
737                         GOTO(cleanup, rc);
738         }
739
740         if (ma->ma_attr.la_valid & (ATTR_MTIME | ATTR_CTIME))
741                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
742                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
743
744         *la_copy = ma->ma_attr;
745         rc = mdd_fix_attr(env, mdd_obj, la_copy);
746         if (rc)
747                 GOTO(cleanup, rc);
748
749         if (la_copy->la_valid & LA_FLAGS) {
750                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
751                                                   handle, 1);
752                 if (rc == 0)
753                         mdd_flags_xlate(mdd_obj, la_copy->la_flags);
754         } else if (la_copy->la_valid) {            /* setattr */
755                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
756                                                   handle, 1);
757                 /* journal chown/chgrp in llog, just like unlink */
758                 if (rc == 0 && lmm_size){
759                         /*TODO set_attr llog */
760                 }
761         }
762
763         if (rc == 0 && ma->ma_valid & MA_LOV) {
764                 umode_t mode;
765
766                 mode = mdd_object_type(mdd_obj);
767                 if (S_ISREG(mode) || S_ISDIR(mode)) {
768                         /*TODO check permission*/
769                         rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
770                                             ma->ma_lmm_size, handle, 1);
771                 }
772
773         }
774 cleanup:
775         mdd_trans_stop(env, mdd, rc, handle);
776         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
777                 /*set obd attr, if needed*/
778                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size);
779         }
780         if (lmm != NULL) {
781                 OBD_FREE(lmm, max_size);
782         }
783
784         RETURN(rc);
785 }
786
787 int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
788                       const struct lu_buf *buf, const char *name, int fl,
789                       struct thandle *handle)
790 {
791         int  rc;
792         ENTRY;
793
794         mdd_write_lock(env, obj);
795         rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
796         mdd_write_unlock(env, obj);
797
798         RETURN(rc);
799 }
800
801 static int mdd_xattr_sanity_check(const struct lu_env *env,
802                                   struct mdd_object *obj)
803 {
804         struct lu_attr  *tmp_la = &mdd_env_info(env)->mti_la;
805         struct md_ucred *uc     = md_ucred(env);
806         int rc;
807         ENTRY;
808
809         if (mdd_is_immutable(obj) || mdd_is_append(obj))
810                 RETURN(-EPERM);
811
812         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
813         if (rc)
814                 RETURN(rc);
815
816         if ((uc->mu_fsuid != tmp_la->la_uid) && !mdd_capable(uc, CAP_FOWNER))
817                 RETURN(-EPERM);
818
819         RETURN(rc);
820 }
821
822 static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
823                          const struct lu_buf *buf, const char *name, int fl)
824 {
825         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
826         struct mdd_object *mdd_obj = md2mdd_obj(obj);
827         struct mdd_device *mdd = mdo2mdd(obj);
828         struct thandle *handle;
829         int  rc;
830         ENTRY;
831
832         rc = mdd_xattr_sanity_check(env, mdd_obj);
833         if (rc)
834                 RETURN(rc);
835
836         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
837         handle = mdd_trans_start(env, mdd);
838         if (IS_ERR(handle))
839                 RETURN(PTR_ERR(handle));
840
841         rc = mdd_xattr_set_txn(env, md2mdd_obj(obj), buf, name,
842                                fl, handle);
843         if (rc == 0) {
844                 la_copy->la_ctime = CURRENT_SECONDS;
845                 la_copy->la_valid = LA_CTIME;
846                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy, handle, 0);
847         }
848         mdd_trans_stop(env, mdd, rc, handle);
849
850         RETURN(rc);
851 }
852
853 static int __mdd_xattr_del(const struct lu_env *env,struct mdd_device *mdd,
854                            struct mdd_object *obj,
855                            const char *name, struct thandle *handle)
856 {
857         struct dt_object *next;
858
859         LASSERT(lu_object_exists(mdd2lu_obj(obj)));
860         next = mdd_object_child(obj);
861         return next->do_ops->do_xattr_del(env, next, name, handle,
862                                           mdd_object_capa(env, obj));
863 }
864
865 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
866                   const char *name)
867 {
868         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
869         struct mdd_object *mdd_obj = md2mdd_obj(obj);
870         struct mdd_device *mdd = mdo2mdd(obj);
871         struct thandle *handle;
872         int  rc;
873         ENTRY;
874
875         rc = mdd_xattr_sanity_check(env, mdd_obj);
876         if (rc)
877                 RETURN(rc);
878
879         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
880         handle = mdd_trans_start(env, mdd);
881         if (IS_ERR(handle))
882                 RETURN(PTR_ERR(handle));
883
884         mdd_write_lock(env, mdd_obj);
885         rc = __mdd_xattr_del(env, mdd, md2mdd_obj(obj), name, handle);
886         mdd_write_unlock(env, mdd_obj);
887         if (rc == 0) {
888                 la_copy->la_ctime = CURRENT_SECONDS;
889                 la_copy->la_valid = LA_CTIME;
890                 rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 0);
891         }
892
893         mdd_trans_stop(env, mdd, rc, handle);
894
895         RETURN(rc);
896 }
897
898 /* partial unlink */
899 static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
900                        struct md_attr *ma)
901 {
902         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
903         struct mdd_object *mdd_obj = md2mdd_obj(obj);
904         struct mdd_device *mdd = mdo2mdd(obj);
905         struct thandle *handle;
906         int rc;
907         ENTRY;
908
909         rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
910         if (rc)
911                 RETURN(rc);
912
913         handle = mdd_trans_start(env, mdd);
914         if (IS_ERR(handle))
915                 RETURN(-ENOMEM);
916
917         mdd_write_lock(env, mdd_obj);
918
919         rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
920         if (rc)
921                 GOTO(cleanup, rc);
922
923         mdd_ref_del_internal(env, mdd_obj, handle);
924
925         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
926                 /* unlink dot */
927                 mdd_ref_del_internal(env, mdd_obj, handle);
928         }
929
930         la_copy->la_ctime = CURRENT_SECONDS;
931         la_copy->la_valid = LA_CTIME;
932         rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 0);
933         if (rc)
934                 GOTO(cleanup, rc);
935
936         rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
937
938         EXIT;
939 cleanup:
940         mdd_write_unlock(env, mdd_obj);
941         mdd_trans_stop(env, mdd, rc, handle);
942         return rc;
943 }
944
945 /* partial operation */
946 static int mdd_oc_sanity_check(const struct lu_env *env,
947                                struct mdd_object *obj,
948                                struct md_attr *ma)
949 {
950         int rc;
951         ENTRY;
952
953         switch (ma->ma_attr.la_mode & S_IFMT) {
954         case S_IFREG:
955         case S_IFDIR:
956         case S_IFLNK:
957         case S_IFCHR:
958         case S_IFBLK:
959         case S_IFIFO:
960         case S_IFSOCK:
961                 rc = 0;
962                 break;
963         default:
964                 rc = -EINVAL;
965                 break;
966         }
967         RETURN(rc);
968 }
969
970 static int mdd_object_create(const struct lu_env *env,
971                              struct md_object *obj,
972                              const struct md_op_spec *spec,
973                              struct md_attr *ma)
974 {
975
976         struct mdd_device *mdd = mdo2mdd(obj);
977         struct mdd_object *mdd_obj = md2mdd_obj(obj);
978         const struct lu_fid *pfid = spec->u.sp_pfid;
979         struct thandle *handle;
980         int rc;
981         ENTRY;
982
983         mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
984         handle = mdd_trans_start(env, mdd);
985         if (IS_ERR(handle))
986                 RETURN(PTR_ERR(handle));
987
988         mdd_write_lock(env, mdd_obj);
989         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
990         if (rc)
991                 GOTO(unlock, rc);
992
993         rc = mdd_object_create_internal(env, mdd_obj, ma, handle);
994         if (rc)
995                 GOTO(unlock, rc);
996
997         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
998                 /* If creating the slave object, set slave EA here. */
999                 int lmv_size = spec->u.sp_ea.eadatalen;
1000                 struct lmv_stripe_md *lmv;
1001
1002                 lmv = (struct lmv_stripe_md *)spec->u.sp_ea.eadata;
1003                 LASSERT(lmv != NULL && lmv_size > 0);
1004
1005                 rc = __mdd_xattr_set(env, mdd_obj,
1006                                      mdd_buf_get_const(env, lmv, lmv_size),
1007                                      MDS_LMV_MD_NAME, 0, handle);
1008                 if (rc)
1009                         GOTO(unlock, rc);
1010                 pfid = spec->u.sp_ea.fid;
1011
1012                 CDEBUG(D_INFO, "Set slave ea "DFID", eadatalen %d, rc %d\n",
1013                        PFID(mdo2fid(mdd_obj)), spec->u.sp_ea.eadatalen, rc);
1014
1015                 rc = mdd_attr_set_internal(env, mdd_obj, &ma->ma_attr, handle, 0);
1016         } else {
1017 #ifdef CONFIG_FS_POSIX_ACL
1018                 if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
1019                         struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
1020
1021                         buf->lb_buf = (void *)spec->u.sp_ea.eadata;
1022                         buf->lb_len = spec->u.sp_ea.eadatalen;
1023                         if ((buf->lb_len > 0) && (buf->lb_buf != NULL)) {
1024                                 rc = __mdd_acl_init(env, mdd_obj, buf,
1025                                                     &ma->ma_attr.la_mode,
1026                                                     handle);
1027                                 if (rc)
1028                                         GOTO(unlock, rc);
1029                                 else
1030                                         ma->ma_attr.la_valid |= LA_MODE;
1031                         }
1032                 }
1033 #endif
1034                 rc = mdd_object_initialize(env, pfid, mdd_obj, ma, handle);
1035         }
1036         EXIT;
1037 unlock:
1038         mdd_write_unlock(env, mdd_obj);
1039         if (rc == 0)
1040                 rc = mdd_attr_get_internal_locked(env, mdd_obj, ma);
1041
1042         mdd_trans_stop(env, mdd, rc, handle);
1043         return rc;
1044 }
1045
1046 /* partial link */
1047 static int mdd_ref_add(const struct lu_env *env,
1048                        struct md_object *obj)
1049 {
1050         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
1051         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1052         struct mdd_device *mdd = mdo2mdd(obj);
1053         struct thandle *handle;
1054         int rc;
1055         ENTRY;
1056
1057         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
1058         handle = mdd_trans_start(env, mdd);
1059         if (IS_ERR(handle))
1060                 RETURN(-ENOMEM);
1061
1062         mdd_write_lock(env, mdd_obj);
1063         rc = mdd_link_sanity_check(env, NULL, mdd_obj);
1064         if (rc == 0)
1065                 mdd_ref_add_internal(env, mdd_obj, handle);
1066         mdd_write_unlock(env, mdd_obj);
1067         if (rc == 0) {
1068                 la_copy->la_ctime = CURRENT_SECONDS;
1069                 la_copy->la_valid = LA_CTIME;
1070                 rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 0);
1071         }
1072         mdd_trans_stop(env, mdd, 0, handle);
1073
1074         RETURN(rc);
1075 }
1076
1077 /* 
1078  * do NOT or the MAY_*'s, you'll get the weakest
1079  * XXX: Can NOT understand.
1080  */
1081 static int accmode(struct mdd_object *mdd_obj, int flags)
1082 {
1083         int res = 0;
1084
1085 #if 0
1086         /* Sadly, NFSD reopens a file repeatedly during operation, so the
1087          * "acc_mode = 0" allowance for newly-created files isn't honoured.
1088          * NFSD uses the MDS_OPEN_OWNEROVERRIDE flag to say that a file
1089          * owner can write to a file even if it is marked readonly to hide
1090          * its brokenness. (bug 5781) */
1091         if (flags & MDS_OPEN_OWNEROVERRIDE && inode->i_uid == current->fsuid)
1092                 return 0;
1093 #endif
1094         if (flags & FMODE_READ)
1095                 res |= MAY_READ;
1096         if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
1097                 res |= MAY_WRITE;
1098         if (flags & MDS_FMODE_EXEC)
1099                 res |= MAY_EXEC;
1100         return res;
1101 }
1102
1103 static int mdd_open_sanity_check(const struct lu_env *env,
1104                                  struct mdd_object *obj, int flag)
1105 {
1106         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
1107         int mode = accmode(obj, flag);
1108         int rc;
1109         ENTRY;
1110
1111         /* EEXIST check */
1112         if (mdd_is_dead_obj(obj))
1113                 RETURN(-ENOENT);
1114
1115         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
1116         if (rc)
1117                RETURN(rc);
1118
1119         if (S_ISLNK(tmp_la->la_mode))
1120                 RETURN(-ELOOP);
1121
1122         if (S_ISDIR(tmp_la->la_mode) && (mode & MAY_WRITE))
1123                 RETURN(-EISDIR);
1124
1125         if (!(flag & MDS_OPEN_CREATED)) {
1126                 rc = mdd_permission_internal(env, obj, tmp_la, mode);
1127                 if (rc)
1128                         RETURN(rc);
1129         }
1130
1131         if (S_ISFIFO(tmp_la->la_mode) || S_ISSOCK(tmp_la->la_mode) ||
1132             S_ISBLK(tmp_la->la_mode) || S_ISCHR(tmp_la->la_mode))
1133                 flag &= ~MDS_OPEN_TRUNC;
1134
1135         /* For writing append-only file must open it with append mode. */
1136         if (mdd_is_append(obj)) {
1137                 if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
1138                         RETURN(-EPERM);
1139                 if (flag & MDS_OPEN_TRUNC)
1140                         RETURN(-EPERM);
1141         }
1142
1143 #if 0
1144         /*
1145          * Now, flag -- O_NOATIME does not be packed by client.
1146          */
1147         if (flag & O_NOATIME) {
1148                 struct md_ucred *uc = md_ucred(env);
1149
1150                 if (uc->mu_fsuid != tmp_la->la_uid &&
1151                     !mdd_capable(uc, CAP_FOWNER))
1152                         RETURN(-EPERM);
1153         }
1154 #endif
1155
1156         RETURN(0);
1157 }
1158
1159 static int mdd_open(const struct lu_env *env, struct md_object *obj,
1160                     int flags)
1161 {
1162         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1163         int rc = 0;
1164
1165         mdd_write_lock(env, mdd_obj);
1166
1167         rc = mdd_open_sanity_check(env, mdd_obj, flags);
1168         if (rc == 0)
1169                 mdd_obj->mod_count++;
1170
1171         mdd_write_unlock(env, mdd_obj);
1172         return rc;
1173 }
1174
1175 /* return md_attr back,
1176  * if it is last unlink then return lov ea + llog cookie*/
1177 int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
1178                     struct md_attr *ma)
1179 {
1180         int rc = 0;
1181         ENTRY;
1182
1183         mdd_set_dead_obj(obj);
1184         if (S_ISREG(mdd_object_type(obj))) {
1185                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
1186                  * Caller must be ready for that. */
1187                 rc = __mdd_lmm_get(env, obj, ma);
1188                 if ((ma->ma_valid & MA_LOV))
1189                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
1190                                             obj, ma);
1191         }
1192         RETURN(rc);
1193 }
1194
1195 /*
1196  * No permission check is needed.
1197  */
1198 static int mdd_close(const struct lu_env *env, struct md_object *obj,
1199                      struct md_attr *ma)
1200 {
1201         int rc;
1202         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1203         ENTRY;
1204
1205         mdd_write_lock(env, mdd_obj);
1206         /* release open count */
1207         mdd_obj->mod_count --;
1208
1209         rc = mdd_iattr_get(env, mdd_obj, ma);
1210         if (rc == 0 && mdd_obj->mod_count == 0) {
1211                 if (ma->ma_attr.la_nlink == 0)
1212                         rc = mdd_object_kill(env, mdd_obj, ma);
1213         }
1214         mdd_write_unlock(env, mdd_obj);
1215         RETURN(rc);
1216 }
1217
1218 /*
1219  * Permission check is done when open,
1220  * no need check again.
1221  */
1222 static int mdd_readpage_sanity_check(const struct lu_env *env,
1223                                      struct mdd_object *obj)
1224 {
1225         struct dt_object *next = mdd_object_child(obj);
1226         int rc;
1227         ENTRY;
1228
1229         if (S_ISDIR(mdd_object_type(obj)) && dt_try_as_dir(env, next))
1230                 rc = 0;
1231         else
1232                 rc = -ENOTDIR;
1233
1234         RETURN(rc);
1235 }
1236
1237 int bug11150 = 0;
1238 EXPORT_SYMBOL(bug11150);
1239
1240 static int mdd_dir_page_build(const struct lu_env *env, int first,
1241                               void *area, int nob, struct dt_it_ops *iops,
1242                               struct dt_it *it, __u32 *start, __u32 *end,
1243                               struct lu_dirent **last)
1244 {
1245         struct lu_fid          *fid2  = &mdd_env_info(env)->mti_fid2;
1246         struct mdd_thread_info *info = mdd_env_info(env);
1247         struct lu_fid          *fid  = &info->mti_fid;
1248         int                     result;
1249         struct lu_dirent       *ent;
1250
1251         if (first) {
1252                 memset(area, 0, sizeof (struct lu_dirpage));
1253                 area += sizeof (struct lu_dirpage);
1254                 nob  -= sizeof (struct lu_dirpage);
1255         }
1256
1257         LASSERT(nob > sizeof *ent);
1258
1259         ent  = area;
1260         result = 0;
1261         do {
1262                 char  *name;
1263                 int    len;
1264                 int    recsize;
1265                 __u32  hash;
1266
1267                 name = (char *)iops->key(env, it);
1268                 len  = iops->key_size(env, it);
1269
1270                 fid  = (struct lu_fid *)iops->rec(env, it);
1271                 fid_be_to_cpu(fid2, fid);
1272
1273                 recsize = (sizeof(*ent) + len + 3) & ~3;
1274                 hash = iops->store(env, it);
1275                 *end = hash;
1276
1277                 CDEBUG(D_INFO, "%p %p %d "DFID": %#8.8x (%d) \"%*.*s\"\n",
1278                        name, ent, nob, PFID(fid2), hash, len, len, len, name);
1279
1280                 if (bug11150)
1281                         CDEBUG(D_INFO,
1282                                "%p %p %d "DFID": %#8.8x (%d) \"%*.*s\"\n",
1283                                name, ent, nob, PFID(fid2), hash,
1284                                len, len, len, name);
1285
1286                 if (nob >= recsize) {
1287                         fid_be_to_cpu(&ent->lde_fid, fid);
1288                         fid_cpu_to_le(&ent->lde_fid, &ent->lde_fid);
1289                         ent->lde_hash = hash;
1290                         ent->lde_namelen = cpu_to_le16(len);
1291                         ent->lde_reclen  = cpu_to_le16(recsize);
1292                         memcpy(ent->lde_name, name, len);
1293                         if (first && ent == area)
1294                                 *start = hash;
1295                         *last = ent;
1296                         ent = (void *)ent + recsize;
1297                         nob -= recsize;
1298                         result = iops->next(env, it);
1299                 } else {
1300                         /*
1301                          * record doesn't fit into page, enlarge previous one.
1302                          */
1303                         LASSERT(*last != NULL);
1304                         (*last)->lde_reclen =
1305                                 cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
1306                                             nob);
1307                         break;
1308                 }
1309         } while (result == 0);
1310
1311         return result;
1312 }
1313
1314 static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
1315                           const struct lu_rdpg *rdpg)
1316 {
1317         struct dt_it      *it;
1318         struct dt_object  *next = mdd_object_child(obj);
1319         struct dt_it_ops  *iops;
1320         struct page       *pg;
1321         struct lu_dirent  *last;
1322         int i;
1323         int rc;
1324         int nob;
1325         __u32 hash_start;
1326         __u32 hash_end;
1327
1328         LASSERT(rdpg->rp_pages != NULL);
1329         LASSERT(next->do_index_ops != NULL);
1330
1331         if (rdpg->rp_count <= 0)
1332                 return -EFAULT;
1333
1334         /*
1335          * iterate through directory and fill pages from @rdpg
1336          */
1337         iops = &next->do_index_ops->dio_it;
1338         it = iops->init(env, next, 0, mdd_object_capa(env, obj));
1339         if (it == NULL)
1340                 return -ENOMEM;
1341
1342         rc = iops->load(env, it, rdpg->rp_hash);
1343
1344         if (rc == 0)
1345                 /*
1346                  * Iterator didn't find record with exactly the key requested.
1347                  *
1348                  * It is currently either
1349                  *
1350                  *     - positioned above record with key less than
1351                  *     requested---skip it.
1352                  *
1353                  *     - or not positioned at all (is in IAM_IT_SKEWED
1354                  *     state)---position it on the next item.
1355                  */
1356                 rc = iops->next(env, it);
1357         else if (rc > 0)
1358                 rc = 0;
1359
1360         /*
1361          * At this point and across for-loop:
1362          *
1363          *  rc == 0 -> ok, proceed.
1364          *  rc >  0 -> end of directory.
1365          *  rc <  0 -> error.
1366          */
1367         for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
1368              i++, nob -= CFS_PAGE_SIZE) {
1369                 LASSERT(i < rdpg->rp_npages);
1370                 pg = rdpg->rp_pages[i];
1371                 rc = mdd_dir_page_build(env, !i, kmap(pg),
1372                                         min_t(int, nob, CFS_PAGE_SIZE), iops,
1373                                         it, &hash_start, &hash_end, &last);
1374                 if (rc != 0 || i == rdpg->rp_npages - 1)
1375                         last->lde_reclen = 0;
1376                 kunmap(pg);
1377         }
1378         if (rc > 0) {
1379                 /*
1380                  * end of directory.
1381                  */
1382                 hash_end = ~0ul;
1383                 rc = 0;
1384         }
1385         if (rc == 0) {
1386                 struct lu_dirpage *dp;
1387
1388                 dp = kmap(rdpg->rp_pages[0]);
1389                 dp->ldp_hash_start = rdpg->rp_hash;
1390                 dp->ldp_hash_end   = hash_end;
1391                 if (i == 0)
1392                         /*
1393                          * No pages were processed, mark this.
1394                          */
1395                         dp->ldp_flags |= LDF_EMPTY;
1396                 dp->ldp_flags = cpu_to_le16(dp->ldp_flags);
1397                 kunmap(rdpg->rp_pages[0]);
1398         }
1399         iops->put(env, it);
1400         iops->fini(env, it);
1401
1402         return rc;
1403 }
1404
1405 static int mdd_readpage(const struct lu_env *env, struct md_object *obj,
1406                         const struct lu_rdpg *rdpg)
1407 {
1408         struct dt_object *next;
1409         struct mdd_object *mdd_obj = md2mdd_obj(obj);
1410         int rc;
1411         ENTRY;
1412
1413         LASSERT(lu_object_exists(mdd2lu_obj(mdd_obj)));
1414         next = mdd_object_child(mdd_obj);
1415
1416         mdd_read_lock(env, mdd_obj);
1417         rc = mdd_readpage_sanity_check(env, mdd_obj);
1418         if (rc)
1419                 GOTO(out_unlock, rc);
1420
1421         rc = __mdd_readpage(env, mdd_obj, rdpg);
1422
1423         EXIT;
1424 out_unlock:
1425         mdd_read_unlock(env, mdd_obj);
1426         return rc;
1427 }
1428
1429 struct md_object_operations mdd_obj_ops = {
1430         .moo_permission    = mdd_permission,
1431         .moo_attr_get      = mdd_attr_get,
1432         .moo_attr_set      = mdd_attr_set,
1433         .moo_xattr_get     = mdd_xattr_get,
1434         .moo_xattr_set     = mdd_xattr_set,
1435         .moo_xattr_list    = mdd_xattr_list,
1436         .moo_xattr_del     = mdd_xattr_del,
1437         .moo_object_create = mdd_object_create,
1438         .moo_ref_add       = mdd_ref_add,
1439         .moo_ref_del       = mdd_ref_del,
1440         .moo_open          = mdd_open,
1441         .moo_close         = mdd_close,
1442         .moo_readpage      = mdd_readpage,
1443         .moo_readlink      = mdd_readlink,
1444         .moo_capa_get      = mdd_capa_get
1445 };