Whamcloud - gitweb
b=17670
[fs/lustre-release.git] / lustre / osd / osd_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/osd/osd_handler.c
37  *
38  * Top-level entry points into osd module
39  *
40  * Author: Nikita Danilov <nikita@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49
50 /* LUSTRE_VERSION_CODE */
51 #include <lustre_ver.h>
52 /* prerequisite for linux/xattr.h */
53 #include <linux/types.h>
54 /* prerequisite for linux/xattr.h */
55 #include <linux/fs.h>
56 /* XATTR_{REPLACE,CREATE} */
57 #include <linux/xattr.h>
58 /*
59  * XXX temporary stuff: direct access to ldiskfs/jdb. Interface between osd
60  * and file system is not yet specified.
61  */
62 /* handle_t, journal_start(), journal_stop() */
63 #include <linux/jbd.h>
64 /* LDISKFS_SB() */
65 #include <linux/ldiskfs_fs.h>
66 #include <linux/ldiskfs_jbd.h>
67 /* simple_mkdir() */
68 #include <lvfs.h>
69
70 /*
71  * struct OBD_{ALLOC,FREE}*()
72  * OBD_FAIL_CHECK
73  */
74 #include <obd_support.h>
75 /* struct ptlrpc_thread */
76 #include <lustre_net.h>
77
78 /* fid_is_local() */
79 #include <lustre_fid.h>
80 #include <linux/lustre_iam.h>
81
82 #include "osd_internal.h"
83 #include "osd_igif.h"
84
85 /* llo_* api support */
86 #include <md_object.h>
87
88 static const char dot[] = ".";
89 static const char dotdot[] = "..";
90 static const char remote_obj_dir[] = "REM_OBJ_DIR";
91
92 struct osd_directory {
93         struct iam_container od_container;
94         struct iam_descr     od_descr;
95 };
96
97 struct osd_object {
98         struct dt_object       oo_dt;
99         /**
100          * Inode for file system object represented by this osd_object. This
101          * inode is pinned for the whole duration of lu_object life.
102          *
103          * Not modified concurrently (either setup early during object
104          * creation, or assigned by osd_object_create() under write lock).
105          */
106         struct inode          *oo_inode;
107         /**
108          * to protect index ops.
109          */
110         struct rw_semaphore    oo_ext_idx_sem;
111         struct rw_semaphore    oo_sem;
112         struct osd_directory  *oo_dir;
113         /** protects inode attributes. */
114         spinlock_t             oo_guard;
115         /**
116          * Following two members are used to indicate the presence of dot and
117          * dotdot in the given directory. This is required for interop mode
118          * (b11826).
119          */
120         int oo_compat_dot_created;
121         int oo_compat_dotdot_created;
122
123         const struct lu_env   *oo_owner;
124 #ifdef CONFIG_LOCKDEP
125         struct lockdep_map     oo_dep_map;
126 #endif
127 };
128
129 static int   osd_root_get      (const struct lu_env *env,
130                                 struct dt_device *dev, struct lu_fid *f);
131
132 static int   lu_device_is_osd  (const struct lu_device *d);
133 static void  osd_mod_exit      (void) __exit;
134 static int   osd_mod_init      (void) __init;
135 static int   osd_type_init     (struct lu_device_type *t);
136 static void  osd_type_fini     (struct lu_device_type *t);
137 static int   osd_object_init   (const struct lu_env *env,
138                                 struct lu_object *l,
139                                 const struct lu_object_conf *unused);
140 static void  osd_object_release(const struct lu_env *env,
141                                 struct lu_object *l);
142 static int   osd_object_print  (const struct lu_env *env, void *cookie,
143                                 lu_printer_t p, const struct lu_object *o);
144 static struct lu_device *osd_device_free   (const struct lu_env *env,
145                                 struct lu_device *m);
146 static void *osd_key_init      (const struct lu_context *ctx,
147                                 struct lu_context_key *key);
148 static void  osd_key_fini      (const struct lu_context *ctx,
149                                 struct lu_context_key *key, void *data);
150 static void  osd_key_exit      (const struct lu_context *ctx,
151                                 struct lu_context_key *key, void *data);
152 static int   osd_has_index     (const struct osd_object *obj);
153 static void  osd_object_init0  (struct osd_object *obj);
154 static int   osd_device_init   (const struct lu_env *env,
155                                 struct lu_device *d, const char *,
156                                 struct lu_device *);
157 static int   osd_fid_lookup    (const struct lu_env *env,
158                                 struct osd_object *obj,
159                                 const struct lu_fid *fid);
160 static void  osd_inode_getattr (const struct lu_env *env,
161                                 struct inode *inode, struct lu_attr *attr);
162 static int   osd_inode_setattr (const struct lu_env *env,
163                                 struct inode *inode, const struct lu_attr *attr);
164 static int   osd_param_is_sane (const struct osd_device *dev,
165                                 const struct txn_param *param);
166 static int   osd_index_iam_lookup(const struct lu_env *env,
167                                   struct dt_object *dt,
168                                   struct dt_rec *rec, const struct dt_key *key,
169                                   struct lustre_capa *capa);
170 static int   osd_index_ea_lookup(const struct lu_env *env,
171                                  struct dt_object *dt,
172                                  struct dt_rec *rec, const struct dt_key *key,
173                                  struct lustre_capa *capa);
174 static int   osd_index_iam_insert(const struct lu_env *env,
175                                   struct dt_object *dt,
176                                   const struct dt_rec *rec,
177                                   const struct dt_key *key,
178                                   struct thandle *handle,
179                                   struct lustre_capa *capa,
180                                   int ingore_quota);
181 static int   osd_index_ea_insert (const struct lu_env *env,
182                                   struct dt_object *dt,
183                                   const struct dt_rec *rec,
184                                   const struct dt_key *key,
185                                   struct thandle *handle,
186                                   struct lustre_capa *capa,
187                                   int ingore_quota);
188 static int   osd_index_iam_delete(const struct lu_env *env,
189                                   struct dt_object *dt, const struct dt_key *key,
190                                   struct thandle *handle,
191                                   struct lustre_capa *capa);
192 static int   osd_index_ea_delete (const struct lu_env *env,
193                                   struct dt_object *dt, const struct dt_key *key,
194                                   struct thandle *handle,
195                                   struct lustre_capa *capa);
196
197 static int   osd_iam_index_probe   (const struct lu_env *env,
198                                     struct osd_object *o,
199                                     const struct dt_index_features *feat);
200 static int   osd_index_try     (const struct lu_env *env,
201                                 struct dt_object *dt,
202                                 const struct dt_index_features *feat);
203 static void  osd_index_fini    (struct osd_object *o);
204
205 static void  osd_it_iam_fini       (const struct lu_env *env, struct dt_it *di);
206 static int   osd_it_iam_get        (const struct lu_env *env,
207                                     struct dt_it *di, const struct dt_key *key);
208 static void  osd_it_iam_put        (const struct lu_env *env, struct dt_it *di);
209 static int   osd_it_iam_next       (const struct lu_env *env, struct dt_it *di);
210 static int   osd_it_iam_key_size   (const struct lu_env *env,
211                                     const struct dt_it *di);
212 static void  osd_it_ea_fini    (const struct lu_env *env, struct dt_it *di);
213 static int   osd_it_ea_get     (const struct lu_env *env,
214                                 struct dt_it *di, const struct dt_key *key);
215 static void  osd_it_ea_put     (const struct lu_env *env, struct dt_it *di);
216 static int   osd_it_ea_next    (const struct lu_env *env, struct dt_it *di);
217 static int   osd_it_ea_key_size(const struct lu_env *env,
218                                 const struct dt_it *di);
219
220 static void  osd_conf_get      (const struct lu_env *env,
221                                 const struct dt_device *dev,
222                                 struct dt_device_param *param);
223 static void  osd_trans_stop    (const struct lu_env *env,
224                                 struct thandle *th);
225 static int   osd_object_is_root(const struct osd_object *obj);
226
227 static struct osd_object  *osd_obj          (const struct lu_object *o);
228 static struct osd_device  *osd_dev          (const struct lu_device *d);
229 static struct osd_device  *osd_dt_dev       (const struct dt_device *d);
230 static struct osd_object  *osd_dt_obj       (const struct dt_object *d);
231 static struct osd_device  *osd_obj2dev      (const struct osd_object *o);
232 static struct lu_device   *osd2lu_dev       (struct osd_device *osd);
233 static struct lu_device   *osd_device_fini  (const struct lu_env *env,
234                                              struct lu_device *d);
235 static struct lu_device   *osd_device_alloc (const struct lu_env *env,
236                                              struct lu_device_type *t,
237                                              struct lustre_cfg *cfg);
238 static struct lu_object   *osd_object_alloc (const struct lu_env *env,
239                                              const struct lu_object_header *hdr,
240                                              struct lu_device *d);
241 static struct inode       *osd_iget         (struct osd_thread_info *info,
242                                              struct osd_device *dev,
243                                              const struct osd_inode_id *id);
244 static struct super_block *osd_sb           (const struct osd_device *dev);
245 static struct dt_it       *osd_it_iam_init  (const struct lu_env *env,
246                                              struct dt_object *dt,
247                                              struct lustre_capa *capa);
248 static struct dt_key      *osd_it_iam_key   (const struct lu_env *env,
249                                              const struct dt_it *di);
250 static int                 osd_it_iam_rec   (const struct lu_env *env,
251                                              const struct dt_it *di,
252                                              struct lu_dirent *lde,
253                                              __u32 attr);
254 static struct dt_it       *osd_it_ea_init   (const struct lu_env *env,
255                                              struct dt_object *dt,
256                                              struct lustre_capa *capa);
257 static struct dt_key      *osd_it_ea_key    (const struct lu_env *env,
258                                              const struct dt_it *di);
259 static inline int          osd_it_ea_rec    (const struct lu_env *env,
260                                              const struct dt_it *di,
261                                              struct lu_dirent *lde,
262                                              __u32 attr);
263
264 static struct timespec    *osd_inode_time   (const struct lu_env *env,
265                                              struct inode *inode,
266                                              __u64 seconds);
267 static struct thandle     *osd_trans_start  (const struct lu_env *env,
268                                              struct dt_device *d,
269                                              struct txn_param *p);
270 static journal_t          *osd_journal      (const struct osd_device *dev);
271
272 static int __osd_ea_add_rec(struct osd_thread_info *info,
273                             struct osd_object *pobj,
274                             struct osd_object *cobj,
275                             const char *name,
276                             struct thandle *th);
277
278 static const struct lu_device_type_operations osd_device_type_ops;
279 static       struct lu_device_type            osd_device_type;
280 static const struct lu_object_operations      osd_lu_obj_ops;
281 static       struct obd_ops                   osd_obd_device_ops;
282 static const struct lu_device_operations      osd_lu_ops;
283 static       struct lu_context_key            osd_key;
284 static const struct dt_object_operations      osd_obj_ops;
285 static const struct dt_object_operations      osd_obj_ea_ops;
286 static const struct dt_body_operations        osd_body_ops;
287 static const struct dt_index_operations       osd_index_iam_ops;
288 static const struct dt_index_operations       osd_index_ea_ops;
289
290 struct osd_thandle {
291         struct thandle          ot_super;
292         handle_t               *ot_handle;
293         struct journal_callback ot_jcb;
294         /* Link to the device, for debugging. */
295         struct lu_ref_link     *ot_dev_link;
296
297 };
298
299 #ifdef HAVE_QUOTA_SUPPORT
300 static inline void
301 osd_push_ctxt(const struct lu_env *env, struct osd_ctxt *save)
302 {
303         struct md_ucred    *uc = md_ucred(env);
304
305         LASSERT(uc != NULL);
306
307         save->oc_uid = current->fsuid;
308         save->oc_gid = current->fsgid;
309         save->oc_cap = current->cap_effective;
310         current->fsuid         = uc->mu_fsuid;
311         current->fsgid         = uc->mu_fsgid;
312         current->cap_effective = uc->mu_cap;
313 }
314
315 static inline void
316 osd_pop_ctxt(struct osd_ctxt *save)
317 {
318         current->fsuid         = save->oc_uid;
319         current->fsgid         = save->oc_gid;
320         current->cap_effective = save->oc_cap;
321 }
322 #endif
323
324 /*
325  * Invariants, assertions.
326  */
327
328 /*
329  * XXX: do not enable this, until invariant checking code is made thread safe
330  * in the face of pdirops locking.
331  */
332 #define OSD_INVARIANT_CHECKS (0)
333
334 #if OSD_INVARIANT_CHECKS
335 static int osd_invariant(const struct osd_object *obj)
336 {
337         return
338                 obj != NULL &&
339                 ergo(obj->oo_inode != NULL,
340                      obj->oo_inode->i_sb == osd_sb(osd_obj2dev(obj)) &&
341                      atomic_read(&obj->oo_inode->i_count) > 0) &&
342                 ergo(obj->oo_dir != NULL &&
343                      obj->oo_dir->od_conationer.ic_object != NULL,
344                      obj->oo_dir->od_conationer.ic_object == obj->oo_inode);
345 }
346 #else
347 #define osd_invariant(obj) (1)
348 #endif
349
350 static inline struct osd_thread_info *osd_oti_get(const struct lu_env *env)
351 {
352         return lu_context_key_get(&env->le_ctx, &osd_key);
353 }
354
355 /*
356  * Concurrency: doesn't matter
357  */
358 static int osd_read_locked(const struct lu_env *env, struct osd_object *o)
359 {
360         return osd_oti_get(env)->oti_r_locks > 0;
361 }
362
363 /*
364  * Concurrency: doesn't matter
365  */
366 static int osd_write_locked(const struct lu_env *env, struct osd_object *o)
367 {
368         struct osd_thread_info *oti = osd_oti_get(env);
369         return oti->oti_w_locks > 0 && o->oo_owner == env;
370 }
371
372 /*
373  * Concurrency: doesn't access mutable data
374  */
375 static int osd_root_get(const struct lu_env *env,
376                         struct dt_device *dev, struct lu_fid *f)
377 {
378         struct inode *inode;
379
380         inode = osd_sb(osd_dt_dev(dev))->s_root->d_inode;
381         lu_igif_build(f, inode->i_ino, inode->i_generation);
382         return 0;
383 }
384
385 /*
386  * OSD object methods.
387  */
388
389 /*
390  * Concurrency: no concurrent access is possible that early in object
391  * life-cycle.
392  */
393 static struct lu_object *osd_object_alloc(const struct lu_env *env,
394                                           const struct lu_object_header *hdr,
395                                           struct lu_device *d)
396 {
397         struct osd_object *mo;
398
399         OBD_ALLOC_PTR(mo);
400         if (mo != NULL) {
401                 struct lu_object *l;
402
403                 l = &mo->oo_dt.do_lu;
404                 dt_object_init(&mo->oo_dt, NULL, d);
405                 if (osd_dev(d)->od_iop_mode)
406                         mo->oo_dt.do_ops = &osd_obj_ea_ops;
407                 else
408                         mo->oo_dt.do_ops = &osd_obj_ops;
409
410                 l->lo_ops = &osd_lu_obj_ops;
411                 init_rwsem(&mo->oo_sem);
412                 init_rwsem(&mo->oo_ext_idx_sem);
413                 spin_lock_init(&mo->oo_guard);
414                 return l;
415         } else
416                 return NULL;
417 }
418
419 /*
420  * Concurrency: shouldn't matter.
421  */
422 static void osd_object_init0(struct osd_object *obj)
423 {
424         LASSERT(obj->oo_inode != NULL);
425         obj->oo_dt.do_body_ops = &osd_body_ops;
426         obj->oo_dt.do_lu.lo_header->loh_attr |=
427                 (LOHA_EXISTS | (obj->oo_inode->i_mode & S_IFMT));
428 }
429
430 /*
431  * Concurrency: no concurrent access is possible that early in object
432  * life-cycle.
433  */
434 static int osd_object_init(const struct lu_env *env, struct lu_object *l,
435                            const struct lu_object_conf *unused)
436 {
437         struct osd_object *obj = osd_obj(l);
438         int result;
439
440         LINVRNT(osd_invariant(obj));
441
442         result = osd_fid_lookup(env, obj, lu_object_fid(l));
443         if (result == 0) {
444                 if (obj->oo_inode != NULL)
445                         osd_object_init0(obj);
446         }
447         LINVRNT(osd_invariant(obj));
448         return result;
449 }
450
451 /*
452  * Concurrency: no concurrent access is possible that late in object
453  * life-cycle.
454  */
455 static void osd_object_free(const struct lu_env *env, struct lu_object *l)
456 {
457         struct osd_object *obj = osd_obj(l);
458
459         LINVRNT(osd_invariant(obj));
460
461         dt_object_fini(&obj->oo_dt);
462         OBD_FREE_PTR(obj);
463 }
464
465 static struct iam_path_descr *osd_it_ipd_get(const struct lu_env *env,
466                                              const struct iam_container *bag)
467 {
468         return bag->ic_descr->id_ops->id_ipd_alloc(bag,
469                                            osd_oti_get(env)->oti_it_ipd);
470 }
471
472 static struct iam_path_descr *osd_idx_ipd_get(const struct lu_env *env,
473                                               const struct iam_container *bag)
474 {
475         return bag->ic_descr->id_ops->id_ipd_alloc(bag,
476                                            osd_oti_get(env)->oti_idx_ipd);
477 }
478
479 static void osd_ipd_put(const struct lu_env *env,
480                         const struct iam_container *bag,
481                         struct iam_path_descr *ipd)
482 {
483         bag->ic_descr->id_ops->id_ipd_free(ipd);
484 }
485
486 /*
487  * Concurrency: no concurrent access is possible that late in object
488  * life-cycle.
489  */
490 static void osd_index_fini(struct osd_object *o)
491 {
492         struct iam_container *bag;
493
494         if (o->oo_dir != NULL) {
495                 bag = &o->oo_dir->od_container;
496                 if (o->oo_inode != NULL) {
497                         if (bag->ic_object == o->oo_inode)
498                                 iam_container_fini(bag);
499                 }
500                 OBD_FREE_PTR(o->oo_dir);
501                 o->oo_dir = NULL;
502         }
503 }
504
505 /*
506  * Concurrency: no concurrent access is possible that late in object
507  * life-cycle (for all existing callers, that is. New callers have to provide
508  * their own locking.)
509  */
510 static int osd_inode_unlinked(const struct inode *inode)
511 {
512         return inode->i_nlink == 0;
513 }
514
515 enum {
516         OSD_TXN_OI_DELETE_CREDITS    = 20,
517         OSD_TXN_INODE_DELETE_CREDITS = 20
518 };
519
520 /*
521  * Concurrency: no concurrent access is possible that late in object
522  * life-cycle.
523  */
524 static int osd_inode_remove(const struct lu_env *env, struct osd_object *obj)
525 {
526         const struct lu_fid    *fid = lu_object_fid(&obj->oo_dt.do_lu);
527         struct osd_device      *osd = osd_obj2dev(obj);
528         struct osd_thread_info *oti = osd_oti_get(env);
529         struct txn_param       *prm = &oti->oti_txn;
530         struct lu_env          *env_del_obj = &oti->oti_obj_delete_tx_env;
531         struct thandle         *th;
532         int result;
533
534         lu_env_init(env_del_obj, LCT_DT_THREAD);
535         txn_param_init(prm, OSD_TXN_OI_DELETE_CREDITS +
536                             OSD_TXN_INODE_DELETE_CREDITS);
537         th = osd_trans_start(env_del_obj, &osd->od_dt_dev, prm);
538         if (!IS_ERR(th)) {
539                 result = osd_oi_delete(osd_oti_get(env_del_obj),
540                                        &osd->od_oi, fid, th);
541                 osd_trans_stop(env_del_obj, th);
542         } else
543                 result = PTR_ERR(th);
544
545         lu_env_fini(env_del_obj);
546         return result;
547 }
548
549 /*
550  * Called just before object is freed. Releases all resources except for
551  * object itself (that is released by osd_object_free()).
552  *
553  * Concurrency: no concurrent access is possible that late in object
554  * life-cycle.
555  */
556 static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
557 {
558         struct osd_object *obj   = osd_obj(l);
559         struct inode      *inode = obj->oo_inode;
560
561         LINVRNT(osd_invariant(obj));
562
563         /*
564          * If object is unlinked remove fid->ino mapping from object index.
565          */
566
567         osd_index_fini(obj);
568         if (inode != NULL) {
569                 int result;
570
571                 if (osd_inode_unlinked(inode)) {
572                         result = osd_inode_remove(env, obj);
573                         if (result != 0)
574                                 LU_OBJECT_DEBUG(D_ERROR, env, l,
575                                                 "Failed to cleanup: %d\n",
576                                                 result);
577                 }
578
579                 iput(inode);
580                 obj->oo_inode = NULL;
581         }
582 }
583
584 /*
585  * Concurrency: ->loo_object_release() is called under site spin-lock.
586  */
587 static void osd_object_release(const struct lu_env *env,
588                                struct lu_object *l)
589 {
590         struct osd_object *o = osd_obj(l);
591
592         LASSERT(!lu_object_is_dying(l->lo_header));
593         if (o->oo_inode != NULL && osd_inode_unlinked(o->oo_inode))
594                 set_bit(LU_OBJECT_HEARD_BANSHEE, &l->lo_header->loh_flags);
595 }
596
597 /*
598  * Concurrency: shouldn't matter.
599  */
600 static int osd_object_print(const struct lu_env *env, void *cookie,
601                             lu_printer_t p, const struct lu_object *l)
602 {
603         struct osd_object *o = osd_obj(l);
604         struct iam_descr  *d;
605
606         if (o->oo_dir != NULL)
607                 d = o->oo_dir->od_container.ic_descr;
608         else
609                 d = NULL;
610         return (*p)(env, cookie, LUSTRE_OSD_NAME"-object@%p(i:%p:%lu/%u)[%s]",
611                     o, o->oo_inode,
612                     o->oo_inode ? o->oo_inode->i_ino : 0UL,
613                     o->oo_inode ? o->oo_inode->i_generation : 0,
614                     d ? d->id_ops->id_name : "plain");
615 }
616
617 /*
618  * Concurrency: shouldn't matter.
619  */
620 int osd_statfs(const struct lu_env *env, struct dt_device *d,
621                struct kstatfs *sfs)
622 {
623         struct osd_device *osd = osd_dt_dev(d);
624         struct super_block *sb = osd_sb(osd);
625         int result = 0;
626
627         spin_lock(&osd->od_osfs_lock);
628         /* cache 1 second */
629         if (cfs_time_before_64(osd->od_osfs_age, cfs_time_shift_64(-1))) {
630                 result = ll_do_statfs(sb, &osd->od_kstatfs);
631                 if (likely(result == 0)) /* N.B. statfs can't really fail */
632                         osd->od_osfs_age = cfs_time_current_64();
633         }
634
635         if (likely(result == 0))
636                 *sfs = osd->od_kstatfs;
637         spin_unlock(&osd->od_osfs_lock);
638
639         return result;
640 }
641
642 /*
643  * Concurrency: doesn't access mutable data.
644  */
645 static void osd_conf_get(const struct lu_env *env,
646                          const struct dt_device *dev,
647                          struct dt_device_param *param)
648 {
649         /*
650          * XXX should be taken from not-yet-existing fs abstraction layer.
651          */
652         param->ddp_max_name_len  = LDISKFS_NAME_LEN;
653         param->ddp_max_nlink     = LDISKFS_LINK_MAX;
654         param->ddp_block_shift   = osd_sb(osd_dt_dev(dev))->s_blocksize_bits;
655 }
656
657 /**
658  * Helper function to get and fill the buffer with input values.
659  */
660 static struct lu_buf *osd_buf_get(const struct lu_env *env, void *area, ssize_t len)
661 {
662         struct lu_buf *buf;
663
664         buf = &osd_oti_get(env)->oti_buf;
665         buf->lb_buf = area;
666         buf->lb_len = len;
667         return buf;
668 }
669
670 /*
671  * Journal
672  */
673
674 /*
675  * Concurrency: doesn't access mutable data.
676  */
677 static int osd_param_is_sane(const struct osd_device *dev,
678                              const struct txn_param *param)
679 {
680         return param->tp_credits <= osd_journal(dev)->j_max_transaction_buffers;
681 }
682
683 /*
684  * Concurrency: shouldn't matter.
685  */
686 static void osd_trans_commit_cb(struct journal_callback *jcb, int error)
687 {
688         struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb);
689         struct thandle     *th  = &oh->ot_super;
690         struct dt_device   *dev = th->th_dev;
691         struct lu_device   *lud = &dev->dd_lu_dev;
692
693         LASSERT(dev != NULL);
694         LASSERT(oh->ot_handle == NULL);
695
696         if (error) {
697                 CERROR("transaction @0x%p commit error: %d\n", th, error);
698         } else {
699                 struct lu_env *env = &osd_dt_dev(dev)->od_env_for_commit;
700                 /*
701                  * This od_env_for_commit is only for commit usage.  see
702                  * "struct dt_device"
703                  */
704                 lu_context_enter(&env->le_ctx);
705                 dt_txn_hook_commit(env, th);
706                 lu_context_exit(&env->le_ctx);
707         }
708
709         lu_ref_del_at(&lud->ld_reference, oh->ot_dev_link, "osd-tx", th);
710         lu_device_put(lud);
711         th->th_dev = NULL;
712
713         lu_context_exit(&th->th_ctx);
714         lu_context_fini(&th->th_ctx);
715         OBD_FREE_PTR(oh);
716 }
717
718 /*
719  * Concurrency: shouldn't matter.
720  */
721 static struct thandle *osd_trans_start(const struct lu_env *env,
722                                        struct dt_device *d,
723                                        struct txn_param *p)
724 {
725         struct osd_device  *dev = osd_dt_dev(d);
726         handle_t           *jh;
727         struct osd_thandle *oh;
728         struct thandle     *th;
729         int hook_res;
730
731         ENTRY;
732
733         hook_res = dt_txn_hook_start(env, d, p);
734         if (hook_res != 0)
735                 RETURN(ERR_PTR(hook_res));
736
737         if (osd_param_is_sane(dev, p)) {
738                 OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO);
739                 if (oh != NULL) {
740                         struct osd_thread_info *oti = osd_oti_get(env);
741
742                         /*
743                          * XXX temporary stuff. Some abstraction layer should
744                          * be used.
745                          */
746
747                         jh = journal_start(osd_journal(dev), p->tp_credits);
748                         if (!IS_ERR(jh)) {
749                                 oh->ot_handle = jh;
750                                 th = &oh->ot_super;
751                                 th->th_dev = d;
752                                 th->th_result = 0;
753                                 jh->h_sync = p->tp_sync;
754                                 lu_device_get(&d->dd_lu_dev);
755                                 oh->ot_dev_link = lu_ref_add
756                                         (&d->dd_lu_dev.ld_reference,
757                                          "osd-tx", th);
758                                 /* add commit callback */
759                                 lu_context_init(&th->th_ctx, LCT_TX_HANDLE);
760                                 lu_context_enter(&th->th_ctx);
761                                 journal_callback_set(jh, osd_trans_commit_cb,
762                                                      (struct journal_callback *)&oh->ot_jcb);
763                                         LASSERT(oti->oti_txns == 0);
764                                         LASSERT(oti->oti_r_locks == 0);
765                                         LASSERT(oti->oti_w_locks == 0);
766                                         oti->oti_txns++;
767                         } else {
768                                 OBD_FREE_PTR(oh);
769                                 th = (void *)jh;
770                         }
771                 } else
772                         th = ERR_PTR(-ENOMEM);
773         } else {
774                 CERROR("Invalid transaction parameters\n");
775                 th = ERR_PTR(-EINVAL);
776         }
777
778         RETURN(th);
779 }
780
781 /*
782  * Concurrency: shouldn't matter.
783  */
784 static void osd_trans_stop(const struct lu_env *env, struct thandle *th)
785 {
786         int result;
787         struct osd_thandle *oh;
788         struct osd_thread_info *oti = osd_oti_get(env);
789
790         ENTRY;
791
792         oh = container_of0(th, struct osd_thandle, ot_super);
793         if (oh->ot_handle != NULL) {
794                 handle_t *hdl = oh->ot_handle;
795
796                 LASSERT(oti->oti_txns == 1);
797                 oti->oti_txns--;
798                 LASSERT(oti->oti_r_locks == 0);
799                 LASSERT(oti->oti_w_locks == 0);
800                 result = dt_txn_hook_stop(env, th);
801                 if (result != 0)
802                         CERROR("Failure in transaction hook: %d\n", result);
803                 oh->ot_handle = NULL;
804                 result = journal_stop(hdl);
805                 if (result != 0)
806                         CERROR("Failure to stop transaction: %d\n", result);
807         }
808         EXIT;
809 }
810
811 /*
812  * Concurrency: shouldn't matter.
813  */
814 static int osd_sync(const struct lu_env *env, struct dt_device *d)
815 {
816         CDEBUG(D_HA, "syncing OSD %s\n", LUSTRE_OSD_NAME);
817         return ldiskfs_force_commit(osd_sb(osd_dt_dev(d)));
818 }
819
820 /**
821  * Start commit for OSD device.
822  *
823  * An implementation of dt_commit_async method for OSD device.
824  * Asychronously starts underlayng fs sync and thereby a transaction
825  * commit.
826  *
827  * \param env environment
828  * \param d dt device
829  *
830  * \see dt_device_operations
831  */
832 static int osd_commit_async(const struct lu_env *env,
833                             struct dt_device *d)
834 {
835         struct super_block *s = osd_sb(osd_dt_dev(d));
836         ENTRY;
837
838         CDEBUG(D_HA, "async commit OSD %s\n", LUSTRE_OSD_NAME);
839         RETURN(s->s_op->sync_fs(s, 0));
840 }
841
842 /*
843  * Concurrency: shouldn't matter.
844  */
845 lvfs_sbdev_type fsfilt_ldiskfs_journal_sbdev(struct super_block *);
846
847 static void osd_ro(const struct lu_env *env, struct dt_device *d)
848 {
849         ENTRY;
850
851         CERROR("*** setting device %s read-only ***\n", LUSTRE_OSD_NAME);
852
853         __lvfs_set_rdonly(lvfs_sbdev(osd_sb(osd_dt_dev(d))),
854                           fsfilt_ldiskfs_journal_sbdev(osd_sb(osd_dt_dev(d))));
855         EXIT;
856 }
857
858
859 /*
860  * Concurrency: serialization provided by callers.
861  */
862 static int osd_init_capa_ctxt(const struct lu_env *env, struct dt_device *d,
863                               int mode, unsigned long timeout, __u32 alg,
864                               struct lustre_capa_key *keys)
865 {
866         struct osd_device *dev = osd_dt_dev(d);
867         ENTRY;
868
869         dev->od_fl_capa = mode;
870         dev->od_capa_timeout = timeout;
871         dev->od_capa_alg = alg;
872         dev->od_capa_keys = keys;
873         RETURN(0);
874 }
875
876 /**
877  * Concurrency: serialization provided by callers.
878  */
879 static void osd_init_quota_ctxt(const struct lu_env *env, struct dt_device *d,
880                                struct dt_quota_ctxt *ctxt, void *data)
881 {
882         struct obd_device *obd = (void *)ctxt;
883         struct vfsmount *mnt = (struct vfsmount *)data;
884         ENTRY;
885
886         obd->u.obt.obt_sb = mnt->mnt_root->d_inode->i_sb;
887         OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
888         obd->obd_lvfs_ctxt.pwdmnt = mnt;
889         obd->obd_lvfs_ctxt.pwd = mnt->mnt_root;
890         obd->obd_lvfs_ctxt.fs = get_ds();
891
892         EXIT;
893 }
894
895 /**
896  * Note: we do not count into QUOTA here.
897  * If we mount with --data_journal we may need more.
898  */
899 static const int osd_dto_credits_noquota[DTO_NR] = {
900         /**
901          * Insert/Delete.
902          * INDEX_EXTRA_TRANS_BLOCKS(8) +
903          * SINGLEDATA_TRANS_BLOCKS(8)
904          * XXX Note: maybe iam need more, since iam have more level than
905          *           EXT3 htree.
906          */
907         [DTO_INDEX_INSERT]  = 16,
908         [DTO_INDEX_DELETE]  = 16,
909         /**
910          * Unused now
911          */
912         [DTO_IDNEX_UPDATE]  = 16,
913         /**
914          * Create a object. The same as create object in EXT3.
915          * DATA_TRANS_BLOCKS(14) +
916          * INDEX_EXTRA_BLOCKS(8) +
917          * 3(inode bits, groups, GDT)
918          */
919         [DTO_OBJECT_CREATE] = 25,
920         /**
921          * Unused now
922          */
923         [DTO_OBJECT_DELETE] = 25,
924         /**
925          * Attr set credits.
926          * 3(inode bits, group, GDT)
927          */
928         [DTO_ATTR_SET_BASE] = 3,
929         /**
930          * Xattr set. The same as xattr of EXT3.
931          * DATA_TRANS_BLOCKS(14)
932          * XXX Note: in original MDS implmentation INDEX_EXTRA_TRANS_BLOCKS are
933          *           also counted in. Do not know why?
934          */
935         [DTO_XATTR_SET]     = 14,
936         [DTO_LOG_REC]       = 14,
937         /**
938          * creadits for inode change during write.
939          */
940         [DTO_WRITE_BASE]    = 3,
941         /**
942          * credits for single block write.
943          */
944         [DTO_WRITE_BLOCK]   = 14,
945         /**
946          * Attr set credits for chown.
947          * 3 (inode bit, group, GDT)
948          */
949         [DTO_ATTR_SET_CHOWN]= 3
950 };
951
952 /**
953  * Note: we count into QUOTA here.
954  * If we mount with --data_journal we may need more.
955  */
956 static const int osd_dto_credits_quota[DTO_NR] = {
957         /**
958          * INDEX_EXTRA_TRANS_BLOCKS(8) +
959          * SINGLEDATA_TRANS_BLOCKS(8) +
960          * 2 * QUOTA_TRANS_BLOCKS(2)
961          */
962         [DTO_INDEX_INSERT]  = 20,
963         /**
964          * INDEX_EXTRA_TRANS_BLOCKS(8) +
965          * SINGLEDATA_TRANS_BLOCKS(8) +
966          * 2 * QUOTA_TRANS_BLOCKS(2)
967          */
968         [DTO_INDEX_DELETE]  = 20,
969         /**
970          * Unused now.
971          */
972         [DTO_IDNEX_UPDATE]  = 16,
973         /*
974          * Create a object. Same as create object in EXT3 filesystem.
975          * DATA_TRANS_BLOCKS(16) +
976          * INDEX_EXTRA_BLOCKS(8) +
977          * 3(inode bits, groups, GDT) +
978          * 2 * QUOTA_INIT_BLOCKS(25)
979          */
980         [DTO_OBJECT_CREATE] = 77,
981         /*
982          * Unused now.
983          * DATA_TRANS_BLOCKS(16) +
984          * INDEX_EXTRA_BLOCKS(8) +
985          * 3(inode bits, groups, GDT) +
986          * QUOTA(?)
987          */
988         [DTO_OBJECT_DELETE] = 27,
989         /**
990          * Attr set credits.
991          * 3 (inode bit, group, GDT) +
992          */
993         [DTO_ATTR_SET_BASE] = 3,
994         /**
995          * Xattr set. The same as xattr of EXT3.
996          * DATA_TRANS_BLOCKS(16)
997          * XXX Note: in original MDS implmentation INDEX_EXTRA_TRANS_BLOCKS are
998          *           also counted in. Do not know why?
999          */
1000         [DTO_XATTR_SET]     = 16,
1001         [DTO_LOG_REC]       = 16,
1002         /**
1003          * creadits for inode change during write.
1004          */
1005         [DTO_WRITE_BASE]    = 3,
1006         /**
1007          * credits for single block write.
1008          */
1009         [DTO_WRITE_BLOCK]   = 16,
1010         /**
1011          * Attr set credits for chown.
1012          * 3 (inode bit, group, GDT) +
1013          * 2 * QUOTA_INIT_BLOCKS(25) +
1014          * 2 * QUOTA_DEL_BLOCKS(9)
1015          */
1016         [DTO_ATTR_SET_CHOWN]= 71
1017 };
1018
1019 static int osd_credit_get(const struct lu_env *env, struct dt_device *d,
1020                           enum dt_txn_op op)
1021 {
1022         LASSERT(ARRAY_SIZE(osd_dto_credits_noquota) ==
1023                 ARRAY_SIZE(osd_dto_credits_quota));
1024         LASSERT(0 <= op && op < ARRAY_SIZE(osd_dto_credits_noquota));
1025 #ifdef HAVE_QUOTA_SUPPORT
1026         if (test_opt(osd_sb(osd_dt_dev(d)), QUOTA))
1027                 return osd_dto_credits_quota[op];
1028         else
1029 #endif
1030                 return osd_dto_credits_noquota[op];
1031 }
1032
1033 static const struct dt_device_operations osd_dt_ops = {
1034         .dt_root_get       = osd_root_get,
1035         .dt_statfs         = osd_statfs,
1036         .dt_trans_start    = osd_trans_start,
1037         .dt_trans_stop     = osd_trans_stop,
1038         .dt_conf_get       = osd_conf_get,
1039         .dt_sync           = osd_sync,
1040         .dt_ro             = osd_ro,
1041         .dt_commit_async   = osd_commit_async,
1042         .dt_credit_get     = osd_credit_get,
1043         .dt_init_capa_ctxt = osd_init_capa_ctxt,
1044         .dt_init_quota_ctxt= osd_init_quota_ctxt,
1045 };
1046
1047 static void osd_object_read_lock(const struct lu_env *env,
1048                                  struct dt_object *dt, unsigned role)
1049 {
1050         struct osd_object *obj = osd_dt_obj(dt);
1051         struct osd_thread_info *oti = osd_oti_get(env);
1052
1053         LINVRNT(osd_invariant(obj));
1054
1055         LASSERT(obj->oo_owner != env);
1056         down_read_nested(&obj->oo_sem, role);
1057
1058         LASSERT(obj->oo_owner == NULL);
1059         oti->oti_r_locks++;
1060 }
1061
1062 static void osd_object_write_lock(const struct lu_env *env,
1063                                   struct dt_object *dt, unsigned role)
1064 {
1065         struct osd_object *obj = osd_dt_obj(dt);
1066         struct osd_thread_info *oti = osd_oti_get(env);
1067
1068         LINVRNT(osd_invariant(obj));
1069
1070         LASSERT(obj->oo_owner != env);
1071         down_write_nested(&obj->oo_sem, role);
1072
1073         LASSERT(obj->oo_owner == NULL);
1074         obj->oo_owner = env;
1075         oti->oti_w_locks++;
1076 }
1077
1078 static void osd_object_read_unlock(const struct lu_env *env,
1079                                    struct dt_object *dt)
1080 {
1081         struct osd_object *obj = osd_dt_obj(dt);
1082         struct osd_thread_info *oti = osd_oti_get(env);
1083
1084         LINVRNT(osd_invariant(obj));
1085
1086         LASSERT(oti->oti_r_locks > 0);
1087         oti->oti_r_locks--;
1088         up_read(&obj->oo_sem);
1089 }
1090
1091 static void osd_object_write_unlock(const struct lu_env *env,
1092                                     struct dt_object *dt)
1093 {
1094         struct osd_object *obj = osd_dt_obj(dt);
1095         struct osd_thread_info *oti = osd_oti_get(env);
1096
1097         LINVRNT(osd_invariant(obj));
1098
1099         LASSERT(obj->oo_owner == env);
1100         LASSERT(oti->oti_w_locks > 0);
1101         oti->oti_w_locks--;
1102         obj->oo_owner = NULL;
1103         up_write(&obj->oo_sem);
1104 }
1105
1106 static int osd_object_write_locked(const struct lu_env *env,
1107                                    struct dt_object *dt)
1108 {
1109         struct osd_object *obj = osd_dt_obj(dt);
1110
1111         LINVRNT(osd_invariant(obj));
1112
1113         return obj->oo_owner == env;
1114 }
1115
1116 static int capa_is_sane(const struct lu_env *env,
1117                         struct osd_device *dev,
1118                         struct lustre_capa *capa,
1119                         struct lustre_capa_key *keys)
1120 {
1121         struct osd_thread_info *oti = osd_oti_get(env);
1122         struct lustre_capa *tcapa = &oti->oti_capa;
1123         struct obd_capa *oc;
1124         int i, rc = 0;
1125         ENTRY;
1126
1127         oc = capa_lookup(dev->od_capa_hash, capa, 0);
1128         if (oc) {
1129                 if (capa_is_expired(oc)) {
1130                         DEBUG_CAPA(D_ERROR, capa, "expired");
1131                         rc = -ESTALE;
1132                 }
1133                 capa_put(oc);
1134                 RETURN(rc);
1135         }
1136
1137         if (capa_is_expired_sec(capa)) {
1138                 DEBUG_CAPA(D_ERROR, capa, "expired");
1139                 RETURN(-ESTALE);
1140         }
1141
1142         spin_lock(&capa_lock);
1143         for (i = 0; i < 2; i++) {
1144                 if (keys[i].lk_keyid == capa->lc_keyid) {
1145                         oti->oti_capa_key = keys[i];
1146                         break;
1147                 }
1148         }
1149         spin_unlock(&capa_lock);
1150
1151         if (i == 2) {
1152                 DEBUG_CAPA(D_ERROR, capa, "no matched capa key");
1153                 RETURN(-ESTALE);
1154         }
1155
1156         rc = capa_hmac(tcapa->lc_hmac, capa, oti->oti_capa_key.lk_key);
1157         if (rc)
1158                 RETURN(rc);
1159
1160         if (memcmp(tcapa->lc_hmac, capa->lc_hmac, sizeof(capa->lc_hmac))) {
1161                 DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch");
1162                 RETURN(-EACCES);
1163         }
1164
1165         oc = capa_add(dev->od_capa_hash, capa);
1166         capa_put(oc);
1167
1168         RETURN(0);
1169 }
1170
1171 static int osd_object_auth(const struct lu_env *env, struct dt_object *dt,
1172                            struct lustre_capa *capa, __u64 opc)
1173 {
1174         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
1175         struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
1176         struct md_capainfo *ci;
1177         int rc;
1178
1179         if (!dev->od_fl_capa)
1180                 return 0;
1181
1182         if (capa == BYPASS_CAPA)
1183                 return 0;
1184
1185         ci = md_capainfo(env);
1186         if (unlikely(!ci))
1187                 return 0;
1188
1189         if (ci->mc_auth == LC_ID_NONE)
1190                 return 0;
1191
1192         if (!capa) {
1193                 CERROR("no capability is provided for fid "DFID"\n", PFID(fid));
1194                 return -EACCES;
1195         }
1196
1197         if (!lu_fid_eq(fid, &capa->lc_fid)) {
1198                 DEBUG_CAPA(D_ERROR, capa, "fid "DFID" mismatch with",
1199                            PFID(fid));
1200                 return -EACCES;
1201         }
1202
1203         if (!capa_opc_supported(capa, opc)) {
1204                 DEBUG_CAPA(D_ERROR, capa, "opc "LPX64" not supported by", opc);
1205                 return -EACCES;
1206         }
1207
1208         if ((rc = capa_is_sane(env, dev, capa, dev->od_capa_keys))) {
1209                 DEBUG_CAPA(D_ERROR, capa, "insane (rc %d)", rc);
1210                 return -EACCES;
1211         }
1212
1213         return 0;
1214 }
1215
1216 static int osd_attr_get(const struct lu_env *env,
1217                         struct dt_object *dt,
1218                         struct lu_attr *attr,
1219                         struct lustre_capa *capa)
1220 {
1221         struct osd_object *obj = osd_dt_obj(dt);
1222
1223         LASSERT(dt_object_exists(dt));
1224         LINVRNT(osd_invariant(obj));
1225
1226         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1227                 return -EACCES;
1228
1229         spin_lock(&obj->oo_guard);
1230         osd_inode_getattr(env, obj->oo_inode, attr);
1231         spin_unlock(&obj->oo_guard);
1232         return 0;
1233 }
1234
1235 static int osd_attr_set(const struct lu_env *env,
1236                         struct dt_object *dt,
1237                         const struct lu_attr *attr,
1238                         struct thandle *handle,
1239                         struct lustre_capa *capa)
1240 {
1241         struct osd_object *obj = osd_dt_obj(dt);
1242         int rc;
1243
1244         LASSERT(handle != NULL);
1245         LASSERT(dt_object_exists(dt));
1246         LASSERT(osd_invariant(obj));
1247
1248         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1249                 return -EACCES;
1250
1251         spin_lock(&obj->oo_guard);
1252         rc = osd_inode_setattr(env, obj->oo_inode, attr);
1253         spin_unlock(&obj->oo_guard);
1254
1255         if (!rc)
1256                 mark_inode_dirty(obj->oo_inode);
1257         return rc;
1258 }
1259
1260 static struct timespec *osd_inode_time(const struct lu_env *env,
1261                                        struct inode *inode, __u64 seconds)
1262 {
1263         struct osd_thread_info *oti = osd_oti_get(env);
1264         struct timespec        *t   = &oti->oti_time;
1265
1266         t->tv_sec  = seconds;
1267         t->tv_nsec = 0;
1268         *t = timespec_trunc(*t, get_sb_time_gran(inode->i_sb));
1269         return t;
1270 }
1271
1272 static int osd_inode_setattr(const struct lu_env *env,
1273                              struct inode *inode, const struct lu_attr *attr)
1274 {
1275         __u64 bits;
1276
1277         bits = attr->la_valid;
1278
1279         LASSERT(!(bits & LA_TYPE)); /* Huh? You want too much. */
1280
1281 #ifdef HAVE_QUOTA_SUPPORT
1282         if ((bits & LA_UID && attr->la_uid != inode->i_uid) ||
1283             (bits & LA_GID && attr->la_gid != inode->i_gid)) {
1284                 struct osd_ctxt *save = &osd_oti_get(env)->oti_ctxt;
1285                 struct iattr iattr;
1286                 int rc;
1287
1288                 iattr.ia_valid = 0;
1289                 if (bits & LA_UID)
1290                         iattr.ia_valid |= ATTR_UID;
1291                 if (bits & LA_GID)
1292                         iattr.ia_valid |= ATTR_GID;
1293                 iattr.ia_uid = attr->la_uid;
1294                 iattr.ia_gid = attr->la_gid;
1295                 osd_push_ctxt(env, save);
1296                 rc = DQUOT_TRANSFER(inode, &iattr) ? -EDQUOT : 0;
1297                 osd_pop_ctxt(save);
1298                 if (rc != 0)
1299                         return rc;
1300         }
1301 #endif
1302
1303         if (bits & LA_ATIME)
1304                 inode->i_atime  = *osd_inode_time(env, inode, attr->la_atime);
1305         if (bits & LA_CTIME)
1306                 inode->i_ctime  = *osd_inode_time(env, inode, attr->la_ctime);
1307         if (bits & LA_MTIME)
1308                 inode->i_mtime  = *osd_inode_time(env, inode, attr->la_mtime);
1309         if (bits & LA_SIZE) {
1310                 LDISKFS_I(inode)->i_disksize = attr->la_size;
1311                 i_size_write(inode, attr->la_size);
1312         }
1313
1314         /* OSD should not change "i_blocks" which is used by quota.
1315          * "i_blocks" should be changed by ldiskfs only.
1316          * Enable this assignment for SOM purpose now, until it is
1317          * stored in SOM EA. */
1318         if (bits & LA_BLOCKS)
1319                 inode->i_blocks = attr->la_blocks;
1320
1321         if (bits & LA_MODE)
1322                 inode->i_mode   = (inode->i_mode & S_IFMT) |
1323                         (attr->la_mode & ~S_IFMT);
1324         if (bits & LA_UID)
1325                 inode->i_uid    = attr->la_uid;
1326         if (bits & LA_GID)
1327                 inode->i_gid    = attr->la_gid;
1328         if (bits & LA_NLINK)
1329                 inode->i_nlink  = attr->la_nlink;
1330         if (bits & LA_RDEV)
1331                 inode->i_rdev   = attr->la_rdev;
1332
1333         if (bits & LA_FLAGS) {
1334                 struct ldiskfs_inode_info *li = LDISKFS_I(inode);
1335
1336                 li->i_flags = (li->i_flags & ~LDISKFS_FL_USER_MODIFIABLE) |
1337                         (attr->la_flags & LDISKFS_FL_USER_MODIFIABLE);
1338         }
1339         return 0;
1340 }
1341
1342 /*
1343  * Object creation.
1344  *
1345  * XXX temporary solution.
1346  */
1347
1348 static int osd_create_pre(struct osd_thread_info *info, struct osd_object *obj,
1349                           struct lu_attr *attr, struct thandle *th)
1350 {
1351         return 0;
1352 }
1353
1354 static int osd_create_post(struct osd_thread_info *info, struct osd_object *obj,
1355                            struct lu_attr *attr, struct thandle *th)
1356 {
1357         osd_object_init0(obj);
1358         return 0;
1359 }
1360
1361 extern struct inode *ldiskfs_create_inode(handle_t *handle,
1362                                           struct inode * dir, int mode);
1363 extern int ldiskfs_add_entry(handle_t *handle, struct dentry *dentry,
1364                              struct inode *inode);
1365 extern int ldiskfs_delete_entry(handle_t *handle,
1366                                 struct inode * dir,
1367                                 struct ldiskfs_dir_entry_2 * de_del,
1368                                 struct buffer_head * bh);
1369 extern struct buffer_head * ldiskfs_find_entry(struct dentry *dentry,
1370                                                struct ldiskfs_dir_entry_2
1371                                                ** res_dir);
1372 extern int ldiskfs_add_dot_dotdot(handle_t *handle, struct inode *dir,
1373                                   struct inode *inode);
1374
1375 extern int ldiskfs_xattr_set_handle(handle_t *handle, struct inode *inode,
1376                                     int name_index, const char *name,
1377                                     const void *value, size_t value_len,
1378                                     int flags);
1379
1380 static struct dentry * osd_child_dentry_get(const struct lu_env *env,
1381                                             struct osd_object *obj,
1382                                             const char *name,
1383                                             const int namelen)
1384 {
1385         struct osd_thread_info *info   = osd_oti_get(env);
1386         struct dentry *child_dentry = &info->oti_child_dentry;
1387         struct dentry *obj_dentry = &info->oti_obj_dentry;
1388
1389         obj_dentry->d_inode = obj->oo_inode;
1390         obj_dentry->d_sb = osd_sb(osd_obj2dev(obj));
1391         obj_dentry->d_name.hash = 0;
1392
1393         child_dentry->d_name.hash = 0;
1394         child_dentry->d_parent = obj_dentry;
1395         child_dentry->d_name.name = name;
1396         child_dentry->d_name.len = namelen;
1397         return child_dentry;
1398 }
1399
1400
1401 static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
1402                       umode_t mode,
1403                       struct dt_allocation_hint *hint,
1404                       struct thandle *th)
1405 {
1406         int result;
1407         struct osd_device  *osd = osd_obj2dev(obj);
1408         struct osd_thandle *oth;
1409         struct dt_object   *parent;
1410         struct inode       *inode;
1411 #ifdef HAVE_QUOTA_SUPPORT
1412         struct osd_ctxt    *save = &info->oti_ctxt;
1413 #endif
1414
1415         LINVRNT(osd_invariant(obj));
1416         LASSERT(obj->oo_inode == NULL);
1417
1418         oth = container_of(th, struct osd_thandle, ot_super);
1419         LASSERT(oth->ot_handle->h_transaction != NULL);
1420
1421         if (hint && hint->dah_parent)
1422                 parent = hint->dah_parent;
1423         else
1424                 parent = osd->od_obj_area;
1425
1426         LASSERT(parent != NULL);
1427         LASSERT(osd_dt_obj(parent)->oo_inode->i_op != NULL);
1428
1429 #ifdef HAVE_QUOTA_SUPPORT
1430         osd_push_ctxt(info->oti_env, save);
1431 #endif
1432         inode = ldiskfs_create_inode(oth->ot_handle,
1433                                      osd_dt_obj(parent)->oo_inode, mode);
1434 #ifdef HAVE_QUOTA_SUPPORT
1435         osd_pop_ctxt(save);
1436 #endif
1437         if (!IS_ERR(inode)) {
1438                 obj->oo_inode = inode;
1439                 result = 0;
1440         } else
1441                 result = PTR_ERR(inode);
1442         LINVRNT(osd_invariant(obj));
1443         return result;
1444 }
1445
1446
1447 extern int iam_lvar_create(struct inode *obj, int keysize, int ptrsize,
1448                            int recsize, handle_t *handle);
1449
1450 extern int iam_lfix_create(struct inode *obj, int keysize, int ptrsize,
1451                            int recsize, handle_t *handle);
1452
1453
1454 enum {
1455         OSD_NAME_LEN = 255
1456 };
1457
1458 static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj,
1459                      struct lu_attr *attr,
1460                      struct dt_allocation_hint *hint,
1461                      struct dt_object_format *dof,
1462                      struct thandle *th)
1463 {
1464         int result;
1465         struct osd_thandle *oth;
1466         struct osd_device *osd = osd_obj2dev(obj);
1467         __u32 mode = (attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX));
1468
1469         LASSERT(S_ISDIR(attr->la_mode));
1470
1471         oth = container_of(th, struct osd_thandle, ot_super);
1472         LASSERT(oth->ot_handle->h_transaction != NULL);
1473         result = osd_mkfile(info, obj, mode, hint, th);
1474         if (result == 0 && osd->od_iop_mode == 0) {
1475                 LASSERT(obj->oo_inode != NULL);
1476                 /*
1477                  * XXX uh-oh... call low-level iam function directly.
1478                  */
1479
1480                 result = iam_lvar_create(obj->oo_inode, OSD_NAME_LEN, 4,
1481                                          sizeof (struct lu_fid_pack),
1482                                          oth->ot_handle);
1483         }
1484         return result;
1485 }
1486
1487 static int osd_mk_index(struct osd_thread_info *info, struct osd_object *obj,
1488                         struct lu_attr *attr,
1489                         struct dt_allocation_hint *hint,
1490                         struct dt_object_format *dof,
1491                         struct thandle *th)
1492 {
1493         int result;
1494         struct osd_thandle *oth;
1495         const struct dt_index_features *feat = dof->u.dof_idx.di_feat;
1496
1497         __u32 mode = (attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX));
1498
1499         LASSERT(S_ISREG(attr->la_mode));
1500
1501         oth = container_of(th, struct osd_thandle, ot_super);
1502         LASSERT(oth->ot_handle->h_transaction != NULL);
1503
1504         result = osd_mkfile(info, obj, mode, hint, th);
1505         if (result == 0) {
1506                 LASSERT(obj->oo_inode != NULL);
1507                 if (feat->dif_flags & DT_IND_VARKEY)
1508                         result = iam_lvar_create(obj->oo_inode,
1509                                                  feat->dif_keysize_max,
1510                                                  feat->dif_ptrsize,
1511                                                  feat->dif_recsize_max,
1512                                                  oth->ot_handle);
1513                 else
1514                         result = iam_lfix_create(obj->oo_inode,
1515                                                  feat->dif_keysize_max,
1516                                                  feat->dif_ptrsize,
1517                                                  feat->dif_recsize_max,
1518                                                  oth->ot_handle);
1519
1520         }
1521         return result;
1522 }
1523
1524 static int osd_mkreg(struct osd_thread_info *info, struct osd_object *obj,
1525                      struct lu_attr *attr,
1526                      struct dt_allocation_hint *hint,
1527                      struct dt_object_format *dof,
1528                      struct thandle *th)
1529 {
1530         LASSERT(S_ISREG(attr->la_mode));
1531         return osd_mkfile(info, obj, (attr->la_mode &
1532                                (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1533 }
1534
1535 static int osd_mksym(struct osd_thread_info *info, struct osd_object *obj,
1536                      struct lu_attr *attr,
1537                      struct dt_allocation_hint *hint,
1538                      struct dt_object_format *dof,
1539                      struct thandle *th)
1540 {
1541         LASSERT(S_ISLNK(attr->la_mode));
1542         return osd_mkfile(info, obj, (attr->la_mode &
1543                               (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1544 }
1545
1546 static int osd_mknod(struct osd_thread_info *info, struct osd_object *obj,
1547                      struct lu_attr *attr,
1548                      struct dt_allocation_hint *hint,
1549                      struct dt_object_format *dof,
1550                      struct thandle *th)
1551 {
1552         umode_t mode = attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX);
1553         int result;
1554
1555         LINVRNT(osd_invariant(obj));
1556         LASSERT(obj->oo_inode == NULL);
1557         LASSERT(S_ISCHR(mode) || S_ISBLK(mode) ||
1558                 S_ISFIFO(mode) || S_ISSOCK(mode));
1559
1560         result = osd_mkfile(info, obj, mode, hint, th);
1561         if (result == 0) {
1562                 LASSERT(obj->oo_inode != NULL);
1563                 init_special_inode(obj->oo_inode, mode, attr->la_rdev);
1564         }
1565         LINVRNT(osd_invariant(obj));
1566         return result;
1567 }
1568
1569 typedef int (*osd_obj_type_f)(struct osd_thread_info *, struct osd_object *,
1570                               struct lu_attr *,
1571                               struct dt_allocation_hint *hint,
1572                               struct dt_object_format *dof,
1573                               struct thandle *);
1574
1575 static osd_obj_type_f osd_create_type_f(enum dt_format_type type)
1576 {
1577         osd_obj_type_f result;
1578
1579         switch (type) {
1580         case DFT_DIR:
1581                 result = osd_mkdir;
1582                 break;
1583         case DFT_REGULAR:
1584                 result = osd_mkreg;
1585                 break;
1586         case DFT_SYM:
1587                 result = osd_mksym;
1588                 break;
1589         case DFT_NODE:
1590                 result = osd_mknod;
1591                 break;
1592         case DFT_INDEX:
1593                 result = osd_mk_index;
1594                 break;
1595
1596         default:
1597                 LBUG();
1598                 break;
1599         }
1600         return result;
1601 }
1602
1603
1604 static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah,
1605                         struct dt_object *parent, umode_t child_mode)
1606 {
1607         LASSERT(ah);
1608
1609         memset(ah, 0, sizeof(*ah));
1610         ah->dah_parent = parent;
1611         ah->dah_mode = child_mode;
1612 }
1613
1614 /**
1615  * Helper function for osd_object_create()
1616  *
1617  * \retval 0, on success
1618  */
1619 static int __osd_object_create(struct osd_thread_info *info,
1620                                struct osd_object *obj, struct lu_attr *attr,
1621                                struct dt_allocation_hint *hint,
1622                                struct dt_object_format *dof,
1623                                struct thandle *th)
1624 {
1625
1626         int result;
1627
1628         result = osd_create_pre(info, obj, attr, th);
1629         if (result == 0) {
1630                 result = osd_create_type_f(dof->dof_type)(info, obj,
1631                                            attr, hint, dof, th);
1632                 if (result == 0)
1633                         result = osd_create_post(info, obj, attr, th);
1634         }
1635         return result;
1636 }
1637
1638 /**
1639  * Helper function for osd_object_create()
1640  *
1641  * \retval 0, on success
1642  */
1643 static int __osd_oi_insert(const struct lu_env *env, struct osd_object *obj,
1644                            const struct lu_fid *fid, struct thandle *th)
1645 {
1646         struct osd_thread_info *info = osd_oti_get(env);
1647         struct osd_inode_id    *id   = &info->oti_id;
1648         struct osd_device      *osd  = osd_obj2dev(obj);
1649         struct md_ucred        *uc   = md_ucred(env);
1650
1651         LASSERT(obj->oo_inode != NULL);
1652         LASSERT(uc != NULL);
1653
1654         id->oii_ino = obj->oo_inode->i_ino;
1655         id->oii_gen = obj->oo_inode->i_generation;
1656
1657         return osd_oi_insert(info, &osd->od_oi, fid, id, th,
1658                              uc->mu_cap & CFS_CAP_SYS_RESOURCE_MASK);
1659 }
1660
1661 static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
1662                              struct lu_attr *attr,
1663                              struct dt_allocation_hint *hint,
1664                              struct dt_object_format *dof,
1665                              struct thandle *th)
1666 {
1667         const struct lu_fid    *fid    = lu_object_fid(&dt->do_lu);
1668         struct osd_object      *obj    = osd_dt_obj(dt);
1669         struct osd_thread_info *info   = osd_oti_get(env);
1670         int result;
1671
1672         ENTRY;
1673
1674         LINVRNT(osd_invariant(obj));
1675         LASSERT(!dt_object_exists(dt));
1676         LASSERT(osd_write_locked(env, obj));
1677         LASSERT(th != NULL);
1678
1679         result = __osd_object_create(info, obj, attr, hint, dof, th);
1680         if (result == 0)
1681                 result = __osd_oi_insert(env, obj, fid, th);
1682
1683         LASSERT(ergo(result == 0, dt_object_exists(dt)));
1684         LASSERT(osd_invariant(obj));
1685         RETURN(result);
1686 }
1687
1688 /**
1689  * Helper function for osd_xattr_set()
1690  */
1691 static int __osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
1692                            const struct lu_buf *buf, const char *name, int fl)
1693 {
1694         struct osd_object      *obj      = osd_dt_obj(dt);
1695         struct inode           *inode    = obj->oo_inode;
1696         struct osd_thread_info *info     = osd_oti_get(env);
1697         struct dentry          *dentry   = &info->oti_child_dentry;
1698         struct timespec        *t        = &info->oti_time;
1699         int                     fs_flags = 0;
1700         int  rc;
1701
1702         LASSERT(dt_object_exists(dt));
1703         LASSERT(inode->i_op != NULL && inode->i_op->setxattr != NULL);
1704         LASSERT(osd_write_locked(env, obj));
1705
1706         if (fl & LU_XATTR_REPLACE)
1707                 fs_flags |= XATTR_REPLACE;
1708
1709         if (fl & LU_XATTR_CREATE)
1710                 fs_flags |= XATTR_CREATE;
1711
1712         dentry->d_inode = inode;
1713         *t = inode->i_ctime;
1714         rc = inode->i_op->setxattr(dentry, name, buf->lb_buf,
1715                                    buf->lb_len, fs_flags);
1716         /* ctime should not be updated with server-side time. */
1717         spin_lock(&obj->oo_guard);
1718         inode->i_ctime = *t;
1719         spin_unlock(&obj->oo_guard);
1720         mark_inode_dirty(inode);
1721         return rc;
1722 }
1723
1724 /**
1725  * Put the fid into lustre_mdt_attrs, and then place the structure
1726  * inode's ea. This fid should not be altered during the life time
1727  * of the inode.
1728  *
1729  * \retval +ve, on success
1730  * \retval -ve, on error
1731  *
1732  * FIXME: It is good to have/use ldiskfs_xattr_set_handle() here
1733  */
1734 static int osd_ea_fid_set(const struct lu_env *env, struct dt_object *dt,
1735                           const struct lu_fid *fid)
1736 {
1737         struct osd_thread_info  *info      = osd_oti_get(env);
1738         struct lustre_mdt_attrs *mdt_attrs = &info->oti_mdt_attrs;
1739
1740         fid_cpu_to_be(&mdt_attrs->lma_self_fid, fid);
1741
1742         return __osd_xattr_set(env, dt,
1743                                osd_buf_get(env, mdt_attrs, sizeof *mdt_attrs),
1744                                XATTR_NAME_LMA, LU_XATTR_CREATE);
1745
1746 }
1747
1748 /**
1749  * Helper function to form igif
1750  */
1751 static inline void osd_igif_get(const struct lu_env *env, struct dentry *dentry,
1752                                 struct lu_fid *fid)
1753 {
1754         struct inode  *inode = dentry->d_inode;
1755         lu_igif_build(fid, inode->i_ino, inode->i_generation);
1756 }
1757
1758 /**
1759  * Helper function to pack the fid
1760  */
1761 static inline void osd_fid_pack(const struct lu_env *env, const struct lu_fid *fid,
1762                                 struct dt_rec *pack)
1763 {
1764         fid_pack((struct lu_fid_pack *)pack, fid, &osd_oti_get(env)->oti_fid);
1765 }
1766
1767 /**
1768  * Try to read the fid from inode ea into dt_rec, if return value
1769  * i.e. rc is +ve, then we got fid, otherwise we will have to form igif
1770  *
1771  * \param fid, object fid.
1772  *
1773  * \retval 0, on success
1774  */
1775 static int osd_ea_fid_get(const struct lu_env *env, struct dentry *dentry,
1776                           struct lu_fid *fid)
1777 {
1778         struct inode            *inode     = dentry->d_inode;
1779         struct osd_thread_info  *info      = osd_oti_get(env);
1780         struct lustre_mdt_attrs *mdt_attrs = &info->oti_mdt_attrs;
1781         int rc;
1782
1783         LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
1784
1785         rc = inode->i_op->getxattr(dentry, XATTR_NAME_LMA, (void *)mdt_attrs,
1786                                    sizeof *mdt_attrs);
1787
1788         /* Check LMA compatibility */
1789         if (rc > 0 &&
1790             (mdt_attrs->lma_incompat & ~cpu_to_be32(LMA_INCOMPAT_SUPP))) {
1791                 CWARN("Inode %lx: Unsupported incompat LMA feature(s) %#x\n",
1792                       inode->i_ino, be32_to_cpu(mdt_attrs->lma_incompat) &
1793                       ~LMA_INCOMPAT_SUPP);
1794                 return -ENOSYS;
1795         }
1796
1797         if (rc > 0) {
1798                 fid_be_to_cpu(fid, &mdt_attrs->lma_self_fid);
1799                 rc = 0;
1800         } else if (rc == -ENODATA) {
1801                 osd_igif_get(env, dentry, fid);
1802                 rc = 0;
1803         }
1804
1805         return rc;
1806 }
1807
1808 /**
1809  * OSD layer object create function for interoperability mode (b11826).
1810  * This is mostly similar to osd_object_create(). Only difference being, fid is
1811  * inserted into inode ea here.
1812  *
1813  * \retval   0, on success
1814  * \retval -ve, on error
1815  */
1816 static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt,
1817                              struct lu_attr *attr,
1818                              struct dt_allocation_hint *hint,
1819                              struct dt_object_format *dof,
1820                              struct thandle *th)
1821 {
1822         const struct lu_fid    *fid    = lu_object_fid(&dt->do_lu);
1823         struct osd_object      *obj    = osd_dt_obj(dt);
1824         struct osd_thread_info *info   = osd_oti_get(env);
1825         int result;
1826         int is_root = 0;
1827
1828         ENTRY;
1829
1830         LASSERT(osd_invariant(obj));
1831         LASSERT(!dt_object_exists(dt));
1832         LASSERT(osd_write_locked(env, obj));
1833         LASSERT(th != NULL);
1834
1835         result = __osd_object_create(info, obj, attr, hint, dof, th);
1836
1837         if (hint && hint->dah_parent)
1838                 is_root = osd_object_is_root(osd_dt_obj(hint->dah_parent));
1839
1840         /* objects under osd root shld have igif fid, so dont add fid EA */
1841         if (result == 0 && is_root == 0)
1842                 result = osd_ea_fid_set(env, dt, fid);
1843
1844         if (result == 0)
1845                 result = __osd_oi_insert(env, obj, fid, th);
1846
1847         LASSERT(ergo(result == 0, dt_object_exists(dt)));
1848         LINVRNT(osd_invariant(obj));
1849         RETURN(result);
1850 }
1851
1852 /*
1853  * Concurrency: @dt is write locked.
1854  */
1855 static void osd_object_ref_add(const struct lu_env *env,
1856                                struct dt_object *dt,
1857                                struct thandle *th)
1858 {
1859         struct osd_object *obj = osd_dt_obj(dt);
1860         struct inode *inode = obj->oo_inode;
1861
1862         LINVRNT(osd_invariant(obj));
1863         LASSERT(dt_object_exists(dt));
1864         LASSERT(osd_write_locked(env, obj));
1865         LASSERT(th != NULL);
1866
1867         spin_lock(&obj->oo_guard);
1868         LASSERT(inode->i_nlink < LDISKFS_LINK_MAX);
1869         inode->i_nlink++;
1870         spin_unlock(&obj->oo_guard);
1871         mark_inode_dirty(inode);
1872         LINVRNT(osd_invariant(obj));
1873 }
1874
1875 /*
1876  * Concurrency: @dt is write locked.
1877  */
1878 static void osd_object_ref_del(const struct lu_env *env,
1879                                struct dt_object *dt,
1880                                struct thandle *th)
1881 {
1882         struct osd_object *obj = osd_dt_obj(dt);
1883         struct inode *inode = obj->oo_inode;
1884
1885         LINVRNT(osd_invariant(obj));
1886         LASSERT(dt_object_exists(dt));
1887         LASSERT(osd_write_locked(env, obj));
1888         LASSERT(th != NULL);
1889
1890         spin_lock(&obj->oo_guard);
1891         LASSERT(inode->i_nlink > 0);
1892         inode->i_nlink--;
1893         spin_unlock(&obj->oo_guard);
1894         mark_inode_dirty(inode);
1895         LINVRNT(osd_invariant(obj));
1896 }
1897
1898 /*
1899  * Concurrency: @dt is read locked.
1900  */
1901 static int osd_xattr_get(const struct lu_env *env,
1902                          struct dt_object *dt,
1903                          struct lu_buf *buf,
1904                          const char *name,
1905                          struct lustre_capa *capa)
1906 {
1907         struct osd_object      *obj    = osd_dt_obj(dt);
1908         struct inode           *inode  = obj->oo_inode;
1909         struct osd_thread_info *info   = osd_oti_get(env);
1910         struct dentry          *dentry = &info->oti_obj_dentry;
1911
1912         LASSERT(dt_object_exists(dt));
1913         LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
1914         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
1915
1916         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1917                 return -EACCES;
1918
1919         dentry->d_inode = inode;
1920         return inode->i_op->getxattr(dentry, name, buf->lb_buf, buf->lb_len);
1921 }
1922
1923
1924 /*
1925  * Concurrency: @dt is write locked.
1926  */
1927 static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
1928                          const struct lu_buf *buf, const char *name, int fl,
1929                          struct thandle *handle, struct lustre_capa *capa)
1930 {
1931         LASSERT(handle != NULL);
1932
1933         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1934                 return -EACCES;
1935
1936         return __osd_xattr_set(env, dt, buf, name, fl);
1937 }
1938
1939 /*
1940  * Concurrency: @dt is read locked.
1941  */
1942 static int osd_xattr_list(const struct lu_env *env,
1943                           struct dt_object *dt,
1944                           struct lu_buf *buf,
1945                           struct lustre_capa *capa)
1946 {
1947         struct osd_object      *obj    = osd_dt_obj(dt);
1948         struct inode           *inode  = obj->oo_inode;
1949         struct osd_thread_info *info   = osd_oti_get(env);
1950         struct dentry          *dentry = &info->oti_obj_dentry;
1951
1952         LASSERT(dt_object_exists(dt));
1953         LASSERT(inode->i_op != NULL && inode->i_op->listxattr != NULL);
1954         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
1955
1956         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1957                 return -EACCES;
1958
1959         dentry->d_inode = inode;
1960         return inode->i_op->listxattr(dentry, buf->lb_buf, buf->lb_len);
1961 }
1962
1963 /*
1964  * Concurrency: @dt is write locked.
1965  */
1966 static int osd_xattr_del(const struct lu_env *env,
1967                          struct dt_object *dt,
1968                          const char *name,
1969                          struct thandle *handle,
1970                          struct lustre_capa *capa)
1971 {
1972         struct osd_object      *obj    = osd_dt_obj(dt);
1973         struct inode           *inode  = obj->oo_inode;
1974         struct osd_thread_info *info   = osd_oti_get(env);
1975         struct dentry          *dentry = &info->oti_obj_dentry;
1976         struct timespec        *t      = &info->oti_time;
1977         int                     rc;
1978
1979         LASSERT(dt_object_exists(dt));
1980         LASSERT(inode->i_op != NULL && inode->i_op->removexattr != NULL);
1981         LASSERT(osd_write_locked(env, obj));
1982         LASSERT(handle != NULL);
1983
1984         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1985                 return -EACCES;
1986
1987         dentry->d_inode = inode;
1988         *t = inode->i_ctime;
1989         rc = inode->i_op->removexattr(dentry, name);
1990         /* ctime should not be updated with server-side time. */
1991         spin_lock(&obj->oo_guard);
1992         inode->i_ctime = *t;
1993         spin_unlock(&obj->oo_guard);
1994         mark_inode_dirty(inode);
1995         return rc;
1996 }
1997
1998 static struct obd_capa *osd_capa_get(const struct lu_env *env,
1999                                      struct dt_object *dt,
2000                                      struct lustre_capa *old,
2001                                      __u64 opc)
2002 {
2003         struct osd_thread_info *info = osd_oti_get(env);
2004         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
2005         struct osd_object *obj = osd_dt_obj(dt);
2006         struct osd_device *dev = osd_obj2dev(obj);
2007         struct lustre_capa_key *key = &info->oti_capa_key;
2008         struct lustre_capa *capa = &info->oti_capa;
2009         struct obd_capa *oc;
2010         struct md_capainfo *ci;
2011         int rc;
2012         ENTRY;
2013
2014         if (!dev->od_fl_capa)
2015                 RETURN(ERR_PTR(-ENOENT));
2016
2017         LASSERT(dt_object_exists(dt));
2018         LINVRNT(osd_invariant(obj));
2019
2020         /* renewal sanity check */
2021         if (old && osd_object_auth(env, dt, old, opc))
2022                 RETURN(ERR_PTR(-EACCES));
2023
2024         ci = md_capainfo(env);
2025         if (unlikely(!ci))
2026                 RETURN(ERR_PTR(-ENOENT));
2027
2028         switch (ci->mc_auth) {
2029         case LC_ID_NONE:
2030                 RETURN(NULL);
2031         case LC_ID_PLAIN:
2032                 capa->lc_uid = obj->oo_inode->i_uid;
2033                 capa->lc_gid = obj->oo_inode->i_gid;
2034                 capa->lc_flags = LC_ID_PLAIN;
2035                 break;
2036         case LC_ID_CONVERT: {
2037                 __u32 d[4], s[4];
2038
2039                 s[0] = obj->oo_inode->i_uid;
2040                 get_random_bytes(&(s[1]), sizeof(__u32));
2041                 s[2] = obj->oo_inode->i_gid;
2042                 get_random_bytes(&(s[3]), sizeof(__u32));
2043                 rc = capa_encrypt_id(d, s, key->lk_key, CAPA_HMAC_KEY_MAX_LEN);
2044                 if (unlikely(rc))
2045                         RETURN(ERR_PTR(rc));
2046
2047                 capa->lc_uid   = ((__u64)d[1] << 32) | d[0];
2048                 capa->lc_gid   = ((__u64)d[3] << 32) | d[2];
2049                 capa->lc_flags = LC_ID_CONVERT;
2050                 break;
2051         }
2052         default:
2053                 RETURN(ERR_PTR(-EINVAL));
2054         }
2055
2056         capa->lc_fid = *fid;
2057         capa->lc_opc = opc;
2058         capa->lc_flags |= dev->od_capa_alg << 24;
2059         capa->lc_timeout = dev->od_capa_timeout;
2060         capa->lc_expiry = 0;
2061
2062         oc = capa_lookup(dev->od_capa_hash, capa, 1);
2063         if (oc) {
2064                 LASSERT(!capa_is_expired(oc));
2065                 RETURN(oc);
2066         }
2067
2068         spin_lock(&capa_lock);
2069         *key = dev->od_capa_keys[1];
2070         spin_unlock(&capa_lock);
2071
2072         capa->lc_keyid = key->lk_keyid;
2073         capa->lc_expiry = cfs_time_current_sec() + dev->od_capa_timeout;
2074
2075         rc = capa_hmac(capa->lc_hmac, capa, key->lk_key);
2076         if (rc) {
2077                 DEBUG_CAPA(D_ERROR, capa, "HMAC failed: %d for", rc);
2078                 RETURN(ERR_PTR(rc));
2079         }
2080
2081         oc = capa_add(dev->od_capa_hash, capa);
2082         RETURN(oc);
2083 }
2084
2085 static int osd_object_sync(const struct lu_env *env, struct dt_object *dt)
2086 {
2087         int rc;
2088         struct osd_object      *obj    = osd_dt_obj(dt);
2089         struct inode           *inode  = obj->oo_inode;
2090         struct osd_thread_info *info   = osd_oti_get(env);
2091         struct dentry          *dentry = &info->oti_obj_dentry;
2092         struct file            *file   = &info->oti_file;
2093         ENTRY;
2094
2095         dentry->d_inode = inode;
2096         file->f_dentry = dentry;
2097         file->f_mapping = inode->i_mapping;
2098         file->f_op = inode->i_fop;
2099         LOCK_INODE_MUTEX(inode);
2100         rc = file->f_op->fsync(file, dentry, 0);
2101         UNLOCK_INODE_MUTEX(inode);
2102         RETURN(rc);
2103 }
2104
2105 /*
2106  * Get the 64-bit version for an inode.
2107  */
2108 static dt_obj_version_t osd_object_version_get(const struct lu_env *env,
2109                                                struct dt_object *dt)
2110 {
2111         struct inode *inode = osd_dt_obj(dt)->oo_inode;
2112
2113         CDEBUG(D_INFO, "Get version "LPX64" for inode %lu\n",
2114                LDISKFS_I(inode)->i_fs_version, inode->i_ino);
2115         return LDISKFS_I(inode)->i_fs_version;
2116 }
2117
2118 /*
2119  * Set the 64-bit version and return the old version.
2120  */
2121 static void osd_object_version_set(const struct lu_env *env, struct dt_object *dt,
2122                                    dt_obj_version_t new_version)
2123 {
2124         struct inode *inode = osd_dt_obj(dt)->oo_inode;
2125
2126         CDEBUG(D_INFO, "Set version "LPX64" (old "LPX64") for inode %lu\n",
2127                new_version, LDISKFS_I(inode)->i_fs_version, inode->i_ino);
2128         LDISKFS_I(inode)->i_fs_version = new_version;
2129         /** Version is set after all inode operations are finished,
2130          *  so we should mark it dirty here */
2131         inode->i_sb->s_op->dirty_inode(inode);
2132 }
2133
2134 static int osd_data_get(const struct lu_env *env, struct dt_object *dt,
2135                         void **data)
2136 {
2137         struct osd_object *obj = osd_dt_obj(dt);
2138         ENTRY;
2139
2140         *data = (void *)obj->oo_inode;
2141         RETURN(0);
2142 }
2143
2144 static const struct dt_object_operations osd_obj_ops = {
2145         .do_read_lock    = osd_object_read_lock,
2146         .do_write_lock   = osd_object_write_lock,
2147         .do_read_unlock  = osd_object_read_unlock,
2148         .do_write_unlock = osd_object_write_unlock,
2149         .do_write_locked = osd_object_write_locked,
2150         .do_attr_get     = osd_attr_get,
2151         .do_attr_set     = osd_attr_set,
2152         .do_ah_init      = osd_ah_init,
2153         .do_create       = osd_object_create,
2154         .do_index_try    = osd_index_try,
2155         .do_ref_add      = osd_object_ref_add,
2156         .do_ref_del      = osd_object_ref_del,
2157         .do_xattr_get    = osd_xattr_get,
2158         .do_xattr_set    = osd_xattr_set,
2159         .do_xattr_del    = osd_xattr_del,
2160         .do_xattr_list   = osd_xattr_list,
2161         .do_capa_get     = osd_capa_get,
2162         .do_object_sync  = osd_object_sync,
2163         .do_version_get  = osd_object_version_get,
2164         .do_version_set  = osd_object_version_set,
2165         .do_data_get     = osd_data_get,
2166 };
2167
2168 /**
2169  * dt_object_operations for interoperability mode
2170  * (i.e. to run 2.0 mds on 1.8 disk) (b11826)
2171  */
2172 static const struct dt_object_operations osd_obj_ea_ops = {
2173         .do_read_lock    = osd_object_read_lock,
2174         .do_write_lock   = osd_object_write_lock,
2175         .do_read_unlock  = osd_object_read_unlock,
2176         .do_write_unlock = osd_object_write_unlock,
2177         .do_write_locked = osd_object_write_locked,
2178         .do_attr_get     = osd_attr_get,
2179         .do_attr_set     = osd_attr_set,
2180         .do_ah_init      = osd_ah_init,
2181         .do_create       = osd_object_ea_create,
2182         .do_index_try    = osd_index_try,
2183         .do_ref_add      = osd_object_ref_add,
2184         .do_ref_del      = osd_object_ref_del,
2185         .do_xattr_get    = osd_xattr_get,
2186         .do_xattr_set    = osd_xattr_set,
2187         .do_xattr_del    = osd_xattr_del,
2188         .do_xattr_list   = osd_xattr_list,
2189         .do_capa_get     = osd_capa_get,
2190         .do_object_sync  = osd_object_sync,
2191         .do_version_get  = osd_object_version_get,
2192         .do_version_set  = osd_object_version_set,
2193         .do_data_get     = osd_data_get,
2194 };
2195
2196 /*
2197  * Body operations.
2198  */
2199
2200 /*
2201  * XXX: Another layering violation for now.
2202  *
2203  * We don't want to use ->f_op->read methods, because generic file write
2204  *
2205  *         - serializes on ->i_sem, and
2206  *
2207  *         - does a lot of extra work like balance_dirty_pages(),
2208  *
2209  * which doesn't work for globally shared files like /last-received.
2210  */
2211 int fsfilt_ldiskfs_read(struct inode *inode, void *buf, int size, loff_t *offs);
2212 int fsfilt_ldiskfs_write_handle(struct inode *inode, void *buf, int bufsize,
2213                                 loff_t *offs, handle_t *handle);
2214
2215 static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt,
2216                         struct lu_buf *buf, loff_t *pos,
2217                         struct lustre_capa *capa)
2218 {
2219         struct inode *inode = osd_dt_obj(dt)->oo_inode;
2220
2221         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
2222                 RETURN(-EACCES);
2223
2224         return fsfilt_ldiskfs_read(inode, buf->lb_buf, buf->lb_len, pos);
2225 }
2226
2227 static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
2228                          const struct lu_buf *buf, loff_t *pos,
2229                          struct thandle *handle, struct lustre_capa *capa,
2230                          int ignore_quota)
2231 {
2232         struct inode       *inode = osd_dt_obj(dt)->oo_inode;
2233         struct osd_thandle *oh;
2234         ssize_t             result;
2235 #ifdef HAVE_QUOTA_SUPPORT
2236         cfs_cap_t           save = current->cap_effective;
2237 #endif
2238
2239         LASSERT(handle != NULL);
2240
2241         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_WRITE))
2242                 RETURN(-EACCES);
2243
2244         oh = container_of(handle, struct osd_thandle, ot_super);
2245         LASSERT(oh->ot_handle->h_transaction != NULL);
2246 #ifdef HAVE_QUOTA_SUPPORT
2247         if (ignore_quota)
2248                 current->cap_effective |= CFS_CAP_SYS_RESOURCE_MASK;
2249         else
2250                 current->cap_effective &= ~CFS_CAP_SYS_RESOURCE_MASK;
2251 #endif
2252         result = fsfilt_ldiskfs_write_handle(inode, buf->lb_buf, buf->lb_len,
2253                                              pos, oh->ot_handle);
2254 #ifdef HAVE_QUOTA_SUPPORT
2255         current->cap_effective = save;
2256 #endif
2257         if (result == 0)
2258                 result = buf->lb_len;
2259         return result;
2260 }
2261
2262 static const struct dt_body_operations osd_body_ops = {
2263         .dbo_read  = osd_read,
2264         .dbo_write = osd_write
2265 };
2266
2267 /*
2268  * Index operations.
2269  */
2270
2271 static int osd_object_is_root(const struct osd_object *obj)
2272 {
2273         return osd_sb(osd_obj2dev(obj))->s_root->d_inode == obj->oo_inode;
2274 }
2275
2276 static int osd_iam_index_probe(const struct lu_env *env, struct osd_object *o,
2277                            const struct dt_index_features *feat)
2278 {
2279         struct iam_descr *descr;
2280         struct dt_object *dt = &o->oo_dt;
2281
2282         if (osd_object_is_root(o))
2283                 return feat == &dt_directory_features;
2284
2285         LASSERT(o->oo_dir != NULL);
2286
2287         descr = o->oo_dir->od_container.ic_descr;
2288         if (feat == &dt_directory_features) {
2289                 if (descr->id_rec_size == sizeof(struct lu_fid_pack))
2290                         return 1;
2291
2292                 if (descr == &iam_htree_compat_param) {
2293                         /* if it is a HTREE dir then there is good chance that,
2294                          * we dealing with ext3 directory here with no FIDs. */
2295
2296                         if (descr->id_rec_size ==
2297                             sizeof ((struct ldiskfs_dir_entry_2 *)NULL)->inode) {
2298
2299                                 dt->do_index_ops = &osd_index_ea_ops;
2300                                 return 1;
2301                         }
2302                 }
2303                 return 0;
2304         } else {
2305                 return
2306                         feat->dif_keysize_min <= descr->id_key_size &&
2307                         descr->id_key_size <= feat->dif_keysize_max &&
2308                         feat->dif_recsize_min <= descr->id_rec_size &&
2309                         descr->id_rec_size <= feat->dif_recsize_max &&
2310                         !(feat->dif_flags & (DT_IND_VARKEY |
2311                                              DT_IND_VARREC | DT_IND_NONUNQ)) &&
2312                         ergo(feat->dif_flags & DT_IND_UPDATE,
2313                              1 /* XXX check that object (and file system) is
2314                                 * writable */);
2315         }
2316 }
2317
2318 static int osd_iam_container_init(const struct lu_env *env,
2319                                   struct osd_object *obj,
2320                                   struct osd_directory *dir)
2321 {
2322         int result;
2323         struct iam_container *bag;
2324
2325         bag    = &dir->od_container;
2326         result = iam_container_init(bag, &dir->od_descr, obj->oo_inode);
2327         if (result == 0) {
2328                 result = iam_container_setup(bag);
2329                 if (result == 0)
2330                         obj->oo_dt.do_index_ops = &osd_index_iam_ops;
2331                 else
2332                         iam_container_fini(bag);
2333         }
2334         return result;
2335 }
2336
2337 /*
2338  * Concurrency: no external locking is necessary.
2339  */
2340 static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
2341                          const struct dt_index_features *feat)
2342 {
2343         int result;
2344         int ea_dir = 0;
2345         struct osd_object *obj = osd_dt_obj(dt);
2346         struct osd_device *osd = osd_obj2dev(obj);
2347
2348         LINVRNT(osd_invariant(obj));
2349         LASSERT(dt_object_exists(dt));
2350
2351         if (osd_object_is_root(obj)) {
2352                 dt->do_index_ops = &osd_index_ea_ops;
2353                 result = 0;
2354         } else if (feat == &dt_directory_features && osd->od_iop_mode) {
2355                 dt->do_index_ops = &osd_index_ea_ops;
2356                 if (S_ISDIR(obj->oo_inode->i_mode))
2357                         result = 0;
2358                 else
2359                         result = -ENOTDIR;
2360                 ea_dir = 1;
2361         } else if (!osd_has_index(obj)) {
2362                 struct osd_directory *dir;
2363
2364                 OBD_ALLOC_PTR(dir);
2365                 if (dir != NULL) {
2366
2367                         spin_lock(&obj->oo_guard);
2368                         if (obj->oo_dir == NULL)
2369                                 obj->oo_dir = dir;
2370                         else
2371                                 /*
2372                                  * Concurrent thread allocated container data.
2373                                  */
2374                                 OBD_FREE_PTR(dir);
2375                         spin_unlock(&obj->oo_guard);
2376                         /*
2377                          * Now, that we have container data, serialize its
2378                          * initialization.
2379                          */
2380                         down_write(&obj->oo_ext_idx_sem);
2381                         /*
2382                          * recheck under lock.
2383                          */
2384                         if (!osd_has_index(obj))
2385                                 result = osd_iam_container_init(env, obj, dir);
2386                         else
2387                                 result = 0;
2388                         up_write(&obj->oo_ext_idx_sem);
2389                 } else
2390                         result = -ENOMEM;
2391         } else
2392                 result = 0;
2393
2394         if (result == 0 && ea_dir == 0) {
2395                 if (!osd_iam_index_probe(env, obj, feat))
2396                         result = -ENOTDIR;
2397         }
2398         LINVRNT(osd_invariant(obj));
2399
2400         return result;
2401 }
2402
2403 /**
2404  *      delete a (key, value) pair from index \a dt specified by \a key
2405  *
2406  *      \param  dt_object      osd index object
2407  *      \param  key     key for index
2408  *      \param  rec     record reference
2409  *      \param  handle  transaction handler
2410  *
2411  *      \retval  0  success
2412  *      \retval -ve   failure
2413  */
2414
2415 static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt,
2416                                 const struct dt_key *key, struct thandle *handle,
2417                                 struct lustre_capa *capa)
2418 {
2419         struct osd_object     *obj = osd_dt_obj(dt);
2420         struct osd_thandle    *oh;
2421         struct iam_path_descr *ipd;
2422         struct iam_container  *bag = &obj->oo_dir->od_container;
2423         int rc;
2424
2425         ENTRY;
2426
2427         LINVRNT(osd_invariant(obj));
2428         LASSERT(dt_object_exists(dt));
2429         LASSERT(bag->ic_object == obj->oo_inode);
2430         LASSERT(handle != NULL);
2431
2432         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
2433                 RETURN(-EACCES);
2434
2435         ipd = osd_idx_ipd_get(env, bag);
2436         if (unlikely(ipd == NULL))
2437                 RETURN(-ENOMEM);
2438
2439         oh = container_of0(handle, struct osd_thandle, ot_super);
2440         LASSERT(oh->ot_handle != NULL);
2441         LASSERT(oh->ot_handle->h_transaction != NULL);
2442
2443         rc = iam_delete(oh->ot_handle, bag, (const struct iam_key *)key, ipd);
2444         osd_ipd_put(env, bag, ipd);
2445         LINVRNT(osd_invariant(obj));
2446         RETURN(rc);
2447 }
2448
2449 /**
2450  * Index delete function for interoperability mode (b11826).
2451  * It will remove the directory entry added by osd_index_ea_insert().
2452  * This entry is needed to maintain name->fid mapping.
2453  *
2454  * \param key,  key i.e. file entry to be deleted
2455  *
2456  * \retval   0, on success
2457  * \retval -ve, on error
2458  */
2459 static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
2460                                const struct dt_key *key, struct thandle *handle,
2461                                struct lustre_capa *capa)
2462 {
2463         struct osd_object          *obj    = osd_dt_obj(dt);
2464         struct inode               *dir    = obj->oo_inode;
2465         struct dentry              *dentry;
2466         struct osd_thandle         *oh;
2467         struct ldiskfs_dir_entry_2 *de;
2468         struct buffer_head         *bh;
2469
2470         int rc;
2471
2472         ENTRY;
2473
2474         LINVRNT(osd_invariant(obj));
2475         LASSERT(dt_object_exists(dt));
2476         LASSERT(handle != NULL);
2477
2478         oh = container_of(handle, struct osd_thandle, ot_super);
2479         LASSERT(oh->ot_handle != NULL);
2480         LASSERT(oh->ot_handle->h_transaction != NULL);
2481
2482         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
2483                 RETURN(-EACCES);
2484
2485         dentry = osd_child_dentry_get(env, obj,
2486                                       (char *)key, strlen((char *)key));
2487
2488         down_write(&obj->oo_ext_idx_sem);
2489         bh = ldiskfs_find_entry(dentry, &de);
2490         if (bh) {
2491                 struct osd_thread_info *oti = osd_oti_get(env);
2492                 struct timespec *ctime = &oti->oti_time;
2493                 struct timespec *mtime = &oti->oti_time2;
2494
2495                 *ctime = dir->i_ctime;
2496                 *mtime = dir->i_mtime;
2497                 rc = ldiskfs_delete_entry(oh->ot_handle,
2498                                 dir, de, bh);
2499                 /* xtime should not be updated with server-side time. */
2500                 spin_lock(&obj->oo_guard);
2501                 dir->i_ctime = *ctime;
2502                 dir->i_mtime = *mtime;
2503                 spin_unlock(&obj->oo_guard);
2504                 mark_inode_dirty(dir);
2505                 brelse(bh);
2506         } else
2507                 rc = -ENOENT;
2508
2509         up_write(&obj->oo_ext_idx_sem);
2510         LASSERT(osd_invariant(obj));
2511         RETURN(rc);
2512 }
2513
2514 /**
2515  *      Lookup index for \a key and copy record to \a rec.
2516  *
2517  *      \param  dt_object      osd index object
2518  *      \param  key     key for index
2519  *      \param  rec     record reference
2520  *
2521  *      \retval  +ve  success : exact mach
2522  *      \retval  0    return record with key not greater than \a key
2523  *      \retval -ve   failure
2524  */
2525 static int osd_index_iam_lookup(const struct lu_env *env, struct dt_object *dt,
2526                                 struct dt_rec *rec, const struct dt_key *key,
2527                                 struct lustre_capa *capa)
2528 {
2529         struct osd_object     *obj = osd_dt_obj(dt);
2530         struct iam_path_descr *ipd;
2531         struct iam_container  *bag = &obj->oo_dir->od_container;
2532         struct osd_thread_info *oti = osd_oti_get(env);
2533         struct iam_iterator    *it = &oti->oti_idx_it;
2534         int rc;
2535         ENTRY;
2536
2537         LASSERT(osd_invariant(obj));
2538         LASSERT(dt_object_exists(dt));
2539         LASSERT(bag->ic_object == obj->oo_inode);
2540
2541         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
2542                 RETURN(-EACCES);
2543
2544         ipd = osd_idx_ipd_get(env, bag);
2545         if (IS_ERR(ipd))
2546                 RETURN(-ENOMEM);
2547
2548         /* got ipd now we can start iterator. */
2549         iam_it_init(it, bag, 0, ipd);
2550
2551         rc = iam_it_get(it, (struct iam_key *)key);
2552         if (rc >= 0)
2553                 iam_reccpy(&it->ii_path.ip_leaf, (struct iam_rec *)rec);
2554
2555         iam_it_put(it);
2556         iam_it_fini(it);
2557         osd_ipd_put(env, bag, ipd);
2558
2559         LINVRNT(osd_invariant(obj));
2560
2561         RETURN(rc);
2562 }
2563
2564 /**
2565  *      Inserts (key, value) pair in \a dt index object.
2566  *
2567  *      \param  dt      osd index object
2568  *      \param  key     key for index
2569  *      \param  rec     record reference
2570  *      \param  th      transaction handler
2571  *
2572  *      \retval  0  success
2573  *      \retval -ve failure
2574  */
2575 static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt,
2576                                 const struct dt_rec *rec, const struct dt_key *key,
2577                                 struct thandle *th, struct lustre_capa *capa,
2578                                 int ignore_quota)
2579 {
2580         struct osd_object     *obj = osd_dt_obj(dt);
2581         struct iam_path_descr *ipd;
2582         struct osd_thandle    *oh;
2583         struct iam_container  *bag = &obj->oo_dir->od_container;
2584 #ifdef HAVE_QUOTA_SUPPORT
2585         cfs_cap_t              save = current->cap_effective;
2586 #endif
2587         int rc;
2588
2589         ENTRY;
2590
2591         LINVRNT(osd_invariant(obj));
2592         LASSERT(dt_object_exists(dt));
2593         LASSERT(bag->ic_object == obj->oo_inode);
2594         LASSERT(th != NULL);
2595
2596         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
2597                 return -EACCES;
2598
2599         ipd = osd_idx_ipd_get(env, bag);
2600         if (unlikely(ipd == NULL))
2601                 RETURN(-ENOMEM);
2602
2603         oh = container_of0(th, struct osd_thandle, ot_super);
2604         LASSERT(oh->ot_handle != NULL);
2605         LASSERT(oh->ot_handle->h_transaction != NULL);
2606 #ifdef HAVE_QUOTA_SUPPORT
2607         if (ignore_quota)
2608                 current->cap_effective |= CFS_CAP_SYS_RESOURCE_MASK;
2609         else
2610                 current->cap_effective &= ~CFS_CAP_SYS_RESOURCE_MASK;
2611 #endif
2612         rc = iam_insert(oh->ot_handle, bag, (const struct iam_key *)key,
2613                         (struct iam_rec *)rec, ipd);
2614 #ifdef HAVE_QUOTA_SUPPORT
2615         current->cap_effective = save;
2616 #endif
2617         osd_ipd_put(env, bag, ipd);
2618         LINVRNT(osd_invariant(obj));
2619         RETURN(rc);
2620 }
2621
2622 /**
2623  * Calls ldiskfs_add_dot_dotdot() to add dot and dotdot entries
2624  * into the directory.Also sets flags into osd object to
2625  * indicate dot and dotdot are created. This is required for
2626  * interoperability mode (b11826)
2627  *
2628  * \param dir   directory for dot and dotdot fixup.
2629  * \param obj   child object for linking
2630  *
2631  * \retval   0, on success
2632  * \retval -ve, on error
2633  */
2634 static int osd_add_dot_dotdot(struct osd_thread_info *info,
2635                               struct osd_object *dir,
2636                               struct osd_object *obj, const char *name,
2637                               struct thandle *th)
2638 {
2639         struct inode            *parent_dir   = obj->oo_inode;
2640         struct inode            *inode  = dir->oo_inode;
2641         struct osd_thandle      *oth;
2642         int result = 0;
2643
2644         oth = container_of(th, struct osd_thandle, ot_super);
2645         LASSERT(oth->ot_handle->h_transaction != NULL);
2646         LASSERT(S_ISDIR(dir->oo_inode->i_mode));
2647
2648         if (strcmp(name, dot) == 0) {
2649                 if (dir->oo_compat_dot_created) {
2650                         result = -EEXIST;
2651                 } else {
2652                         LASSERT(obj == dir);
2653                         dir->oo_compat_dot_created = 1;
2654                         result = 0;
2655                 }
2656         } else if(strcmp(name, dotdot) == 0) {
2657                 if (!dir->oo_compat_dot_created)
2658                         return -EINVAL;
2659                 if (dir->oo_compat_dotdot_created)
2660                         return __osd_ea_add_rec(info, dir, obj, name, th);
2661
2662                 result = ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir, inode);
2663                 if (result == 0)
2664                        dir->oo_compat_dotdot_created = 1;
2665         }
2666
2667         return result;
2668 }
2669
2670 /**
2671  * Calls ldiskfs_add_entry() to add directory entry
2672  * into the directory. This is required for
2673  * interoperability mode (b11826)
2674  *
2675  * \retval   0, on success
2676  * \retval -ve, on error
2677  */
2678 static int __osd_ea_add_rec(struct osd_thread_info *info,
2679                             struct osd_object *pobj,
2680                             struct osd_object *cobj,
2681                             const char *name,
2682                             struct thandle *th)
2683 {
2684         struct dentry      *child;
2685         struct osd_thandle *oth;
2686         struct inode       *cinode  = cobj->oo_inode;
2687         int rc;
2688
2689         oth = container_of(th, struct osd_thandle, ot_super);
2690         LASSERT(oth->ot_handle != NULL);
2691         LASSERT(oth->ot_handle->h_transaction != NULL);
2692
2693         child = osd_child_dentry_get(info->oti_env, pobj, name, strlen(name));
2694         rc = ldiskfs_add_entry(oth->ot_handle, child, cinode);
2695
2696         RETURN(rc);
2697 }
2698
2699 /**
2700  * It will call the appropriate osd_add* function and return the
2701  * value, return by respective functions.
2702  */
2703 static int osd_ea_add_rec(const struct lu_env *env,
2704                           struct osd_object *pobj,
2705                           struct osd_object *cobj,
2706                           const char *name,
2707                           struct thandle *th)
2708 {
2709         struct osd_thread_info    *info   = osd_oti_get(env);
2710         int rc;
2711
2712         if (name[0] == '.' && (name[1] == '\0' || (name[1] == '.' &&
2713                                                    name[2] =='\0')))
2714                 rc = osd_add_dot_dotdot(info, pobj, cobj, name, th);
2715         else
2716                 rc = __osd_ea_add_rec(info, pobj, cobj, name, th);
2717
2718         return rc;
2719 }
2720
2721 /**
2722  * Calls ->lookup() to find dentry. From dentry get inode and
2723  * read inode's ea to get fid. This is required for  interoperability
2724  * mode (b11826)
2725  *
2726  * \retval   0, on success
2727  * \retval -ve, on error
2728  */
2729 static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
2730                              struct dt_rec *rec, const struct dt_key *key)
2731 {
2732         struct inode            *dir    = obj->oo_inode;
2733         struct osd_thread_info  *info   = osd_oti_get(env);
2734         struct dentry           *dentry;
2735         struct osd_device      *dev = osd_dev(obj->oo_dt.do_lu.lo_dev);
2736         struct osd_inode_id    *id     = &info->oti_id;
2737         struct ldiskfs_dir_entry_2 *de;
2738         struct buffer_head         *bh;
2739         struct lu_fid * fid = &info->oti_fid;
2740         struct inode *inode;
2741         int ino;
2742         int rc;
2743
2744         LASSERT(dir->i_op != NULL && dir->i_op->lookup != NULL);
2745
2746         dentry = osd_child_dentry_get(env, obj,
2747                                       (char *)key, strlen((char *)key));
2748
2749         down_read(&obj->oo_ext_idx_sem);
2750         bh = ldiskfs_find_entry(dentry, &de);
2751         if (bh) {
2752                 ino = le32_to_cpu(de->inode);
2753                 brelse(bh);
2754                 id->oii_ino = ino;
2755                 id->oii_gen = OSD_OII_NOGEN;
2756
2757                 inode = osd_iget(info, dev, id);
2758                 if (!IS_ERR(inode)) {
2759                         dentry->d_inode = inode;
2760
2761                         rc = osd_ea_fid_get(env, dentry, fid);
2762                         if (rc == 0)
2763                                 osd_fid_pack(env, fid, rec);
2764
2765                         iput(inode);
2766                 } else
2767                         rc = PTR_ERR(inode);
2768         } else
2769                 rc = -ENOENT;
2770
2771         up_read(&obj->oo_ext_idx_sem);
2772         RETURN (rc);
2773 }
2774
2775 /**
2776  * Find the osd object for given fid.
2777  *
2778  * \param fid, need to find the osd object having this fid
2779  *
2780  * \retval osd_object, on success
2781  * \retval        -ve, on error
2782  */
2783 struct osd_object *osd_object_find(const struct lu_env *env,
2784                                    struct dt_object *dt,
2785                                    const struct lu_fid *fid)
2786 {
2787         struct lu_device         *ludev = dt->do_lu.lo_dev;
2788         struct osd_object        *child = NULL;
2789         struct lu_object         *luch;
2790         struct lu_object         *lo;
2791
2792         luch = lu_object_find(env, ludev, fid, NULL);
2793         if (!IS_ERR(luch)) {
2794                 if (lu_object_exists(luch)) {
2795                         lo = lu_object_locate(luch->lo_header, ludev->ld_type);
2796                         if (lo != NULL)
2797                                 child = osd_obj(lo);
2798                         else
2799                                 LU_OBJECT_DEBUG(D_ERROR, env, luch,
2800                                                 "lu_object can't be located"
2801                                                 ""DFID"\n", PFID(fid));
2802
2803                         if (child == NULL) {
2804                                 lu_object_put(env, luch);
2805                                 CERROR("Unable to get osd_object\n");
2806                                 child = ERR_PTR(-ENOENT);
2807                         }
2808                 } else {
2809                         LU_OBJECT_DEBUG(D_ERROR, env, luch,
2810                                         "lu_object does not exists "DFID"\n",
2811                                         PFID(fid));
2812                         child = ERR_PTR(-ENOENT);
2813                 }
2814         } else
2815                 child = (void *)luch;
2816
2817         return child;
2818 }
2819
2820 /**
2821  * Put the osd object once done with it.
2822  *
2823  * \param obj, osd object that needs to be put
2824  */
2825 static inline void osd_object_put(const struct lu_env *env,
2826                                   struct osd_object *obj)
2827 {
2828         lu_object_put(env, &obj->oo_dt.do_lu);
2829 }
2830
2831 /**
2832  * Index add function for interoperability mode (b11826).
2833  * It will add the directory entry.This entry is needed to
2834  * maintain name->fid mapping.
2835  *
2836  * \param key, it is key i.e. file entry to be inserted
2837  * \param rec, it is value of given key i.e. fid
2838  *
2839  * \retval   0, on success
2840  * \retval -ve, on error
2841  */
2842 static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
2843                                const struct dt_rec *rec,
2844                                const struct dt_key *key, struct thandle *th,
2845                                struct lustre_capa *capa, int ignore_quota)
2846 {
2847         struct osd_object        *obj   = osd_dt_obj(dt);
2848         struct lu_fid            *fid   = &osd_oti_get(env)->oti_fid;
2849         const struct lu_fid_pack *pack  = (const struct lu_fid_pack *)rec;
2850         const char               *name  = (const char *)key;
2851         struct osd_object        *child;
2852 #ifdef HAVE_QUOTA_SUPPORT
2853         cfs_cap_t                 save  = current->cap_effective;
2854 #endif
2855         int rc;
2856
2857         ENTRY;
2858
2859         LASSERT(osd_invariant(obj));
2860         LASSERT(dt_object_exists(dt));
2861         LASSERT(th != NULL);
2862
2863         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
2864                 RETURN(-EACCES);
2865
2866         rc = fid_unpack(pack, fid);
2867         if (rc != 0)
2868                 RETURN(rc);
2869         child = osd_object_find(env, dt, fid);
2870         if (!IS_ERR(child)) {
2871                 struct inode *inode = obj->oo_inode;
2872                 struct osd_thread_info *oti = osd_oti_get(env);
2873                 struct timespec *ctime = &oti->oti_time;
2874                 struct timespec *mtime = &oti->oti_time2;
2875
2876                 *ctime = inode->i_ctime;
2877                 *mtime = inode->i_mtime;
2878 #ifdef HAVE_QUOTA_SUPPORT
2879                 if (ignore_quota)
2880                         current->cap_effective |= CFS_CAP_SYS_RESOURCE_MASK;
2881                 else
2882                         current->cap_effective &= ~CFS_CAP_SYS_RESOURCE_MASK;
2883 #endif
2884                 down_write(&obj->oo_ext_idx_sem);
2885                 rc = osd_ea_add_rec(env, obj, child, name, th);
2886                 up_write(&obj->oo_ext_idx_sem);
2887 #ifdef HAVE_QUOTA_SUPPORT
2888                 current->cap_effective = save;
2889 #endif
2890                 osd_object_put(env, child);
2891                 /* xtime should not be updated with server-side time. */
2892                 spin_lock(&obj->oo_guard);
2893                 inode->i_ctime = *ctime;
2894                 inode->i_mtime = *mtime;
2895                 spin_unlock(&obj->oo_guard);
2896                 mark_inode_dirty(inode);
2897         } else {
2898                 rc = PTR_ERR(child);
2899         }
2900
2901         LASSERT(osd_invariant(obj));
2902         RETURN(rc);
2903 }
2904
2905 /**
2906  *  Initialize osd Iterator for given osd index object.
2907  *
2908  *  \param  dt      osd index object
2909  */
2910
2911 static struct dt_it *osd_it_iam_init(const struct lu_env *env,
2912                                  struct dt_object *dt,
2913                                  struct lustre_capa *capa)
2914 {
2915         struct osd_it_iam         *it;
2916         struct osd_thread_info *oti = osd_oti_get(env);
2917         struct osd_object     *obj = osd_dt_obj(dt);
2918         struct lu_object      *lo  = &dt->do_lu;
2919         struct iam_path_descr *ipd;
2920         struct iam_container  *bag = &obj->oo_dir->od_container;
2921
2922         LASSERT(lu_object_exists(lo));
2923
2924         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
2925                 return ERR_PTR(-EACCES);
2926
2927         it = &oti->oti_it;
2928         ipd = osd_it_ipd_get(env, bag);
2929         if (likely(ipd != NULL)) {
2930                 it->oi_obj = obj;
2931                 it->oi_ipd = ipd;
2932                 lu_object_get(lo);
2933                 iam_it_init(&it->oi_it, bag, IAM_IT_MOVE, ipd);
2934                 return (struct dt_it *)it;
2935         }
2936         return ERR_PTR(-ENOMEM);
2937 }
2938
2939 /**
2940  * free given Iterator.
2941  */
2942
2943 static void osd_it_iam_fini(const struct lu_env *env, struct dt_it *di)
2944 {
2945         struct osd_it_iam     *it = (struct osd_it_iam *)di;
2946         struct osd_object *obj = it->oi_obj;
2947
2948         iam_it_fini(&it->oi_it);
2949         osd_ipd_put(env, &obj->oo_dir->od_container, it->oi_ipd);
2950         lu_object_put(env, &obj->oo_dt.do_lu);
2951 }
2952
2953 /**
2954  *  Move Iterator to record specified by \a key
2955  *
2956  *  \param  di      osd iterator
2957  *  \param  key     key for index
2958  *
2959  *  \retval +ve  di points to record with least key not larger than key
2960  *  \retval  0   di points to exact matched key
2961  *  \retval -ve  failure
2962  */
2963
2964 static int osd_it_iam_get(const struct lu_env *env,
2965                       struct dt_it *di, const struct dt_key *key)
2966 {
2967         struct osd_it_iam *it = (struct osd_it_iam *)di;
2968
2969         return iam_it_get(&it->oi_it, (const struct iam_key *)key);
2970 }
2971
2972 /**
2973  *  Release Iterator
2974  *
2975  *  \param  di      osd iterator
2976  */
2977
2978 static void osd_it_iam_put(const struct lu_env *env, struct dt_it *di)
2979 {
2980         struct osd_it_iam *it = (struct osd_it_iam *)di;
2981
2982         iam_it_put(&it->oi_it);
2983 }
2984
2985 /**
2986  *  Move iterator by one record
2987  *
2988  *  \param  di      osd iterator
2989  *
2990  *  \retval +1   end of container reached
2991  *  \retval  0   success
2992  *  \retval -ve  failure
2993  */
2994
2995 static int osd_it_iam_next(const struct lu_env *env, struct dt_it *di)
2996 {
2997         struct osd_it_iam *it = (struct osd_it_iam *)di;
2998
2999         return iam_it_next(&it->oi_it);
3000 }
3001
3002 /**
3003  * Return pointer to the key under iterator.
3004  */
3005
3006 static struct dt_key *osd_it_iam_key(const struct lu_env *env,
3007                                  const struct dt_it *di)
3008 {
3009         struct osd_it_iam *it = (struct osd_it_iam *)di;
3010
3011         return (struct dt_key *)iam_it_key_get(&it->oi_it);
3012 }
3013
3014 /**
3015  * Return size of key under iterator (in bytes)
3016  */
3017
3018 static int osd_it_iam_key_size(const struct lu_env *env, const struct dt_it *di)
3019 {
3020         struct osd_it_iam *it = (struct osd_it_iam *)di;
3021
3022         return iam_it_key_size(&it->oi_it);
3023 }
3024
3025 static inline void osd_it_append_attrs(struct lu_dirent*ent,
3026                                        __u32 attr,
3027                                        int len,
3028                                        __u16 type)
3029 {
3030         struct luda_type        *lt;
3031         const unsigned           align = sizeof(struct luda_type) - 1;
3032
3033         /* check if file type is required */
3034         if (attr & LUDA_TYPE) {
3035                         len = (len + align) & ~align;
3036
3037                         lt = (void *) ent->lde_name + len;
3038                         lt->lt_type = cpu_to_le16(CFS_DTTOIF(type));
3039                         ent->lde_attrs |= LUDA_TYPE;
3040         }
3041
3042         ent->lde_attrs = cpu_to_le32(ent->lde_attrs);
3043 }
3044
3045 /**
3046  * build lu direct from backend fs dirent.
3047  */
3048
3049 static inline void osd_it_pack_dirent(struct lu_dirent *ent,
3050                                       struct lu_fid *fid,
3051                                       __u64 offset,
3052                                       char *name,
3053                                       __u16 namelen,
3054                                       __u16 type,
3055                                       __u32 attr)
3056 {
3057         fid_cpu_to_le(&ent->lde_fid, fid);
3058         ent->lde_attrs = LUDA_FID;
3059
3060         ent->lde_hash = cpu_to_le64(offset);
3061         ent->lde_reclen = cpu_to_le16(lu_dirent_calc_size(namelen, attr));
3062
3063         strncpy(ent->lde_name, name, namelen);
3064         ent->lde_namelen = cpu_to_le16(namelen);
3065
3066         /* append lustre attributes */
3067         osd_it_append_attrs(ent, attr, namelen, type);
3068 }
3069
3070 /**
3071  * Return pointer to the record under iterator.
3072  */
3073 static int osd_it_iam_rec(const struct lu_env *env,
3074                           const struct dt_it *di,
3075                           struct lu_dirent *lde,
3076                           __u32 attr)
3077 {
3078         struct osd_it_iam *it        = (struct osd_it_iam *)di;
3079         struct osd_thread_info *info = osd_oti_get(env);
3080         struct lu_fid     *fid       = &info->oti_fid;
3081         const struct lu_fid_pack *rec;
3082         char *name;
3083         int namelen;
3084         __u64 hash;
3085         int rc;
3086
3087         name = (char *)iam_it_key_get(&it->oi_it);
3088         if (IS_ERR(name))
3089                 RETURN(PTR_ERR(name));
3090
3091         namelen = iam_it_key_size(&it->oi_it);
3092
3093         rec = (const struct lu_fid_pack *) iam_it_rec_get(&it->oi_it);
3094         if (IS_ERR(rec))
3095                 RETURN(PTR_ERR(rec));
3096
3097         rc = fid_unpack(rec, fid);
3098         if (rc)
3099                 RETURN(rc);
3100
3101         hash = iam_it_store(&it->oi_it);
3102
3103         /* IAM does not store object type in IAM index (dir) */
3104         osd_it_pack_dirent(lde, fid, hash, name, namelen,
3105                            0, LUDA_FID);
3106
3107         return 0;
3108 }
3109
3110 /**
3111  * Returns cookie for current Iterator position.
3112  */
3113 static __u64 osd_it_iam_store(const struct lu_env *env, const struct dt_it *di)
3114 {
3115         struct osd_it_iam *it = (struct osd_it_iam *)di;
3116
3117         return iam_it_store(&it->oi_it);
3118 }
3119
3120 /**
3121  * Restore iterator from cookie.
3122  *
3123  * \param  di      osd iterator
3124  * \param  hash    Iterator location cookie
3125  *
3126  * \retval +ve  di points to record with least key not larger than key.
3127  * \retval  0   di points to exact matched key
3128  * \retval -ve  failure
3129  */
3130
3131 static int osd_it_iam_load(const struct lu_env *env,
3132                        const struct dt_it *di, __u64 hash)
3133 {
3134         struct osd_it_iam *it = (struct osd_it_iam *)di;
3135
3136         return iam_it_load(&it->oi_it, hash);
3137 }
3138
3139 static const struct dt_index_operations osd_index_iam_ops = {
3140         .dio_lookup = osd_index_iam_lookup,
3141         .dio_insert = osd_index_iam_insert,
3142         .dio_delete = osd_index_iam_delete,
3143         .dio_it     = {
3144                 .init     = osd_it_iam_init,
3145                 .fini     = osd_it_iam_fini,
3146                 .get      = osd_it_iam_get,
3147                 .put      = osd_it_iam_put,
3148                 .next     = osd_it_iam_next,
3149                 .key      = osd_it_iam_key,
3150                 .key_size = osd_it_iam_key_size,
3151                 .rec      = osd_it_iam_rec,
3152                 .store    = osd_it_iam_store,
3153                 .load     = osd_it_iam_load
3154         }
3155 };
3156
3157 /**
3158  * Creates or initializes iterator context.
3159  *
3160  * \retval struct osd_it_ea, iterator structure on success
3161  *
3162  */
3163 static struct dt_it *osd_it_ea_init(const struct lu_env *env,
3164                                     struct dt_object *dt,
3165                                     struct lustre_capa *capa)
3166 {
3167         struct osd_object       *obj  = osd_dt_obj(dt);
3168         struct osd_thread_info  *info = osd_oti_get(env);
3169         struct osd_it_ea        *it   = &info->oti_it_ea;
3170         struct lu_object        *lo   = &dt->do_lu;
3171         struct dentry           *obj_dentry = &info->oti_it_dentry;
3172         ENTRY;
3173         LASSERT(lu_object_exists(lo));
3174
3175         obj_dentry->d_inode = obj->oo_inode;
3176         obj_dentry->d_sb = osd_sb(osd_obj2dev(obj));
3177         obj_dentry->d_name.hash = 0;
3178
3179         it->oie_rd_dirent       = 0;
3180         it->oie_it_dirent       = 0;
3181         it->oie_curr_pos        = 0;
3182         it->oie_next_pos        = 0;
3183         it->oie_dirent          = NULL;
3184         it->oie_buf             = info->oti_it_ea_buf;
3185         it->oie_obj             = obj;
3186         it->oie_file.f_dentry   = obj_dentry;
3187         it->oie_file.f_mapping    = obj->oo_inode->i_mapping;
3188         it->oie_file.f_op         = obj->oo_inode->i_fop;
3189         it->oie_file.private_data = NULL;
3190         lu_object_get(lo);
3191         RETURN((struct dt_it*) it);
3192 }
3193
3194 /**
3195  * Destroy or finishes iterator context.
3196  *
3197  * \param di, struct osd_it_ea, iterator structure to be destroyed
3198  */
3199 static void osd_it_ea_fini(const struct lu_env *env, struct dt_it *di)
3200 {
3201         struct osd_it_ea     *it   = (struct osd_it_ea *)di;
3202         struct osd_object    *obj  = it->oie_obj;
3203         struct inode       *inode  = obj->oo_inode;
3204
3205         ENTRY;
3206         it->oie_file.f_op->release(inode, &it->oie_file);
3207         lu_object_put(env, &obj->oo_dt.do_lu);
3208         EXIT;
3209 }
3210
3211 /**
3212  * It position the iterator at given key, so that next lookup continues from
3213  * that key Or it is similar to dio_it->load() but based on a key,
3214  * rather than file position.
3215  *
3216  * As a special convention, osd_it_ea_get(env, di, "") has to rewind iterator
3217  * to the beginning.
3218  *
3219  * TODO: Presently return +1 considering it is only used by mdd_dir_is_empty().
3220  */
3221 static int osd_it_ea_get(const struct lu_env *env,
3222                          struct dt_it *di, const struct dt_key *key)
3223 {
3224         struct osd_it_ea     *it   = (struct osd_it_ea *)di;
3225
3226         ENTRY;
3227         LASSERT(((const char *)key)[0] == '\0');
3228         it->oie_curr_pos        = 0;
3229         it->oie_next_pos        = 0;
3230         it->oie_rd_dirent       = 0;
3231         it->oie_it_dirent       = 0;
3232         it->oie_dirent          = NULL;
3233
3234         RETURN(+1);
3235 }
3236
3237 /**
3238  * Does nothing
3239  */
3240 static void osd_it_ea_put(const struct lu_env *env, struct dt_it *di)
3241 {
3242 }
3243
3244 /**
3245  * It is called internally by ->readdir(). It fills the
3246  * iterator's in-memory data structure with required
3247  * information i.e. name, namelen, rec_size etc.
3248  *
3249  * \param buf, in which information to be filled in.
3250  * \param name, name of the file in given dir
3251  *
3252  * \retval 0, on success
3253  * \retval 1, on buffer full
3254  */
3255 static int osd_ldiskfs_filldir(char *buf, const char *name, int namelen,
3256                                loff_t offset, ino_t ino,
3257                                unsigned int d_type)
3258 {
3259         struct osd_it_ea        *it = (struct osd_it_ea *)buf;
3260         struct osd_it_ea_dirent *ent = it->oie_dirent;
3261         ENTRY;
3262
3263         /* this should never happen */
3264         if (unlikely(namelen == 0 || namelen > LDISKFS_NAME_LEN)) {
3265                 CERROR("ldiskfs return invalid namelen %d\n", namelen);
3266                 RETURN(-EIO);
3267         }
3268
3269         if ((void *) ent - it->oie_buf + sizeof(*ent) + namelen >
3270             OSD_IT_EA_BUFSIZE)
3271                 RETURN(1);
3272
3273         ent->oied_ino     = ino;
3274         ent->oied_off     = offset;
3275         ent->oied_namelen = namelen;
3276         ent->oied_type    = d_type;
3277
3278         memcpy(ent->oied_name, name, namelen);
3279
3280         it->oie_rd_dirent++;
3281         it->oie_dirent = (void *) ent + size_round(sizeof(*ent) + namelen);
3282         RETURN(0);
3283 }
3284
3285 /**
3286  * Calls ->readdir() to load a directory entry at a time
3287  * and stored it in iterator's in-memory data structure.
3288  *
3289  * \param di, struct osd_it_ea, iterator's in memory structure
3290  *
3291  * \retval   0, on success
3292  * \retval -ve, on error
3293  */
3294 static int osd_ldiskfs_it_fill(const struct dt_it *di)
3295 {
3296         struct osd_it_ea   *it    = (struct osd_it_ea *)di;
3297         struct osd_object  *obj   = it->oie_obj;
3298         struct inode       *inode = obj->oo_inode;
3299         int                result = 0;
3300
3301         ENTRY;
3302         it->oie_dirent = it->oie_buf;
3303         it->oie_rd_dirent = 0;
3304         it->oie_file.f_pos = it->oie_curr_pos;
3305
3306         down_read(&obj->oo_ext_idx_sem);
3307         result = inode->i_fop->readdir(&it->oie_file, it,
3308                                        (filldir_t) osd_ldiskfs_filldir);
3309
3310         up_read(&obj->oo_ext_idx_sem);
3311         it->oie_next_pos = it->oie_file.f_pos;
3312
3313         if (it->oie_rd_dirent == 0) {
3314                 result = -EIO;
3315         } else {
3316                 it->oie_dirent = it->oie_buf;
3317                 it->oie_it_dirent = 1;
3318         }
3319
3320         RETURN(result);
3321 }
3322
3323 /**
3324  * It calls osd_ldiskfs_it_fill() which will use ->readdir()
3325  * to load a directory entry at a time and stored it in
3326  * iterator's in-memory data structure.
3327  *
3328  * \param di, struct osd_it_ea, iterator's in memory structure
3329  *
3330  * \retval +ve, iterator reached to end
3331  * \retval   0, iterator not reached to end
3332  * \retval -ve, on error
3333  */
3334 static int osd_it_ea_next(const struct lu_env *env, struct dt_it *di)
3335 {
3336         struct osd_it_ea *it = (struct osd_it_ea *)di;
3337         int rc;
3338
3339         ENTRY;
3340
3341         if (it->oie_it_dirent < it->oie_rd_dirent) {
3342                 it->oie_dirent = (void *) it->oie_dirent +
3343                                  size_round(sizeof(struct osd_it_ea_dirent) +
3344                                             it->oie_dirent->oied_namelen);
3345                 it->oie_it_dirent++;
3346                 RETURN(0);
3347         } else {
3348                 it->oie_curr_pos = it->oie_next_pos;
3349
3350                 if (it->oie_curr_pos == LDISKFS_HTREE_EOF)
3351                         rc = +1;
3352