Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / osd / osd_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/osd/osd_handler.c
37  *
38  * Top-level entry points into osd module
39  *
40  * Author: Nikita Danilov <nikita@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49
50 /* LUSTRE_VERSION_CODE */
51 #include <lustre_ver.h>
52 /* prerequisite for linux/xattr.h */
53 #include <linux/types.h>
54 /* prerequisite for linux/xattr.h */
55 #include <linux/fs.h>
56 /* XATTR_{REPLACE,CREATE} */
57 #include <linux/xattr.h>
58 /*
59  * XXX temporary stuff: direct access to ldiskfs/jdb. Interface between osd
60  * and file system is not yet specified.
61  */
62 /* handle_t, journal_start(), journal_stop() */
63 #include <linux/jbd.h>
64 /* LDISKFS_SB() */
65 #include <linux/ldiskfs_fs.h>
66 #include <linux/ldiskfs_jbd.h>
67 /* simple_mkdir() */
68 #include <lvfs.h>
69
70 /*
71  * struct OBD_{ALLOC,FREE}*()
72  * OBD_FAIL_CHECK
73  */
74 #include <obd_support.h>
75 /* struct ptlrpc_thread */
76 #include <lustre_net.h>
77
78 /* fid_is_local() */
79 #include <lustre_fid.h>
80 #include <linux/lustre_iam.h>
81
82 #include "osd_internal.h"
83 #include "osd_igif.h"
84
85 /* llo_* api support */
86 #include <md_object.h>
87
88 static const char dot[] = ".";
89 static const char dotdot[] = "..";
90 static const char remote_obj_dir[] = "REM_OBJ_DIR";
91
92 struct osd_directory {
93         struct iam_container od_container;
94         struct iam_descr     od_descr;
95         struct semaphore     od_sem;
96 };
97
98 struct osd_object {
99         struct dt_object       oo_dt;
100         /**
101          * Inode for file system object represented by this osd_object. This
102          * inode is pinned for the whole duration of lu_object life.
103          *
104          * Not modified concurrently (either setup early during object
105          * creation, or assigned by osd_object_create() under write lock).
106          */
107         struct inode          *oo_inode;
108         struct rw_semaphore    oo_sem;
109         struct osd_directory  *oo_dir;
110         /** protects inode attributes. */
111         spinlock_t             oo_guard;
112         /**
113          * Following two members are used to indicate the presence of dot and
114          * dotdot in the given directory. This is required for interop mode
115          * (b11826).
116          */
117         int oo_compat_dot_created;
118         int oo_compat_dotdot_created;
119
120         const struct lu_env   *oo_owner;
121 #ifdef CONFIG_LOCKDEP
122         struct lockdep_map     oo_dep_map;
123 #endif
124 };
125
126 static int   osd_root_get      (const struct lu_env *env,
127                                 struct dt_device *dev, struct lu_fid *f);
128
129 static int   lu_device_is_osd  (const struct lu_device *d);
130 static void  osd_mod_exit      (void) __exit;
131 static int   osd_mod_init      (void) __init;
132 static int   osd_type_init     (struct lu_device_type *t);
133 static void  osd_type_fini     (struct lu_device_type *t);
134 static int   osd_object_init   (const struct lu_env *env,
135                                 struct lu_object *l,
136                                 const struct lu_object_conf *_);
137 static void  osd_object_release(const struct lu_env *env,
138                                 struct lu_object *l);
139 static int   osd_object_print  (const struct lu_env *env, void *cookie,
140                                 lu_printer_t p, const struct lu_object *o);
141 static struct lu_device *osd_device_free   (const struct lu_env *env,
142                                 struct lu_device *m);
143 static void *osd_key_init      (const struct lu_context *ctx,
144                                 struct lu_context_key *key);
145 static void  osd_key_fini      (const struct lu_context *ctx,
146                                 struct lu_context_key *key, void *data);
147 static void  osd_key_exit      (const struct lu_context *ctx,
148                                 struct lu_context_key *key, void *data);
149 static int   osd_has_index     (const struct osd_object *obj);
150 static void  osd_object_init0  (struct osd_object *obj);
151 static int   osd_device_init   (const struct lu_env *env,
152                                 struct lu_device *d, const char *,
153                                 struct lu_device *);
154 static int   osd_fid_lookup    (const struct lu_env *env,
155                                 struct osd_object *obj,
156                                 const struct lu_fid *fid);
157 static void  osd_inode_getattr (const struct lu_env *env,
158                                 struct inode *inode, struct lu_attr *attr);
159 static int   osd_inode_setattr (const struct lu_env *env,
160                                 struct inode *inode, const struct lu_attr *attr);
161 static int   osd_param_is_sane (const struct osd_device *dev,
162                                 const struct txn_param *param);
163 static int   osd_index_iam_lookup(const struct lu_env *env,
164                                   struct dt_object *dt,
165                                   struct dt_rec *rec, const struct dt_key *key,
166                                   struct lustre_capa *capa);
167 static int   osd_index_ea_lookup(const struct lu_env *env,
168                                  struct dt_object *dt,
169                                  struct dt_rec *rec, const struct dt_key *key,
170                                  struct lustre_capa *capa);
171 static int   osd_index_iam_insert(const struct lu_env *env,
172                                   struct dt_object *dt,
173                                   const struct dt_rec *rec,
174                                   const struct dt_key *key,
175                                   struct thandle *handle,
176                                   struct lustre_capa *capa,
177                                   int ingore_quota);
178 static int   osd_index_ea_insert (const struct lu_env *env,
179                                   struct dt_object *dt,
180                                   const struct dt_rec *rec,
181                                   const struct dt_key *key,
182                                   struct thandle *handle,
183                                   struct lustre_capa *capa,
184                                   int ingore_quota);
185 static int   osd_index_iam_delete(const struct lu_env *env,
186                                   struct dt_object *dt, const struct dt_key *key,
187                                   struct thandle *handle,
188                                   struct lustre_capa *capa);
189 static int   osd_index_ea_delete (const struct lu_env *env,
190                                   struct dt_object *dt, const struct dt_key *key,
191                                   struct thandle *handle,
192                                   struct lustre_capa *capa);
193
194 static int   osd_iam_index_probe   (const struct lu_env *env,
195                                     struct osd_object *o,
196                                     const struct dt_index_features *feat);
197 static int   osd_index_try     (const struct lu_env *env,
198                                 struct dt_object *dt,
199                                 const struct dt_index_features *feat);
200 static void  osd_index_fini    (struct osd_object *o);
201
202 static void  osd_it_iam_fini       (const struct lu_env *env, struct dt_it *di);
203 static int   osd_it_iam_get        (const struct lu_env *env,
204                                     struct dt_it *di, const struct dt_key *key);
205 static void  osd_it_iam_put        (const struct lu_env *env, struct dt_it *di);
206 static int   osd_it_iam_next       (const struct lu_env *env, struct dt_it *di);
207 static int   osd_it_iam_key_size   (const struct lu_env *env,
208                                     const struct dt_it *di);
209 static void  osd_it_ea_fini    (const struct lu_env *env, struct dt_it *di);
210 static int   osd_it_ea_get     (const struct lu_env *env,
211                                 struct dt_it *di, const struct dt_key *key);
212 static void  osd_it_ea_put     (const struct lu_env *env, struct dt_it *di);
213 static int   osd_it_ea_next    (const struct lu_env *env, struct dt_it *di);
214 static int   osd_it_ea_key_size(const struct lu_env *env,
215                                 const struct dt_it *di);
216
217 static void  osd_conf_get      (const struct lu_env *env,
218                                 const struct dt_device *dev,
219                                 struct dt_device_param *param);
220 static void  osd_trans_stop    (const struct lu_env *env,
221                                 struct thandle *th);
222 static int   osd_object_is_root(const struct osd_object *obj);
223
224 static struct osd_object  *osd_obj          (const struct lu_object *o);
225 static struct osd_device  *osd_dev          (const struct lu_device *d);
226 static struct osd_device  *osd_dt_dev       (const struct dt_device *d);
227 static struct osd_object  *osd_dt_obj       (const struct dt_object *d);
228 static struct osd_device  *osd_obj2dev      (const struct osd_object *o);
229 static struct lu_device   *osd2lu_dev       (struct osd_device *osd);
230 static struct lu_device   *osd_device_fini  (const struct lu_env *env,
231                                              struct lu_device *d);
232 static struct lu_device   *osd_device_alloc (const struct lu_env *env,
233                                              struct lu_device_type *t,
234                                              struct lustre_cfg *cfg);
235 static struct lu_object   *osd_object_alloc (const struct lu_env *env,
236                                              const struct lu_object_header *hdr,
237                                              struct lu_device *d);
238 static struct inode       *osd_iget         (struct osd_thread_info *info,
239                                              struct osd_device *dev,
240                                              const struct osd_inode_id *id);
241 static struct super_block *osd_sb           (const struct osd_device *dev);
242 static struct dt_it       *osd_it_iam_init  (const struct lu_env *env,
243                                              struct dt_object *dt,
244                                              struct lustre_capa *capa);
245 static struct dt_key      *osd_it_iam_key   (const struct lu_env *env,
246                                              const struct dt_it *di);
247 static struct dt_rec      *osd_it_iam_rec   (const struct lu_env *env,
248                                              const struct dt_it *di);
249 static struct dt_it       *osd_it_ea_init   (const struct lu_env *env,
250                                              struct dt_object *dt,
251                                              struct lustre_capa *capa);
252 static struct dt_key      *osd_it_ea_key    (const struct lu_env *env,
253                                              const struct dt_it *di);
254 static struct dt_rec      *osd_it_ea_rec    (const struct lu_env *env,
255                                              const struct dt_it *di);
256
257 static struct timespec    *osd_inode_time   (const struct lu_env *env,
258                                              struct inode *inode,
259                                              __u64 seconds);
260 static struct thandle     *osd_trans_start  (const struct lu_env *env,
261                                              struct dt_device *d,
262                                              struct txn_param *p);
263 static journal_t          *osd_journal      (const struct osd_device *dev);
264
265 static int __osd_ea_add_rec(struct osd_thread_info *info,
266                             struct osd_object *pobj,
267                             struct osd_object *cobj,
268                             const char *name,
269                             struct thandle *th);
270
271 static const struct lu_device_type_operations osd_device_type_ops;
272 static       struct lu_device_type            osd_device_type;
273 static const struct lu_object_operations      osd_lu_obj_ops;
274 static       struct obd_ops                   osd_obd_device_ops;
275 static const struct lu_device_operations      osd_lu_ops;
276 static       struct lu_context_key            osd_key;
277 static const struct dt_object_operations      osd_obj_ops;
278 static const struct dt_object_operations      osd_obj_ea_ops;
279 static const struct dt_body_operations        osd_body_ops;
280 static const struct dt_index_operations       osd_index_iam_ops;
281 static const struct dt_index_operations       osd_index_ea_ops;
282
283 struct osd_thandle {
284         struct thandle          ot_super;
285         handle_t               *ot_handle;
286         struct journal_callback ot_jcb;
287         /* Link to the device, for debugging. */
288         struct lu_ref_link     *ot_dev_link;
289
290 };
291
292 #ifdef HAVE_QUOTA_SUPPORT
293 static inline void
294 osd_push_ctxt(const struct lu_env *env, struct osd_ctxt *save)
295 {
296         struct md_ucred    *uc = md_ucred(env);
297
298         LASSERT(uc != NULL);
299
300         save->oc_uid = current->fsuid;
301         save->oc_gid = current->fsgid;
302         save->oc_cap = current->cap_effective;
303         current->fsuid         = uc->mu_fsuid;
304         current->fsgid         = uc->mu_fsgid;
305         current->cap_effective = uc->mu_cap;
306 }
307
308 static inline void
309 osd_pop_ctxt(struct osd_ctxt *save)
310 {
311         current->fsuid         = save->oc_uid;
312         current->fsgid         = save->oc_gid;
313         current->cap_effective = save->oc_cap;
314 }
315 #endif
316
317 /*
318  * Invariants, assertions.
319  */
320
321 /*
322  * XXX: do not enable this, until invariant checking code is made thread safe
323  * in the face of pdirops locking.
324  */
325 #define OSD_INVARIANT_CHECKS (0)
326
327 #if OSD_INVARIANT_CHECKS
328 static int osd_invariant(const struct osd_object *obj)
329 {
330         return
331                 obj != NULL &&
332                 ergo(obj->oo_inode != NULL,
333                      obj->oo_inode->i_sb == osd_sb(osd_obj2dev(obj)) &&
334                      atomic_read(&obj->oo_inode->i_count) > 0) &&
335                 ergo(obj->oo_dir != NULL &&
336                      obj->oo_dir->od_conationer.ic_object != NULL,
337                      obj->oo_dir->od_conationer.ic_object == obj->oo_inode);
338 }
339 #else
340 #define osd_invariant(obj) (1)
341 #endif
342
343 static inline struct osd_thread_info *osd_oti_get(const struct lu_env *env)
344 {
345         return lu_context_key_get(&env->le_ctx, &osd_key);
346 }
347
348 /*
349  * Concurrency: doesn't matter
350  */
351 static int osd_read_locked(const struct lu_env *env, struct osd_object *o)
352 {
353         return osd_oti_get(env)->oti_r_locks > 0;
354 }
355
356 /*
357  * Concurrency: doesn't matter
358  */
359 static int osd_write_locked(const struct lu_env *env, struct osd_object *o)
360 {
361         struct osd_thread_info *oti = osd_oti_get(env);
362         return oti->oti_w_locks > 0 && o->oo_owner == env;
363 }
364
365 /*
366  * Concurrency: doesn't access mutable data
367  */
368 static int osd_root_get(const struct lu_env *env,
369                         struct dt_device *dev, struct lu_fid *f)
370 {
371         struct inode *inode;
372
373         inode = osd_sb(osd_dt_dev(dev))->s_root->d_inode;
374         lu_igif_build(f, inode->i_ino, inode->i_generation);
375         return 0;
376 }
377
378 /*
379  * OSD object methods.
380  */
381
382 /*
383  * Concurrency: no concurrent access is possible that early in object
384  * life-cycle.
385  */
386 static struct lu_object *osd_object_alloc(const struct lu_env *env,
387                                           const struct lu_object_header *hdr,
388                                           struct lu_device *d)
389 {
390         struct osd_object *mo;
391
392         OBD_ALLOC_PTR(mo);
393         if (mo != NULL) {
394                 struct lu_object *l;
395
396                 l = &mo->oo_dt.do_lu;
397                 dt_object_init(&mo->oo_dt, NULL, d);
398                 if (osd_dev(d)->od_iop_mode)
399                         mo->oo_dt.do_ops = &osd_obj_ea_ops;
400                 else
401                         mo->oo_dt.do_ops = &osd_obj_ops;
402
403                 l->lo_ops = &osd_lu_obj_ops;
404                 init_rwsem(&mo->oo_sem);
405                 spin_lock_init(&mo->oo_guard);
406                 return l;
407         } else
408                 return NULL;
409 }
410
411 /*
412  * Concurrency: shouldn't matter.
413  */
414 static void osd_object_init0(struct osd_object *obj)
415 {
416         LASSERT(obj->oo_inode != NULL);
417         obj->oo_dt.do_body_ops = &osd_body_ops;
418         obj->oo_dt.do_lu.lo_header->loh_attr |=
419                 (LOHA_EXISTS | (obj->oo_inode->i_mode & S_IFMT));
420 }
421
422 /*
423  * Concurrency: no concurrent access is possible that early in object
424  * life-cycle.
425  */
426 static int osd_object_init(const struct lu_env *env, struct lu_object *l,
427                            const struct lu_object_conf *_)
428 {
429         struct osd_object *obj = osd_obj(l);
430         int result;
431
432         LINVRNT(osd_invariant(obj));
433
434         result = osd_fid_lookup(env, obj, lu_object_fid(l));
435         if (result == 0) {
436                 if (obj->oo_inode != NULL)
437                         osd_object_init0(obj);
438         }
439         LINVRNT(osd_invariant(obj));
440         return result;
441 }
442
443 /*
444  * Concurrency: no concurrent access is possible that late in object
445  * life-cycle.
446  */
447 static void osd_object_free(const struct lu_env *env, struct lu_object *l)
448 {
449         struct osd_object *obj = osd_obj(l);
450
451         LINVRNT(osd_invariant(obj));
452
453         dt_object_fini(&obj->oo_dt);
454         OBD_FREE_PTR(obj);
455 }
456
457 static struct iam_path_descr *osd_it_ipd_get(const struct lu_env *env,
458                                              const struct iam_container *bag)
459 {
460         return bag->ic_descr->id_ops->id_ipd_alloc(bag,
461                                            osd_oti_get(env)->oti_it_ipd);
462 }
463
464 static struct iam_path_descr *osd_idx_ipd_get(const struct lu_env *env,
465                                               const struct iam_container *bag)
466 {
467         return bag->ic_descr->id_ops->id_ipd_alloc(bag,
468                                            osd_oti_get(env)->oti_idx_ipd);
469 }
470
471 static void osd_ipd_put(const struct lu_env *env,
472                         const struct iam_container *bag,
473                         struct iam_path_descr *ipd)
474 {
475         bag->ic_descr->id_ops->id_ipd_free(ipd);
476 }
477
478 /*
479  * Concurrency: no concurrent access is possible that late in object
480  * life-cycle.
481  */
482 static void osd_index_fini(struct osd_object *o)
483 {
484         struct iam_container *bag;
485
486         if (o->oo_dir != NULL) {
487                 bag = &o->oo_dir->od_container;
488                 if (o->oo_inode != NULL) {
489                         if (bag->ic_object == o->oo_inode)
490                                 iam_container_fini(bag);
491                 }
492                 OBD_FREE_PTR(o->oo_dir);
493                 o->oo_dir = NULL;
494         }
495 }
496
497 /*
498  * Concurrency: no concurrent access is possible that late in object
499  * life-cycle (for all existing callers, that is. New callers have to provide
500  * their own locking.)
501  */
502 static int osd_inode_unlinked(const struct inode *inode)
503 {
504         return inode->i_nlink == 0;
505 }
506
507 enum {
508         OSD_TXN_OI_DELETE_CREDITS    = 20,
509         OSD_TXN_INODE_DELETE_CREDITS = 20
510 };
511
512 /*
513  * Concurrency: no concurrent access is possible that late in object
514  * life-cycle.
515  */
516 static int osd_inode_remove(const struct lu_env *env, struct osd_object *obj)
517 {
518         const struct lu_fid    *fid = lu_object_fid(&obj->oo_dt.do_lu);
519         struct osd_device      *osd = osd_obj2dev(obj);
520         struct osd_thread_info *oti = osd_oti_get(env);
521         struct txn_param       *prm = &oti->oti_txn;
522         struct thandle         *th;
523         int result;
524
525         txn_param_init(prm, OSD_TXN_OI_DELETE_CREDITS +
526                             OSD_TXN_INODE_DELETE_CREDITS);
527         th = osd_trans_start(env, &osd->od_dt_dev, prm);
528         if (!IS_ERR(th)) {
529                 result = osd_oi_delete(oti, &osd->od_oi, fid, th);
530                 osd_trans_stop(env, th);
531         } else
532                 result = PTR_ERR(th);
533         return result;
534 }
535
536 /*
537  * Called just before object is freed. Releases all resources except for
538  * object itself (that is released by osd_object_free()).
539  *
540  * Concurrency: no concurrent access is possible that late in object
541  * life-cycle.
542  */
543 static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
544 {
545         struct osd_object *obj   = osd_obj(l);
546         struct inode      *inode = obj->oo_inode;
547
548         LINVRNT(osd_invariant(obj));
549
550         /*
551          * If object is unlinked remove fid->ino mapping from object index.
552          */
553
554         osd_index_fini(obj);
555         if (inode != NULL) {
556                 int result;
557
558                 if (osd_inode_unlinked(inode)) {
559                         result = osd_inode_remove(env, obj);
560                         if (result != 0)
561                                 LU_OBJECT_DEBUG(D_ERROR, env, l,
562                                                 "Failed to cleanup: %d\n",
563                                                 result);
564                 }
565
566                 iput(inode);
567                 obj->oo_inode = NULL;
568         }
569 }
570
571 /*
572  * Concurrency: ->loo_object_release() is called under site spin-lock.
573  */
574 static void osd_object_release(const struct lu_env *env,
575                                struct lu_object *l)
576 {
577         struct osd_object *o = osd_obj(l);
578
579         LASSERT(!lu_object_is_dying(l->lo_header));
580         if (o->oo_inode != NULL && osd_inode_unlinked(o->oo_inode))
581                 set_bit(LU_OBJECT_HEARD_BANSHEE, &l->lo_header->loh_flags);
582 }
583
584 /*
585  * Concurrency: shouldn't matter.
586  */
587 static int osd_object_print(const struct lu_env *env, void *cookie,
588                             lu_printer_t p, const struct lu_object *l)
589 {
590         struct osd_object *o = osd_obj(l);
591         struct iam_descr  *d;
592
593         if (o->oo_dir != NULL)
594                 d = o->oo_dir->od_container.ic_descr;
595         else
596                 d = NULL;
597         return (*p)(env, cookie, LUSTRE_OSD_NAME"-object@%p(i:%p:%lu/%u)[%s]",
598                     o, o->oo_inode,
599                     o->oo_inode ? o->oo_inode->i_ino : 0UL,
600                     o->oo_inode ? o->oo_inode->i_generation : 0,
601                     d ? d->id_ops->id_name : "plain");
602 }
603
604 /*
605  * Concurrency: shouldn't matter.
606  */
607 int osd_statfs(const struct lu_env *env, struct dt_device *d,
608                struct kstatfs *sfs)
609 {
610         struct osd_device *osd = osd_dt_dev(d);
611         struct super_block *sb = osd_sb(osd);
612         int result = 0;
613
614         spin_lock(&osd->od_osfs_lock);
615         /* cache 1 second */
616         if (cfs_time_before_64(osd->od_osfs_age, cfs_time_shift_64(-1))) {
617                 result = ll_do_statfs(sb, &osd->od_kstatfs);
618                 if (likely(result == 0)) /* N.B. statfs can't really fail */
619                         osd->od_osfs_age = cfs_time_current_64();
620         }
621
622         if (likely(result == 0))
623                 *sfs = osd->od_kstatfs;
624         spin_unlock(&osd->od_osfs_lock);
625
626         return result;
627 }
628
629 /*
630  * Concurrency: doesn't access mutable data.
631  */
632 static void osd_conf_get(const struct lu_env *env,
633                          const struct dt_device *dev,
634                          struct dt_device_param *param)
635 {
636         /*
637          * XXX should be taken from not-yet-existing fs abstraction layer.
638          */
639         param->ddp_max_name_len  = LDISKFS_NAME_LEN;
640         param->ddp_max_nlink     = LDISKFS_LINK_MAX;
641         param->ddp_block_shift   = osd_sb(osd_dt_dev(dev))->s_blocksize_bits;
642 }
643
644 /**
645  * Helper function to get and fill the buffer with input values.
646  */
647 static struct lu_buf *osd_buf_get(const struct lu_env *env, void *area, ssize_t len)
648 {
649         struct lu_buf *buf;
650
651         buf = &osd_oti_get(env)->oti_buf;
652         buf->lb_buf = area;
653         buf->lb_len = len;
654         return buf;
655 }
656
657 /*
658  * Journal
659  */
660
661 /*
662  * Concurrency: doesn't access mutable data.
663  */
664 static int osd_param_is_sane(const struct osd_device *dev,
665                              const struct txn_param *param)
666 {
667         return param->tp_credits <= osd_journal(dev)->j_max_transaction_buffers;
668 }
669
670 /*
671  * Concurrency: shouldn't matter.
672  */
673 static void osd_trans_commit_cb(struct journal_callback *jcb, int error)
674 {
675         struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb);
676         struct thandle     *th  = &oh->ot_super;
677         struct dt_device   *dev = th->th_dev;
678         struct lu_device   *lud = &dev->dd_lu_dev;
679
680         LASSERT(dev != NULL);
681         LASSERT(oh->ot_handle == NULL);
682
683         if (error) {
684                 CERROR("transaction @0x%p commit error: %d\n", th, error);
685         } else {
686                 struct lu_env *env = &osd_dt_dev(dev)->od_env_for_commit;
687                 /*
688                  * This od_env_for_commit is only for commit usage.  see
689                  * "struct dt_device"
690                  */
691                 lu_context_enter(&env->le_ctx);
692                 dt_txn_hook_commit(env, th);
693                 lu_context_exit(&env->le_ctx);
694         }
695
696         lu_ref_del_at(&lud->ld_reference, oh->ot_dev_link, "osd-tx", th);
697         lu_device_put(lud);
698         th->th_dev = NULL;
699
700         lu_context_exit(&th->th_ctx);
701         lu_context_fini(&th->th_ctx);
702         OBD_FREE_PTR(oh);
703 }
704
705 /*
706  * Concurrency: shouldn't matter.
707  */
708 static struct thandle *osd_trans_start(const struct lu_env *env,
709                                        struct dt_device *d,
710                                        struct txn_param *p)
711 {
712         struct osd_device  *dev = osd_dt_dev(d);
713         handle_t           *jh;
714         struct osd_thandle *oh;
715         struct thandle     *th;
716         int hook_res;
717
718         ENTRY;
719
720         hook_res = dt_txn_hook_start(env, d, p);
721         if (hook_res != 0)
722                 RETURN(ERR_PTR(hook_res));
723
724         if (osd_param_is_sane(dev, p)) {
725                 OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO);
726                 if (oh != NULL) {
727                         struct osd_thread_info *oti = osd_oti_get(env);
728
729                         /*
730                          * XXX temporary stuff. Some abstraction layer should
731                          * be used.
732                          */
733
734                         jh = journal_start(osd_journal(dev), p->tp_credits);
735                         if (!IS_ERR(jh)) {
736                                 oh->ot_handle = jh;
737                                 th = &oh->ot_super;
738                                 th->th_dev = d;
739                                 th->th_result = 0;
740                                 jh->h_sync = p->tp_sync;
741                                 lu_device_get(&d->dd_lu_dev);
742                                 oh->ot_dev_link = lu_ref_add
743                                         (&d->dd_lu_dev.ld_reference,
744                                          "osd-tx", th);
745                                 /* add commit callback */
746                                 lu_context_init(&th->th_ctx, LCT_TX_HANDLE);
747                                 lu_context_enter(&th->th_ctx);
748                                 journal_callback_set(jh, osd_trans_commit_cb,
749                                                      (struct journal_callback *)&oh->ot_jcb);
750                                         LASSERT(oti->oti_txns == 0);
751                                         LASSERT(oti->oti_r_locks == 0);
752                                         LASSERT(oti->oti_w_locks == 0);
753                                         oti->oti_txns++;
754                         } else {
755                                 OBD_FREE_PTR(oh);
756                                 th = (void *)jh;
757                         }
758                 } else
759                         th = ERR_PTR(-ENOMEM);
760         } else {
761                 CERROR("Invalid transaction parameters\n");
762                 th = ERR_PTR(-EINVAL);
763         }
764
765         RETURN(th);
766 }
767
768 /*
769  * Concurrency: shouldn't matter.
770  */
771 static void osd_trans_stop(const struct lu_env *env, struct thandle *th)
772 {
773         int result;
774         struct osd_thandle *oh;
775         struct osd_thread_info *oti = osd_oti_get(env);
776
777         ENTRY;
778
779         oh = container_of0(th, struct osd_thandle, ot_super);
780         if (oh->ot_handle != NULL) {
781                 handle_t *hdl = oh->ot_handle;
782
783                 LASSERT(oti->oti_txns == 1);
784                 oti->oti_txns--;
785                 LASSERT(oti->oti_r_locks == 0);
786                 LASSERT(oti->oti_w_locks == 0);
787                 result = dt_txn_hook_stop(env, th);
788                 if (result != 0)
789                         CERROR("Failure in transaction hook: %d\n", result);
790                 oh->ot_handle = NULL;
791                 result = journal_stop(hdl);
792                 if (result != 0)
793                         CERROR("Failure to stop transaction: %d\n", result);
794         }
795         EXIT;
796 }
797
798 /*
799  * Concurrency: shouldn't matter.
800  */
801 static int osd_sync(const struct lu_env *env, struct dt_device *d)
802 {
803         CDEBUG(D_HA, "syncing OSD %s\n", LUSTRE_OSD_NAME);
804         return ldiskfs_force_commit(osd_sb(osd_dt_dev(d)));
805 }
806
807 /**
808  * Start commit for OSD device.
809  *
810  * An implementation of dt_commit_async method for OSD device.
811  * Asychronously starts underlayng fs sync and thereby a transaction
812  * commit.
813  *
814  * \param env environment
815  * \param d dt device
816  *
817  * \see dt_device_operations
818  */
819 static int osd_commit_async(const struct lu_env *env,
820                             struct dt_device *d)
821 {
822         struct super_block *s = osd_sb(osd_dt_dev(d));
823         ENTRY;
824
825         CDEBUG(D_HA, "async commit OSD %s\n", LUSTRE_OSD_NAME);
826         RETURN(s->s_op->sync_fs(s, 0));
827 }
828
829 /*
830  * Concurrency: shouldn't matter.
831  */
832 lvfs_sbdev_type fsfilt_ldiskfs_journal_sbdev(struct super_block *);
833
834 static void osd_ro(const struct lu_env *env, struct dt_device *d)
835 {
836         ENTRY;
837
838         CERROR("*** setting device %s read-only ***\n", LUSTRE_OSD_NAME);
839
840         __lvfs_set_rdonly(lvfs_sbdev(osd_sb(osd_dt_dev(d))),
841                           fsfilt_ldiskfs_journal_sbdev(osd_sb(osd_dt_dev(d))));
842         EXIT;
843 }
844
845
846 /*
847  * Concurrency: serialization provided by callers.
848  */
849 static int osd_init_capa_ctxt(const struct lu_env *env, struct dt_device *d,
850                               int mode, unsigned long timeout, __u32 alg,
851                               struct lustre_capa_key *keys)
852 {
853         struct osd_device *dev = osd_dt_dev(d);
854         ENTRY;
855
856         dev->od_fl_capa = mode;
857         dev->od_capa_timeout = timeout;
858         dev->od_capa_alg = alg;
859         dev->od_capa_keys = keys;
860         RETURN(0);
861 }
862
863 /**
864  * Concurrency: serialization provided by callers.
865  */
866 static void osd_init_quota_ctxt(const struct lu_env *env, struct dt_device *d,
867                                struct dt_quota_ctxt *ctxt, void *data)
868 {
869         struct obd_device *obd = (void *)ctxt;
870         struct vfsmount *mnt = (struct vfsmount *)data;
871         ENTRY;
872
873         obd->u.obt.obt_sb = mnt->mnt_root->d_inode->i_sb;
874         OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
875         obd->obd_lvfs_ctxt.pwdmnt = mnt;
876         obd->obd_lvfs_ctxt.pwd = mnt->mnt_root;
877         obd->obd_lvfs_ctxt.fs = get_ds();
878
879         EXIT;
880 }
881
882 /**
883  * Note: we do not count into QUOTA here.
884  * If we mount with --data_journal we may need more.
885  */
886 static const int osd_dto_credits_noquota[DTO_NR] = {
887         /**
888          * Insert/Delete.
889          * INDEX_EXTRA_TRANS_BLOCKS(8) +
890          * SINGLEDATA_TRANS_BLOCKS(8)
891          * XXX Note: maybe iam need more, since iam have more level than
892          *           EXT3 htree.
893          */
894         [DTO_INDEX_INSERT]  = 16,
895         [DTO_INDEX_DELETE]  = 16,
896         /**
897          * Unused now
898          */
899         [DTO_IDNEX_UPDATE]  = 16,
900         /**
901          * Create a object. The same as create object in EXT3.
902          * DATA_TRANS_BLOCKS(14) +
903          * INDEX_EXTRA_BLOCKS(8) +
904          * 3(inode bits, groups, GDT)
905          */
906         [DTO_OBJECT_CREATE] = 25,
907         /**
908          * Unused now
909          */
910         [DTO_OBJECT_DELETE] = 25,
911         /**
912          * Attr set credits.
913          * 3(inode bits, group, GDT)
914          */
915         [DTO_ATTR_SET_BASE] = 3,
916         /**
917          * Xattr set. The same as xattr of EXT3.
918          * DATA_TRANS_BLOCKS(14)
919          * XXX Note: in original MDS implmentation INDEX_EXTRA_TRANS_BLOCKS are
920          *           also counted in. Do not know why?
921          */
922         [DTO_XATTR_SET]     = 14,
923         [DTO_LOG_REC]       = 14,
924         /**
925          * creadits for inode change during write.
926          */
927         [DTO_WRITE_BASE]    = 3,
928         /**
929          * credits for single block write.
930          */
931         [DTO_WRITE_BLOCK]   = 14,
932         /**
933          * Attr set credits for chown.
934          * 3 (inode bit, group, GDT)
935          */
936         [DTO_ATTR_SET_CHOWN]= 3
937 };
938
939 /**
940  * Note: we count into QUOTA here.
941  * If we mount with --data_journal we may need more.
942  */
943 static const int osd_dto_credits_quota[DTO_NR] = {
944         /**
945          * INDEX_EXTRA_TRANS_BLOCKS(8) +
946          * SINGLEDATA_TRANS_BLOCKS(8) +
947          * 2 * QUOTA_TRANS_BLOCKS(2)
948          */
949         [DTO_INDEX_INSERT]  = 20,
950         /**
951          * INDEX_EXTRA_TRANS_BLOCKS(8) +
952          * SINGLEDATA_TRANS_BLOCKS(8) +
953          * 2 * QUOTA_TRANS_BLOCKS(2)
954          */
955         [DTO_INDEX_DELETE]  = 20,
956         /**
957          * Unused now.
958          */ 
959         [DTO_IDNEX_UPDATE]  = 16,
960         /*
961          * Create a object. Same as create object in EXT3 filesystem.
962          * DATA_TRANS_BLOCKS(16) +
963          * INDEX_EXTRA_BLOCKS(8) +
964          * 3(inode bits, groups, GDT) +
965          * 2 * QUOTA_INIT_BLOCKS(25)
966          */
967         [DTO_OBJECT_CREATE] = 77,
968         /*
969          * Unused now.
970          * DATA_TRANS_BLOCKS(16) +
971          * INDEX_EXTRA_BLOCKS(8) +
972          * 3(inode bits, groups, GDT) +
973          * QUOTA(?)
974          */ 
975         [DTO_OBJECT_DELETE] = 27,
976         /**
977          * Attr set credits.
978          * 3 (inode bit, group, GDT) +
979          */
980         [DTO_ATTR_SET_BASE] = 3,
981         /**
982          * Xattr set. The same as xattr of EXT3.
983          * DATA_TRANS_BLOCKS(16)
984          * XXX Note: in original MDS implmentation INDEX_EXTRA_TRANS_BLOCKS are
985          *           also counted in. Do not know why?
986          */
987         [DTO_XATTR_SET]     = 16,
988         [DTO_LOG_REC]       = 16,
989         /**
990          * creadits for inode change during write.
991          */
992         [DTO_WRITE_BASE]    = 3,
993         /**
994          * credits for single block write.
995          */
996         [DTO_WRITE_BLOCK]   = 16,
997         /**
998          * Attr set credits for chown.
999          * 3 (inode bit, group, GDT) +
1000          * 2 * QUOTA_INIT_BLOCKS(25) +
1001          * 2 * QUOTA_DEL_BLOCKS(9)
1002          */
1003         [DTO_ATTR_SET_CHOWN]= 71
1004 };
1005
1006 static int osd_credit_get(const struct lu_env *env, struct dt_device *d,
1007                           enum dt_txn_op op)
1008 {
1009         LASSERT(ARRAY_SIZE(osd_dto_credits_noquota) ==
1010                 ARRAY_SIZE(osd_dto_credits_quota));
1011         LASSERT(0 <= op && op < ARRAY_SIZE(osd_dto_credits_noquota));
1012 #ifdef HAVE_QUOTA_SUPPORT
1013         if (test_opt(osd_sb(osd_dt_dev(d)), QUOTA))
1014                 return osd_dto_credits_quota[op];
1015         else
1016 #endif
1017                 return osd_dto_credits_noquota[op];
1018 }
1019
1020 static const struct dt_device_operations osd_dt_ops = {
1021         .dt_root_get       = osd_root_get,
1022         .dt_statfs         = osd_statfs,
1023         .dt_trans_start    = osd_trans_start,
1024         .dt_trans_stop     = osd_trans_stop,
1025         .dt_conf_get       = osd_conf_get,
1026         .dt_sync           = osd_sync,
1027         .dt_ro             = osd_ro,
1028         .dt_commit_async   = osd_commit_async,
1029         .dt_credit_get     = osd_credit_get,
1030         .dt_init_capa_ctxt = osd_init_capa_ctxt,
1031         .dt_init_quota_ctxt= osd_init_quota_ctxt,
1032 };
1033
1034 static void osd_object_read_lock(const struct lu_env *env,
1035                                  struct dt_object *dt, unsigned role)
1036 {
1037         struct osd_object *obj = osd_dt_obj(dt);
1038         struct osd_thread_info *oti = osd_oti_get(env);
1039
1040         LINVRNT(osd_invariant(obj));
1041
1042         LASSERT(obj->oo_owner != env);
1043         down_read_nested(&obj->oo_sem, role);
1044
1045         LASSERT(obj->oo_owner == NULL);
1046         oti->oti_r_locks++;
1047 }
1048
1049 static void osd_object_write_lock(const struct lu_env *env,
1050                                   struct dt_object *dt, unsigned role)
1051 {
1052         struct osd_object *obj = osd_dt_obj(dt);
1053         struct osd_thread_info *oti = osd_oti_get(env);
1054
1055         LINVRNT(osd_invariant(obj));
1056
1057         LASSERT(obj->oo_owner != env);
1058         down_write_nested(&obj->oo_sem, role);
1059
1060         LASSERT(obj->oo_owner == NULL);
1061         obj->oo_owner = env;
1062         oti->oti_w_locks++;
1063 }
1064
1065 static void osd_object_read_unlock(const struct lu_env *env,
1066                                    struct dt_object *dt)
1067 {
1068         struct osd_object *obj = osd_dt_obj(dt);
1069         struct osd_thread_info *oti = osd_oti_get(env);
1070
1071         LINVRNT(osd_invariant(obj));
1072
1073         LASSERT(oti->oti_r_locks > 0);
1074         oti->oti_r_locks--;
1075         up_read(&obj->oo_sem);
1076 }
1077
1078 static void osd_object_write_unlock(const struct lu_env *env,
1079                                     struct dt_object *dt)
1080 {
1081         struct osd_object *obj = osd_dt_obj(dt);
1082         struct osd_thread_info *oti = osd_oti_get(env);
1083
1084         LINVRNT(osd_invariant(obj));
1085
1086         LASSERT(obj->oo_owner == env);
1087         LASSERT(oti->oti_w_locks > 0);
1088         oti->oti_w_locks--;
1089         obj->oo_owner = NULL;
1090         up_write(&obj->oo_sem);
1091 }
1092
1093 static int capa_is_sane(const struct lu_env *env,
1094                         struct osd_device *dev,
1095                         struct lustre_capa *capa,
1096                         struct lustre_capa_key *keys)
1097 {
1098         struct osd_thread_info *oti = osd_oti_get(env);
1099         struct lustre_capa *tcapa = &oti->oti_capa;
1100         struct obd_capa *oc;
1101         int i, rc = 0;
1102         ENTRY;
1103
1104         oc = capa_lookup(dev->od_capa_hash, capa, 0);
1105         if (oc) {
1106                 if (capa_is_expired(oc)) {
1107                         DEBUG_CAPA(D_ERROR, capa, "expired");
1108                         rc = -ESTALE;
1109                 }
1110                 capa_put(oc);
1111                 RETURN(rc);
1112         }
1113
1114         if (capa_is_expired_sec(capa)) {
1115                 DEBUG_CAPA(D_ERROR, capa, "expired");
1116                 RETURN(-ESTALE);
1117         }
1118
1119         spin_lock(&capa_lock);
1120         for (i = 0; i < 2; i++) {
1121                 if (keys[i].lk_keyid == capa->lc_keyid) {
1122                         oti->oti_capa_key = keys[i];
1123                         break;
1124                 }
1125         }
1126         spin_unlock(&capa_lock);
1127
1128         if (i == 2) {
1129                 DEBUG_CAPA(D_ERROR, capa, "no matched capa key");
1130                 RETURN(-ESTALE);
1131         }
1132
1133         rc = capa_hmac(tcapa->lc_hmac, capa, oti->oti_capa_key.lk_key);
1134         if (rc)
1135                 RETURN(rc);
1136
1137         if (memcmp(tcapa->lc_hmac, capa->lc_hmac, sizeof(capa->lc_hmac))) {
1138                 DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch");
1139                 RETURN(-EACCES);
1140         }
1141
1142         oc = capa_add(dev->od_capa_hash, capa);
1143         capa_put(oc);
1144
1145         RETURN(0);
1146 }
1147
1148 static int osd_object_auth(const struct lu_env *env, struct dt_object *dt,
1149                            struct lustre_capa *capa, __u64 opc)
1150 {
1151         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
1152         struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
1153         struct md_capainfo *ci;
1154         int rc;
1155
1156         if (!dev->od_fl_capa)
1157                 return 0;
1158
1159         if (capa == BYPASS_CAPA)
1160                 return 0;
1161
1162         ci = md_capainfo(env);
1163         if (unlikely(!ci))
1164                 return 0;
1165
1166         if (ci->mc_auth == LC_ID_NONE)
1167                 return 0;
1168
1169         if (!capa) {
1170                 CERROR("no capability is provided for fid "DFID"\n", PFID(fid));
1171                 return -EACCES;
1172         }
1173
1174         if (!lu_fid_eq(fid, &capa->lc_fid)) {
1175                 DEBUG_CAPA(D_ERROR, capa, "fid "DFID" mismatch with",
1176                            PFID(fid));
1177                 return -EACCES;
1178         }
1179
1180         if (!capa_opc_supported(capa, opc)) {
1181                 DEBUG_CAPA(D_ERROR, capa, "opc "LPX64" not supported by", opc);
1182                 return -EACCES;
1183         }
1184
1185         if ((rc = capa_is_sane(env, dev, capa, dev->od_capa_keys))) {
1186                 DEBUG_CAPA(D_ERROR, capa, "insane (rc %d)", rc);
1187                 return -EACCES;
1188         }
1189
1190         return 0;
1191 }
1192
1193 static int osd_attr_get(const struct lu_env *env,
1194                         struct dt_object *dt,
1195                         struct lu_attr *attr,
1196                         struct lustre_capa *capa)
1197 {
1198         struct osd_object *obj = osd_dt_obj(dt);
1199
1200         LASSERT(dt_object_exists(dt));
1201         LINVRNT(osd_invariant(obj));
1202
1203         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1204                 return -EACCES;
1205
1206         spin_lock(&obj->oo_guard);
1207         osd_inode_getattr(env, obj->oo_inode, attr);
1208         spin_unlock(&obj->oo_guard);
1209         return 0;
1210 }
1211
1212 static int osd_attr_set(const struct lu_env *env,
1213                         struct dt_object *dt,
1214                         const struct lu_attr *attr,
1215                         struct thandle *handle,
1216                         struct lustre_capa *capa)
1217 {
1218         struct osd_object *obj = osd_dt_obj(dt);
1219         int rc;
1220
1221         LASSERT(handle != NULL);
1222         LASSERT(dt_object_exists(dt));
1223         LASSERT(osd_invariant(obj));
1224
1225         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1226                 return -EACCES;
1227
1228         spin_lock(&obj->oo_guard);
1229         rc = osd_inode_setattr(env, obj->oo_inode, attr);
1230         spin_unlock(&obj->oo_guard);
1231
1232         if (!rc)
1233                 mark_inode_dirty(obj->oo_inode);
1234         return rc;
1235 }
1236
1237 static struct timespec *osd_inode_time(const struct lu_env *env,
1238                                        struct inode *inode, __u64 seconds)
1239 {
1240         struct osd_thread_info *oti = osd_oti_get(env);
1241         struct timespec        *t   = &oti->oti_time;
1242
1243         t->tv_sec  = seconds;
1244         t->tv_nsec = 0;
1245         *t = timespec_trunc(*t, get_sb_time_gran(inode->i_sb));
1246         return t;
1247 }
1248
1249 static int osd_inode_setattr(const struct lu_env *env,
1250                              struct inode *inode, const struct lu_attr *attr)
1251 {
1252         __u64 bits;
1253
1254         bits = attr->la_valid;
1255
1256         LASSERT(!(bits & LA_TYPE)); /* Huh? You want too much. */
1257
1258 #ifdef HAVE_QUOTA_SUPPORT
1259         if ((bits & LA_UID && attr->la_uid != inode->i_uid) ||
1260             (bits & LA_GID && attr->la_gid != inode->i_gid)) {
1261                 struct osd_ctxt *save = &osd_oti_get(env)->oti_ctxt;
1262                 struct iattr iattr;
1263                 int rc;
1264
1265                 iattr.ia_valid = 0;
1266                 if (bits & LA_UID)
1267                         iattr.ia_valid |= ATTR_UID;
1268                 if (bits & LA_GID)
1269                         iattr.ia_valid |= ATTR_GID;
1270                 iattr.ia_uid = attr->la_uid;
1271                 iattr.ia_gid = attr->la_gid;
1272                 osd_push_ctxt(env, save);
1273                 rc = DQUOT_TRANSFER(inode, &iattr) ? -EDQUOT : 0;
1274                 osd_pop_ctxt(save);
1275                 if (rc != 0)
1276                         return rc;
1277         }
1278 #endif
1279
1280         if (bits & LA_ATIME)
1281                 inode->i_atime  = *osd_inode_time(env, inode, attr->la_atime);
1282         if (bits & LA_CTIME)
1283                 inode->i_ctime  = *osd_inode_time(env, inode, attr->la_ctime);
1284         if (bits & LA_MTIME)
1285                 inode->i_mtime  = *osd_inode_time(env, inode, attr->la_mtime);
1286         if (bits & LA_SIZE) {
1287                 LDISKFS_I(inode)->i_disksize = attr->la_size;
1288                 i_size_write(inode, attr->la_size);
1289         }
1290 # if 0
1291         /*
1292          * OSD should not change "i_blocks" which is used by quota.
1293          * "i_blocks" should be changed by ldiskfs only.
1294          * Disable this assignment until SOM to fix some EA field. */
1295         if (bits & LA_BLOCKS)
1296                 inode->i_blocks = attr->la_blocks;
1297 #endif
1298         if (bits & LA_MODE)
1299                 inode->i_mode   = (inode->i_mode & S_IFMT) |
1300                         (attr->la_mode & ~S_IFMT);
1301         if (bits & LA_UID)
1302                 inode->i_uid    = attr->la_uid;
1303         if (bits & LA_GID)
1304                 inode->i_gid    = attr->la_gid;
1305         if (bits & LA_NLINK)
1306                 inode->i_nlink  = attr->la_nlink;
1307         if (bits & LA_RDEV)
1308                 inode->i_rdev   = attr->la_rdev;
1309
1310         if (bits & LA_FLAGS) {
1311                 struct ldiskfs_inode_info *li = LDISKFS_I(inode);
1312
1313                 li->i_flags = (li->i_flags & ~LDISKFS_FL_USER_MODIFIABLE) |
1314                         (attr->la_flags & LDISKFS_FL_USER_MODIFIABLE);
1315         }
1316         return 0;
1317 }
1318
1319 /*
1320  * Object creation.
1321  *
1322  * XXX temporary solution.
1323  */
1324
1325 static int osd_create_pre(struct osd_thread_info *info, struct osd_object *obj,
1326                           struct lu_attr *attr, struct thandle *th)
1327 {
1328         return 0;
1329 }
1330
1331 static int osd_create_post(struct osd_thread_info *info, struct osd_object *obj,
1332                            struct lu_attr *attr, struct thandle *th)
1333 {
1334         LASSERT(obj->oo_inode != NULL);
1335
1336         osd_object_init0(obj);
1337         return 0;
1338 }
1339
1340 extern struct inode *ldiskfs_create_inode(handle_t *handle,
1341                                           struct inode * dir, int mode);
1342 extern int ldiskfs_add_entry(handle_t *handle, struct dentry *dentry,
1343                              struct inode *inode);
1344 extern int ldiskfs_delete_entry(handle_t *handle,
1345                                 struct inode * dir,
1346                                 struct ldiskfs_dir_entry_2 * de_del,
1347                                 struct buffer_head * bh);
1348 extern struct buffer_head * ldiskfs_find_entry(struct dentry *dentry,
1349                                                struct ldiskfs_dir_entry_2
1350                                                ** res_dir);
1351 extern int ldiskfs_add_dot_dotdot(handle_t *handle, struct inode *dir,
1352                                   struct inode *inode);
1353
1354 extern int ldiskfs_xattr_set_handle(handle_t *handle, struct inode *inode,
1355                                     int name_index, const char *name,
1356                                     const void *value, size_t value_len,
1357                                     int flags);
1358
1359 static struct dentry * osd_child_dentry_get(const struct lu_env *env,
1360                                             struct osd_object *obj,
1361                                             const char *name,
1362                                             const int namelen)
1363 {
1364         struct osd_thread_info *info   = osd_oti_get(env);
1365         struct dentry *child_dentry = &info->oti_child_dentry;
1366         struct dentry *obj_dentry = &info->oti_obj_dentry;
1367
1368         obj_dentry->d_inode = obj->oo_inode;
1369         obj_dentry->d_sb = osd_sb(osd_obj2dev(obj));
1370         obj_dentry->d_name.hash = 0;
1371
1372         child_dentry->d_name.hash = 0;
1373         child_dentry->d_parent = obj_dentry;
1374         child_dentry->d_name.name = name;
1375         child_dentry->d_name.len = namelen;
1376         return child_dentry;
1377 }
1378
1379
1380 static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
1381                       umode_t mode,
1382                       struct dt_allocation_hint *hint,
1383                       struct thandle *th)
1384 {
1385         int result;
1386         struct osd_device  *osd = osd_obj2dev(obj);
1387         struct osd_thandle *oth;
1388         struct dt_object   *parent;
1389         struct inode       *inode;
1390 #ifdef HAVE_QUOTA_SUPPORT
1391         struct osd_ctxt    *save = &info->oti_ctxt;
1392 #endif
1393
1394         LINVRNT(osd_invariant(obj));
1395         LASSERT(obj->oo_inode == NULL);
1396
1397         oth = container_of(th, struct osd_thandle, ot_super);
1398         LASSERT(oth->ot_handle->h_transaction != NULL);
1399
1400         if (hint && hint->dah_parent)
1401                 parent = hint->dah_parent;
1402         else
1403                 parent = osd->od_obj_area;
1404
1405         LASSERT(parent != NULL);
1406         LASSERT(osd_dt_obj(parent)->oo_inode->i_op != NULL);
1407
1408 #ifdef HAVE_QUOTA_SUPPORT
1409         osd_push_ctxt(info->oti_env, save);
1410 #endif
1411         inode = ldiskfs_create_inode(oth->ot_handle,
1412                                      osd_dt_obj(parent)->oo_inode, mode);
1413 #ifdef HAVE_QUOTA_SUPPORT
1414         osd_pop_ctxt(save);
1415 #endif
1416         if (!IS_ERR(inode)) {
1417                 obj->oo_inode = inode;
1418                 result = 0;
1419         } else
1420                 result = PTR_ERR(inode);
1421         LINVRNT(osd_invariant(obj));
1422         return result;
1423 }
1424
1425
1426 extern int iam_lvar_create(struct inode *obj, int keysize, int ptrsize,
1427                            int recsize, handle_t *handle);
1428
1429 extern int iam_lfix_create(struct inode *obj, int keysize, int ptrsize,
1430                            int recsize, handle_t *handle);
1431
1432
1433 enum {
1434         OSD_NAME_LEN = 255
1435 };
1436
1437 static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj,
1438                      struct lu_attr *attr,
1439                      struct dt_allocation_hint *hint,
1440                      struct dt_object_format *dof,
1441                      struct thandle *th)
1442 {
1443         int result;
1444         struct osd_thandle *oth;
1445         struct osd_device *osd = osd_obj2dev(obj);
1446         __u32 mode = (attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX));
1447
1448         LASSERT(S_ISDIR(attr->la_mode));
1449
1450         oth = container_of(th, struct osd_thandle, ot_super);
1451         LASSERT(oth->ot_handle->h_transaction != NULL);
1452         result = osd_mkfile(info, obj, mode, hint, th);
1453         if (result == 0 && osd->od_iop_mode == 0) {
1454                 LASSERT(obj->oo_inode != NULL);
1455                 /*
1456                  * XXX uh-oh... call low-level iam function directly.
1457                  */
1458
1459                 result = iam_lvar_create(obj->oo_inode, OSD_NAME_LEN, 4,
1460                                          sizeof (struct lu_fid_pack),
1461                                          oth->ot_handle);
1462         }
1463         return result;
1464 }
1465
1466 static int osd_mk_index(struct osd_thread_info *info, struct osd_object *obj,
1467                         struct lu_attr *attr,
1468                         struct dt_allocation_hint *hint,
1469                         struct dt_object_format *dof,
1470                         struct thandle *th)
1471 {
1472         int result;
1473         struct osd_thandle *oth;
1474         const struct dt_index_features *feat = dof->u.dof_idx.di_feat;
1475
1476         __u32 mode = (attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX));
1477
1478         LASSERT(S_ISREG(attr->la_mode));
1479
1480         oth = container_of(th, struct osd_thandle, ot_super);
1481         LASSERT(oth->ot_handle->h_transaction != NULL);
1482
1483         result = osd_mkfile(info, obj, mode, hint, th);
1484         if (result == 0) {
1485                 LASSERT(obj->oo_inode != NULL);
1486                 if (feat->dif_flags & DT_IND_VARKEY)
1487                         result = iam_lvar_create(obj->oo_inode,
1488                                                  feat->dif_keysize_max,
1489                                                  feat->dif_ptrsize,
1490                                                  feat->dif_recsize_max,
1491                                                  oth->ot_handle);
1492                 else
1493                         result = iam_lfix_create(obj->oo_inode,
1494                                                  feat->dif_keysize_max,
1495                                                  feat->dif_ptrsize,
1496                                                  feat->dif_recsize_max,
1497                                                  oth->ot_handle);
1498
1499         }
1500         return result;
1501 }
1502
1503 static int osd_mkreg(struct osd_thread_info *info, struct osd_object *obj,
1504                      struct lu_attr *attr,
1505                      struct dt_allocation_hint *hint,
1506                      struct dt_object_format *dof,
1507                      struct thandle *th)
1508 {
1509         LASSERT(S_ISREG(attr->la_mode));
1510         return osd_mkfile(info, obj, (attr->la_mode &
1511                                (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1512 }
1513
1514 static int osd_mksym(struct osd_thread_info *info, struct osd_object *obj,
1515                      struct lu_attr *attr,
1516                      struct dt_allocation_hint *hint,
1517                      struct dt_object_format *dof,
1518                      struct thandle *th)
1519 {
1520         LASSERT(S_ISLNK(attr->la_mode));
1521         return osd_mkfile(info, obj, (attr->la_mode &
1522                               (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1523 }
1524
1525 static int osd_mknod(struct osd_thread_info *info, struct osd_object *obj,
1526                      struct lu_attr *attr,
1527                      struct dt_allocation_hint *hint,
1528                      struct dt_object_format *dof,
1529                      struct thandle *th)
1530 {
1531         umode_t mode = attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX);
1532         int result;
1533
1534         LINVRNT(osd_invariant(obj));
1535         LASSERT(obj->oo_inode == NULL);
1536         LASSERT(S_ISCHR(mode) || S_ISBLK(mode) ||
1537                 S_ISFIFO(mode) || S_ISSOCK(mode));
1538
1539         result = osd_mkfile(info, obj, mode, hint, th);
1540         if (result == 0) {
1541                 LASSERT(obj->oo_inode != NULL);
1542                 init_special_inode(obj->oo_inode, mode, attr->la_rdev);
1543         }
1544         LINVRNT(osd_invariant(obj));
1545         return result;
1546 }
1547
1548 typedef int (*osd_obj_type_f)(struct osd_thread_info *, struct osd_object *,
1549                               struct lu_attr *,
1550                               struct dt_allocation_hint *hint,
1551                               struct dt_object_format *dof,
1552                               struct thandle *);
1553
1554 static osd_obj_type_f osd_create_type_f(enum dt_format_type type)
1555 {
1556         osd_obj_type_f result;
1557
1558         switch (type) {
1559         case DFT_DIR:
1560                 result = osd_mkdir;
1561                 break;
1562         case DFT_REGULAR:
1563                 result = osd_mkreg;
1564                 break;
1565         case DFT_SYM:
1566                 result = osd_mksym;
1567                 break;
1568         case DFT_NODE:
1569                 result = osd_mknod;
1570                 break;
1571         case DFT_INDEX:
1572                 result = osd_mk_index;
1573                 break;
1574
1575         default:
1576                 LBUG();
1577                 break;
1578         }
1579         return result;
1580 }
1581
1582
1583 static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah,
1584                         struct dt_object *parent, umode_t child_mode)
1585 {
1586         LASSERT(ah);
1587
1588         memset(ah, 0, sizeof(*ah));
1589         ah->dah_parent = parent;
1590         ah->dah_mode = child_mode;
1591 }
1592
1593 /**
1594  * Helper function for osd_object_create()
1595  *
1596  * \retval 0, on success
1597  */
1598 static int __osd_object_create(struct osd_thread_info *info,
1599                                struct osd_object *obj, struct lu_attr *attr,
1600                                struct dt_allocation_hint *hint,
1601                                struct dt_object_format *dof,
1602                                struct thandle *th)
1603 {
1604
1605         int result;
1606
1607         result = osd_create_pre(info, obj, attr, th);
1608         if (result == 0) {
1609                 result = osd_create_type_f(dof->dof_type)(info, obj,
1610                                            attr, hint, dof, th);
1611                 if (result == 0)
1612                         result = osd_create_post(info, obj, attr, th);
1613         }
1614         return result;
1615 }
1616
1617 /**
1618  * Helper function for osd_object_create()
1619  *
1620  * \retval 0, on success
1621  */
1622 static int __osd_oi_insert(const struct lu_env *env, struct osd_object *obj,
1623                            const struct lu_fid *fid, struct thandle *th)
1624 {
1625         struct osd_thread_info *info = osd_oti_get(env);
1626         struct osd_inode_id    *id   = &info->oti_id;
1627         struct osd_device      *osd  = osd_obj2dev(obj);
1628         struct md_ucred        *uc   = md_ucred(env);
1629
1630         LASSERT(obj->oo_inode != NULL);
1631         LASSERT(uc != NULL);
1632
1633         id->oii_ino = obj->oo_inode->i_ino;
1634         id->oii_gen = obj->oo_inode->i_generation;
1635
1636         return osd_oi_insert(info, &osd->od_oi, fid, id, th,
1637                              uc->mu_cap & CFS_CAP_SYS_RESOURCE_MASK);
1638 }
1639
1640 static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
1641                              struct lu_attr *attr,
1642                              struct dt_allocation_hint *hint,
1643                              struct dt_object_format *dof,
1644                              struct thandle *th)
1645 {
1646         const struct lu_fid    *fid    = lu_object_fid(&dt->do_lu);
1647         struct osd_object      *obj    = osd_dt_obj(dt);
1648         struct osd_thread_info *info   = osd_oti_get(env);
1649         int result;
1650
1651         ENTRY;
1652
1653         LINVRNT(osd_invariant(obj));
1654         LASSERT(!dt_object_exists(dt));
1655         LASSERT(osd_write_locked(env, obj));
1656         LASSERT(th != NULL);
1657
1658         result = __osd_object_create(info, obj, attr, hint, dof, th);
1659         if (result == 0)
1660                 result = __osd_oi_insert(env, obj, fid, th);
1661
1662         LASSERT(ergo(result == 0, dt_object_exists(dt)));
1663         LASSERT(osd_invariant(obj));
1664         RETURN(result);
1665 }
1666
1667 /**
1668  * Helper function for osd_xattr_set()
1669  */
1670 static int __osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
1671                            const struct lu_buf *buf, const char *name, int fl)
1672 {
1673         struct osd_object      *obj      = osd_dt_obj(dt);
1674         struct inode           *inode    = obj->oo_inode;
1675         struct osd_thread_info *info     = osd_oti_get(env);
1676         struct dentry          *dentry   = &info->oti_child_dentry;
1677         struct timespec        *t        = &info->oti_time;
1678         int                     fs_flags = 0;
1679         int  rc;
1680
1681         LASSERT(dt_object_exists(dt));
1682         LASSERT(inode->i_op != NULL && inode->i_op->setxattr != NULL);
1683         LASSERT(osd_write_locked(env, obj));
1684
1685         if (fl & LU_XATTR_REPLACE)
1686                 fs_flags |= XATTR_REPLACE;
1687
1688         if (fl & LU_XATTR_CREATE)
1689                 fs_flags |= XATTR_CREATE;
1690
1691         dentry->d_inode = inode;
1692         *t = inode->i_ctime;
1693         rc = inode->i_op->setxattr(dentry, name, buf->lb_buf,
1694                                    buf->lb_len, fs_flags);
1695         if (likely(rc == 0)) {
1696                 spin_lock(&obj->oo_guard);
1697                 inode->i_ctime = *t;
1698                 spin_unlock(&obj->oo_guard);
1699                 mark_inode_dirty(inode);
1700         }
1701         return rc;
1702 }
1703
1704 /**
1705  * Put the fid into lustre_mdt_attrs, and then place the structure
1706  * inode's ea. This fid should not be altered during the life time
1707  * of the inode.
1708  *
1709  * \retval +ve, on success
1710  * \retval -ve, on error
1711  *
1712  * FIXME: It is good to have/use ldiskfs_xattr_set_handle() here
1713  */
1714 static int osd_ea_fid_set(const struct lu_env *env, struct dt_object *dt,
1715                           const struct lu_fid *fid)
1716 {
1717         struct osd_thread_info  *info      = osd_oti_get(env);
1718         struct lustre_mdt_attrs *mdt_attrs = &info->oti_mdt_attrs;
1719
1720         fid_cpu_to_be(&mdt_attrs->lma_self_fid, fid);
1721
1722         return __osd_xattr_set(env, dt,
1723                                osd_buf_get(env, mdt_attrs, sizeof *mdt_attrs),
1724                                XATTR_NAME_LMA, LU_XATTR_CREATE);
1725
1726 }
1727
1728 /**
1729  * Helper function to form igif
1730  */
1731 static inline void osd_igif_get(const struct lu_env *env, struct dentry *dentry,
1732                                 struct lu_fid *fid)
1733 {
1734         struct inode  *inode = dentry->d_inode;
1735         lu_igif_build(fid, inode->i_ino, inode->i_generation);
1736 }
1737
1738 /**
1739  * Helper function to pack the fid
1740  */
1741 static inline void osd_fid_pack(const struct lu_env *env, const struct lu_fid *fid,
1742                                 struct lu_fid_pack *pack)
1743 {
1744         fid_pack(pack, fid, &osd_oti_get(env)->oti_fid);
1745 }
1746
1747 /**
1748  * Try to read the fid from inode ea into dt_rec, if return value
1749  * i.e. rc is +ve, then we got fid, otherwise we will have to form igif
1750  *
1751  * \param rec, the data-structure into which fid/igif is read
1752  *
1753  * \retval 0, on success
1754  */
1755 static int osd_ea_fid_get(const struct lu_env *env, struct dentry *dentry,
1756                           struct dt_rec *rec)
1757 {
1758         struct inode            *inode     = dentry->d_inode;
1759         struct osd_thread_info  *info      = osd_oti_get(env);
1760         struct lustre_mdt_attrs *mdt_attrs = &info->oti_mdt_attrs;
1761         struct lu_fid           *fid       = &info->oti_fid;
1762         int rc;
1763
1764         LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
1765
1766         rc = inode->i_op->getxattr(dentry, XATTR_NAME_LMA, (void *)mdt_attrs,
1767                                    sizeof *mdt_attrs);
1768
1769         if (rc > 0) {
1770                 fid_be_to_cpu(fid, &mdt_attrs->lma_self_fid);
1771                 rc = 0;
1772         } else if (rc == -ENODATA) {
1773                 osd_igif_get(env, dentry, fid);
1774                 rc = 0;
1775         }
1776
1777         if (rc == 0)
1778                 osd_fid_pack(env, fid, (struct lu_fid_pack*)rec);
1779
1780         return rc;
1781 }
1782
1783 /**
1784  * OSD layer object create function for interoperability mode (b11826).
1785  * This is mostly similar to osd_object_create(). Only difference being, fid is
1786  * inserted into inode ea here.
1787  *
1788  * \retval   0, on success
1789  * \retval -ve, on error
1790  */
1791 static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt,
1792                              struct lu_attr *attr,
1793                              struct dt_allocation_hint *hint,
1794                              struct dt_object_format *dof,
1795                              struct thandle *th)
1796 {
1797         const struct lu_fid    *fid    = lu_object_fid(&dt->do_lu);
1798         struct osd_object      *obj    = osd_dt_obj(dt);
1799         struct osd_thread_info *info   = osd_oti_get(env);
1800         int result;
1801         int is_root = 0;
1802
1803         ENTRY;
1804
1805         LASSERT(osd_invariant(obj));
1806         LASSERT(!dt_object_exists(dt));
1807         LASSERT(osd_write_locked(env, obj));
1808         LASSERT(th != NULL);
1809
1810         result = __osd_object_create(info, obj, attr, hint, dof, th);
1811
1812         if (hint && hint->dah_parent)
1813                 is_root = osd_object_is_root(osd_dt_obj(hint->dah_parent));
1814
1815         /* objects under osd root shld have igif fid, so dont add fid EA */
1816         if (result == 0 && is_root == 0)
1817                 result = osd_ea_fid_set(env, dt, fid);
1818
1819         if (result == 0)
1820                 result = __osd_oi_insert(env, obj, fid, th);
1821
1822         LASSERT(ergo(result == 0, dt_object_exists(dt)));
1823         LINVRNT(osd_invariant(obj));
1824         RETURN(result);
1825 }
1826
1827 /*
1828  * Concurrency: @dt is write locked.
1829  */
1830 static void osd_object_ref_add(const struct lu_env *env,
1831                                struct dt_object *dt,
1832                                struct thandle *th)
1833 {
1834         struct osd_object *obj = osd_dt_obj(dt);
1835         struct inode *inode = obj->oo_inode;
1836
1837         LINVRNT(osd_invariant(obj));
1838         LASSERT(dt_object_exists(dt));
1839         LASSERT(osd_write_locked(env, obj));
1840         LASSERT(th != NULL);
1841
1842         spin_lock(&obj->oo_guard);
1843         LASSERT(inode->i_nlink < LDISKFS_LINK_MAX);
1844         inode->i_nlink++;
1845         spin_unlock(&obj->oo_guard);
1846         mark_inode_dirty(inode);
1847         LINVRNT(osd_invariant(obj));
1848 }
1849
1850 /*
1851  * Concurrency: @dt is write locked.
1852  */
1853 static void osd_object_ref_del(const struct lu_env *env,
1854                                struct dt_object *dt,
1855                                struct thandle *th)
1856 {
1857         struct osd_object *obj = osd_dt_obj(dt);
1858         struct inode *inode = obj->oo_inode;
1859
1860         LINVRNT(osd_invariant(obj));
1861         LASSERT(dt_object_exists(dt));
1862         LASSERT(osd_write_locked(env, obj));
1863         LASSERT(th != NULL);
1864
1865         spin_lock(&obj->oo_guard);
1866         LASSERT(inode->i_nlink > 0);
1867         inode->i_nlink--;
1868         spin_unlock(&obj->oo_guard);
1869         mark_inode_dirty(inode);
1870         LINVRNT(osd_invariant(obj));
1871 }
1872
1873 /*
1874  * Concurrency: @dt is read locked.
1875  */
1876 static int osd_xattr_get(const struct lu_env *env,
1877                          struct dt_object *dt,
1878                          struct lu_buf *buf,
1879                          const char *name,
1880                          struct lustre_capa *capa)
1881 {
1882         struct osd_object      *obj    = osd_dt_obj(dt);
1883         struct inode           *inode  = obj->oo_inode;
1884         struct osd_thread_info *info   = osd_oti_get(env);
1885         struct dentry          *dentry = &info->oti_obj_dentry;
1886
1887         LASSERT(dt_object_exists(dt));
1888         LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
1889         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
1890
1891         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1892                 return -EACCES;
1893
1894         dentry->d_inode = inode;
1895         return inode->i_op->getxattr(dentry, name, buf->lb_buf, buf->lb_len);
1896 }
1897
1898
1899 /*
1900  * Concurrency: @dt is write locked.
1901  */
1902 static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
1903                          const struct lu_buf *buf, const char *name, int fl,
1904                          struct thandle *handle, struct lustre_capa *capa)
1905 {
1906         LASSERT(handle != NULL);
1907
1908         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1909                 return -EACCES;
1910
1911         return __osd_xattr_set(env, dt, buf, name, fl);
1912 }
1913
1914 /*
1915  * Concurrency: @dt is read locked.
1916  */
1917 static int osd_xattr_list(const struct lu_env *env,
1918                           struct dt_object *dt,
1919                           struct lu_buf *buf,
1920                           struct lustre_capa *capa)
1921 {
1922         struct osd_object      *obj    = osd_dt_obj(dt);
1923         struct inode           *inode  = obj->oo_inode;
1924         struct osd_thread_info *info   = osd_oti_get(env);
1925         struct dentry          *dentry = &info->oti_obj_dentry;
1926
1927         LASSERT(dt_object_exists(dt));
1928         LASSERT(inode->i_op != NULL && inode->i_op->listxattr != NULL);
1929         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
1930
1931         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1932                 return -EACCES;
1933
1934         dentry->d_inode = inode;
1935         return inode->i_op->listxattr(dentry, buf->lb_buf, buf->lb_len);
1936 }
1937
1938 /*
1939  * Concurrency: @dt is write locked.
1940  */
1941 static int osd_xattr_del(const struct lu_env *env,
1942                          struct dt_object *dt,
1943                          const char *name,
1944                          struct thandle *handle,
1945                          struct lustre_capa *capa)
1946 {
1947         struct osd_object      *obj    = osd_dt_obj(dt);
1948         struct inode           *inode  = obj->oo_inode;
1949         struct osd_thread_info *info   = osd_oti_get(env);
1950         struct dentry          *dentry = &info->oti_obj_dentry;
1951         struct timespec        *t      = &info->oti_time;
1952         int                     rc;
1953
1954         LASSERT(dt_object_exists(dt));
1955         LASSERT(inode->i_op != NULL && inode->i_op->removexattr != NULL);
1956         LASSERT(osd_write_locked(env, obj));
1957         LASSERT(handle != NULL);
1958
1959         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1960                 return -EACCES;
1961
1962         dentry->d_inode = inode;
1963         *t = inode->i_ctime;
1964         rc = inode->i_op->removexattr(dentry, name);
1965         if (likely(rc == 0)) {
1966                 /* ctime should not be updated with server-side time. */
1967                 spin_lock(&obj->oo_guard);
1968                 inode->i_ctime = *t;
1969                 spin_unlock(&obj->oo_guard);
1970                 mark_inode_dirty(inode);
1971         }
1972         return rc;
1973 }
1974
1975 static struct obd_capa *osd_capa_get(const struct lu_env *env,
1976                                      struct dt_object *dt,
1977                                      struct lustre_capa *old,
1978                                      __u64 opc)
1979 {
1980         struct osd_thread_info *info = osd_oti_get(env);
1981         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
1982         struct osd_object *obj = osd_dt_obj(dt);
1983         struct osd_device *dev = osd_obj2dev(obj);
1984         struct lustre_capa_key *key = &info->oti_capa_key;
1985         struct lustre_capa *capa = &info->oti_capa;
1986         struct obd_capa *oc;
1987         struct md_capainfo *ci;
1988         int rc;
1989         ENTRY;
1990
1991         if (!dev->od_fl_capa)
1992                 RETURN(ERR_PTR(-ENOENT));
1993
1994         LASSERT(dt_object_exists(dt));
1995         LINVRNT(osd_invariant(obj));
1996
1997         /* renewal sanity check */
1998         if (old && osd_object_auth(env, dt, old, opc))
1999                 RETURN(ERR_PTR(-EACCES));
2000
2001         ci = md_capainfo(env);
2002         if (unlikely(!ci))
2003                 RETURN(ERR_PTR(-ENOENT));
2004
2005         switch (ci->mc_auth) {
2006         case LC_ID_NONE:
2007                 RETURN(NULL);
2008         case LC_ID_PLAIN:
2009                 capa->lc_uid = obj->oo_inode->i_uid;
2010                 capa->lc_gid = obj->oo_inode->i_gid;
2011                 capa->lc_flags = LC_ID_PLAIN;
2012                 break;
2013         case LC_ID_CONVERT: {
2014                 __u32 d[4], s[4];
2015
2016                 s[0] = obj->oo_inode->i_uid;
2017                 get_random_bytes(&(s[1]), sizeof(__u32));
2018                 s[2] = obj->oo_inode->i_gid;
2019                 get_random_bytes(&(s[3]), sizeof(__u32));
2020                 rc = capa_encrypt_id(d, s, key->lk_key, CAPA_HMAC_KEY_MAX_LEN);
2021                 if (unlikely(rc))
2022                         RETURN(ERR_PTR(rc));
2023
2024                 capa->lc_uid   = ((__u64)d[1] << 32) | d[0];
2025                 capa->lc_gid   = ((__u64)d[3] << 32) | d[2];
2026                 capa->lc_flags = LC_ID_CONVERT;
2027                 break;
2028         }
2029         default:
2030                 RETURN(ERR_PTR(-EINVAL));
2031         }
2032
2033         capa->lc_fid = *fid;
2034         capa->lc_opc = opc;
2035         capa->lc_flags |= dev->od_capa_alg << 24;
2036         capa->lc_timeout = dev->od_capa_timeout;
2037         capa->lc_expiry = 0;
2038
2039         oc = capa_lookup(dev->od_capa_hash, capa, 1);
2040         if (oc) {
2041                 LASSERT(!capa_is_expired(oc));
2042                 RETURN(oc);
2043         }
2044
2045         spin_lock(&capa_lock);
2046         *key = dev->od_capa_keys[1];
2047         spin_unlock(&capa_lock);
2048
2049         capa->lc_keyid = key->lk_keyid;
2050         capa->lc_expiry = cfs_time_current_sec() + dev->od_capa_timeout;
2051
2052         rc = capa_hmac(capa->lc_hmac, capa, key->lk_key);
2053         if (rc) {
2054                 DEBUG_CAPA(D_ERROR, capa, "HMAC failed: %d for", rc);
2055                 RETURN(ERR_PTR(rc));
2056         }
2057
2058         oc = capa_add(dev->od_capa_hash, capa);
2059         RETURN(oc);
2060 }
2061
2062 static int osd_object_sync(const struct lu_env *env, struct dt_object *dt)
2063 {
2064         int rc;
2065         struct osd_object      *obj    = osd_dt_obj(dt);
2066         struct inode           *inode  = obj->oo_inode;
2067         struct osd_thread_info *info   = osd_oti_get(env);
2068         struct dentry          *dentry = &info->oti_obj_dentry;
2069         struct file            *file   = &info->oti_file;
2070         ENTRY;
2071
2072         dentry->d_inode = inode;
2073         file->f_dentry = dentry;
2074         file->f_mapping = inode->i_mapping;
2075         file->f_op = inode->i_fop;
2076         LOCK_INODE_MUTEX(inode);
2077         rc = file->f_op->fsync(file, dentry, 0);
2078         UNLOCK_INODE_MUTEX(inode);
2079         RETURN(rc);
2080 }
2081
2082 static int osd_data_get(const struct lu_env *env, struct dt_object *dt,
2083                         void **data)
2084 {
2085         struct osd_object *obj = osd_dt_obj(dt);
2086         ENTRY;
2087
2088         *data = (void *)obj->oo_inode;
2089         RETURN(0);
2090 }
2091
2092 static const struct dt_object_operations osd_obj_ops = {
2093         .do_read_lock    = osd_object_read_lock,
2094         .do_write_lock   = osd_object_write_lock,
2095         .do_read_unlock  = osd_object_read_unlock,
2096         .do_write_unlock = osd_object_write_unlock,
2097         .do_attr_get     = osd_attr_get,
2098         .do_attr_set     = osd_attr_set,
2099         .do_ah_init      = osd_ah_init,
2100         .do_create       = osd_object_create,
2101         .do_index_try    = osd_index_try,
2102         .do_ref_add      = osd_object_ref_add,
2103         .do_ref_del      = osd_object_ref_del,
2104         .do_xattr_get    = osd_xattr_get,
2105         .do_xattr_set    = osd_xattr_set,
2106         .do_xattr_del    = osd_xattr_del,
2107         .do_xattr_list   = osd_xattr_list,
2108         .do_capa_get     = osd_capa_get,
2109         .do_object_sync  = osd_object_sync,
2110         .do_data_get     = osd_data_get,
2111 };
2112
2113 /**
2114  * dt_object_operations for interoperability mode
2115  * (i.e. to run 2.0 mds on 1.8 disk) (b11826)
2116  */
2117 static const struct dt_object_operations osd_obj_ea_ops = {
2118         .do_read_lock    = osd_object_read_lock,
2119         .do_write_lock   = osd_object_write_lock,
2120         .do_read_unlock  = osd_object_read_unlock,
2121         .do_write_unlock = osd_object_write_unlock,
2122         .do_attr_get     = osd_attr_get,
2123         .do_attr_set     = osd_attr_set,
2124         .do_ah_init      = osd_ah_init,
2125         .do_create       = osd_object_ea_create,
2126         .do_index_try    = osd_index_try,
2127         .do_ref_add      = osd_object_ref_add,
2128         .do_ref_del      = osd_object_ref_del,
2129         .do_xattr_get    = osd_xattr_get,
2130         .do_xattr_set    = osd_xattr_set,
2131         .do_xattr_del    = osd_xattr_del,
2132         .do_xattr_list   = osd_xattr_list,
2133         .do_capa_get     = osd_capa_get,
2134         .do_object_sync  = osd_object_sync,
2135         .do_data_get     = osd_data_get,
2136 };
2137
2138 /*
2139  * Body operations.
2140  */
2141
2142 /*
2143  * XXX: Another layering violation for now.
2144  *
2145  * We don't want to use ->f_op->read methods, because generic file write
2146  *
2147  *         - serializes on ->i_sem, and
2148  *
2149  *         - does a lot of extra work like balance_dirty_pages(),
2150  *
2151  * which doesn't work for globally shared files like /last-received.
2152  */
2153 int fsfilt_ldiskfs_read(struct inode *inode, void *buf, int size, loff_t *offs);
2154 int fsfilt_ldiskfs_write_handle(struct inode *inode, void *buf, int bufsize,
2155                                 loff_t *offs, handle_t *handle);
2156
2157 static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt,
2158                         struct lu_buf *buf, loff_t *pos,
2159                         struct lustre_capa *capa)
2160 {
2161         struct inode *inode = osd_dt_obj(dt)->oo_inode;
2162
2163         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
2164                 RETURN(-EACCES);
2165
2166         return fsfilt_ldiskfs_read(inode, buf->lb_buf, buf->lb_len, pos);
2167 }
2168
2169 static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
2170                          const struct lu_buf *buf, loff_t *pos,
2171                          struct thandle *handle, struct lustre_capa *capa,
2172                          int ignore_quota)
2173 {
2174         struct inode       *inode = osd_dt_obj(dt)->oo_inode;
2175         struct osd_thandle *oh;
2176         ssize_t             result;
2177 #ifdef HAVE_QUOTA_SUPPORT
2178         cfs_cap_t           save = current->cap_effective;
2179 #endif
2180
2181         LASSERT(handle != NULL);
2182
2183         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_WRITE))
2184                 RETURN(-EACCES);
2185
2186         oh = container_of(handle, struct osd_thandle, ot_super);
2187         LASSERT(oh->ot_handle->h_transaction != NULL);
2188 #ifdef HAVE_QUOTA_SUPPORT
2189         if (ignore_quota)
2190                 current->cap_effective |= CFS_CAP_SYS_RESOURCE_MASK;
2191         else
2192                 current->cap_effective &= ~CFS_CAP_SYS_RESOURCE_MASK;
2193 #endif
2194         result = fsfilt_ldiskfs_write_handle(inode, buf->lb_buf, buf->lb_len,
2195                                              pos, oh->ot_handle);
2196 #ifdef HAVE_QUOTA_SUPPORT
2197         current->cap_effective = save;
2198 #endif
2199         if (result == 0)
2200                 result = buf->lb_len;
2201         return result;
2202 }
2203
2204 static const struct dt_body_operations osd_body_ops = {
2205         .dbo_read  = osd_read,
2206         .dbo_write = osd_write
2207 };
2208
2209 /*
2210  * Index operations.
2211  */
2212
2213 static int osd_object_is_root(const struct osd_object *obj)
2214 {
2215         return osd_sb(osd_obj2dev(obj))->s_root->d_inode == obj->oo_inode;
2216 }
2217
2218 static int osd_iam_index_probe(const struct lu_env *env, struct osd_object *o,
2219                            const struct dt_index_features *feat)
2220 {
2221         struct iam_descr *descr;
2222         struct dt_object *dt = &o->oo_dt;
2223
2224         if (osd_object_is_root(o))
2225                 return feat == &dt_directory_features;
2226
2227         LASSERT(o->oo_dir != NULL);
2228
2229         descr = o->oo_dir->od_container.ic_descr;
2230         if (feat == &dt_directory_features) {
2231                 if (descr->id_rec_size == sizeof(struct lu_fid_pack))
2232                         return 1;
2233
2234                 if (descr == &iam_htree_compat_param) {
2235                         /* if it is a HTREE dir then there is good chance that,
2236                          * we dealing with ext3 directory here with no FIDs. */
2237
2238                         if (descr->id_rec_size ==
2239                             sizeof ((struct ldiskfs_dir_entry_2 *)NULL)->inode) {
2240
2241                                 dt->do_index_ops = &osd_index_ea_ops;
2242                                 return 1;
2243                         }
2244                 }
2245                 return 0;
2246         } else {
2247                 return
2248                         feat->dif_keysize_min <= descr->id_key_size &&
2249                         descr->id_key_size <= feat->dif_keysize_max &&
2250                         feat->dif_recsize_min <= descr->id_rec_size &&
2251                         descr->id_rec_size <= feat->dif_recsize_max &&
2252                         !(feat->dif_flags & (DT_IND_VARKEY |
2253                                              DT_IND_VARREC | DT_IND_NONUNQ)) &&
2254                         ergo(feat->dif_flags & DT_IND_UPDATE,
2255                              1 /* XXX check that object (and file system) is
2256                                 * writable */);
2257         }
2258 }
2259
2260 static int osd_iam_container_init(const struct lu_env *env,
2261                                   struct osd_object *obj,
2262                                   struct osd_directory *dir)
2263 {
2264         int result;
2265         struct iam_container *bag;
2266
2267         bag    = &dir->od_container;
2268         result = iam_container_init(bag, &dir->od_descr, obj->oo_inode);
2269         if (result == 0) {
2270                 result = iam_container_setup(bag);
2271                 if (result == 0)
2272                         obj->oo_dt.do_index_ops = &osd_index_iam_ops;
2273                 else
2274                         iam_container_fini(bag);
2275         }
2276         return result;
2277 }
2278
2279 /*
2280  * Concurrency: no external locking is necessary.
2281  */
2282 static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
2283                          const struct dt_index_features *feat)
2284 {
2285         int result;
2286         int ea_dir = 0;
2287         struct osd_object *obj = osd_dt_obj(dt);
2288         struct osd_device *osd = osd_obj2dev(obj);
2289
2290         LINVRNT(osd_invariant(obj));
2291         LASSERT(dt_object_exists(dt));
2292
2293         if (osd_object_is_root(obj)) {
2294                 dt->do_index_ops = &osd_index_ea_ops;
2295                 result = 0;
2296         } else if (feat == &dt_directory_features && osd->od_iop_mode) {
2297                 dt->do_index_ops = &osd_index_ea_ops;
2298                 if (S_ISDIR(obj->oo_inode->i_mode))
2299                         result = 0;
2300                 else
2301                         result = -ENOTDIR;
2302                 ea_dir = 1;
2303         } else if (!osd_has_index(obj)) {
2304                 struct osd_directory *dir;
2305
2306                 OBD_ALLOC_PTR(dir);
2307                 if (dir != NULL) {
2308                         sema_init(&dir->od_sem, 1);
2309
2310                         spin_lock(&obj->oo_guard);
2311                         if (obj->oo_dir == NULL)
2312                                 obj->oo_dir = dir;
2313                         else
2314                                 /*
2315                                  * Concurrent thread allocated container data.
2316                                  */
2317                                 OBD_FREE_PTR(dir);
2318                         spin_unlock(&obj->oo_guard);
2319                         /*
2320                          * Now, that we have container data, serialize its
2321                          * initialization.
2322                          */
2323                         down(&obj->oo_dir->od_sem);
2324                         /*
2325                          * recheck under lock.
2326                          */
2327                         if (!osd_has_index(obj))
2328                                 result = osd_iam_container_init(env, obj, dir);
2329                         else
2330                                 result = 0;
2331                         up(&obj->oo_dir->od_sem);
2332                 } else
2333                         result = -ENOMEM;
2334         } else
2335                 result = 0;
2336
2337         if (result == 0 && ea_dir == 0) {
2338                 if (!osd_iam_index_probe(env, obj, feat))
2339                         result = -ENOTDIR;
2340         }
2341         LINVRNT(osd_invariant(obj));
2342
2343         return result;
2344 }
2345
2346 /**
2347  *      delete a (key, value) pair from index \a dt specified by \a key
2348  *
2349  *      \param  dt_object      osd index object
2350  *      \param  key     key for index
2351  *      \param  rec     record reference
2352  *      \param  handle  transaction handler
2353  *
2354  *      \retval  0  success
2355  *      \retval -ve   failure
2356  */
2357
2358 static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt,
2359                                 const struct dt_key *key, struct thandle *handle,
2360                                 struct lustre_capa *capa)
2361 {
2362         struct osd_object     *obj = osd_dt_obj(dt);
2363         struct osd_thandle    *oh;
2364         struct iam_path_descr *ipd;
2365         struct iam_container  *bag = &obj->oo_dir->od_container;
2366         int rc;
2367
2368         ENTRY;
2369
2370         LINVRNT(osd_invariant(obj));
2371         LASSERT(dt_object_exists(dt));
2372         LASSERT(bag->ic_object == obj->oo_inode);
2373         LASSERT(handle != NULL);
2374
2375         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
2376                 RETURN(-EACCES);
2377
2378         ipd = osd_idx_ipd_get(env, bag);
2379         if (unlikely(ipd == NULL))
2380                 RETURN(-ENOMEM);
2381
2382         oh = container_of0(handle, struct osd_thandle, ot_super);
2383         LASSERT(oh->ot_handle != NULL);
2384         LASSERT(oh->ot_handle->h_transaction != NULL);
2385
2386         rc = iam_delete(oh->ot_handle, bag, (const struct iam_key *)key, ipd);
2387         osd_ipd_put(env, bag, ipd);
2388         LINVRNT(osd_invariant(obj));
2389         RETURN(rc);
2390 }
2391
2392 /**
2393  * Index delete function for interoperability mode (b11826).
2394  * It will remove the directory entry added by osd_index_ea_insert().
2395  * This entry is needed to maintain name->fid mapping.
2396  *
2397  * \param key,  key i.e. file entry to be deleted
2398  *
2399  * \retval   0, on success
2400  * \retval -ve, on error
2401  */
2402 static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
2403                                const struct dt_key *key, struct thandle *handle,
2404                                struct lustre_capa *capa)
2405 {
2406         struct osd_object          *obj    = osd_dt_obj(dt);
2407         struct inode               *dir    = obj->oo_inode;
2408         struct dentry              *dentry;
2409         struct osd_thandle         *oh;
2410         struct ldiskfs_dir_entry_2 *de;
2411         struct buffer_head         *bh;
2412
2413         int rc;
2414
2415         ENTRY;
2416
2417         LINVRNT(osd_invariant(obj));
2418         LASSERT(dt_object_exists(dt));
2419         LASSERT(handle != NULL);
2420
2421         oh = container_of(handle, struct osd_thandle, ot_super);
2422         LASSERT(oh->ot_handle != NULL);
2423         LASSERT(oh->ot_handle->h_transaction != NULL);
2424
2425         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
2426                 RETURN(-EACCES);
2427
2428         dentry = osd_child_dentry_get(env, obj,
2429                                       (char *)key, strlen((char *)key));
2430         bh = ldiskfs_find_entry(dentry, &de);
2431         if (bh) {
2432                 rc = ldiskfs_delete_entry(oh->ot_handle,
2433                                 dir, de, bh);
2434                 if (!rc)
2435                         mark_inode_dirty(dir);
2436                 brelse(bh);
2437         } else
2438                 rc = -ENOENT;
2439
2440         LASSERT(osd_invariant(obj));
2441         RETURN(rc);
2442 }
2443
2444 /**
2445  *      Lookup index for \a key and copy record to \a rec.
2446  *
2447  *      \param  dt_object      osd index object
2448  *      \param  key     key for index
2449  *      \param  rec     record reference
2450  *
2451  *      \retval  +ve  success : exact mach
2452  *      \retval  0    return record with key not greater than \a key
2453  *      \retval -ve   failure
2454  */
2455 static int osd_index_iam_lookup(const struct lu_env *env, struct dt_object *dt,
2456                                 struct dt_rec *rec, const struct dt_key *key,
2457                                 struct lustre_capa *capa)
2458 {
2459         struct osd_object     *obj = osd_dt_obj(dt);
2460         struct iam_path_descr *ipd;
2461         struct iam_container  *bag = &obj->oo_dir->od_container;
2462         struct osd_thread_info *oti = osd_oti_get(env);
2463         struct iam_iterator    *it = &oti->oti_idx_it;
2464         int rc;
2465         ENTRY;
2466
2467         LASSERT(osd_invariant(obj));
2468         LASSERT(dt_object_exists(dt));
2469         LASSERT(bag->ic_object == obj->oo_inode);
2470
2471         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
2472                 RETURN(-EACCES);
2473
2474         ipd = osd_idx_ipd_get(env, bag);
2475         if (IS_ERR(ipd))
2476                 RETURN(-ENOMEM);
2477
2478         /* got ipd now we can start iterator. */
2479         iam_it_init(it, bag, 0, ipd);
2480
2481         rc = iam_it_get(it, (struct iam_key *)key);
2482         if (rc >= 0)
2483                 iam_reccpy(&it->ii_path.ip_leaf, (struct iam_rec *)rec);
2484
2485         iam_it_put(it);
2486         iam_it_fini(it);
2487         osd_ipd_put(env, bag, ipd);
2488
2489         LINVRNT(osd_invariant(obj));
2490
2491         RETURN(rc);
2492 }
2493
2494 /**
2495  *      Inserts (key, value) pair in \a dt index object.
2496  *
2497  *      \param  dt      osd index object
2498  *      \param  key     key for index
2499  *      \param  rec     record reference
2500  *      \param  th      transaction handler
2501  *
2502  *      \retval  0  success
2503  *      \retval -ve failure
2504  */
2505 static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt,
2506                                 const struct dt_rec *rec, const struct dt_key *key,
2507                                 struct thandle *th, struct lustre_capa *capa,
2508                                 int ignore_quota)
2509 {
2510         struct osd_object     *obj = osd_dt_obj(dt);
2511         struct iam_path_descr *ipd;
2512         struct osd_thandle    *oh;
2513         struct iam_container  *bag = &obj->oo_dir->od_container;
2514 #ifdef HAVE_QUOTA_SUPPORT
2515         cfs_cap_t              save = current->cap_effective;
2516 #endif
2517         int rc;
2518
2519         ENTRY;
2520
2521         LINVRNT(osd_invariant(obj));
2522         LASSERT(dt_object_exists(dt));
2523         LASSERT(bag->ic_object == obj->oo_inode);
2524         LASSERT(th != NULL);
2525
2526         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
2527                 return -EACCES;
2528
2529         ipd = osd_idx_ipd_get(env, bag);
2530         if (unlikely(ipd == NULL))
2531                 RETURN(-ENOMEM);
2532
2533         oh = container_of0(th, struct osd_thandle, ot_super);
2534         LASSERT(oh->ot_handle != NULL);
2535         LASSERT(oh->ot_handle->h_transaction != NULL);
2536 #ifdef HAVE_QUOTA_SUPPORT
2537         if (ignore_quota)
2538                 current->cap_effective |= CFS_CAP_SYS_RESOURCE_MASK;
2539         else
2540                 current->cap_effective &= ~CFS_CAP_SYS_RESOURCE_MASK;
2541 #endif
2542         rc = iam_insert(oh->ot_handle, bag, (const struct iam_key *)key,
2543                         (struct iam_rec *)rec, ipd);
2544 #ifdef HAVE_QUOTA_SUPPORT
2545         current->cap_effective = save;
2546 #endif
2547         osd_ipd_put(env, bag, ipd);
2548         LINVRNT(osd_invariant(obj));
2549         RETURN(rc);
2550 }
2551
2552 /**
2553  * Calls ldiskfs_add_dot_dotdot() to add dot and dotdot entries
2554  * into the directory.Also sets flags into osd object to
2555  * indicate dot and dotdot are created. This is required for
2556  * interoperability mode (b11826)
2557  *
2558  * \param dir   directory for dot and dotdot fixup.
2559  * \param obj   child object for linking
2560  *
2561  * \retval   0, on success
2562  * \retval -ve, on error
2563  */
2564 static int osd_add_dot_dotdot(struct osd_thread_info *info,
2565                               struct osd_object *dir,
2566                               struct osd_object *obj, const char *name,
2567                               struct thandle *th)
2568 {
2569         struct inode            *parent_dir   = obj->oo_inode;
2570         struct inode            *inode  = dir->oo_inode;
2571         struct osd_thandle      *oth;
2572         int result = 0;
2573
2574         oth = container_of(th, struct osd_thandle, ot_super);
2575         LASSERT(oth->ot_handle->h_transaction != NULL);
2576         LASSERT(S_ISDIR(dir->oo_inode->i_mode));
2577
2578         if (strcmp(name, dot) == 0) {
2579                 if (dir->oo_compat_dot_created) {
2580                         result = -EEXIST;
2581                 } else {
2582                         LASSERT(obj == dir);
2583                         dir->oo_compat_dot_created = 1;
2584                         result = 0;
2585                 }
2586         } else if(strcmp(name, dotdot) == 0) {
2587                 if (!dir->oo_compat_dot_created)
2588                         return -EINVAL;
2589                 if (dir->oo_compat_dotdot_created)
2590                         return __osd_ea_add_rec(info, dir, obj, name, th);
2591
2592                 result = ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir, inode);
2593                 if (result == 0)
2594                        dir->oo_compat_dotdot_created = 1;
2595         }
2596
2597         return result;
2598 }
2599
2600 /**
2601  * Calls ldiskfs_add_entry() to add directory entry
2602  * into the directory. This is required for
2603  * interoperability mode (b11826)
2604  *
2605  * \retval   0, on success
2606  * \retval -ve, on error
2607  */
2608 static int __osd_ea_add_rec(struct osd_thread_info *info,
2609                             struct osd_object *pobj,
2610                             struct osd_object *cobj,
2611                             const char *name,
2612                             struct thandle *th)
2613 {
2614         struct dentry      *child;
2615         struct osd_thandle *oth;
2616         struct inode       *cinode  = cobj->oo_inode;
2617         int rc;
2618
2619         oth = container_of(th, struct osd_thandle, ot_super);
2620         LASSERT(oth->ot_handle != NULL);
2621         LASSERT(oth->ot_handle->h_transaction != NULL);
2622
2623         child = osd_child_dentry_get(info->oti_env, pobj, name, strlen(name));
2624         rc = ldiskfs_add_entry(oth->ot_handle, child, cinode);
2625
2626         RETURN(rc);
2627 }
2628
2629 /**
2630  * It will call the appropriate osd_add* function and return the
2631  * value, return by respective functions.
2632  */
2633 static int osd_ea_add_rec(const struct lu_env *env,
2634                           struct osd_object *pobj,
2635                           struct osd_object *cobj,
2636                           const char *name,
2637                           struct thandle *th)
2638 {
2639         struct osd_thread_info    *info   = osd_oti_get(env);
2640         int rc;
2641
2642         if (name[0] == '.' && (name[1] == '\0' || (name[1] == '.' &&
2643                                                    name[2] =='\0')))
2644                 rc = osd_add_dot_dotdot(info, pobj, cobj, name, th);
2645         else
2646                 rc = __osd_ea_add_rec(info, pobj, cobj, name, th);
2647
2648         return rc;
2649 }
2650
2651 /**
2652  * Calls ->lookup() to find dentry. From dentry get inode and
2653  * read inode's ea to get fid. This is required for  interoperability
2654  * mode (b11826)
2655  *
2656  * \retval   0, on success
2657  * \retval -ve, on error
2658  */
2659 static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
2660                              struct dt_rec *rec, const struct dt_key *key)
2661 {
2662         struct inode            *dir    = obj->oo_inode;
2663         struct osd_thread_info  *info   = osd_oti_get(env);
2664         struct dentry           *dentry;
2665         struct osd_device      *dev = osd_dev(obj->oo_dt.do_lu.lo_dev);
2666         struct osd_inode_id    *id     = &info->oti_id;
2667         struct ldiskfs_dir_entry_2 *de;
2668         struct buffer_head         *bh;
2669         struct inode *inode;
2670         int ino;
2671         int rc;
2672
2673         LASSERT(dir->i_op != NULL && dir->i_op->lookup != NULL);
2674
2675         dentry = osd_child_dentry_get(env, obj,
2676                                       (char *)key, strlen((char *)key));
2677         bh = ldiskfs_find_entry(dentry, &de);
2678         if (bh) {
2679                 ino = le32_to_cpu(de->inode);
2680                 brelse(bh);
2681                 id->oii_ino = ino;
2682                 id->oii_gen = OSD_OII_NOGEN;
2683
2684                 inode = osd_iget(info, dev, id);
2685                 if (!IS_ERR(inode)) {
2686                         dentry->d_inode = inode;
2687
2688                         rc = osd_ea_fid_get(env, dentry, rec);
2689                         iput(inode);
2690                 } else
2691                         rc = -ENOENT;
2692         } else
2693                 rc = -ENOENT;
2694
2695         RETURN (rc);
2696 }
2697
2698 /**
2699  * Find the osd object for given fid.
2700  *
2701  * \param fid, need to find the osd object having this fid
2702  *
2703  * \retval osd_object, on success
2704  * \retval        -ve, on error
2705  */
2706 struct osd_object *osd_object_find(const struct lu_env *env,
2707                                    struct dt_object *dt,
2708                                    const struct lu_fid *fid)
2709 {
2710         struct lu_device         *ludev = dt->do_lu.lo_dev;
2711         struct osd_object        *child = NULL;
2712         struct lu_object         *luch;
2713         struct lu_object         *lo;
2714
2715         luch = lu_object_find(env, ludev, fid, NULL);
2716         if (!IS_ERR(luch)) {
2717                 if (lu_object_exists(luch)) {
2718                         lo = lu_object_locate(luch->lo_header, ludev->ld_type);
2719                         if (lo != NULL)
2720                                 child = osd_obj(lo);
2721                         else
2722                                 LU_OBJECT_DEBUG(D_ERROR, env, luch,
2723                                                 "lu_object can't be located"
2724                                                 ""DFID"\n", PFID(fid));
2725
2726                         if (child == NULL) {
2727                                 lu_object_put(env, luch);
2728                                 CERROR("Unable to get osd_object\n");
2729                                 child = ERR_PTR(-ENOENT);
2730                         }
2731                 } else {
2732                         LU_OBJECT_DEBUG(D_ERROR, env, luch,
2733                                         "lu_object does not exists "DFID"\n",
2734                                         PFID(fid));
2735                         child = ERR_PTR(-ENOENT);
2736                 }
2737         } else
2738                 child = (void *)luch;
2739
2740         return child;
2741 }
2742
2743 /**
2744  * Put the osd object once done with it.
2745  *
2746  * \param obj, osd object that needs to be put
2747  */
2748 static inline void osd_object_put(const struct lu_env *env,
2749                                   struct osd_object *obj)
2750 {
2751         lu_object_put(env, &obj->oo_dt.do_lu);
2752 }
2753
2754 /**
2755  * Index add function for interoperability mode (b11826).
2756  * It will add the directory entry.This entry is needed to
2757  * maintain name->fid mapping.
2758  *
2759  * \param key, it is key i.e. file entry to be inserted
2760  * \param rec, it is value of given key i.e. fid
2761  *
2762  * \retval   0, on success
2763  * \retval -ve, on error
2764  */
2765 static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
2766                                const struct dt_rec *rec,
2767                                const struct dt_key *key, struct thandle *th,
2768                                struct lustre_capa *capa, int ignore_quota)
2769 {
2770         struct osd_object        *obj   = osd_dt_obj(dt);
2771         struct lu_fid            *fid   = &osd_oti_get(env)->oti_fid;
2772         const struct lu_fid_pack *pack  = (const struct lu_fid_pack *)rec;
2773         const char               *name  = (const char *)key;
2774         struct osd_object        *child;
2775 #ifdef HAVE_QUOTA_SUPPORT
2776         cfs_cap_t              save = current->cap_effective;
2777 #endif
2778         int rc;
2779
2780         ENTRY;
2781
2782         LASSERT(osd_invariant(obj));
2783         LASSERT(dt_object_exists(dt));
2784         LASSERT(th != NULL);
2785
2786         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
2787                 RETURN(-EACCES);
2788
2789         rc = fid_unpack(pack, fid);
2790         if (rc != 0)
2791                 RETURN(rc);
2792         child = osd_object_find(env, dt, fid);
2793         if (!IS_ERR(child)) {
2794 #ifdef HAVE_QUOTA_SUPPORT
2795                 if (ignore_quota)
2796                         current->cap_effective |= CFS_CAP_SYS_RESOURCE_MASK;
2797                 else
2798                         current->cap_effective &= ~CFS_CAP_SYS_RESOURCE_MASK;
2799 #endif
2800                 rc = osd_ea_add_rec(env, obj, child, name, th);
2801
2802 #ifdef HAVE_QUOTA_SUPPORT
2803                 current->cap_effective = save;
2804 #endif
2805                 osd_object_put(env, child);
2806         } else {
2807                 rc = PTR_ERR(child);
2808         }
2809
2810         LASSERT(osd_invariant(obj));
2811         RETURN(rc);
2812 }
2813
2814 /**
2815  *  Initialize osd Iterator for given osd index object.
2816  *
2817  *  \param  dt      osd index object
2818  */
2819
2820 static struct dt_it *osd_it_iam_init(const struct lu_env *env,
2821                                  struct dt_object *dt,
2822                                  struct lustre_capa *capa)
2823 {
2824         struct osd_it_iam         *it;
2825         struct osd_thread_info *oti = osd_oti_get(env);
2826         struct osd_object     *obj = osd_dt_obj(dt);
2827         struct lu_object      *lo  = &dt->do_lu;
2828         struct iam_path_descr *ipd;
2829         struct iam_container  *bag = &obj->oo_dir->od_container;
2830
2831         LASSERT(lu_object_exists(lo));
2832
2833         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
2834                 return ERR_PTR(-EACCES);
2835
2836         it = &oti->oti_it;
2837         ipd = osd_it_ipd_get(env, bag);
2838         if (likely(ipd != NULL)) {
2839                 it->oi_obj = obj;
2840                 it->oi_ipd = ipd;
2841                 lu_object_get(lo);
2842                 iam_it_init(&it->oi_it, bag, IAM_IT_MOVE, ipd);
2843                 return (struct dt_it *)it;
2844         }
2845         return ERR_PTR(-ENOMEM);
2846 }
2847
2848 /**
2849  * free given Iterator.
2850  */
2851
2852 static void osd_it_iam_fini(const struct lu_env *env, struct dt_it *di)
2853 {
2854         struct osd_it_iam     *it = (struct osd_it_iam *)di;
2855         struct osd_object *obj = it->oi_obj;
2856
2857         iam_it_fini(&it->oi_it);
2858         osd_ipd_put(env, &obj->oo_dir->od_container, it->oi_ipd);
2859         lu_object_put(env, &obj->oo_dt.do_lu);
2860 }
2861
2862 /**
2863  *  Move Iterator to record specified by \a key
2864  *
2865  *  \param  di      osd iterator
2866  *  \param  key     key for index
2867  *
2868  *  \retval +ve  di points to record with least key not larger than key
2869  *  \retval  0   di points to exact matched key
2870  *  \retval -ve  failure
2871  */
2872
2873 static int osd_it_iam_get(const struct lu_env *env,
2874                       struct dt_it *di, const struct dt_key *key)
2875 {
2876         struct osd_it_iam *it = (struct osd_it_iam *)di;
2877
2878         return iam_it_get(&it->oi_it, (const struct iam_key *)key);
2879 }
2880
2881 /**
2882  *  Release Iterator
2883  *
2884  *  \param  di      osd iterator
2885  */
2886
2887 static void osd_it_iam_put(const struct lu_env *env, struct dt_it *di)
2888 {
2889         struct osd_it_iam *it = (struct osd_it_iam *)di;
2890
2891         iam_it_put(&it->oi_it);
2892 }
2893
2894 /**
2895  *  Move iterator by one record
2896  *
2897  *  \param  di      osd iterator
2898  *
2899  *  \retval +1   end of container reached
2900  *  \retval  0   success
2901  *  \retval -ve  failure
2902  */
2903
2904 static int osd_it_iam_next(const struct lu_env *env, struct dt_it *di)
2905 {
2906         struct osd_it_iam *it = (struct osd_it_iam *)di;
2907
2908         return iam_it_next(&it->oi_it);
2909 }
2910
2911 /**
2912  * Return pointer to the key under iterator.
2913  */
2914
2915 static struct dt_key *osd_it_iam_key(const struct lu_env *env,
2916                                  const struct dt_it *di)
2917 {
2918         struct osd_it_iam *it = (struct osd_it_iam *)di;
2919
2920         return (struct dt_key *)iam_it_key_get(&it->oi_it);
2921 }
2922
2923 /**
2924  * Return size of key under iterator (in bytes)
2925  */
2926
2927 static int osd_it_iam_key_size(const struct lu_env *env, const struct dt_it *di)
2928 {
2929         struct osd_it_iam *it = (struct osd_it_iam *)di;
2930
2931         return iam_it_key_size(&it->oi_it);
2932 }
2933
2934 /**
2935  * Return pointer to the record under iterator.
2936  */
2937 static struct dt_rec *osd_it_iam_rec(const struct lu_env *env,
2938                                  const struct dt_it *di)
2939 {
2940         struct osd_it_iam *it = (struct osd_it_iam *)di;
2941
2942         return (struct dt_rec *)iam_it_rec_get(&it->oi_it);
2943 }
2944
2945 /**
2946  * Returns cookie for current Iterator position.
2947  */
2948 static __u64 osd_it_iam_store(const struct lu_env *env, const struct dt_it *di)
2949 {
2950         struct osd_it_iam *it = (struct osd_it_iam *)di;
2951
2952         return iam_it_store(&it->oi_it);
2953 }
2954
2955 /**
2956  * Restore iterator from cookie.
2957  *
2958  * \param  di      osd iterator
2959  * \param  hash    Iterator location cookie
2960  *
2961  * \retval +ve  di points to record with least key not larger than key.
2962  * \retval  0   di points to exact matched key
2963  * \retval -ve  failure
2964  */
2965
2966 static int osd_it_iam_load(const struct lu_env *env,
2967                        const struct dt_it *di, __u64 hash)
2968 {
2969         struct osd_it_iam *it = (struct osd_it_iam *)di;
2970
2971         return iam_it_load(&it->oi_it, hash);
2972 }
2973
2974 static const struct dt_index_operations osd_index_iam_ops = {
2975         .dio_lookup = osd_index_iam_lookup,
2976         .dio_insert = osd_index_iam_insert,
2977         .dio_delete = osd_index_iam_delete,
2978         .dio_it     = {
2979                 .init     = osd_it_iam_init,
2980                 .fini     = osd_it_iam_fini,
2981                 .get      = osd_it_iam_get,
2982                 .put      = osd_it_iam_put,
2983                 .next     = osd_it_iam_next,
2984                 .key      = osd_it_iam_key,
2985                 .key_size = osd_it_iam_key_size,
2986                 .rec      = osd_it_iam_rec,
2987                 .store    = osd_it_iam_store,
2988                 .load     = osd_it_iam_load
2989         }
2990 };
2991
2992 /**
2993  * Creates or initializes iterator context.
2994  *
2995  * \retval struct osd_it_ea, iterator structure on success
2996  *
2997  */
2998 static struct dt_it *osd_it_ea_init(const struct lu_env *env,
2999                                     struct dt_object *dt,
3000                                     struct lustre_capa *capa)
3001 {
3002         struct osd_object       *obj  = osd_dt_obj(dt);
3003         struct osd_thread_info  *info = osd_oti_get(env);
3004         struct osd_it_ea        *it   = &info->oti_it_ea;
3005         struct lu_object        *lo   = &dt->do_lu;
3006         struct dentry           *obj_dentry = &info->oti_it_dentry;
3007         ENTRY;
3008         LASSERT(lu_object_exists(lo));
3009
3010         obj_dentry->d_inode = obj->oo_inode;
3011         obj_dentry->d_sb = osd_sb(osd_obj2dev(obj));
3012         obj_dentry->d_name.hash = 0;
3013
3014         it->oie_namelen         = 0;
3015         it->oie_curr_pos        = 0;
3016         it->oie_next_pos        = 0;
3017         it->oie_obj             = obj;
3018         it->oie_file.f_dentry   = obj_dentry;
3019         it->oie_file.f_mapping    = obj->oo_inode->i_mapping;
3020         it->oie_file.f_op         = obj->oo_inode->i_fop;
3021         it->oie_file.private_data = NULL;
3022         lu_object_get(lo);
3023
3024         RETURN((struct dt_it*) it);
3025 }
3026
3027 /**
3028  * Destroy or finishes iterator context.
3029  *
3030  * \param di, struct osd_it_ea, iterator structure to be destroyed
3031  */
3032 static void osd_it_ea_fini(const struct lu_env *env, struct dt_it *di)
3033 {
3034         struct osd_it_ea     *it   = (struct osd_it_ea *)di;
3035         struct osd_object    *obj  = it->oie_obj;
3036
3037
3038         ENTRY;
3039         lu_object_put(env, &obj->oo_dt.do_lu);
3040         EXIT;
3041 }
3042
3043 /**
3044  * It position the iterator at given key, so that next lookup continues from
3045  * that key Or it is similar to dio_it->load() but based on a key,
3046  * rather than file position.
3047  *
3048  * As a special convention, osd_it_ea_get(env, di, "") has to rewind iterator
3049  * to the beginning.
3050  *
3051  * TODO: Presently return +1 considering it is only used by mdd_dir_is_empty().
3052  */
3053 static int osd_it_ea_get(const struct lu_env *env,
3054                          struct dt_it *di, const struct dt_key *key)
3055 {
3056         struct osd_it_ea     *it   = (struct osd_it_ea *)di;
3057
3058         ENTRY;
3059         LASSERT(((const char *)key)[0] == '\0');
3060         it->oie_namelen         = 0;
3061         it->oie_curr_pos        = 0;
3062         it->oie_next_pos        = 0;
3063
3064         RETURN(+1);
3065 }
3066
3067 /**
3068  * Does nothing
3069  */
3070 static void osd_it_ea_put(const struct lu_env *env, struct dt_it *di)
3071 {
3072 }
3073
3074 /**
3075  * It is called internally by ->readdir(). It fills the
3076  * iterator's in-memory data structure with required
3077  * information i.e. name, namelen, rec_size etc.
3078  *
3079  * \param buf, in which information to be filled in.
3080  * \param name, name of the file in given dir
3081  *
3082  * \retval 0, on success
3083  * \retval 1, on buffer full
3084  */
3085 static int osd_ldiskfs_filldir(char *buf, const char *name, int namelen,
3086                                loff_t offset, ino_t ino,
3087                                unsigned int d_type)
3088 {
3089         struct osd_it_ea   *it     = (struct osd_it_ea *)buf;
3090         struct dirent64    *dirent = &it->oie_dirent64;
3091         int                 reclen = LDISKFS_DIR_REC_LEN(namelen);
3092
3093
3094         ENTRY;
3095         if (it->oie_namelen)
3096                 RETURN(-ENOENT);
3097
3098         if (namelen == 0 || namelen > LDISKFS_NAME_LEN)
3099                 RETURN(-EIO);
3100
3101         strncpy(dirent->d_name, name, LDISKFS_NAME_LEN);
3102         dirent->d_name[namelen] = 0;
3103         dirent->d_ino           = ino;
3104         dirent->d_off           = offset;
3105         dirent->d_reclen        = reclen;
3106         it->oie_namelen         = namelen;
3107         it->oie_curr_pos        = offset;
3108
3109         RETURN(0);
3110 }
3111
3112 /**
3113  * Calls ->readdir() to load a directory entry at a time
3114  * and stored it in iterator's in-memory data structure.
3115  *
3116  * \param di, struct osd_it_ea, iterator's in memory structure
3117  *
3118  * \retval   0, on success
3119  * \retval -ve, on error
3120  */
3121 int osd_ldiskfs_it_fill(const struct dt_it *di)
3122 {
3123         struct osd_it_ea   *it    = (struct osd_it_ea *)di;
3124         struct osd_object  *obj   = it->oie_obj;
3125         struct inode       *inode = obj->oo_inode;
3126         int                result = 0;
3127
3128         ENTRY;
3129         it->oie_namelen    = 0;
3130         it->oie_file.f_pos = it->oie_curr_pos;
3131
3132         result = inode->i_fop->readdir(&it->oie_file, it,
3133                                        (filldir_t) osd_ldiskfs_filldir);
3134
3135         it->oie_next_pos = it->oie_file.f_pos;
3136
3137         if(!result && it->oie_namelen == 0)
3138                 result = -EIO;
3139
3140         RETURN(result);
3141 }
3142
3143 /**
3144  * It calls osd_ldiskfs_it_fill() which will use ->readdir()
3145  * to load a directory entry at a time and stored it in
3146  * iterator's in-memory data structure.
3147  *
3148  * \param di, struct osd_it_ea, iterator's in memory structure
3149  *
3150  * \retval +ve, iterator reached to end
3151  * \retval   0, iterator not reached to end
3152  * \retval -ve, on error
3153  */
3154 static int osd_it_ea_next(const struct lu_env *env, struct dt_it *di)
3155 {
3156         struct osd_it_ea *it = (struct osd_it_ea *)di;
3157         int rc;
3158
3159         ENTRY;
3160         it->oie_curr_pos = it->oie_next_pos;
3161
3162         if (it->oie_curr_pos == LDISKFS_HTREE_EOF)
3163                 rc = +1;
3164         else
3165                 rc = osd_ldiskfs_it_fill(di);
3166
3167         RETURN(rc);
3168 }
3169
3170 /**
3171  * Returns the key at current position from iterator's in memory structure.
3172  *
3173  * \param di, struct osd_it_ea, iterator's in memory structure
3174  *
3175  * \retval key i.e. struct dt_key on success
3176  */
3177 static struct dt_key *osd_it_ea_key(const struct lu_env *env,
3178                                     const struct dt_it *di)
3179 {
3180         struct osd_it_ea *it = (struct osd_it_ea *)di;
3181         ENTRY;
3182         RETURN((struct dt_key *)it->oie_dirent64.d_name);
3183 }
3184
3185 /**
3186  * Returns the key's size at current position from iterator's in memory structure.
3187  *
3188  * \param di, struct osd_it_ea, iterator's in memory structure
3189  *
3190  * \retval key_size i.e. struct dt_key on success
3191  */
3192 static int osd_it_ea_key_size(const struct lu_env *env, const struct dt_it *di)
3193 {
3194         struct osd_it_ea *it = (struct osd_it_ea *)di;
3195         ENTRY;
3196         RETURN(it->oie_namelen);
3197 }
3198
3199 /**
3200  * Returns the value (i.e. fid/igif) at current position from iterator's
3201  * in memory structure.
3202  *
3203  * \param di, struct osd_it_ea, iterator's in memory structure
3204  *
3205  * \retval value i.e. struct dt_rec on success
3206  */
3207 static struct dt_rec *osd_it_ea_rec(const struct lu_env *env,
3208                                     const struct dt_it *di)
3209 {
3210         struct osd_it_ea       *it     = (struct osd_it_ea *)di;
3211         struct osd_object      *obj    = it->oie_obj;
3212         struct osd_thread_info *info   = osd_oti_get(env);
3213         struct osd_inode_id    *id     = &info->oti_id;
3214         struct lu_fid_pack     *rec    = &info->oti_pack;
3215         struct lu_device       *ldev   = obj->oo_dt.do_lu.lo_dev;
3216         struct dentry          *dentry = &info->oti_child_dentry;
3217         struct osd_device      *dev;
3218         struct inode           *inode;
3219         int                    rc;
3220
3221         ENTRY;
3222         dev  = osd_dev(ldev);
3223         id->oii_ino = it->oie_dirent64.d_ino;
3224         id->oii_gen = OSD_OII_NOGEN;
3225         inode = osd_iget(info, dev, id);
3226         if (!IS_ERR(inode)) {
3227                 dentry->d_inode = inode;
3228                 LASSERT(dentry->d_inode->i_sb == osd_sb(dev));
3229         } else {
3230                 CERROR("Error getting inode for ino =%d", id->oii_ino);
3231                 RETURN((struct dt_rec *) PTR_ERR(inode));
3232         }
3233
3234         rc = osd_ea_fid_get(env, dentry, (struct dt_rec*) rec);
3235
3236         iput(inode);
3237         RETURN((struct dt_rec *)rec);
3238
3239 }
3240
3241 /**
3242  * Returns a cookie for current position of the iterator head, so that
3243  * user can use this cookie to load/start the iterator next time.
3244  *
3245  * \param di, struct osd_it_ea, iterator's in memory structure
3246  *
3247  * \retval cookie for current position, on success
3248  */
3249 static __u64 osd_it_ea_store(const struct lu_env *env, const struct dt_it *di)
3250 {
3251         struct osd_it_ea *it = (struct osd_it_ea *)di;
3252         ENTRY;
3253         RETURN(it->oie_curr_pos);
3254 }
3255
3256 /**
3257  * It calls osd_ldiskfs_it_fill() which will use ->readdir()
3258  * to load a directory entry at a time and stored it i inn,
3259  * in iterator's in-memory data structure.
3260  *
3261  * \param di, struct osd_it_ea, iterator's in memory structure
3262  *
3263  * \retval +ve, on success
3264  * \retval -ve, on error
3265  */
3266 static int osd_it_ea_load(const struct lu_env *env,
3267                           const struct dt_it *di, __u64 hash)
3268 {
3269         struct osd_it_ea *it = (struct osd_it_ea *)di;
3270         int rc;
3271
3272         ENTRY;
3273         it->oie_curr_pos = it->oie_next_pos = hash;
3274
3275         rc =  osd_ldiskfs_it_fill(di);
3276         if (rc == 0)
3277                 rc = +1;
3278
3279         RETURN(rc);
3280 }
3281 /**
3282  * Index and Iterator operations for interoperability
3283  * mode (i.e. to run 2.0 mds on 1.8 disk) (b11826)
3284  */
3285 static const struct dt_index_operations osd_index_ea_ops = {
3286         .dio_lookup = osd_index_ea_lookup,
3287         .dio_insert = osd_index_ea_insert,
3288         .dio_delete = osd_index_ea_delete,
3289         .dio_it     = {
3290                 .init     = osd_it_ea_init,
3291                 .fini     = osd_it_ea_fini,
3292                 .get      = osd_it_ea_get,
3293                 .put      = osd_it_ea_put,
3294                 .next     = osd_it_ea_next,
3295                 .key      = osd_it_ea_key,
3296                 .key_size = osd_it_ea_key_size,
3297                 .rec      = osd_it_ea_rec,
3298                 .store    = osd_it_ea_store,
3299                 .load     = osd_it_ea_load
3300         }
3301 };
3302
3303 /**
3304  * Index lookup function for interoperability mode (b11826).
3305  *
3306  * \param key,  key i.e. file name to be searched
3307  *
3308  * \retval +ve, on success
3309  * \retval -ve, on error
3310  */
3311 static int osd_index_ea_lookup(const struct lu_env *env, struct dt_object *dt,
3312                                struct dt_rec *rec, const struct dt_key *key,
3313                                struct lustre_capa *capa)
3314 {
3315         struct osd_object *obj = osd_dt_obj(dt);
3316         int rc = 0;
3317
3318         ENTRY;
3319
3320         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
3321         LINVRNT(osd_invariant(obj));
3322
3323         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
3324                 return -EACCES;
3325
3326         rc = osd_ea_lookup_rec(env, obj, rec, key);
3327
3328         if (rc == 0)
3329                 rc = +1;
3330         RETURN(rc);
3331 }
3332
3333 /* type constructor/destructor: osd_type_init, osd_type_fini */
3334 LU_TYPE_INIT_FINI(osd, &osd_key);
3335
3336 static struct lu_context_key osd_key = {
3337         .lct_tags = LCT_DT_THREAD | LCT_MD_THREAD,
3338         .lct_init = osd_key_init,
3339         .lct_fini = osd_key_fini,
3340         .lct_exit = osd_key_exit
3341 };
3342
3343 static void *osd_key_init(const struct lu_context *ctx,
3344                           struct lu_context_key *key)
3345 {
3346         struct osd_thread_info *info;
3347
3348         OBD_ALLOC_PTR(info);
3349         if (info != NULL)
3350                 info->oti_env = container_of(ctx, struct lu_env, le_ctx);
3351         else
3352                 info = ERR_PTR(-ENOMEM);
3353         return info;
3354 }
3355
3356 /* context key destructor: osd_key_fini */
3357 LU_KEY_FINI(osd, struct osd_thread_info);
3358
3359 static void osd_key_exit(const struct lu_context *ctx,
3360                          struct lu_context_key *key, void *data)
3361 {
3362         struct osd_thread_info *info = data;
3363
3364         LASSERT(info->oti_r_locks == 0);
3365         LASSERT(info->oti_w_locks == 0);
3366         LASSERT(info->oti_txns    == 0);
3367 }
3368
3369 static int osd_device_init(const struct lu_env *env, struct lu_device *d,
3370                            const char *name, struct lu_device *next)
3371 {
3372         int rc;
3373         struct lu_context *ctx;
3374