Whamcloud - gitweb
small fixes: remove '_dev' from dt_sync and dt_ro methods, make compiler
[fs/lustre-release.git] / lustre / osd / osd_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/osd/osd_handler.c
5  *  Top-level entry points into osd module
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Nikita Danilov <nikita@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include <linux/module.h>
35
36 /* LUSTRE_VERSION_CODE */
37 #include <lustre_ver.h>
38 /* prerequisite for linux/xattr.h */
39 #include <linux/types.h>
40 /* prerequisite for linux/xattr.h */
41 #include <linux/fs.h>
42 /* XATTR_{REPLACE,CREATE} */
43 #include <linux/xattr.h>
44 /*
45  * XXX temporary stuff: direct access to ldiskfs/jdb. Interface between osd
46  * and file system is not yet specified.
47  */
48 /* handle_t, journal_start(), journal_stop() */
49 #include <linux/jbd.h>
50 /* LDISKFS_SB() */
51 #include <linux/ldiskfs_fs.h>
52 #include <linux/ldiskfs_jbd.h>
53 /* simple_mkdir() */
54 #include <lvfs.h>
55
56 /*
57  * struct OBD_{ALLOC,FREE}*()
58  * OBD_FAIL_CHECK
59  */
60 #include <obd_support.h>
61 /* struct ptlrpc_thread */
62 #include <lustre_net.h>
63 /* LUSTRE_OSD0_NAME */
64 #include <obd.h>
65 /* class_register_type(), class_unregister_type(), class_get_type() */
66 #include <obd_class.h>
67 #include <lustre_disk.h>
68
69 /* fid_is_local() */
70 #include <lustre_fid.h>
71 #include <linux/lustre_iam.h>
72
73 #include "osd_internal.h"
74 #include "osd_igif.h"
75
76 struct osd_object {
77         struct dt_object       oo_dt;
78         /*
79          * Inode for file system object represented by this osd_object. This
80          * inode is pinned for the whole duration of lu_object life.
81          */
82         struct inode          *oo_inode;
83         struct rw_semaphore    oo_sem;
84         struct iam_container   oo_container;
85         struct iam_descr       oo_descr;
86         struct iam_path_descr *oo_ipd;
87         const struct lu_context *oo_owner;
88 };
89
90 /*
91  * osd device.
92  */
93 struct osd_device {
94         /* super-class */
95         struct dt_device          od_dt_dev;
96         /* information about underlying file system */
97         struct lustre_mount_info *od_mount;
98         /* object index */
99         struct osd_oi             od_oi;
100         /*
101          * XXX temporary stuff for object index: directory where every object
102          * is named by its fid.
103          */
104         struct dentry            *od_obj_area;
105
106         /* Thread context for transaction commit callback.
107          * Currently, OSD is based on ext3/JBD. Transaction commit in ext3/JBD
108          * is serialized, that is there is no more than one transaction commit 
109          * at a time (JBD journal_commit_transaction() is serialized). 
110          * This means that it's enough to have _one_ lu_context.
111          */
112         struct lu_context         od_ctx_for_commit;
113 };
114
115 static int   osd_root_get      (const struct lu_context *ctxt,
116                                 struct dt_device *dev, struct lu_fid *f);
117 static int   osd_statfs        (const struct lu_context *ctxt,
118                                 struct dt_device *dev, struct kstatfs *sfs);
119
120 static int   lu_device_is_osd  (const struct lu_device *d);
121 static void  osd_mod_exit      (void) __exit;
122 static int   osd_mod_init      (void) __init;
123 static int   osd_type_init     (struct lu_device_type *t);
124 static void  osd_type_fini     (struct lu_device_type *t);
125 static int   osd_object_init   (const struct lu_context *ctxt,
126                                 struct lu_object *l);
127 static void  osd_object_release(const struct lu_context *ctxt,
128                                 struct lu_object *l);
129 static int   osd_object_print  (const struct lu_context *ctx, void *cookie,
130                                 lu_printer_t p, const struct lu_object *o);
131 static void  osd_device_free   (const struct lu_context *ctx,
132                                 struct lu_device *m);
133 static void *osd_key_init      (const struct lu_context *ctx,
134                                 struct lu_context_key *key);
135 static void  osd_key_fini      (const struct lu_context *ctx,
136                                 struct lu_context_key *key, void *data);
137 static void  osd_key_exit      (const struct lu_context *ctx,
138                                 struct lu_context_key *key, void *data);
139 static int   osd_has_index     (const struct osd_object *obj);
140 static void  osd_object_init0  (struct osd_object *obj);
141 static int   osd_device_init   (const struct lu_context *ctx,
142                                 struct lu_device *d, struct lu_device *);
143 static int   osd_fid_lookup    (const struct lu_context *ctx,
144                                 struct osd_object *obj,
145                                 const struct lu_fid *fid);
146 static int   osd_inode_getattr (const struct lu_context *ctx,
147                                 struct inode *inode, struct lu_attr *attr);
148 static int   osd_inode_setattr (const struct lu_context *ctx,
149                                 struct inode *inode, const struct lu_attr *attr);
150 static int   osd_param_is_sane (const struct osd_device *dev,
151                                 const struct txn_param *param);
152 static int   osd_index_lookup  (const struct lu_context *ctxt,
153                                 struct dt_object *dt,
154                                 struct dt_rec *rec, const struct dt_key *key);
155 static int   osd_index_insert  (const struct lu_context *ctxt,
156                                 struct dt_object *dt,
157                                 const struct dt_rec *rec,
158                                 const struct dt_key *key,
159                                 struct thandle *handle);
160 static int   osd_index_delete  (const struct lu_context *ctxt,
161                                 struct dt_object *dt, const struct dt_key *key,
162                                 struct thandle *handle);
163 static int   osd_index_probe   (const struct lu_context *ctxt,
164                                 struct osd_object *o,
165                                 const struct dt_index_features *feat);
166 static int   osd_index_try     (const struct lu_context *ctx,
167                                 struct dt_object *dt,
168                                 const struct dt_index_features *feat);
169 static void  osd_index_fini    (struct osd_object *o);
170
171 static void  osd_it_fini       (const struct lu_context *ctx, struct dt_it *di);
172 static int   osd_it_get        (const struct lu_context *ctx,
173                                 struct dt_it *di, const struct dt_key *key);
174 static void  osd_it_put        (const struct lu_context *ctx, struct dt_it *di);
175 static int   osd_it_next       (const struct lu_context *ctx, struct dt_it *di);
176 static int   osd_it_key_size   (const struct lu_context *ctx,
177                                 const struct dt_it *di);
178 static void  osd_conf_get      (const struct lu_context *ctx,
179                                 const struct dt_device *dev,
180                                 struct dt_device_param *param);
181 static int   osd_read_locked   (const struct lu_context *ctx,
182                                 struct osd_object *o);
183 static int   osd_write_locked  (const struct lu_context *ctx,
184                                 struct osd_object *o);
185
186 static struct osd_object  *osd_obj          (const struct lu_object *o);
187 static struct osd_device  *osd_dev          (const struct lu_device *d);
188 static struct osd_device  *osd_dt_dev       (const struct dt_device *d);
189 static struct osd_object  *osd_dt_obj       (const struct dt_object *d);
190 static struct osd_device  *osd_obj2dev      (const struct osd_object *o);
191 static struct lu_device   *osd2lu_dev       (struct osd_device *osd);
192 static struct lu_device   *osd_device_fini  (const struct lu_context *ctx,
193                                              struct lu_device *d);
194 static struct lu_device   *osd_device_alloc (const struct lu_context *ctx,
195                                              struct lu_device_type *t,
196                                              struct lustre_cfg *cfg);
197 static struct lu_object   *osd_object_alloc (const struct lu_context *ctx,
198                                              const struct lu_object_header *hdr,
199                                              struct lu_device *d);
200 static struct inode       *osd_iget         (struct osd_thread_info *info,
201                                              struct osd_device *dev,
202                                              const struct osd_inode_id *id);
203 static struct super_block *osd_sb           (const struct osd_device *dev);
204 static struct dt_it       *osd_it_init      (const struct lu_context *ctx,
205                                              struct dt_object *dt);
206 static struct dt_key      *osd_it_key       (const struct lu_context *ctx,
207                                              const struct dt_it *di);
208 static struct dt_rec      *osd_it_rec       (const struct lu_context *ctx,
209                                              const struct dt_it *di);
210 static struct timespec    *osd_inode_time   (const struct lu_context *ctx,
211                                              struct inode *inode,
212                                              __u64 seconds);
213 static journal_t          *osd_journal      (const struct osd_device *dev);
214
215 static struct lu_device_type_operations osd_device_type_ops;
216 static struct lu_device_type            osd_device_type;
217 static struct lu_object_operations      osd_lu_obj_ops;
218 static struct obd_ops                   osd_obd_device_ops;
219 static struct lprocfs_vars              lprocfs_osd_module_vars[];
220 static struct lprocfs_vars              lprocfs_osd_obd_vars[];
221 static struct lu_device_operations      osd_lu_ops;
222 static struct lu_context_key            osd_key;
223 static struct dt_object_operations      osd_obj_ops;
224 static struct dt_body_operations        osd_body_ops;
225 static struct dt_index_operations       osd_index_ops;
226 static struct dt_index_operations       osd_index_compat_ops;
227
228 struct osd_thandle {
229         struct thandle          ot_super;
230         handle_t               *ot_handle;
231         struct journal_callback ot_jcb;
232 };
233
234 /*
235  * Invariants, assertions.
236  */
237
238 #define OSD_INVARIANT_CHECKS (0)
239
240 #if OSD_INVARIANT_CHECKS
241 static int osd_invariant(const struct osd_object *obj)
242 {
243         return
244                 obj != NULL &&
245                 ergo(obj->oo_inode != NULL,
246                      obj->oo_inode->i_sb == osd_sb(osd_obj2dev(obj)) &&
247                      atomic_read(&obj->oo_inode->i_count) > 0) &&
248                 ergo(obj->oo_container.ic_object != NULL,
249                      obj->oo_container.ic_object == obj->oo_inode);
250 }
251 #else
252 #define osd_invariant(obj) (1)
253 #endif
254
255 static int osd_read_locked(const struct lu_context *ctx, struct osd_object *o)
256 {
257         struct osd_thread_info *oti = lu_context_key_get(ctx, &osd_key);
258
259         return oti->oti_r_locks > 0;
260 }
261
262 static int osd_write_locked(const struct lu_context *ctx, struct osd_object *o)
263 {
264         struct osd_thread_info *oti = lu_context_key_get(ctx, &osd_key);
265
266         return oti->oti_w_locks > 0 && o->oo_owner == ctx;
267 }
268
269 /* helper to push us into KERNEL_DS context */
270 static struct file *osd_rw_init(const struct lu_context *ctxt,
271                                 struct inode *inode, mm_segment_t *seg)
272 {
273         struct osd_thread_info *info   = lu_context_key_get(ctxt, &osd_key);
274         struct dentry          *dentry = &info->oti_dentry;
275         struct file            *file   = &info->oti_file;
276
277         file->f_dentry = dentry;
278         file->f_mapping = inode->i_mapping;
279         file->f_op      = inode->i_fop;
280         file->f_mode    = FMODE_WRITE|FMODE_READ;
281         dentry->d_inode = inode;
282
283         *seg = get_fs();
284         set_fs(KERNEL_DS);
285         return file;
286 }
287
288 /* helper to pop us from KERNEL_DS context */
289 static void osd_rw_fini(mm_segment_t *seg)
290 {
291         set_fs(*seg);
292 }
293
294 static int osd_root_get(const struct lu_context *ctx,
295                         struct dt_device *dev, struct lu_fid *f)
296 {
297         struct inode *inode;
298
299         inode = osd_sb(osd_dt_dev(dev))->s_root->d_inode;
300         lu_igif_build(f, inode->i_ino, inode->i_generation);
301         return 0;
302 }
303
304 /*
305  * OSD object methods.
306  */
307
308 static struct lu_object *osd_object_alloc(const struct lu_context *ctx,
309                                           const struct lu_object_header *hdr,
310                                           struct lu_device *d)
311 {
312         struct osd_object *mo;
313
314         OBD_ALLOC_PTR(mo);
315         if (mo != NULL) {
316                 struct lu_object *l;
317
318                 l = &mo->oo_dt.do_lu;
319                 dt_object_init(&mo->oo_dt, NULL, d);
320                 mo->oo_dt.do_ops = &osd_obj_ops;
321                 l->lo_ops = &osd_lu_obj_ops;
322                 init_rwsem(&mo->oo_sem);
323                 return l;
324         } else
325                 return NULL;
326 }
327
328 static void osd_object_init0(struct osd_object *obj)
329 {
330         LASSERT(obj->oo_inode != NULL);
331         obj->oo_dt.do_body_ops = &osd_body_ops;
332         obj->oo_dt.do_lu.lo_header->loh_attr |=
333                 (LOHA_EXISTS | (obj->oo_inode->i_mode & S_IFMT));
334 }
335
336 static int osd_object_init(const struct lu_context *ctxt, struct lu_object *l)
337 {
338         struct osd_object *obj = osd_obj(l);
339         int result;
340
341         LASSERT(osd_invariant(obj));
342
343         result = osd_fid_lookup(ctxt, obj, lu_object_fid(l));
344         if (result == 0) {
345                 if (obj->oo_inode != NULL)
346                         osd_object_init0(obj);
347         }
348         LASSERT(osd_invariant(obj));
349         return result;
350 }
351
352 static void osd_object_free(const struct lu_context *ctx, struct lu_object *l)
353 {
354         struct osd_object *obj = osd_obj(l);
355
356         LASSERT(osd_invariant(obj));
357
358         dt_object_fini(&obj->oo_dt);
359         OBD_FREE_PTR(obj);
360 }
361
362 static void osd_index_fini(struct osd_object *o)
363 {
364         struct iam_container *bag;
365
366         bag = &o->oo_container;
367         if (o->oo_ipd != NULL) {
368                 LASSERT(bag->ic_descr->id_ops->id_ipd_free != NULL);
369                 bag->ic_descr->id_ops->id_ipd_free(&o->oo_container, o->oo_ipd);
370         }
371         if (o->oo_inode != NULL) {
372                 if (o->oo_container.ic_object == o->oo_inode)
373                         iam_container_fini(&o->oo_container);
374         }
375 }
376
377 static void osd_object_delete(const struct lu_context *ctx, struct lu_object *l)
378 {
379         struct osd_object *obj = osd_obj(l);
380
381         LASSERT(osd_invariant(obj));
382
383         osd_index_fini(obj);
384         if (obj->oo_inode != NULL) {
385                 iput(obj->oo_inode);
386                 obj->oo_inode = NULL;
387         }
388 }
389
390 static int osd_inode_unlinked(const struct inode *inode)
391 {
392         return inode->i_nlink == !!S_ISDIR(inode->i_mode);
393 }
394
395 static void osd_object_release(const struct lu_context *ctxt,
396                                struct lu_object *l)
397 {
398         struct osd_object *o = osd_obj(l);
399
400         LASSERT(!lu_object_is_dying(l->lo_header));
401         if (o->oo_inode != NULL && osd_inode_unlinked(o->oo_inode))
402                 set_bit(LU_OBJECT_HEARD_BANSHEE, &l->lo_header->loh_flags);
403 }
404
405 static int osd_object_print(const struct lu_context *ctx, void *cookie,
406                             lu_printer_t p, const struct lu_object *l)
407 {
408         struct osd_object *o = osd_obj(l);
409         struct iam_descr  *d;
410
411         d = o->oo_container.ic_descr;
412         return (*p)(ctx, cookie, LUSTRE_OSD0_NAME"-object@%p(i:%p:%lu/%u)[%s]",
413                     o, o->oo_inode,
414                     o->oo_inode ? o->oo_inode->i_ino : 0UL,
415                     o->oo_inode ? o->oo_inode->i_generation : 0,
416                     d ? d->id_ops->id_name : "plain");
417 }
418
419 static int osd_statfs(const struct lu_context *ctx,
420                       struct dt_device *d, struct kstatfs *sfs)
421 {
422         struct osd_device *osd = osd_dt_dev(d);
423         struct super_block *sb = osd_sb(osd);
424         int result;
425
426         ENTRY;
427
428         memset(sfs, 0, sizeof *sfs);
429         result = sb->s_op->statfs(sb, sfs);
430
431         RETURN (result);
432 }
433
434 static void osd_conf_get(const struct lu_context *ctx,
435                          const struct dt_device *dev,
436                          struct dt_device_param *param)
437 {
438         /*
439          * XXX should be taken from not-yet-existing fs abstraction layer.
440          */
441         param->ddp_max_name_len  = LDISKFS_NAME_LEN;
442         param->ddp_max_nlink     = LDISKFS_LINK_MAX;
443         param->ddp_block_shift   = osd_sb(osd_dt_dev(dev))->s_blocksize_bits;
444 }
445
446 /*
447  * Journal
448  */
449
450 static int osd_param_is_sane(const struct osd_device *dev,
451                              const struct txn_param *param)
452 {
453         return param->tp_credits <= osd_journal(dev)->j_max_transaction_buffers;
454 }
455
456 static void osd_trans_commit_cb(struct journal_callback *jcb, int error)
457 {
458         struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb);
459         struct thandle     *th = &oh->ot_super;
460         struct dt_device   *dev = th->th_dev;
461
462         LASSERT(dev != NULL);
463
464         if (error) {
465                 CERROR("transaction @0x%p commit error: %d\n", th, error);
466         } else {
467                 /* This dd_ctx_for_commit is only for commit usage.
468                  * see "struct dt_device" 
469                  */ 
470                 dt_txn_hook_commit(&osd_dt_dev(dev)->od_ctx_for_commit, th);
471         }
472
473         lu_device_put(&dev->dd_lu_dev);
474         th->th_dev = NULL;
475
476         lu_context_exit(&th->th_ctx);
477         lu_context_fini(&th->th_ctx);
478         OBD_FREE_PTR(oh);
479 }
480
481 static struct thandle *osd_trans_start(const struct lu_context *ctx,
482                                        struct dt_device *d,
483                                        struct txn_param *p)
484 {
485         struct osd_device      *dev = osd_dt_dev(d);
486         handle_t               *jh;
487         struct osd_thandle     *oh;
488         struct thandle         *th;
489         struct osd_thread_info *oti = lu_context_key_get(ctx, &osd_key);
490         int hook_res;
491
492         ENTRY;
493
494         hook_res = dt_txn_hook_start(ctx, d, p);
495         if (hook_res != 0)
496                 RETURN(ERR_PTR(hook_res));
497
498         if (osd_param_is_sane(dev, p)) {
499                 OBD_ALLOC_GFP(oh, sizeof *oh, GFP_NOFS);
500                 if (oh != NULL) {
501                         /*
502                          * XXX temporary stuff. Some abstraction layer should
503                          * be used.
504                          */
505
506                         jh = journal_start(osd_journal(dev), p->tp_credits);
507                         if (!IS_ERR(jh)) {
508                                 oh->ot_handle = jh;
509                                 th = &oh->ot_super;
510                                 th->th_dev = d;
511                                 lu_device_get(&d->dd_lu_dev);
512                                 /* add commit callback */
513                                 lu_context_init(&th->th_ctx, LCT_TX_HANDLE);
514                                 lu_context_enter(&th->th_ctx);
515                                 journal_callback_set(jh, osd_trans_commit_cb,
516                                                      (struct journal_callback *)&oh->ot_jcb);
517                                 LASSERT(oti->oti_txns == 0);
518                                 LASSERT(oti->oti_r_locks == 0);
519                                 LASSERT(oti->oti_w_locks == 0);
520                                 oti->oti_txns++;
521                         } else {
522                                 OBD_FREE_PTR(oh);
523                                 th = (void *)jh;
524                         }
525                 } else
526                         th = ERR_PTR(-ENOMEM);
527         } else {
528                 CERROR("Invalid transaction parameters\n");
529                 th = ERR_PTR(-EINVAL);
530         }
531
532         RETURN(th);
533 }
534
535 static void osd_trans_stop(const struct lu_context *ctx, struct thandle *th)
536 {
537         int result;
538         struct osd_thandle     *oh;
539         struct osd_thread_info *oti = lu_context_key_get(ctx, &osd_key);
540
541         ENTRY;
542
543         oh = container_of0(th, struct osd_thandle, ot_super);
544         if (oh->ot_handle != NULL) {
545                 handle_t *hdl = oh->ot_handle;
546                 /*
547                  * XXX temporary stuff. Some abstraction layer should be used.
548                  */
549                 result = dt_txn_hook_stop(ctx, th);
550                 if (result != 0)
551                         CERROR("Failure in transaction hook: %d\n", result);
552
553                 /**/
554                 oh->ot_handle = NULL;
555                 result = journal_stop(hdl);
556                 if (result != 0)
557                         CERROR("Failure to stop transaction: %d\n", result);
558
559                 LASSERT(oti->oti_txns == 1);
560                 LASSERT(oti->oti_r_locks == 0);
561                 LASSERT(oti->oti_w_locks == 0);
562                 oti->oti_txns--;
563         }
564         EXIT;
565 }
566
567 static void osd_sync(const struct lu_context *ctx,
568                         struct dt_device *d)
569 {
570         struct osd_device *osd = osd_dt_dev(d);
571         ENTRY;
572
573         CDEBUG(D_HA, "syncing OSD %s\n", LUSTRE_OSD0_NAME);
574         ldiskfs_force_commit(osd_sb(osd));
575         EXIT;
576 }
577
578 static void osd_ro(const struct lu_context *ctx,
579                       struct dt_device *d, int sync)
580 {
581         struct thandle *th;
582         struct txn_param param = {
583                 .tp_credits = 3
584         };
585         ENTRY;
586
587         CERROR("*** setting device %s read-only ***\n", LUSTRE_OSD0_NAME);
588
589         th = osd_trans_start(ctx, d, &param);
590         if (!IS_ERR(th))
591                 osd_trans_stop(ctx, th);
592         
593         if (sync)
594                 osd_sync(ctx, d);
595         
596         lvfs_set_rdonly(lvfs_sbdev(osd_sb(osd_dt_dev(d))));
597         EXIT;        
598 }
599
600
601 static struct dt_device_operations osd_dt_ops = {
602         .dt_root_get    = osd_root_get,
603         .dt_statfs      = osd_statfs,
604         .dt_trans_start = osd_trans_start,
605         .dt_trans_stop  = osd_trans_stop,
606         .dt_conf_get    = osd_conf_get,
607         .dt_sync        = osd_sync,
608         .dt_ro          = osd_ro
609 };
610
611 static void osd_object_read_lock(const struct lu_context *ctx,
612                                  struct dt_object *dt)
613 {
614         struct osd_object      *obj = osd_dt_obj(dt);
615         struct osd_thread_info *oti = lu_context_key_get(ctx, &osd_key);
616
617         LASSERT(osd_invariant(obj));
618
619         LASSERT(obj->oo_owner != ctx);
620         down_read(&obj->oo_sem);
621         LASSERT(obj->oo_owner == NULL);
622         oti->oti_r_locks++;
623 }
624
625 static void osd_object_write_lock(const struct lu_context *ctx,
626                                   struct dt_object *dt)
627 {
628         struct osd_object      *obj = osd_dt_obj(dt);
629         struct osd_thread_info *oti = lu_context_key_get(ctx, &osd_key);
630
631         LASSERT(osd_invariant(obj));
632
633         LASSERT(obj->oo_owner != ctx);
634         down_write(&obj->oo_sem);
635         LASSERT(obj->oo_owner == NULL);
636         /*
637          * Write lock assumes transaction.
638          */
639         LASSERT(oti->oti_txns > 0);
640         obj->oo_owner = ctx;
641         oti->oti_w_locks++;
642 }
643
644 static void osd_object_read_unlock(const struct lu_context *ctx,
645                                    struct dt_object *dt)
646 {
647         struct osd_object      *obj = osd_dt_obj(dt);
648         struct osd_thread_info *oti = lu_context_key_get(ctx, &osd_key);
649
650         LASSERT(osd_invariant(obj));
651         LASSERT(oti->oti_r_locks > 0);
652         oti->oti_r_locks--;
653         up_read(&obj->oo_sem);
654 }
655
656 static void osd_object_write_unlock(const struct lu_context *ctx,
657                                     struct dt_object *dt)
658 {
659         struct osd_object      *obj = osd_dt_obj(dt);
660         struct osd_thread_info *oti = lu_context_key_get(ctx, &osd_key);
661
662         LASSERT(osd_invariant(obj));
663         LASSERT(obj->oo_owner == ctx);
664         LASSERT(oti->oti_w_locks > 0);
665         oti->oti_w_locks--;
666         obj->oo_owner = NULL;
667         up_write(&obj->oo_sem);
668 }
669
670 static int osd_attr_get(const struct lu_context *ctxt, struct dt_object *dt,
671                         struct lu_attr *attr)
672 {
673         struct osd_object *obj = osd_dt_obj(dt);
674         LASSERT(dt_object_exists(dt));
675         LASSERT(osd_invariant(obj));
676         LASSERT(osd_read_locked(ctxt, obj) || osd_write_locked(ctxt, obj));
677
678         return osd_inode_getattr(ctxt, obj->oo_inode, attr);
679 }
680
681 static int osd_attr_set(const struct lu_context *ctxt,
682                         struct dt_object *dt,
683                         const struct lu_attr *attr,
684                         struct thandle *handle)
685 {
686         struct osd_object *obj = osd_dt_obj(dt);
687         LASSERT(dt_object_exists(dt));
688         LASSERT(osd_invariant(obj));
689         LASSERT(osd_write_locked(ctxt, obj));
690
691         return osd_inode_setattr(ctxt, obj->oo_inode, attr);
692 }
693
694 static struct timespec *osd_inode_time(const struct lu_context *ctx,
695                                        struct inode *inode, __u64 seconds)
696 {
697         struct osd_thread_info *oti = lu_context_key_get(ctx, &osd_key);
698         struct timespec        *t   = &oti->oti_time;
699
700         t->tv_sec  = seconds;
701         t->tv_nsec = 0;
702         *t = timespec_trunc(*t, get_sb_time_gran(inode->i_sb));
703         return t;
704 }
705
706 static int osd_inode_setattr(const struct lu_context *ctx,
707                              struct inode *inode, const struct lu_attr *attr)
708 {
709         __u64 bits;
710         int rc = 0;
711
712         bits = attr->la_valid;
713
714         LASSERT(!(bits & LA_TYPE)); /* Huh? You want too much. */
715
716         if (bits & LA_ATIME)
717                 inode->i_atime  = *osd_inode_time(ctx, inode, attr->la_atime);
718         if (bits & LA_CTIME)
719                 inode->i_ctime  = *osd_inode_time(ctx, inode, attr->la_ctime);
720         if (bits & LA_MTIME)
721                 inode->i_mtime  = *osd_inode_time(ctx, inode, attr->la_mtime);
722         if (bits & LA_SIZE)
723                 inode->i_size   = attr->la_size;
724         if (bits & LA_BLOCKS)
725                 inode->i_blocks = attr->la_blocks;
726         if (bits & LA_MODE)
727                 inode->i_mode   = (inode->i_mode & S_IFMT) |
728                         (attr->la_mode & ~S_IFMT);
729         if (bits & LA_UID)
730                 inode->i_uid    = attr->la_uid;
731         if (bits & LA_GID)
732                 inode->i_gid    = attr->la_gid;
733         if (bits & LA_NLINK)
734                 inode->i_nlink  = attr->la_nlink;
735         if (bits & LA_RDEV)
736                 inode->i_rdev   = attr->la_rdev;
737         if (bits & LA_BLKSIZE)
738                 inode->i_blksize = attr->la_blksize;
739
740         if (bits & LA_FLAGS) {
741                 /*
742                  * Horrible ext3 legacy. Flags are better to be handled in
743                  * mdd.
744                  */
745                 struct ldiskfs_inode_info *li = LDISKFS_I(inode);
746
747                 li->i_flags = (li->i_flags & ~LDISKFS_FL_USER_MODIFIABLE) |
748                         (attr->la_flags & LDISKFS_FL_USER_MODIFIABLE);
749         }
750         mark_inode_dirty(inode);
751         return rc;
752 }
753
754 /*
755  * Object creation.
756  *
757  * XXX temporary solution.
758  */
759
760 static int osd_create_pre(struct osd_thread_info *info, struct osd_object *obj,
761                           struct lu_attr *attr, struct thandle *th)
762 {
763         return 0;
764 }
765
766 static int osd_create_post(struct osd_thread_info *info, struct osd_object *obj,
767                            struct lu_attr *attr, struct thandle *th)
768 {
769         LASSERT(obj->oo_inode != NULL);
770
771         osd_object_init0(obj);
772         return 0;
773 }
774
775 static void osd_fid_build_name(const struct lu_fid *fid, char *name)
776 {
777         static const char *qfmt = LPX64":%lx:%lx";
778
779         sprintf(name, qfmt, fid_seq(fid), fid_oid(fid), fid_ver(fid));
780 }
781
782 static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
783                       umode_t mode, struct thandle *th)
784 {
785         int result;
786         struct osd_device *osd = osd_obj2dev(obj);
787         struct inode      *dir;
788
789         /*
790          * XXX temporary solution.
791          */
792         struct dentry     *dentry;
793
794         LASSERT(osd_invariant(obj));
795         LASSERT(obj->oo_inode == NULL);
796         LASSERT(osd->od_obj_area != NULL);
797
798         dir = osd->od_obj_area->d_inode;
799         LASSERT(dir->i_op != NULL && dir->i_op->create != NULL);
800
801         osd_fid_build_name(lu_object_fid(&obj->oo_dt.do_lu), info->oti_name);
802         info->oti_str.name = info->oti_name;
803         info->oti_str.len  = strlen(info->oti_name);
804
805         dentry = d_alloc(osd->od_obj_area, &info->oti_str);
806         if (dentry != NULL) {
807                result = dir->i_op->create(dir, dentry, mode, NULL);
808                if (result == 0) {
809                         LASSERT(dentry->d_inode != NULL);
810                         obj->oo_inode = dentry->d_inode;
811                         igrab(obj->oo_inode);
812                 }
813                 dput(dentry);
814         } else
815                 result = -ENOMEM;
816         LASSERT(osd_invariant(obj));
817         return result;
818 }
819
820
821 extern int iam_lvar_create(struct inode *obj, int keysize, int ptrsize,
822                            int recsize, handle_t *handle);
823
824 enum {
825         OSD_NAME_LEN = 255
826 };
827
828 static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj,
829                      struct lu_attr *attr, struct thandle *th)
830 {
831         int result;
832         struct osd_thandle *oth;
833
834         oth = container_of0(th, struct osd_thandle, ot_super);
835         LASSERT(S_ISDIR(attr->la_mode));
836         result = osd_mkfile(info, obj, (attr->la_mode &
837                             (S_IFMT | S_IRWXUGO | S_ISVTX)), th);
838         if (result == 0) {
839                 LASSERT(obj->oo_inode != NULL);
840                 /*
841                  * XXX uh-oh... call low-level iam function directly.
842                  */
843                 result = iam_lvar_create(obj->oo_inode, OSD_NAME_LEN, 4,
844                                          sizeof (struct lu_fid),
845                                          oth->ot_handle);
846         }
847         return result;
848 }
849
850 static int osd_mkreg(struct osd_thread_info *info, struct osd_object *obj,
851                      struct lu_attr *attr, struct thandle *th)
852 {
853         LASSERT(S_ISREG(attr->la_mode));
854         return osd_mkfile(info, obj, (attr->la_mode &
855                                (S_IFMT | S_IRWXUGO | S_ISVTX)), th);
856 }
857
858 static int osd_mksym(struct osd_thread_info *info, struct osd_object *obj,
859                      struct lu_attr *attr, struct thandle *th)
860 {
861         LASSERT(S_ISLNK(attr->la_mode));
862         return osd_mkfile(info, obj, (attr->la_mode &
863                               (S_IFMT | S_IRWXUGO | S_ISVTX)), th);
864 }
865
866 static int osd_mknod(struct osd_thread_info *info, struct osd_object *obj,
867                      struct lu_attr *attr, struct thandle *th)
868 {
869         int result;
870         struct osd_device *osd = osd_obj2dev(obj);
871         struct inode      *dir;
872         umode_t mode = attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX);
873
874         /*
875          * XXX temporary solution.
876          */
877         struct dentry     *dentry;
878
879         LASSERT(osd_invariant(obj));
880         LASSERT(obj->oo_inode == NULL);
881         LASSERT(osd->od_obj_area != NULL);
882
883         dir = osd->od_obj_area->d_inode;
884         LASSERT(dir->i_op != NULL && dir->i_op->create != NULL);
885
886         osd_fid_build_name(lu_object_fid(&obj->oo_dt.do_lu), info->oti_name);
887         info->oti_str.name = info->oti_name;
888         info->oti_str.len  = strlen(info->oti_name);
889
890         dentry = d_alloc(osd->od_obj_area, &info->oti_str);
891         if (dentry != NULL) {
892                 result = dir->i_op->mknod(dir, dentry, mode, attr->la_rdev);
893                 if (result == 0) {
894                         LASSERT(dentry->d_inode != NULL);
895                         obj->oo_inode = dentry->d_inode;
896                         igrab(obj->oo_inode);
897                 }
898                 dput(dentry);
899         } else
900                 result = -ENOMEM;
901         LASSERT(osd_invariant(obj));
902         return result;
903 }
904
905 typedef int (*osd_obj_type_f)(struct osd_thread_info *, struct osd_object *,
906                               struct lu_attr *, struct thandle *);
907
908 static osd_obj_type_f osd_create_type_f(__u32 mode)
909 {
910         osd_obj_type_f result;
911
912         switch (mode) {
913         case S_IFDIR:
914                 result = osd_mkdir;
915                 break;
916         case S_IFREG:
917                 result = osd_mkreg;
918                 break;
919         case S_IFLNK:
920                 result = osd_mksym;
921                 break;
922         case S_IFCHR:
923         case S_IFBLK:
924         case S_IFIFO:
925         case S_IFSOCK:
926                 result = osd_mknod;
927                 break;
928         default:
929                 LBUG();
930                 break;
931         }
932         return result;
933 }
934
935 static int osd_object_create(const struct lu_context *ctx, struct dt_object *dt,
936                              struct lu_attr *attr, struct thandle *th)
937 {
938         const struct lu_fid    *fid  = lu_object_fid(&dt->do_lu);
939         struct osd_object      *obj  = osd_dt_obj(dt);
940         struct osd_device      *osd  = osd_obj2dev(obj);
941         struct osd_thread_info *info = lu_context_key_get(ctx, &osd_key);
942         int result;
943
944         ENTRY;
945
946         LASSERT(osd_invariant(obj));
947         LASSERT(!dt_object_exists(dt));
948         LASSERT(osd_write_locked(ctx, obj));
949
950         /*
951          * XXX missing: permission checks.
952          */
953
954         /*
955          * XXX missing: sanity checks (valid ->la_mode, etc.)
956          */
957
958         /*
959          * XXX missing: Quote handling.
960          */
961
962         result = osd_create_pre(info, obj, attr, th);
963         if (result == 0) {
964                 result = osd_create_type_f(attr->la_mode & S_IFMT)(info, obj,
965                                                                    attr, th);
966                 if (result == 0)
967                         result = osd_create_post(info, obj, attr, th);
968         }
969         if (result == 0) {
970                 struct osd_inode_id *id = &info->oti_id;
971
972                 LASSERT(obj->oo_inode != NULL);
973
974                 id->oii_ino = obj->oo_inode->i_ino;
975                 id->oii_gen = obj->oo_inode->i_generation;
976
977                 osd_oi_write_lock(&osd->od_oi);
978                 result = osd_oi_insert(info, &osd->od_oi, fid, id, th);
979                 osd_oi_write_unlock(&osd->od_oi);
980         }
981
982         LASSERT(ergo(result == 0, dt_object_exists(dt)));
983         LASSERT(osd_invariant(obj));
984         return result;
985 }
986
987 static void osd_object_ref_add(const struct lu_context *ctxt,
988                                struct dt_object *dt, struct thandle *th)
989 {
990         struct osd_object *obj = osd_dt_obj(dt);
991         struct inode *inode = obj->oo_inode;
992
993         LASSERT(osd_invariant(obj));
994         LASSERT(dt_object_exists(dt));
995         LASSERT(osd_write_locked(ctxt, obj));
996
997         if (inode->i_nlink < LDISKFS_LINK_MAX) {
998                 inode->i_nlink ++;
999                 mark_inode_dirty(inode);
1000         } else
1001                 LU_OBJECT_DEBUG(D_ERROR, ctxt, &dt->do_lu,
1002                                 "Overflowed nlink\n");
1003         LASSERT(osd_invariant(obj));
1004 }
1005
1006 static void osd_object_ref_del(const struct lu_context *ctxt,
1007                                struct dt_object *dt, struct thandle *th)
1008 {
1009         struct osd_object *obj = osd_dt_obj(dt);
1010         struct inode *inode = obj->oo_inode;
1011
1012         LASSERT(osd_invariant(obj));
1013         LASSERT(dt_object_exists(dt));
1014         LASSERT(osd_write_locked(ctxt, obj));
1015
1016         if (inode->i_nlink > 0) {
1017                 inode->i_nlink --;
1018                 mark_inode_dirty(inode);
1019         } else
1020                 LU_OBJECT_DEBUG(D_ERROR, ctxt, &dt->do_lu,
1021                                 "Underflowed nlink\n");
1022         LASSERT(osd_invariant(obj));
1023 }
1024
1025 static int osd_xattr_get(const struct lu_context *ctxt, struct dt_object *dt,
1026                          void *buf, int size, const char *name)
1027 {
1028         struct osd_object      *obj    = osd_dt_obj(dt);
1029         struct inode           *inode  = obj->oo_inode;
1030         struct osd_thread_info *info   = lu_context_key_get(ctxt, &osd_key);
1031         struct dentry          *dentry = &info->oti_dentry;
1032
1033         LASSERT(dt_object_exists(dt));
1034         LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
1035         LASSERT(osd_read_locked(ctxt, obj) || osd_write_locked(ctxt, obj));
1036
1037         dentry->d_inode = inode;
1038         return inode->i_op->getxattr(dentry, name, buf, size);
1039 }
1040
1041 static int osd_xattr_set(const struct lu_context *ctxt, struct dt_object *dt,
1042                          const void *buf, int size, const char *name, int fl,
1043                          struct thandle *handle)
1044 {
1045         int fs_flags;
1046
1047         struct osd_object      *obj    = osd_dt_obj(dt);
1048         struct inode           *inode  = obj->oo_inode;
1049         struct osd_thread_info *info   = lu_context_key_get(ctxt, &osd_key);
1050         struct dentry          *dentry = &info->oti_dentry;
1051
1052         LASSERT(dt_object_exists(dt));
1053         LASSERT(inode->i_op != NULL && inode->i_op->setxattr != NULL);
1054         LASSERT(osd_write_locked(ctxt, obj));
1055
1056         dentry->d_inode = inode;
1057
1058         fs_flags = 0;
1059         if (fl & LU_XATTR_REPLACE)
1060                 fs_flags |= XATTR_REPLACE;
1061
1062         if (fl & LU_XATTR_CREATE)
1063                 fs_flags |= XATTR_CREATE;
1064
1065         return inode->i_op->setxattr(dentry, name, buf, size, fs_flags);
1066 }
1067
1068 static int osd_xattr_list(const struct lu_context *ctxt, struct dt_object *dt,
1069                           void *buf, int size)
1070 {
1071         struct osd_object      *obj    = osd_dt_obj(dt);
1072         struct inode           *inode  = obj->oo_inode;
1073         struct osd_thread_info *info   = lu_context_key_get(ctxt, &osd_key);
1074         struct dentry          *dentry = &info->oti_dentry;
1075
1076         LASSERT(dt_object_exists(dt));
1077         LASSERT(inode->i_op != NULL && inode->i_op->listxattr != NULL);
1078         LASSERT(osd_read_locked(ctxt, obj) || osd_write_locked(ctxt, obj));
1079
1080         dentry->d_inode = inode;
1081         return inode->i_op->listxattr(dentry, buf, size);
1082 }
1083
1084 static int osd_xattr_del(const struct lu_context *ctxt, struct dt_object *dt,
1085                          const char *name, struct thandle *handle)
1086 {
1087         struct osd_object      *obj    = osd_dt_obj(dt);
1088         struct inode           *inode  = obj->oo_inode;
1089         struct osd_thread_info *info   = lu_context_key_get(ctxt, &osd_key);
1090         struct dentry          *dentry = &info->oti_dentry;
1091
1092         LASSERT(dt_object_exists(dt));
1093         LASSERT(inode->i_op != NULL && inode->i_op->removexattr != NULL);
1094         LASSERT(osd_write_locked(ctxt, obj));
1095
1096         dentry->d_inode = inode;
1097         return inode->i_op->removexattr(dentry, name);
1098 }
1099
1100 static int osd_dir_page_build(const struct lu_context *ctx, int first,
1101                               void *area, int nob,
1102                               struct dt_it_ops  *iops, struct dt_it *it,
1103                               __u32 *start, __u32 *end, __u32 hash_end,
1104                               struct lu_dirent **last)
1105 {
1106         int result;
1107         struct osd_thread_info *info = lu_context_key_get(ctx, &osd_key);
1108         struct lu_fid          *fid  = &info->oti_fid;
1109         struct lu_dirent       *ent;
1110
1111         if (first) {
1112                 area += sizeof (struct lu_dirpage);
1113                 nob  -= sizeof (struct lu_dirpage);
1114                 
1115         }
1116
1117         LASSERT(nob > sizeof *ent);
1118
1119         ent  = area;
1120         result = 0;
1121         do {
1122                 char  *name;
1123                 int    len;
1124                 int    recsize;
1125                 __u32  hash;
1126
1127                 name = (char *)iops->key(ctx, it);
1128                 len  = iops->key_size(ctx, it);
1129
1130                 *fid  = *(struct lu_fid *)iops->rec(ctx, it);
1131                 fid_cpu_to_le(fid);
1132
1133                 recsize = (sizeof *ent + len + 3) & ~3;
1134                 hash = iops->store(ctx, it);
1135                 if (hash > hash_end) {
1136                         *end = hash_end;
1137                         if (first && ent == area)
1138                                 *start = hash_end;
1139                         break;
1140                 }
1141                 *end = hash;
1142                 CDEBUG(D_INODE, "%p %p %d "DFID": %#8.8x (%d)\"%*.*s\"\n",
1143                        area, ent, nob, PFID(fid), hash, len, len, len, name);
1144                 if (nob >= recsize) {
1145                         ent->lde_fid = *fid;
1146                         ent->lde_hash = hash;
1147                         ent->lde_namelen = cpu_to_le16(len);
1148                         ent->lde_reclen  = cpu_to_le16(recsize);
1149                         memcpy(ent->lde_name, name, len);
1150                         if (first && ent == area)
1151                                 *start = hash;
1152                         *last = ent;
1153                         ent = (void *)ent + recsize;
1154                         nob -= recsize;
1155                         result = iops->next(ctx, it);
1156                 } else {
1157                         /*
1158                          * record doesn't fit into page, enlarge previous one.
1159                          */
1160                         LASSERT(*last != NULL);
1161                         (*last)->lde_reclen =
1162                                 cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
1163                                             nob);
1164                         break;
1165                 }
1166         } while (result == 0);
1167         return result;
1168 }
1169
1170 static int osd_readpage(const struct lu_context *ctxt,
1171                         struct dt_object *dt, const struct lu_rdpg *rdpg)
1172 {
1173         struct dt_it      *it;
1174         struct osd_object *obj = osd_dt_obj(dt);
1175         struct dt_it_ops  *iops;
1176         int i;
1177         int rc;
1178         int nob;
1179
1180         LASSERT(dt_object_exists(dt));
1181         LASSERT(osd_invariant(obj));
1182         LASSERT(osd_has_index(obj));
1183         LASSERT(osd_read_locked(ctxt, obj) || osd_write_locked(ctxt, obj));
1184
1185         LASSERT(rdpg->rp_pages != NULL);
1186
1187         if (rdpg->rp_count <= 0)
1188                 return -EFAULT;
1189
1190         if (rdpg->rp_count & (obj->oo_inode->i_blksize - 1)) {
1191                 CERROR("size %u is not multiple of blocksize %lu\n",
1192                        rdpg->rp_count, obj->oo_inode->i_blksize);
1193                 return -EFAULT;
1194         }
1195
1196         /*
1197          * iterating through directory and fill pages from @rdpg
1198          */
1199         iops = &dt->do_index_ops->dio_it;
1200         it = iops->init(ctxt, dt);
1201         if (it == NULL)
1202                 return -ENOMEM;
1203         /*
1204          * XXX position iterator at rdpg->rp_hash
1205          */
1206         rc = iops->load(ctxt, it, rdpg->rp_hash);
1207         if (rc > 0) {
1208                 struct page      *pg; /* no, Richard, it _is_ initialized */
1209                 struct lu_dirent *last;
1210                 __u32             hash_start;
1211                 __u32             hash_end;
1212
1213                 for (i = 0, rc = 0, nob = rdpg->rp_count;
1214                      rc == 0 && nob > 0; i++, nob -= CFS_PAGE_SIZE) {
1215                         LASSERT(i < rdpg->rp_npages);
1216                         pg = rdpg->rp_pages[i];
1217                         rc = osd_dir_page_build(ctxt, !i, kmap(pg),
1218                                                 min_t(int, nob, CFS_PAGE_SIZE),
1219                                                 iops, it,
1220                                                 &hash_start, &hash_end, 
1221                                                 rdpg->rp_hash_end, &last);
1222                         if (rc != 0 || i == rdpg->rp_npages - 1)
1223                                 last->lde_reclen = 0;
1224                         kunmap(pg);
1225                 }
1226                 iops->put(ctxt, it);
1227                 if (rc > 0) {
1228                         /*
1229                          * end of directory.
1230                          */
1231                         hash_end = ~0ul;
1232                         rc = 0;
1233                 }
1234                 if (rc == 0) {
1235                         struct lu_dirpage *dp;
1236
1237                         dp = kmap(rdpg->rp_pages[0]);
1238                         dp->ldp_hash_start = hash_start;
1239                         dp->ldp_hash_end   = hash_end;
1240                         kunmap(rdpg->rp_pages[0]);
1241                 }
1242         } else if (rc == 0)
1243                 rc = -EIO;
1244         iops->put(ctxt, it);
1245         iops->fini(ctxt, it);
1246
1247         return rc;
1248 }
1249
1250 static struct dt_object_operations osd_obj_ops = {
1251         .do_read_lock    = osd_object_read_lock,
1252         .do_write_lock   = osd_object_write_lock,
1253         .do_read_unlock  = osd_object_read_unlock,
1254         .do_write_unlock = osd_object_write_unlock,
1255         .do_attr_get     = osd_attr_get,
1256         .do_attr_set     = osd_attr_set,
1257         .do_create       = osd_object_create,
1258         .do_index_try    = osd_index_try,
1259         .do_ref_add      = osd_object_ref_add,
1260         .do_ref_del      = osd_object_ref_del,
1261         .do_xattr_get    = osd_xattr_get,
1262         .do_xattr_set    = osd_xattr_set,
1263         .do_xattr_del    = osd_xattr_del,
1264         .do_xattr_list   = osd_xattr_list,
1265         .do_readpage     = osd_readpage,
1266 };
1267
1268 /*
1269  * Body operations.
1270  */
1271
1272 static ssize_t osd_read(const struct lu_context *ctxt, struct dt_object *dt,
1273                         void *buf, size_t count, loff_t *pos)
1274 {
1275         struct inode *inode = osd_dt_obj(dt)->oo_inode;
1276         struct file  *file;
1277         mm_segment_t  seg;
1278         ssize_t       result;
1279
1280         file = osd_rw_init(ctxt, inode, &seg);
1281         /*
1282          * We'd like to use vfs_read() here, but it messes with
1283          * dnotify_parent() and locks.
1284          */
1285         result = file->f_op->read(file, buf, count, pos);
1286         osd_rw_fini(&seg);
1287         return result;
1288 }
1289
1290 static ssize_t osd_write(const struct lu_context *ctxt, struct dt_object *dt,
1291                          const void *buf, size_t count, loff_t *pos,
1292                          struct thandle *handle)
1293 {
1294         struct inode *inode = osd_dt_obj(dt)->oo_inode;
1295         struct file  *file;
1296         mm_segment_t  seg;
1297         ssize_t       result;
1298
1299         file = osd_rw_init(ctxt, inode, &seg);
1300         result = file->f_op->write(file, buf, count, pos);
1301         osd_rw_fini(&seg);
1302         return result;
1303 }
1304
1305 static struct dt_body_operations osd_body_ops = {
1306         .dbo_read  = osd_read,
1307         .dbo_write = osd_write
1308 };
1309
1310 /*
1311  * Index operations.
1312  */
1313
1314 static int osd_index_probe(const struct lu_context *ctxt, struct osd_object *o,
1315                            const struct dt_index_features *feat)
1316 {
1317         struct iam_descr *descr;
1318
1319         descr = o->oo_container.ic_descr;
1320         if (feat == &dt_directory_features)
1321                 return osd_sb(osd_obj2dev(o))->s_root->d_inode == o->oo_inode ||
1322                         descr == &iam_htree_compat_param ||
1323                         (descr->id_rec_size == sizeof(struct lu_fid) &&
1324                          1 /*
1325                             * XXX check that index looks like directory.
1326                             */
1327                                 );
1328
1329         else
1330                 return
1331                         feat->dif_keysize_min <= descr->id_key_size &&
1332                         descr->id_key_size <= feat->dif_keysize_max &&
1333                         feat->dif_recsize_min <= descr->id_rec_size &&
1334                         descr->id_rec_size <= feat->dif_recsize_max &&
1335                         !(feat->dif_flags & (DT_IND_VARKEY |
1336                                              DT_IND_VARREC | DT_IND_NONUNQ)) &&
1337                         ergo(feat->dif_flags & DT_IND_UPDATE,
1338                              1 /* XXX check that object (and file system) is
1339                                 * writable */);
1340 }
1341
1342 static int osd_index_try(const struct lu_context *ctx, struct dt_object *dt,
1343                          const struct dt_index_features *feat)
1344 {
1345         int result;
1346         struct osd_object *obj = osd_dt_obj(dt);
1347
1348         LASSERT(osd_invariant(obj));
1349         LASSERT(dt_object_exists(dt));
1350
1351         if (osd_sb(osd_obj2dev(obj))->s_root->d_inode == obj->oo_inode) {
1352                 dt->do_index_ops = &osd_index_compat_ops;
1353                 result = 0;
1354         } else if (!osd_has_index(obj)) {
1355                 struct iam_container *bag;
1356
1357                 bag = &obj->oo_container;
1358                 result = iam_container_init(bag, &obj->oo_descr, obj->oo_inode);
1359                 if (result == 0) {
1360                         result = iam_container_setup(bag);
1361                         if (result == 0) {
1362                                 struct iam_path_descr *ipd;
1363
1364                                 LASSERT(obj->oo_ipd == NULL);
1365                                 ipd = bag->ic_descr->id_ops->id_ipd_alloc(bag);
1366                                 if (ipd != NULL) {
1367                                         obj->oo_ipd = ipd;
1368                                         dt->do_index_ops = &osd_index_ops;
1369                                 } else
1370                                         result = -ENOMEM;
1371                         }
1372                 }
1373         } else
1374                 result = 0;
1375
1376         if (result == 0) {
1377                 if (osd_index_probe(ctx, obj, feat))
1378                         result = 0;
1379                 else
1380                         result = -ENOTDIR;
1381         }
1382         LASSERT(osd_invariant(obj));
1383
1384         return result;
1385 }
1386
1387 static int osd_index_delete(const struct lu_context *ctxt, struct dt_object *dt,
1388                             const struct dt_key *key, struct thandle *handle)
1389 {
1390         struct osd_object     *obj = osd_dt_obj(dt);
1391         struct osd_thandle    *oh;
1392         int rc;
1393
1394         ENTRY;
1395
1396         LASSERT(osd_invariant(obj));
1397         LASSERT(dt_object_exists(dt));
1398         LASSERT(obj->oo_container.ic_object == obj->oo_inode);
1399         LASSERT(obj->oo_ipd != NULL);
1400
1401         oh = container_of0(handle, struct osd_thandle, ot_super);
1402         LASSERT(oh->ot_handle != NULL);
1403
1404         rc = iam_delete(oh->ot_handle, &obj->oo_container,
1405                         (const struct iam_key *)key, obj->oo_ipd);
1406
1407         LASSERT(osd_invariant(obj));
1408         RETURN(rc);
1409 }
1410
1411 static int osd_index_lookup(const struct lu_context *ctxt, struct dt_object *dt,
1412                             struct dt_rec *rec, const struct dt_key *key)
1413 {
1414         struct osd_object *obj = osd_dt_obj(dt);
1415         int rc;
1416
1417         ENTRY;
1418
1419         LASSERT(osd_invariant(obj));
1420         LASSERT(dt_object_exists(dt));
1421         LASSERT(obj->oo_container.ic_object == obj->oo_inode);
1422         LASSERT(obj->oo_ipd != NULL);
1423
1424         rc = iam_lookup(&obj->oo_container, (const struct iam_key *)key,
1425                         (struct iam_rec *)rec, obj->oo_ipd);
1426
1427         LASSERT(osd_invariant(obj));
1428
1429         RETURN(rc);
1430 }
1431
1432
1433 static int osd_index_insert(const struct lu_context *ctx, struct dt_object *dt,
1434                             const struct dt_rec *rec, const struct dt_key *key,
1435                             struct thandle *th)
1436 {
1437         struct osd_object     *obj = osd_dt_obj(dt);
1438
1439         struct osd_thandle    *oh;
1440         int rc;
1441
1442         ENTRY;
1443
1444         LASSERT(osd_invariant(obj));
1445         LASSERT(dt_object_exists(dt));
1446         LASSERT(obj->oo_container.ic_object == obj->oo_inode);
1447         LASSERT(obj->oo_ipd != NULL);
1448
1449         oh = container_of0(th, struct osd_thandle, ot_super);
1450         LASSERT(oh->ot_handle != NULL);
1451         rc = iam_insert(oh->ot_handle, &obj->oo_container,
1452                         (const struct iam_key *)key,
1453                         (struct iam_rec *)rec, obj->oo_ipd);
1454
1455         LASSERT(osd_invariant(obj));
1456         RETURN(rc);
1457 }
1458
1459 /*
1460  * Iterator operations.
1461  */
1462 struct osd_it {
1463         struct osd_object  *oi_obj;
1464         struct iam_iterator oi_it;
1465 };
1466
1467 static struct dt_it *osd_it_init(const struct lu_context *ctx,
1468                                  struct dt_object *dt)
1469 {
1470         struct osd_it     *it;
1471         struct osd_object *obj = osd_dt_obj(dt);
1472         struct lu_object  *lo  = &dt->do_lu;
1473
1474         LASSERT(lu_object_exists(lo));
1475         LASSERT(obj->oo_ipd != NULL);
1476
1477         OBD_ALLOC_PTR(it);
1478         if (it != NULL) {
1479                 it->oi_obj = obj;
1480                 lu_object_get(lo);
1481                 iam_it_init(&it->oi_it,
1482                             &obj->oo_container, IAM_IT_MOVE, obj->oo_ipd);
1483         }
1484         return (struct dt_it *)it;
1485 }
1486
1487 static void osd_it_fini(const struct lu_context *ctx, struct dt_it *di)
1488 {
1489         struct osd_it *it = (struct osd_it *)di;
1490
1491         iam_it_fini(&it->oi_it);
1492         lu_object_put(ctx, &it->oi_obj->oo_dt.do_lu);
1493         OBD_FREE_PTR(it);
1494 }
1495
1496 static int osd_it_get(const struct lu_context *ctx,
1497                       struct dt_it *di, const struct dt_key *key)
1498 {
1499         struct osd_it *it = (struct osd_it *)di;
1500
1501         return iam_it_get(&it->oi_it, (const struct iam_key *)key);
1502 }
1503
1504 static void osd_it_put(const struct lu_context *ctx, struct dt_it *di)
1505 {
1506         struct osd_it *it = (struct osd_it *)di;
1507         iam_it_put(&it->oi_it);
1508 }
1509
1510 static int osd_it_next(const struct lu_context *ctx, struct dt_it *di)
1511 {
1512         struct osd_it *it = (struct osd_it *)di;
1513         return iam_it_next(&it->oi_it);
1514 }
1515
1516 static struct dt_key *osd_it_key(const struct lu_context *ctx,
1517                                  const struct dt_it *di)
1518 {
1519         struct osd_it *it = (struct osd_it *)di;
1520         return (struct dt_key *)iam_it_key_get(&it->oi_it);
1521 }
1522
1523 static int osd_it_key_size(const struct lu_context *ctx, const struct dt_it *di)
1524 {
1525         struct osd_it *it = (struct osd_it *)di;
1526         return iam_it_key_size(&it->oi_it);
1527 }
1528
1529 static struct dt_rec *osd_it_rec(const struct lu_context *ctx,
1530                                  const struct dt_it *di)
1531 {
1532         struct osd_it *it = (struct osd_it *)di;
1533         return (struct dt_rec *)iam_it_rec_get(&it->oi_it);
1534 }
1535
1536 static __u32 osd_it_store(const struct lu_context *ctxt, const struct dt_it *di)
1537 {
1538         struct osd_it *it = (struct osd_it *)di;
1539         return iam_it_store(&it->oi_it);
1540 }
1541
1542 static int osd_it_load(const struct lu_context *ctxt,
1543                        const struct dt_it *di, __u32 hash)
1544 {
1545         struct osd_it *it = (struct osd_it *)di;
1546         return iam_it_load(&it->oi_it, hash);
1547 }
1548
1549 static struct dt_index_operations osd_index_ops = {
1550         .dio_lookup = osd_index_lookup,
1551         .dio_insert = osd_index_insert,
1552         .dio_delete = osd_index_delete,
1553         .dio_it     = {
1554                 .init     = osd_it_init,
1555                 .fini     = osd_it_fini,
1556                 .get      = osd_it_get,
1557                 .put      = osd_it_put,
1558                 .next     = osd_it_next,
1559                 .key      = osd_it_key,
1560                 .key_size = osd_it_key_size,
1561                 .rec      = osd_it_rec,
1562                 .store    = osd_it_store,
1563                 .load     = osd_it_load
1564         }
1565 };
1566
1567 static int osd_index_compat_delete(const struct lu_context *ctxt,
1568                                    struct dt_object *dt,
1569                                    const struct dt_key *key,
1570                                    struct thandle *handle)
1571 {
1572         struct osd_object *obj = osd_dt_obj(dt);
1573
1574         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
1575         ENTRY;
1576         RETURN(-EOPNOTSUPP);
1577 }
1578
1579 /*
1580  * Compatibility index operations.
1581  */
1582
1583
1584 static int osd_build_fid(struct osd_device *osd,
1585                          struct dentry *dentry, struct lu_fid *fid)
1586 {
1587         struct inode *inode = dentry->d_inode;
1588
1589         lu_igif_build(fid, inode->i_ino, inode->i_generation);
1590         return 0;
1591 }
1592
1593 static int osd_index_compat_lookup(const struct lu_context *ctxt,
1594                                    struct dt_object *dt,
1595                                    struct dt_rec *rec, const struct dt_key *key)
1596 {
1597         struct osd_object *obj = osd_dt_obj(dt);
1598
1599         struct osd_device      *osd  = osd_obj2dev(obj);
1600         struct osd_thread_info *info = lu_context_key_get(ctxt, &osd_key);
1601         struct inode           *dir;
1602
1603         int result;
1604
1605         /*
1606          * XXX temporary solution.
1607          */
1608         struct dentry *dentry;
1609         struct dentry *parent;
1610
1611         LASSERT(osd_invariant(obj));
1612         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
1613         LASSERT(osd_has_index(obj));
1614
1615         info->oti_str.name = (const char *)key;
1616         info->oti_str.len  = strlen((const char *)key);
1617
1618         dir = obj->oo_inode;
1619         LASSERT(dir->i_op != NULL && dir->i_op->lookup != NULL);
1620
1621         parent = d_alloc_root(dir);
1622         if (parent == NULL)
1623                 return -ENOMEM;
1624         igrab(dir);
1625         dentry = d_alloc(parent, &info->oti_str);
1626         if (dentry != NULL) {
1627                 struct dentry *d;
1628
1629                 /*
1630                  * XXX passing NULL for nameidata should work for
1631                  * ext3/ldiskfs.
1632                  */
1633                 d = dir->i_op->lookup(dir, dentry, NULL);
1634                 if (d == NULL) {
1635                         /*
1636                          * normal case, result is in @dentry.
1637                          */
1638                         if (dentry->d_inode != NULL)
1639                                 result = osd_build_fid(osd, dentry,
1640                                                        (struct lu_fid *)rec);
1641                         else
1642                                 result = -ENOENT;
1643                 } else {
1644                         /* What? Disconnected alias? Ppheeeww... */
1645                         CERROR("Aliasing where not expected\n");
1646                         result = -EIO;
1647                         dput(d);
1648                 }
1649                 dput(dentry);
1650         } else
1651                 result = -ENOMEM;
1652         dput(parent);
1653         LASSERT(osd_invariant(obj));
1654         return result;
1655 }
1656
1657 static int osd_add_rec(struct osd_thread_info *info, struct osd_device *dev,
1658                        struct inode *dir, struct inode *inode, const char *name)
1659 {
1660         struct dentry *old;
1661         struct dentry *new;
1662         struct dentry *parent;
1663
1664         int result;
1665
1666         info->oti_str.name = name;
1667         info->oti_str.len  = strlen(name);
1668
1669         LASSERT(atomic_read(&dir->i_count) > 0);
1670         result = -ENOMEM;
1671         old = d_alloc(dev->od_obj_area, &info->oti_str);
1672         if (old != NULL) {
1673                 d_instantiate(old, inode);
1674                 igrab(inode);
1675                 LASSERT(atomic_read(&dir->i_count) > 0);
1676                 parent = d_alloc_root(dir);
1677                 if (parent != NULL) {
1678                         igrab(dir);
1679                         LASSERT(atomic_read(&dir->i_count) > 1);
1680                         new = d_alloc(parent, &info->oti_str);
1681                         LASSERT(atomic_read(&dir->i_count) > 1);
1682                         if (new != NULL) {
1683                                 LASSERT(atomic_read(&dir->i_count) > 1);
1684                                 result = dir->i_op->link(old, dir, new);
1685                                 LASSERT(atomic_read(&dir->i_count) > 1);
1686                                 dput(new);
1687                                 LASSERT(atomic_read(&dir->i_count) > 1);
1688                         }
1689                         LASSERT(atomic_read(&dir->i_count) > 1);
1690                         dput(parent);
1691                         LASSERT(atomic_read(&dir->i_count) > 0);
1692                 }
1693                 dput(old);
1694         }
1695         LASSERT(atomic_read(&dir->i_count) > 0);
1696         return result;
1697 }
1698
1699
1700 /*
1701  * XXX Temporary stuff.
1702  */
1703 static int osd_index_compat_insert(const struct lu_context *ctx,
1704                                    struct dt_object *dt,
1705                                    const struct dt_rec *rec,
1706                                    const struct dt_key *key, struct thandle *th)
1707 {
1708         struct osd_object     *obj = osd_dt_obj(dt);
1709
1710         const struct lu_fid *fid  = (const struct lu_fid *)rec;
1711         const char          *name = (const char *)key;
1712
1713         struct lu_device    *ludev = dt->do_lu.lo_dev;
1714         struct lu_object    *luch;
1715
1716         struct osd_thread_info *info = lu_context_key_get(ctx, &osd_key);
1717
1718         int result;
1719
1720         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
1721         LASSERT(osd_invariant(obj));
1722
1723         luch = lu_object_find(ctx, ludev->ld_site, fid);
1724         if (!IS_ERR(luch)) {
1725                 if (lu_object_exists(luch)) {
1726                         struct osd_object *child;
1727
1728                         child = osd_obj(lu_object_locate(luch->lo_header,
1729                                                          ludev->ld_type));
1730                         if (child != NULL)
1731                                 result = osd_add_rec(info, osd_obj2dev(obj),
1732                                                      obj->oo_inode,
1733                                                      child->oo_inode, name);
1734                         else {
1735                                 CERROR("No osd slice.\n");
1736                                 result = -ENOENT;
1737                         }
1738                         LASSERT(osd_invariant(obj));
1739                         LASSERT(osd_invariant(child));
1740                 } else {
1741                         CERROR("Sorry.\n");
1742                         result = -ENOENT;
1743                 }
1744                 lu_object_put(ctx, luch);
1745         } else
1746                 result = PTR_ERR(luch);
1747         LASSERT(osd_invariant(obj));
1748         return result;
1749 }
1750
1751 static struct dt_index_operations osd_index_compat_ops = {
1752         .dio_lookup = osd_index_compat_lookup,
1753         .dio_insert = osd_index_compat_insert,
1754         .dio_delete = osd_index_compat_delete
1755 };
1756
1757 /*
1758  * OSD device type methods
1759  */
1760 static int osd_type_init(struct lu_device_type *t)
1761 {
1762         return lu_context_key_register(&osd_key);
1763 }
1764
1765 static void osd_type_fini(struct lu_device_type *t)
1766 {
1767         lu_context_key_degister(&osd_key);
1768 }
1769
1770 static struct lu_context_key osd_key = {
1771         .lct_tags = LCT_MD_THREAD|LCT_DT_THREAD,
1772         .lct_init = osd_key_init,
1773         .lct_fini = osd_key_fini,
1774         .lct_exit = osd_key_exit
1775 };
1776
1777 static void *osd_key_init(const struct lu_context *ctx,
1778                           struct lu_context_key *key)
1779 {
1780         struct osd_thread_info *info;
1781
1782         OBD_ALLOC_PTR(info);
1783         if (info != NULL)
1784                 info->oti_ctx = ctx;
1785         else
1786                 info = ERR_PTR(-ENOMEM);
1787         return info;
1788 }
1789
1790 static void osd_key_fini(const struct lu_context *ctx,
1791                          struct lu_context_key *key, void *data)
1792 {
1793         struct osd_thread_info *info = data;
1794         OBD_FREE_PTR(info);
1795 }
1796
1797 static void osd_key_exit(const struct lu_context *ctx,
1798                          struct lu_context_key *key, void *data)
1799 {
1800         struct osd_thread_info *info = data;
1801
1802         LASSERT(info->oti_r_locks == 0);
1803         LASSERT(info->oti_w_locks == 0);
1804         LASSERT(info->oti_txns    == 0);
1805 }
1806
1807 static int osd_device_init(const struct lu_context *ctx,
1808                            struct lu_device *d, struct lu_device *next)
1809 {
1810         int rc;
1811         rc = lu_context_init(&osd_dev(d)->od_ctx_for_commit, LCT_MD_THREAD);
1812         if (rc == 0)
1813                 lu_context_enter(&osd_dev(d)->od_ctx_for_commit);
1814         return rc;
1815 }
1816
1817 static int osd_shutdown(const struct lu_context *ctx, struct osd_device *o)
1818 {
1819         struct osd_thread_info *info = lu_context_key_get(ctx, &osd_key);
1820         ENTRY;
1821         if (o->od_obj_area != NULL) {
1822                 dput(o->od_obj_area);
1823                 o->od_obj_area = NULL;
1824         }
1825         osd_oi_fini(info, &o->od_oi);
1826
1827         RETURN(0);
1828 }
1829
1830 static int osd_mount(const struct lu_context *ctx,
1831                      struct osd_device *o, struct lustre_cfg *cfg)
1832 {
1833         struct lustre_mount_info *lmi;
1834         const char               *dev = lustre_cfg_string(cfg, 0);
1835         struct osd_thread_info   *info = lu_context_key_get(ctx, &osd_key);
1836         int result;
1837
1838         ENTRY;
1839
1840         if (o->od_mount != NULL) {
1841                 CERROR("Already mounted (%s)\n", dev);
1842                 RETURN(-EEXIST);
1843         }
1844
1845         /* get mount */
1846         lmi = server_get_mount(dev);
1847         if (lmi == NULL) {
1848                 CERROR("Cannot get mount info for %s!\n", dev);
1849                 RETURN(-EFAULT);
1850         }
1851
1852         LASSERT(lmi != NULL);
1853         /* save lustre_mount_info in dt_device */
1854         o->od_mount = lmi;
1855
1856         result = osd_oi_init(info, &o->od_oi, &o->od_dt_dev);
1857         if (result == 0) {
1858                 struct dentry *d;
1859
1860                 d = simple_mkdir(osd_sb(o)->s_root, "*OBJ-TEMP*", 0777, 1);
1861                 if (!IS_ERR(d)) {
1862                         o->od_obj_area = d;
1863                 } else
1864                         result = PTR_ERR(d);
1865         }
1866         if (result != 0)
1867                 osd_shutdown(ctx, o);
1868         RETURN(result);
1869 }
1870
1871 static struct lu_device *osd_device_fini(const struct lu_context *ctx,
1872                                          struct lu_device *d)
1873 {
1874         ENTRY;
1875
1876         shrink_dcache_sb(osd_sb(osd_dev(d)));
1877
1878         if (osd_dev(d)->od_mount)
1879                 server_put_mount(osd_dev(d)->od_mount->lmi_name,
1880                                  osd_dev(d)->od_mount->lmi_mnt);
1881         osd_dev(d)->od_mount = NULL;
1882
1883         lu_context_exit(&osd_dev(d)->od_ctx_for_commit);
1884         lu_context_fini(&osd_dev(d)->od_ctx_for_commit);
1885         RETURN(NULL);
1886 }
1887
1888 static struct lu_device *osd_device_alloc(const struct lu_context *ctx,
1889                                           struct lu_device_type *t,
1890                                           struct lustre_cfg *cfg)
1891 {
1892         struct lu_device  *l;
1893         struct osd_device *o;
1894
1895         OBD_ALLOC_PTR(o);
1896         if (o != NULL) {
1897                 int result;
1898
1899                 result = dt_device_init(&o->od_dt_dev, t);
1900                 if (result == 0) {
1901                         l = osd2lu_dev(o);
1902                         l->ld_ops = &osd_lu_ops;
1903                         o->od_dt_dev.dd_ops = &osd_dt_ops;
1904                 } else
1905                         l = ERR_PTR(result);
1906         } else
1907                 l = ERR_PTR(-ENOMEM);
1908         return l;
1909 }
1910
1911 static void osd_device_free(const struct lu_context *ctx, struct lu_device *d)
1912 {
1913         struct osd_device *o = osd_dev(d);
1914
1915         dt_device_fini(&o->od_dt_dev);
1916         OBD_FREE_PTR(o);
1917 }
1918
1919 static int osd_process_config(const struct lu_context *ctx,
1920                               struct lu_device *d, struct lustre_cfg *cfg)
1921 {
1922         struct osd_device *o = osd_dev(d);
1923         int err;
1924
1925         switch(cfg->lcfg_command) {
1926         case LCFG_SETUP:
1927                 err = osd_mount(ctx, o, cfg);
1928                 break;
1929         case LCFG_CLEANUP:
1930                 err = osd_shutdown(ctx, o);
1931                 break;
1932         default:
1933                 err = -ENOTTY;
1934         }
1935
1936         RETURN(err);
1937 }
1938
1939 static int osd_recovery_complete(const struct lu_context *ctxt,
1940                                  struct lu_device *d)
1941 {
1942         ENTRY;
1943         /* TODO: orphans handling */
1944         RETURN(0);
1945 }
1946
1947 /*
1948  * fid<->inode<->object functions.
1949  */
1950
1951 static struct inode *osd_open(struct dentry *parent,
1952                               const char *name, mode_t mode)
1953 {
1954         struct dentry *dentry;
1955         struct inode *result;
1956
1957         dentry = osd_lookup(parent, name);
1958         if (IS_ERR(dentry)) {
1959                 CERROR("Error opening %s: %ld\n", name, PTR_ERR(dentry));
1960                 result = NULL; /* dput(NULL) below is OK */
1961         } else if (dentry->d_inode == NULL) {
1962                 CERROR("Not found: %s\n", name);
1963                 result = ERR_PTR(-ENOENT);
1964         } else if ((dentry->d_inode->i_mode & S_IFMT) != mode) {
1965                 CERROR("Wrong mode: %s: %o != %o\n", name,
1966                        dentry->d_inode->i_mode, mode);
1967                 result = ERR_PTR(mode == S_IFDIR ? -ENOTDIR : -EISDIR);
1968         } else {
1969                 result = dentry->d_inode;
1970                 igrab(result);
1971         }
1972         dput(dentry);
1973         return result;
1974 }
1975
1976 struct dentry *osd_lookup(struct dentry *parent, const char *name)
1977 {
1978         struct dentry *dentry;
1979
1980         CDEBUG(D_INODE, "looking up object %s\n", name);
1981         down(&parent->d_inode->i_sem);
1982         dentry = lookup_one_len(name, parent, strlen(name));
1983         up(&parent->d_inode->i_sem);
1984
1985         if (IS_ERR(dentry)) {
1986                 CERROR("error getting %s: %ld\n", name, PTR_ERR(dentry));
1987         } else if (dentry->d_inode != NULL && is_bad_inode(dentry->d_inode)) {
1988                 CERROR("got bad object %s inode %lu\n",
1989                        name, dentry->d_inode->i_ino);
1990                 dput(dentry);
1991                 dentry = ERR_PTR(-ENOENT);
1992         }
1993         return dentry;
1994 }
1995
1996 int osd_lookup_id(struct dt_device *dev, const char *name, mode_t mode,
1997                   struct osd_inode_id *id)
1998 {
1999         struct inode *inode;
2000         struct osd_device *osd = osd_dt_dev(dev);
2001         int result;
2002
2003         inode = osd_open(osd_sb(osd)->s_root, name, mode);
2004         if (!IS_ERR(inode)) {
2005                 LASSERT(inode != NULL);
2006                 id->oii_ino = inode->i_ino;
2007                 id->oii_gen = inode->i_generation;
2008                 result = 0;
2009         } else
2010                 result = PTR_ERR(inode);
2011         return result;
2012 }
2013
2014 static struct inode *osd_iget(struct osd_thread_info *info,
2015                               struct osd_device *dev,
2016                               const struct osd_inode_id *id)
2017 {
2018         struct inode *inode;
2019
2020         inode = iget(osd_sb(dev), id->oii_ino);
2021         if (inode == NULL) {
2022                 CERROR("no inode\n");
2023                 inode = ERR_PTR(-EACCES);
2024         } else if (is_bad_inode(inode)) {
2025                 CERROR("bad inode\n");
2026                 iput(inode);
2027                 inode = ERR_PTR(-ENOENT);
2028         } else if (inode->i_generation != id->oii_gen) {
2029                 CERROR("stale inode\n");
2030                 iput(inode);
2031                 inode = ERR_PTR(-ESTALE);
2032         }
2033         
2034         return inode;
2035
2036 }
2037
2038 static int osd_fid_lookup(const struct lu_context *ctx,
2039                           struct osd_object *obj, const struct lu_fid *fid)
2040 {
2041         struct osd_thread_info *info;
2042         struct lu_device       *ldev = obj->oo_dt.do_lu.lo_dev;
2043         struct osd_device      *dev;
2044         struct osd_inode_id    *id;
2045         struct osd_oi          *oi;
2046         struct inode           *inode;
2047         int                     result;
2048
2049         LASSERT(osd_invariant(obj));
2050         LASSERT(obj->oo_inode == NULL);
2051         LASSERT(fid_is_sane(fid));
2052         LASSERT(fid_is_local(ldev->ld_site, fid));
2053
2054         ENTRY;
2055
2056         info = lu_context_key_get(ctx, &osd_key);
2057         dev  = osd_dev(ldev);
2058         id   = &info->oti_id;
2059         oi   = &dev->od_oi;
2060
2061         if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT))
2062                 RETURN(-ENOENT);
2063
2064         osd_oi_read_lock(oi);
2065         result = osd_oi_lookup(info, oi, fid, id);
2066         if (result == 0) {
2067                 inode = osd_iget(info, dev, id);
2068                 if (!IS_ERR(inode)) {
2069                         obj->oo_inode = inode;
2070                         LASSERT(obj->oo_inode->i_sb == osd_sb(dev));
2071                         result = 0;
2072                 } else
2073                         /*
2074                          * If fid wasn't found in oi, inode-less object is
2075                          * created, for which lu_object_exists() returns
2076                          * false. This is used in a (frequent) case when
2077                          * objects are created as locking anchors or
2078                          * place holders for objects yet to be created.
2079                          */
2080                         result = PTR_ERR(inode);
2081         } else if (result == -ENOENT)
2082                 result = 0;
2083         osd_oi_read_unlock(oi);
2084         LASSERT(osd_invariant(obj));
2085         RETURN(result);
2086 }
2087
2088 static int osd_inode_getattr(const struct lu_context *ctx,
2089                              struct inode *inode, struct lu_attr *attr)
2090 {
2091         attr->la_valid      |= LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
2092                                LA_SIZE | LA_BLOCKS | LA_UID | LA_GID |
2093                                LA_FLAGS | LA_NLINK | LA_RDEV | LA_BLKSIZE;
2094
2095         attr->la_atime      = LTIME_S(inode->i_atime);
2096         attr->la_mtime      = LTIME_S(inode->i_mtime);
2097         attr->la_ctime      = LTIME_S(inode->i_ctime);
2098         attr->la_mode       = inode->i_mode;
2099         attr->la_size       = inode->i_size;
2100         attr->la_blocks     = inode->i_blocks;
2101         attr->la_uid        = inode->i_uid;
2102         attr->la_gid        = inode->i_gid;
2103         attr->la_flags      = LDISKFS_I(inode)->i_flags;
2104         attr->la_nlink      = inode->i_nlink;
2105         attr->la_rdev       = inode->i_rdev;
2106         attr->la_blksize    = inode->i_blksize;
2107         return 0;
2108 }
2109
2110 /*
2111  * Helpers.
2112  */
2113
2114 static int lu_device_is_osd(const struct lu_device *d)
2115 {
2116         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &osd_lu_ops);
2117 }
2118
2119 static struct osd_object *osd_obj(const struct lu_object *o)
2120 {
2121         LASSERT(lu_device_is_osd(o->lo_dev));
2122         return container_of0(o, struct osd_object, oo_dt.do_lu);
2123 }
2124
2125 static struct osd_device *osd_dt_dev(const struct dt_device *d)
2126 {
2127         LASSERT(lu_device_is_osd(&d->dd_lu_dev));
2128         return container_of0(d, struct osd_device, od_dt_dev);
2129 }
2130
2131 static struct osd_device *osd_dev(const struct lu_device *d)
2132 {
2133         LASSERT(lu_device_is_osd(d));
2134         return osd_dt_dev(container_of0(d, struct dt_device, dd_lu_dev));
2135 }
2136
2137 static struct osd_object *osd_dt_obj(const struct dt_object *d)
2138 {
2139         return osd_obj(&d->do_lu);
2140 }
2141
2142 static struct osd_device *osd_obj2dev(const struct osd_object *o)
2143 {
2144         return osd_dev(o->oo_dt.do_lu.lo_dev);
2145 }
2146
2147 static struct lu_device *osd2lu_dev(struct osd_device *osd)
2148 {
2149         return &osd->od_dt_dev.dd_lu_dev;
2150 }
2151
2152 static struct super_block *osd_sb(const struct osd_device *dev)
2153 {
2154         return dev->od_mount->lmi_mnt->mnt_sb;
2155 }
2156
2157 static journal_t *osd_journal(const struct osd_device *dev)
2158 {
2159         return LDISKFS_SB(osd_sb(dev))->s_journal;
2160 }
2161
2162 static int osd_has_index(const struct osd_object *obj)
2163 {
2164         return obj->oo_dt.do_index_ops != NULL;
2165 }
2166
2167 static int osd_object_invariant(const struct lu_object *l)
2168 {
2169         return osd_invariant(osd_obj(l));
2170 }
2171
2172 static struct lu_object_operations osd_lu_obj_ops = {
2173         .loo_object_init      = osd_object_init,
2174         .loo_object_delete    = osd_object_delete,
2175         .loo_object_release   = osd_object_release,
2176         .loo_object_free      = osd_object_free,
2177         .loo_object_print     = osd_object_print,
2178         .loo_object_invariant = osd_object_invariant
2179 };
2180
2181 static struct lu_device_operations osd_lu_ops = {
2182         .ldo_object_alloc      = osd_object_alloc,
2183         .ldo_process_config    = osd_process_config,
2184         .ldo_recovery_complete = osd_recovery_complete
2185 };
2186
2187 static struct lu_device_type_operations osd_device_type_ops = {
2188         .ldto_init = osd_type_init,
2189         .ldto_fini = osd_type_fini,
2190
2191         .ldto_device_alloc = osd_device_alloc,
2192         .ldto_device_free  = osd_device_free,
2193
2194         .ldto_device_init    = osd_device_init,
2195         .ldto_device_fini    = osd_device_fini
2196 };
2197
2198 static struct lu_device_type osd_device_type = {
2199         .ldt_tags     = LU_DEVICE_DT,
2200         .ldt_name     = LUSTRE_OSD0_NAME,
2201         .ldt_ops      = &osd_device_type_ops,
2202         .ldt_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
2203 };
2204
2205 /*
2206  * lprocfs legacy support.
2207  */
2208 static struct lprocfs_vars lprocfs_osd_obd_vars[] = {
2209         { 0 }
2210 };
2211
2212 static struct lprocfs_vars lprocfs_osd_module_vars[] = {
2213         { 0 }
2214 };
2215
2216 static struct obd_ops osd_obd_device_ops = {
2217         .o_owner = THIS_MODULE
2218 };
2219
2220 LPROCFS_INIT_VARS(osd, lprocfs_osd_module_vars, lprocfs_osd_obd_vars);
2221
2222 static int __init osd_mod_init(void)
2223 {
2224         struct lprocfs_static_vars lvars;
2225
2226         lprocfs_init_vars(osd, &lvars);
2227         return class_register_type(&osd_obd_device_ops, NULL, lvars.module_vars,
2228                                    LUSTRE_OSD0_NAME, &osd_device_type);
2229 }
2230
2231 static void __exit osd_mod_exit(void)
2232 {
2233         class_unregister_type(LUSTRE_OSD0_NAME);
2234 }
2235
2236 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2237 MODULE_DESCRIPTION("Lustre Object Storage Device ("LUSTRE_OSD0_NAME")");
2238 MODULE_LICENSE("GPL");
2239
2240 cfs_module(osd, "0.0.2", osd_mod_init, osd_mod_exit);