Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / osd / osd_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/osd/osd_handler.c
37  *
38  * Top-level entry points into osd module
39  *
40  * Author: Nikita Danilov <nikita@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49
50 /* LUSTRE_VERSION_CODE */
51 #include <lustre_ver.h>
52 /* prerequisite for linux/xattr.h */
53 #include <linux/types.h>
54 /* prerequisite for linux/xattr.h */
55 #include <linux/fs.h>
56 /* XATTR_{REPLACE,CREATE} */
57 #include <linux/xattr.h>
58 /*
59  * XXX temporary stuff: direct access to ldiskfs/jdb. Interface between osd
60  * and file system is not yet specified.
61  */
62 /* handle_t, journal_start(), journal_stop() */
63 #include <linux/jbd.h>
64 /* LDISKFS_SB() */
65 #include <linux/ldiskfs_fs.h>
66 #include <linux/ldiskfs_jbd.h>
67 /* simple_mkdir() */
68 #include <lvfs.h>
69
70 /*
71  * struct OBD_{ALLOC,FREE}*()
72  * OBD_FAIL_CHECK
73  */
74 #include <obd_support.h>
75 /* struct ptlrpc_thread */
76 #include <lustre_net.h>
77
78 /* fid_is_local() */
79 #include <lustre_fid.h>
80 #include <linux/lustre_iam.h>
81
82 #include "osd_internal.h"
83 #include "osd_igif.h"
84
85 struct osd_directory {
86         struct iam_container od_container;
87         struct iam_descr     od_descr;
88         struct semaphore     od_sem;
89 };
90
91 struct osd_object {
92         struct dt_object       oo_dt;
93         /**
94          * Inode for file system object represented by this osd_object. This
95          * inode is pinned for the whole duration of lu_object life.
96          *
97          * Not modified concurrently (either setup early during object
98          * creation, or assigned by osd_object_create() under write lock).
99          */
100         struct inode          *oo_inode;
101         struct rw_semaphore    oo_sem;
102         struct osd_directory  *oo_dir;
103         /** protects inode attributes. */
104         spinlock_t             oo_guard;
105         const struct lu_env   *oo_owner;
106 #ifdef CONFIG_LOCKDEP
107         struct lockdep_map     oo_dep_map;
108 #endif
109 };
110
111 static int   osd_root_get      (const struct lu_env *env,
112                                 struct dt_device *dev, struct lu_fid *f);
113
114 static int   lu_device_is_osd  (const struct lu_device *d);
115 static void  osd_mod_exit      (void) __exit;
116 static int   osd_mod_init      (void) __init;
117 static int   osd_type_init     (struct lu_device_type *t);
118 static void  osd_type_fini     (struct lu_device_type *t);
119 static int   osd_object_init   (const struct lu_env *env,
120                                 struct lu_object *l,
121                                 const struct lu_object_conf *_);
122 static void  osd_object_release(const struct lu_env *env,
123                                 struct lu_object *l);
124 static int   osd_object_print  (const struct lu_env *env, void *cookie,
125                                 lu_printer_t p, const struct lu_object *o);
126 static struct lu_device *osd_device_free   (const struct lu_env *env,
127                                 struct lu_device *m);
128 static void *osd_key_init      (const struct lu_context *ctx,
129                                 struct lu_context_key *key);
130 static void  osd_key_fini      (const struct lu_context *ctx,
131                                 struct lu_context_key *key, void *data);
132 static void  osd_key_exit      (const struct lu_context *ctx,
133                                 struct lu_context_key *key, void *data);
134 static int   osd_has_index     (const struct osd_object *obj);
135 static void  osd_object_init0  (struct osd_object *obj);
136 static int   osd_device_init   (const struct lu_env *env,
137                                 struct lu_device *d, const char *,
138                                 struct lu_device *);
139 static int   osd_fid_lookup    (const struct lu_env *env,
140                                 struct osd_object *obj,
141                                 const struct lu_fid *fid);
142 static void  osd_inode_getattr (const struct lu_env *env,
143                                 struct inode *inode, struct lu_attr *attr);
144 static void  osd_inode_setattr (const struct lu_env *env,
145                                 struct inode *inode, const struct lu_attr *attr);
146 static int   osd_param_is_sane (const struct osd_device *dev,
147                                 const struct txn_param *param);
148 static int   osd_index_lookup  (const struct lu_env *env,
149                                 struct dt_object *dt,
150                                 struct dt_rec *rec, const struct dt_key *key,
151                                 struct lustre_capa *capa);
152 static int   osd_index_insert  (const struct lu_env *env,
153                                 struct dt_object *dt,
154                                 const struct dt_rec *rec,
155                                 const struct dt_key *key,
156                                 struct thandle *handle,
157                                 struct lustre_capa *capa);
158 static int   osd_index_delete  (const struct lu_env *env,
159                                 struct dt_object *dt, const struct dt_key *key,
160                                 struct thandle *handle,
161                                 struct lustre_capa *capa);
162 static int   osd_index_probe   (const struct lu_env *env,
163                                 struct osd_object *o,
164                                 const struct dt_index_features *feat);
165 static int   osd_index_try     (const struct lu_env *env,
166                                 struct dt_object *dt,
167                                 const struct dt_index_features *feat);
168 static void  osd_index_fini    (struct osd_object *o);
169
170 static void  osd_it_fini       (const struct lu_env *env, struct dt_it *di);
171 static int   osd_it_get        (const struct lu_env *env,
172                                 struct dt_it *di, const struct dt_key *key);
173 static void  osd_it_put        (const struct lu_env *env, struct dt_it *di);
174 static int   osd_it_next       (const struct lu_env *env, struct dt_it *di);
175 static int   osd_it_del        (const struct lu_env *env, struct dt_it *di,
176                                 struct thandle *th);
177 static int   osd_it_key_size   (const struct lu_env *env,
178                                 const struct dt_it *di);
179 static void  osd_conf_get      (const struct lu_env *env,
180                                 const struct dt_device *dev,
181                                 struct dt_device_param *param);
182 static void  osd_trans_stop    (const struct lu_env *env,
183                                 struct thandle *th);
184 static int   osd_object_is_root(const struct osd_object *obj);
185
186 static struct osd_object  *osd_obj          (const struct lu_object *o);
187 static struct osd_device  *osd_dev          (const struct lu_device *d);
188 static struct osd_device  *osd_dt_dev       (const struct dt_device *d);
189 static struct osd_object  *osd_dt_obj       (const struct dt_object *d);
190 static struct osd_device  *osd_obj2dev      (const struct osd_object *o);
191 static struct lu_device   *osd2lu_dev       (struct osd_device *osd);
192 static struct lu_device   *osd_device_fini  (const struct lu_env *env,
193                                              struct lu_device *d);
194 static struct lu_device   *osd_device_alloc (const struct lu_env *env,
195                                              struct lu_device_type *t,
196                                              struct lustre_cfg *cfg);
197 static struct lu_object   *osd_object_alloc (const struct lu_env *env,
198                                              const struct lu_object_header *hdr,
199                                              struct lu_device *d);
200 static struct inode       *osd_iget         (struct osd_thread_info *info,
201                                              struct osd_device *dev,
202                                              const struct osd_inode_id *id);
203 static struct super_block *osd_sb           (const struct osd_device *dev);
204 static struct dt_it       *osd_it_init      (const struct lu_env *env,
205                                              struct dt_object *dt, int wable,
206                                              struct lustre_capa *capa);
207 static struct dt_key      *osd_it_key       (const struct lu_env *env,
208                                              const struct dt_it *di);
209 static struct dt_rec      *osd_it_rec       (const struct lu_env *env,
210                                              const struct dt_it *di);
211 static struct timespec    *osd_inode_time   (const struct lu_env *env,
212                                              struct inode *inode,
213                                              __u64 seconds);
214 static struct thandle     *osd_trans_start  (const struct lu_env *env,
215                                              struct dt_device *d,
216                                              struct txn_param *p);
217 static journal_t          *osd_journal      (const struct osd_device *dev);
218
219 static const struct lu_device_type_operations osd_device_type_ops;
220 static struct lu_device_type            osd_device_type;
221 static const struct lu_object_operations      osd_lu_obj_ops;
222 static struct obd_ops                   osd_obd_device_ops;
223 static const struct lu_device_operations      osd_lu_ops;
224 static struct lu_context_key            osd_key;
225 static const struct dt_object_operations      osd_obj_ops;
226 static const struct dt_body_operations        osd_body_ops;
227 static const struct dt_index_operations       osd_index_ops;
228 static const struct dt_index_operations       osd_index_compat_ops;
229
230 struct osd_thandle {
231         struct thandle          ot_super;
232         handle_t               *ot_handle;
233         struct journal_callback ot_jcb;
234         /* Link to the device, for debugging. */
235         struct lu_ref_link     *ot_dev_link;
236
237 };
238
239 /*
240  * Invariants, assertions.
241  */
242
243 /*
244  * XXX: do not enable this, until invariant checking code is made thread safe
245  * in the face of pdirops locking.
246  */
247 #define OSD_INVARIANT_CHECKS (0)
248
249 #if OSD_INVARIANT_CHECKS
250 static int osd_invariant(const struct osd_object *obj)
251 {
252         return
253                 obj != NULL &&
254                 ergo(obj->oo_inode != NULL,
255                      obj->oo_inode->i_sb == osd_sb(osd_obj2dev(obj)) &&
256                      atomic_read(&obj->oo_inode->i_count) > 0) &&
257                 ergo(obj->oo_dir != NULL &&
258                      obj->oo_dir->od_conationer.ic_object != NULL,
259                      obj->oo_dir->od_conationer.ic_object == obj->oo_inode);
260 }
261 #else
262 #define osd_invariant(obj) (1)
263 #endif
264
265 static inline struct osd_thread_info *osd_oti_get(const struct lu_env *env)
266 {
267         return lu_context_key_get(&env->le_ctx, &osd_key);
268 }
269
270 /*
271  * Concurrency: doesn't matter
272  */
273 static int osd_read_locked(const struct lu_env *env, struct osd_object *o)
274 {
275         return osd_oti_get(env)->oti_r_locks > 0;
276 }
277
278 /*
279  * Concurrency: doesn't matter
280  */
281 static int osd_write_locked(const struct lu_env *env, struct osd_object *o)
282 {
283         struct osd_thread_info *oti = osd_oti_get(env);
284         return oti->oti_w_locks > 0 && o->oo_owner == env;
285 }
286
287 /*
288  * Concurrency: doesn't access mutable data
289  */
290 static int osd_root_get(const struct lu_env *env,
291                         struct dt_device *dev, struct lu_fid *f)
292 {
293         struct inode *inode;
294
295         inode = osd_sb(osd_dt_dev(dev))->s_root->d_inode;
296         lu_igif_build(f, inode->i_ino, inode->i_generation);
297         return 0;
298 }
299
300 /*
301  * OSD object methods.
302  */
303
304 /*
305  * Concurrency: no concurrent access is possible that early in object
306  * life-cycle.
307  */
308 static struct lu_object *osd_object_alloc(const struct lu_env *env,
309                                           const struct lu_object_header *hdr,
310                                           struct lu_device *d)
311 {
312         struct osd_object *mo;
313
314         OBD_ALLOC_PTR(mo);
315         if (mo != NULL) {
316                 struct lu_object *l;
317
318                 l = &mo->oo_dt.do_lu;
319                 dt_object_init(&mo->oo_dt, NULL, d);
320                 mo->oo_dt.do_ops = &osd_obj_ops;
321                 l->lo_ops = &osd_lu_obj_ops;
322                 init_rwsem(&mo->oo_sem);
323                 spin_lock_init(&mo->oo_guard);
324                 return l;
325         } else
326                 return NULL;
327 }
328
329 /*
330  * Concurrency: shouldn't matter.
331  */
332 static void osd_object_init0(struct osd_object *obj)
333 {
334         LASSERT(obj->oo_inode != NULL);
335         obj->oo_dt.do_body_ops = &osd_body_ops;
336         obj->oo_dt.do_lu.lo_header->loh_attr |=
337                 (LOHA_EXISTS | (obj->oo_inode->i_mode & S_IFMT));
338 }
339
340 /*
341  * Concurrency: no concurrent access is possible that early in object
342  * life-cycle.
343  */
344 static int osd_object_init(const struct lu_env *env, struct lu_object *l,
345                            const struct lu_object_conf *_)
346 {
347         struct osd_object *obj = osd_obj(l);
348         int result;
349
350         LINVRNT(osd_invariant(obj));
351
352         result = osd_fid_lookup(env, obj, lu_object_fid(l));
353         if (result == 0) {
354                 if (obj->oo_inode != NULL)
355                         osd_object_init0(obj);
356         }
357         LINVRNT(osd_invariant(obj));
358         return result;
359 }
360
361 /*
362  * Concurrency: no concurrent access is possible that late in object
363  * life-cycle.
364  */
365 static void osd_object_free(const struct lu_env *env, struct lu_object *l)
366 {
367         struct osd_object *obj = osd_obj(l);
368
369         LINVRNT(osd_invariant(obj));
370
371         dt_object_fini(&obj->oo_dt);
372         OBD_FREE_PTR(obj);
373 }
374
375 static struct iam_path_descr *osd_ipd_get(const struct lu_env *env,
376                                           const struct iam_container *bag)
377 {
378         return bag->ic_descr->id_ops->id_ipd_alloc(bag,
379                                                    osd_oti_get(env)->oti_ipd);
380 }
381
382 static void osd_ipd_put(const struct lu_env *env,
383                         const struct iam_container *bag,
384                         struct iam_path_descr *ipd)
385 {
386         bag->ic_descr->id_ops->id_ipd_free(ipd);
387 }
388
389 /*
390  * Concurrency: no concurrent access is possible that late in object
391  * life-cycle.
392  */
393 static void osd_index_fini(struct osd_object *o)
394 {
395         struct iam_container *bag;
396
397         if (o->oo_dir != NULL) {
398                 bag = &o->oo_dir->od_container;
399                 if (o->oo_inode != NULL) {
400                         if (bag->ic_object == o->oo_inode)
401                                 iam_container_fini(bag);
402                 }
403                 OBD_FREE_PTR(o->oo_dir);
404                 o->oo_dir = NULL;
405         }
406 }
407
408 /*
409  * Concurrency: no concurrent access is possible that late in object
410  * life-cycle (for all existing callers, that is. New callers have to provide
411  * their own locking.)
412  */
413 static int osd_inode_unlinked(const struct inode *inode)
414 {
415         return inode->i_nlink == 0;
416 }
417
418 enum {
419         OSD_TXN_OI_DELETE_CREDITS    = 20,
420         OSD_TXN_INODE_DELETE_CREDITS = 20
421 };
422
423 /*
424  * Concurrency: no concurrent access is possible that late in object
425  * life-cycle.
426  */
427 static int osd_inode_remove(const struct lu_env *env, struct osd_object *obj)
428 {
429         const struct lu_fid    *fid = lu_object_fid(&obj->oo_dt.do_lu);
430         struct osd_device      *osd = osd_obj2dev(obj);
431         struct osd_thread_info *oti = osd_oti_get(env);
432         struct txn_param       *prm = &oti->oti_txn;
433         struct thandle         *th;
434         int result;
435
436         txn_param_init(prm, OSD_TXN_OI_DELETE_CREDITS + 
437                             OSD_TXN_INODE_DELETE_CREDITS);
438         th = osd_trans_start(env, &osd->od_dt_dev, prm);
439         if (!IS_ERR(th)) {
440                 result = osd_oi_delete(oti, &osd->od_oi, fid, th);
441                 osd_trans_stop(env, th);
442         } else
443                 result = PTR_ERR(th);
444         return result;
445 }
446
447 /*
448  * Called just before object is freed. Releases all resources except for
449  * object itself (that is released by osd_object_free()).
450  *
451  * Concurrency: no concurrent access is possible that late in object
452  * life-cycle.
453  */
454 static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
455 {
456         struct osd_object *obj   = osd_obj(l);
457         struct inode      *inode = obj->oo_inode;
458
459         LINVRNT(osd_invariant(obj));
460
461         /*
462          * If object is unlinked remove fid->ino mapping from object index.
463          *
464          * File body will be deleted by iput().
465          */
466
467         osd_index_fini(obj);
468         if (inode != NULL) {
469                 int result;
470
471                 if (osd_inode_unlinked(inode)) {
472                         result = osd_inode_remove(env, obj);
473                         if (result != 0)
474                                 LU_OBJECT_DEBUG(D_ERROR, env, l,
475                                                 "Failed to cleanup: %d\n",
476                                                 result);
477                 }
478                 iput(inode);
479                 obj->oo_inode = NULL;
480         }
481 }
482
483 /*
484  * Concurrency: ->loo_object_release() is called under site spin-lock.
485  */
486 static void osd_object_release(const struct lu_env *env,
487                                struct lu_object *l)
488 {
489         struct osd_object *o = osd_obj(l);
490
491         LASSERT(!lu_object_is_dying(l->lo_header));
492         if (o->oo_inode != NULL && osd_inode_unlinked(o->oo_inode))
493                 set_bit(LU_OBJECT_HEARD_BANSHEE, &l->lo_header->loh_flags);
494 }
495
496 /*
497  * Concurrency: shouldn't matter.
498  */
499 static int osd_object_print(const struct lu_env *env, void *cookie,
500                             lu_printer_t p, const struct lu_object *l)
501 {
502         struct osd_object *o = osd_obj(l);
503         struct iam_descr  *d;
504
505         if (o->oo_dir != NULL)
506                 d = o->oo_dir->od_container.ic_descr;
507         else
508                 d = NULL;
509         return (*p)(env, cookie, LUSTRE_OSD_NAME"-object@%p(i:%p:%lu/%u)[%s]",
510                     o, o->oo_inode,
511                     o->oo_inode ? o->oo_inode->i_ino : 0UL,
512                     o->oo_inode ? o->oo_inode->i_generation : 0,
513                     d ? d->id_ops->id_name : "plain");
514 }
515
516 /*
517  * Concurrency: shouldn't matter.
518  */
519 int osd_statfs(const struct lu_env *env, struct dt_device *d,
520                struct kstatfs *sfs)
521 {
522         struct osd_device *osd = osd_dt_dev(d);
523         struct super_block *sb = osd_sb(osd);
524         int result = 0;
525
526         spin_lock(&osd->od_osfs_lock);
527         /* cache 1 second */
528         if (cfs_time_before_64(osd->od_osfs_age, cfs_time_shift_64(-1))) {
529                 result = ll_do_statfs(sb, &osd->od_kstatfs);
530                 if (likely(result == 0)) /* N.B. statfs can't really fail */
531                         osd->od_osfs_age = cfs_time_current_64();
532         }
533
534         if (likely(result == 0))
535                 *sfs = osd->od_kstatfs; 
536         spin_unlock(&osd->od_osfs_lock);
537
538         return result;
539 }
540
541 /*
542  * Concurrency: doesn't access mutable data.
543  */
544 static void osd_conf_get(const struct lu_env *env,
545                          const struct dt_device *dev,
546                          struct dt_device_param *param)
547 {
548         /*
549          * XXX should be taken from not-yet-existing fs abstraction layer.
550          */
551         param->ddp_max_name_len  = LDISKFS_NAME_LEN;
552         param->ddp_max_nlink     = LDISKFS_LINK_MAX;
553         param->ddp_block_shift   = osd_sb(osd_dt_dev(dev))->s_blocksize_bits;
554 }
555
556 /*
557  * Journal
558  */
559
560 /*
561  * Concurrency: doesn't access mutable data.
562  */
563 static int osd_param_is_sane(const struct osd_device *dev,
564                              const struct txn_param *param)
565 {
566         return param->tp_credits <= osd_journal(dev)->j_max_transaction_buffers;
567 }
568
569 /*
570  * Concurrency: shouldn't matter.
571  */
572 static void osd_trans_commit_cb(struct journal_callback *jcb, int error)
573 {
574         struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb);
575         struct thandle     *th = &oh->ot_super;
576         struct dt_device   *dev = th->th_dev;
577         struct lu_device   *lud = &dev->dd_lu_dev;
578
579         LASSERT(dev != NULL);
580         LASSERT(oh->ot_handle == NULL);
581
582         if (error) {
583                 CERROR("transaction @0x%p commit error: %d\n", th, error);
584         } else {
585                 struct lu_env *env = &osd_dt_dev(dev)->od_env_for_commit;
586                 /*
587                  * This od_env_for_commit is only for commit usage.  see
588                  * "struct dt_device"
589                  */
590                 lu_context_enter(&env->le_ctx);
591                 dt_txn_hook_commit(env, th);
592                 lu_context_exit(&env->le_ctx);
593         }
594
595         lu_ref_del_at(&lud->ld_reference, oh->ot_dev_link, "osd-tx", th);
596         lu_device_put(lud);
597         th->th_dev = NULL;
598
599         lu_context_exit(&th->th_ctx);
600         lu_context_fini(&th->th_ctx);
601         OBD_FREE_PTR(oh);
602 }
603
604 /*
605  * Concurrency: shouldn't matter.
606  */
607 static struct thandle *osd_trans_start(const struct lu_env *env,
608                                        struct dt_device *d,
609                                        struct txn_param *p)
610 {
611         struct osd_device  *dev = osd_dt_dev(d);
612         handle_t           *jh;
613         struct osd_thandle *oh;
614         struct thandle     *th;
615         int hook_res;
616
617         ENTRY;
618
619         hook_res = dt_txn_hook_start(env, d, p);
620         if (hook_res != 0)
621                 RETURN(ERR_PTR(hook_res));
622
623         if (osd_param_is_sane(dev, p)) {
624                 OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO);
625                 if (oh != NULL) {
626                         struct osd_thread_info *oti = osd_oti_get(env);
627
628                         /*
629                          * XXX temporary stuff. Some abstraction layer should
630                          * be used.
631                          */
632
633                         jh = journal_start(osd_journal(dev), p->tp_credits);
634                         if (!IS_ERR(jh)) {
635                                 oh->ot_handle = jh;
636                                 th = &oh->ot_super;
637                                 th->th_dev = d;
638                                 th->th_result = 0;
639                                 jh->h_sync = p->tp_sync;
640                                 lu_device_get(&d->dd_lu_dev);
641                                 oh->ot_dev_link = lu_ref_add
642                                         (&d->dd_lu_dev.ld_reference,
643                                          "osd-tx", th);
644                                 /* add commit callback */
645                                 lu_context_init(&th->th_ctx, LCT_TX_HANDLE);
646                                 lu_context_enter(&th->th_ctx);
647                                 journal_callback_set(jh, osd_trans_commit_cb,
648                                                      (struct journal_callback *)&oh->ot_jcb);
649                                         LASSERT(oti->oti_txns == 0);
650                                         LASSERT(oti->oti_r_locks == 0);
651                                         LASSERT(oti->oti_w_locks == 0);
652                                         oti->oti_txns++;
653                         } else {
654                                 OBD_FREE_PTR(oh);
655                                 th = (void *)jh;
656                         }
657                 } else
658                         th = ERR_PTR(-ENOMEM);
659         } else {
660                 CERROR("Invalid transaction parameters\n");
661                 th = ERR_PTR(-EINVAL);
662         }
663
664         RETURN(th);
665 }
666
667 /*
668  * Concurrency: shouldn't matter.
669  */
670 static void osd_trans_stop(const struct lu_env *env, struct thandle *th)
671 {
672         int result;
673         struct osd_thandle *oh;
674         struct osd_thread_info *oti = osd_oti_get(env);
675
676         ENTRY;
677
678         oh = container_of0(th, struct osd_thandle, ot_super);
679         if (oh->ot_handle != NULL) {
680                 handle_t *hdl = oh->ot_handle;
681
682                 LASSERT(oti->oti_txns == 1);
683                 oti->oti_txns--;
684                 LASSERT(oti->oti_r_locks == 0);
685                 LASSERT(oti->oti_w_locks == 0);
686                 result = dt_txn_hook_stop(env, th);
687                 if (result != 0)
688                         CERROR("Failure in transaction hook: %d\n", result);
689                 oh->ot_handle = NULL;
690                 result = journal_stop(hdl);
691                 if (result != 0)
692                         CERROR("Failure to stop transaction: %d\n", result);
693         }
694         EXIT;
695 }
696
697 /*
698  * Concurrency: shouldn't matter.
699  */
700 static int osd_sync(const struct lu_env *env, struct dt_device *d)
701 {
702         CDEBUG(D_HA, "syncing OSD %s\n", LUSTRE_OSD_NAME);
703         return ldiskfs_force_commit(osd_sb(osd_dt_dev(d)));
704 }
705
706 /**
707  * Start commit for OSD device.
708  *
709  * An implementation of dt_commit_async method for OSD device.
710  * Asychronously starts underlayng fs sync and thereby a transaction
711  * commit.
712  *
713  * \param env environment
714  * \param d dt device
715  *
716  * \see dt_device_operations
717  */
718 static int osd_commit_async(const struct lu_env *env,
719                             struct dt_device *d)
720 {
721         struct super_block *s = osd_sb(osd_dt_dev(d));
722         ENTRY;
723
724         CDEBUG(D_HA, "async commit OSD %s\n", LUSTRE_OSD_NAME);
725         RETURN(s->s_op->sync_fs(s, 0));
726 }
727
728 /*
729  * Concurrency: shouldn't matter.
730  */
731 lvfs_sbdev_type fsfilt_ldiskfs_journal_sbdev(struct super_block *);
732
733 static void osd_ro(const struct lu_env *env, struct dt_device *d)
734 {
735         ENTRY;
736
737         CERROR("*** setting device %s read-only ***\n", LUSTRE_OSD_NAME);
738
739         __lvfs_set_rdonly(lvfs_sbdev(osd_sb(osd_dt_dev(d))),
740                           fsfilt_ldiskfs_journal_sbdev(osd_sb(osd_dt_dev(d))));
741         EXIT;
742 }
743
744 /*
745  * Concurrency: serialization provided by callers.
746  */
747 static int osd_init_capa_ctxt(const struct lu_env *env, struct dt_device *d,
748                               int mode, unsigned long timeout, __u32 alg,
749                               struct lustre_capa_key *keys)
750 {
751         struct osd_device *dev = osd_dt_dev(d);
752         ENTRY;
753
754         dev->od_fl_capa = mode;
755         dev->od_capa_timeout = timeout;
756         dev->od_capa_alg = alg;
757         dev->od_capa_keys = keys;
758         RETURN(0);
759 }
760
761 /* Note: we did not count into QUOTA here, If we mount with --data_journal
762  * we may need more*/
763 static const int osd_dto_credits[DTO_NR] = {
764         /*
765          * Insert/Delete. IAM EXT3_INDEX_EXTRA_TRANS_BLOCKS(8) +
766          * EXT3_SINGLEDATA_TRANS_BLOCKS 8 XXX Note: maybe iam need more,since
767          * iam have more level than Ext3 htree
768          */
769         [DTO_INDEX_INSERT]  = 16,
770         [DTO_INDEX_DELETE]  = 16,
771         [DTO_IDNEX_UPDATE]  = 16,
772         /*
773          * Create a object. Same as create object in Ext3 filesystem, but did
774          * not count QUOTA i EXT3_DATA_TRANS_BLOCKS(12) +
775          * INDEX_EXTRA_BLOCKS(8) + 3(inode bits,groups, GDT)
776          */
777         [DTO_OBJECT_CREATE] = 23,
778         [DTO_OBJECT_DELETE] = 23,
779         /*
780          * Attr set credits 3 inode, group, GDT
781          */
782         [DTO_ATTR_SET]      = 3,
783         /*
784          * XATTR_SET. SAME AS XATTR of EXT3 EXT3_DATA_TRANS_BLOCKS XXX Note:
785          * in original MDS implmentation EXT3_INDEX_EXTRA_TRANS_BLOCKS are
786          * also counted in. Do not know why?
787          */
788         [DTO_XATTR_SET]     = 16,
789         [DTO_LOG_REC]       = 16,
790         /* creadits for inode change during write */
791         [DTO_WRITE_BASE]    = 3,
792         /* credits for single block write */
793         [DTO_WRITE_BLOCK]   = 12 
794 };
795
796 static int osd_credit_get(const struct lu_env *env, struct dt_device *d,
797                           enum dt_txn_op op)
798 {
799         LASSERT(0 <= op && op < ARRAY_SIZE(osd_dto_credits));
800         return osd_dto_credits[op];
801 }
802
803 static const struct dt_device_operations osd_dt_ops = {
804         .dt_root_get       = osd_root_get,
805         .dt_statfs         = osd_statfs,
806         .dt_trans_start    = osd_trans_start,
807         .dt_trans_stop     = osd_trans_stop,
808         .dt_conf_get       = osd_conf_get,
809         .dt_sync           = osd_sync,
810         .dt_ro             = osd_ro,
811         .dt_commit_async   = osd_commit_async,
812         .dt_credit_get     = osd_credit_get,
813         .dt_init_capa_ctxt = osd_init_capa_ctxt,
814 };
815
816 static void osd_object_read_lock(const struct lu_env *env,
817                                  struct dt_object *dt, unsigned role)
818 {
819         struct osd_object *obj = osd_dt_obj(dt);
820         struct osd_thread_info *oti = osd_oti_get(env);
821
822         LINVRNT(osd_invariant(obj));
823
824         LASSERT(obj->oo_owner != env);
825         down_read_nested(&obj->oo_sem, role);
826
827                 LASSERT(obj->oo_owner == NULL);
828                 oti->oti_r_locks++;
829 }
830
831 static void osd_object_write_lock(const struct lu_env *env,
832                                   struct dt_object *dt, unsigned role)
833 {
834         struct osd_object *obj = osd_dt_obj(dt);
835         struct osd_thread_info *oti = osd_oti_get(env);
836
837         LINVRNT(osd_invariant(obj));
838
839         LASSERT(obj->oo_owner != env);
840         down_write_nested(&obj->oo_sem, role);
841
842                 LASSERT(obj->oo_owner == NULL);
843                 obj->oo_owner = env;
844                 oti->oti_w_locks++;
845 }
846
847 static void osd_object_read_unlock(const struct lu_env *env,
848                                    struct dt_object *dt)
849 {
850         struct osd_object *obj = osd_dt_obj(dt);
851                 struct osd_thread_info *oti = osd_oti_get(env);
852
853         LINVRNT(osd_invariant(obj));
854
855                 LASSERT(oti->oti_r_locks > 0);
856                 oti->oti_r_locks--;
857         up_read(&obj->oo_sem);
858 }
859
860 static void osd_object_write_unlock(const struct lu_env *env,
861                                     struct dt_object *dt)
862 {
863         struct osd_object *obj = osd_dt_obj(dt);
864                 struct osd_thread_info *oti = osd_oti_get(env);
865
866         LINVRNT(osd_invariant(obj));
867
868                 LASSERT(obj->oo_owner == env);
869                 LASSERT(oti->oti_w_locks > 0);
870                 oti->oti_w_locks--;
871                 obj->oo_owner = NULL;
872         up_write(&obj->oo_sem);
873 }
874
875 static int capa_is_sane(const struct lu_env *env,
876                         struct osd_device *dev,
877                         struct lustre_capa *capa,
878                         struct lustre_capa_key *keys)
879 {
880         struct osd_thread_info *oti = osd_oti_get(env);
881         struct obd_capa *oc;
882         int i, rc = 0;
883         ENTRY;
884
885         oc = capa_lookup(dev->od_capa_hash, capa, 0);
886         if (oc) {
887                 if (capa_is_expired(oc)) {
888                         DEBUG_CAPA(D_ERROR, capa, "expired");
889                         rc = -ESTALE;
890                 }
891                 capa_put(oc);
892                 RETURN(rc);
893         }
894
895         spin_lock(&capa_lock);
896         for (i = 0; i < 2; i++) {
897                 if (keys[i].lk_keyid == capa->lc_keyid) {
898                         oti->oti_capa_key = keys[i];
899                         break;
900                 }
901         }
902         spin_unlock(&capa_lock);
903
904         if (i == 2) {
905                 DEBUG_CAPA(D_ERROR, capa, "no matched capa key");
906                 RETURN(-ESTALE);
907         }
908
909         rc = capa_hmac(oti->oti_capa.lc_hmac, capa, oti->oti_capa_key.lk_key);
910         if (rc)
911                 RETURN(rc);
912         if (memcmp(oti->oti_capa.lc_hmac, capa->lc_hmac, sizeof(capa->lc_hmac)))
913         {
914                 DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch");
915                 RETURN(-EACCES);
916         }
917
918         oc = capa_add(dev->od_capa_hash, capa);
919         capa_put(oc);
920
921         RETURN(0);
922 }
923
924 static int osd_object_auth(const struct lu_env *env, struct dt_object *dt,
925                            struct lustre_capa *capa, __u64 opc)
926 {
927         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
928         struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
929         int rc;
930
931         if (!dev->od_fl_capa)
932                 return 0;
933
934         if (capa == BYPASS_CAPA)
935                 return 0;
936
937         if (!capa) {
938                 CERROR("no capability is provided for fid "DFID"\n", PFID(fid));
939                 return -EACCES;
940         }
941
942         if (!lu_fid_eq(fid, &capa->lc_fid)) {
943                 DEBUG_CAPA(D_ERROR, capa, "fid "DFID" mismatch with",
944                            PFID(fid));
945                 return -EACCES;
946         }
947
948         if (!capa_opc_supported(capa, opc)) {
949                 DEBUG_CAPA(D_ERROR, capa, "opc "LPX64" not supported by", opc);
950                 return -EACCES;
951         }
952
953         if ((rc = capa_is_sane(env, dev, capa, dev->od_capa_keys))) {
954                 DEBUG_CAPA(D_ERROR, capa, "insane (rc %d)", rc);
955                 return -EACCES;
956         }
957
958         return 0;
959 }
960
961 static int osd_attr_get(const struct lu_env *env,
962                         struct dt_object *dt,
963                         struct lu_attr *attr,
964                         struct lustre_capa *capa)
965 {
966         struct osd_object *obj = osd_dt_obj(dt);
967
968         LASSERT(dt_object_exists(dt));
969         LINVRNT(osd_invariant(obj));
970
971         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
972                 return -EACCES;
973
974         spin_lock(&obj->oo_guard);
975         osd_inode_getattr(env, obj->oo_inode, attr);
976         spin_unlock(&obj->oo_guard);
977         return 0;
978 }
979
980 static int osd_attr_set(const struct lu_env *env,
981                         struct dt_object *dt,
982                         const struct lu_attr *attr,
983                         struct thandle *handle,
984                         struct lustre_capa *capa)
985 {
986         struct osd_object *obj = osd_dt_obj(dt);
987
988         LASSERT(handle != NULL);
989         LASSERT(dt_object_exists(dt));
990         LASSERT(osd_invariant(obj));
991
992         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
993                 return -EACCES;
994
995         spin_lock(&obj->oo_guard);
996         osd_inode_setattr(env, obj->oo_inode, attr);
997         spin_unlock(&obj->oo_guard);
998
999         mark_inode_dirty(obj->oo_inode);
1000         return 0;
1001 }
1002
1003 static struct timespec *osd_inode_time(const struct lu_env *env,
1004                                        struct inode *inode, __u64 seconds)
1005 {
1006         struct osd_thread_info *oti = osd_oti_get(env);
1007         struct timespec        *t   = &oti->oti_time;
1008
1009         t->tv_sec  = seconds;
1010         t->tv_nsec = 0;
1011         *t = timespec_trunc(*t, get_sb_time_gran(inode->i_sb));
1012         return t;
1013 }
1014
1015 static void osd_inode_setattr(const struct lu_env *env,
1016                               struct inode *inode, const struct lu_attr *attr)
1017 {
1018         __u64 bits;
1019
1020         bits = attr->la_valid;
1021
1022         LASSERT(!(bits & LA_TYPE)); /* Huh? You want too much. */
1023
1024         if (bits & LA_ATIME)
1025                 inode->i_atime  = *osd_inode_time(env, inode, attr->la_atime);
1026         if (bits & LA_CTIME)
1027                 inode->i_ctime  = *osd_inode_time(env, inode, attr->la_ctime);
1028         if (bits & LA_MTIME)
1029                 inode->i_mtime  = *osd_inode_time(env, inode, attr->la_mtime);
1030         if (bits & LA_SIZE) {
1031                 LDISKFS_I(inode)->i_disksize = attr->la_size;
1032                 i_size_write(inode, attr->la_size);
1033         }
1034         if (bits & LA_BLOCKS)
1035                 inode->i_blocks = attr->la_blocks;
1036         if (bits & LA_MODE)
1037                 inode->i_mode   = (inode->i_mode & S_IFMT) |
1038                         (attr->la_mode & ~S_IFMT);
1039         if (bits & LA_UID)
1040                 inode->i_uid    = attr->la_uid;
1041         if (bits & LA_GID)
1042                 inode->i_gid    = attr->la_gid;
1043         if (bits & LA_NLINK)
1044                 inode->i_nlink  = attr->la_nlink;
1045         if (bits & LA_RDEV)
1046                 inode->i_rdev   = attr->la_rdev;
1047
1048         if (bits & LA_FLAGS) {
1049                 struct ldiskfs_inode_info *li = LDISKFS_I(inode);
1050
1051                 li->i_flags = (li->i_flags & ~LDISKFS_FL_USER_MODIFIABLE) |
1052                         (attr->la_flags & LDISKFS_FL_USER_MODIFIABLE);
1053         }
1054 }
1055
1056 /*
1057  * Object creation.
1058  *
1059  * XXX temporary solution.
1060  */
1061
1062 static int osd_create_pre(struct osd_thread_info *info, struct osd_object *obj,
1063                           struct lu_attr *attr, struct thandle *th)
1064 {
1065         return 0;
1066 }
1067
1068 static int osd_create_post(struct osd_thread_info *info, struct osd_object *obj,
1069                            struct lu_attr *attr, struct thandle *th)
1070 {
1071         LASSERT(obj->oo_inode != NULL);
1072
1073         osd_object_init0(obj);
1074         return 0;
1075 }
1076
1077 extern struct inode *ldiskfs_create_inode(handle_t *handle,
1078                                           struct inode * dir, int mode);
1079
1080 static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
1081                       umode_t mode,
1082                       struct dt_allocation_hint *hint,
1083                       struct thandle *th)
1084 {
1085         int result;
1086         struct osd_device  *osd = osd_obj2dev(obj);
1087         struct osd_thandle *oth;
1088         struct inode       *parent;
1089         struct inode       *inode;
1090
1091         LINVRNT(osd_invariant(obj));
1092         LASSERT(obj->oo_inode == NULL);
1093         LASSERT(osd->od_obj_area != NULL);
1094
1095         oth = container_of(th, struct osd_thandle, ot_super);
1096         LASSERT(oth->ot_handle->h_transaction != NULL);
1097
1098         if (hint && hint->dah_parent)
1099                 parent = osd_dt_obj(hint->dah_parent)->oo_inode;
1100         else
1101                 parent = osd->od_obj_area->d_inode;
1102         LASSERT(parent->i_op != NULL);
1103
1104         inode = ldiskfs_create_inode(oth->ot_handle, parent, mode);
1105         if (!IS_ERR(inode)) {
1106                 obj->oo_inode = inode;
1107                 result = 0;
1108         } else
1109                 result = PTR_ERR(inode);
1110         LINVRNT(osd_invariant(obj));
1111         return result;
1112 }
1113
1114
1115 extern int iam_lvar_create(struct inode *obj, int keysize, int ptrsize,
1116                            int recsize, handle_t *handle);
1117
1118 enum {
1119         OSD_NAME_LEN = 255
1120 };
1121
1122 static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj,
1123                      struct lu_attr *attr,
1124                      struct dt_allocation_hint *hint,
1125                      struct thandle *th)
1126 {
1127         int result;
1128         struct osd_thandle *oth;
1129
1130         LASSERT(S_ISDIR(attr->la_mode));
1131
1132         oth = container_of(th, struct osd_thandle, ot_super);
1133         LASSERT(oth->ot_handle->h_transaction != NULL);
1134         result = osd_mkfile(info, obj, (attr->la_mode &
1135                             (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1136         if (result == 0) {
1137                 LASSERT(obj->oo_inode != NULL);
1138                 /*
1139                  * XXX uh-oh... call low-level iam function directly.
1140                  */
1141                 result = iam_lvar_create(obj->oo_inode, OSD_NAME_LEN, 4,
1142                                          sizeof (struct lu_fid_pack),
1143                                          oth->ot_handle);
1144         }
1145         return result;
1146 }
1147
1148 static int osd_mkreg(struct osd_thread_info *info, struct osd_object *obj,
1149                      struct lu_attr *attr,
1150                      struct dt_allocation_hint *hint,
1151                      struct thandle *th)
1152 {
1153         LASSERT(S_ISREG(attr->la_mode));
1154         return osd_mkfile(info, obj, (attr->la_mode &
1155                                (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1156 }
1157
1158 static int osd_mksym(struct osd_thread_info *info, struct osd_object *obj,
1159                      struct lu_attr *attr,
1160                      struct dt_allocation_hint *hint,
1161                      struct thandle *th)
1162 {
1163         LASSERT(S_ISLNK(attr->la_mode));
1164         return osd_mkfile(info, obj, (attr->la_mode &
1165                               (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1166 }
1167
1168 static int osd_mknod(struct osd_thread_info *info, struct osd_object *obj,
1169                      struct lu_attr *attr,
1170                      struct dt_allocation_hint *hint,
1171                      struct thandle *th)
1172 {
1173         int result;
1174         struct osd_device *osd = osd_obj2dev(obj);
1175         struct inode      *dir;
1176         umode_t mode = attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX);
1177
1178         LINVRNT(osd_invariant(obj));
1179         LASSERT(obj->oo_inode == NULL);
1180         LASSERT(osd->od_obj_area != NULL);
1181         LASSERT(S_ISCHR(mode) || S_ISBLK(mode) ||
1182                 S_ISFIFO(mode) || S_ISSOCK(mode));
1183
1184         dir = osd->od_obj_area->d_inode;
1185         LASSERT(dir->i_op != NULL);
1186
1187         result = osd_mkfile(info, obj, mode, hint, th);
1188         if (result == 0) {
1189                 LASSERT(obj->oo_inode != NULL);
1190                 init_special_inode(obj->oo_inode, mode, attr->la_rdev);
1191         }
1192         LINVRNT(osd_invariant(obj));
1193         return result;
1194 }
1195
1196 typedef int (*osd_obj_type_f)(struct osd_thread_info *, struct osd_object *,
1197                               struct lu_attr *,
1198                               struct dt_allocation_hint *hint,
1199                               struct thandle *);
1200
1201 static osd_obj_type_f osd_create_type_f(__u32 mode)
1202 {
1203         osd_obj_type_f result;
1204
1205         switch (mode) {
1206         case S_IFDIR:
1207                 result = osd_mkdir;
1208                 break;
1209         case S_IFREG:
1210                 result = osd_mkreg;
1211                 break;
1212         case S_IFLNK:
1213                 result = osd_mksym;
1214                 break;
1215         case S_IFCHR:
1216         case S_IFBLK:
1217         case S_IFIFO:
1218         case S_IFSOCK:
1219                 result = osd_mknod;
1220                 break;
1221         default:
1222                 LBUG();
1223                 break;
1224         }
1225         return result;
1226 }
1227
1228
1229 static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah,
1230                         struct dt_object *parent, umode_t child_mode)
1231 {
1232         LASSERT(ah);
1233
1234         memset(ah, 0, sizeof(*ah));
1235         ah->dah_parent = parent;
1236         ah->dah_mode = child_mode;
1237 }
1238
1239
1240 /*
1241  * Concurrency: @dt is write locked.
1242  */
1243 static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
1244                              struct lu_attr *attr, 
1245                              struct dt_allocation_hint *hint,
1246                              struct thandle *th)
1247 {
1248         const struct lu_fid    *fid  = lu_object_fid(&dt->do_lu);
1249         struct osd_object      *obj  = osd_dt_obj(dt);
1250         struct osd_device      *osd  = osd_obj2dev(obj);
1251         struct osd_thread_info *info = osd_oti_get(env);
1252         int result;
1253
1254         ENTRY;
1255
1256         LINVRNT(osd_invariant(obj));
1257         LASSERT(!dt_object_exists(dt));
1258         LASSERT(osd_write_locked(env, obj));
1259         LASSERT(th != NULL);
1260
1261         /*
1262          * XXX missing: Quote handling.
1263          */
1264
1265         result = osd_create_pre(info, obj, attr, th);
1266         if (result == 0) {
1267                 result = osd_create_type_f(attr->la_mode & S_IFMT)(info, obj,
1268                                                                 attr, hint, th);
1269                 if (result == 0)
1270                         result = osd_create_post(info, obj, attr, th);
1271         }
1272         if (result == 0) {
1273                 struct osd_inode_id *id = &info->oti_id;
1274
1275                 LASSERT(obj->oo_inode != NULL);
1276
1277                 id->oii_ino = obj->oo_inode->i_ino;
1278                 id->oii_gen = obj->oo_inode->i_generation;
1279
1280                 result = osd_oi_insert(info, &osd->od_oi, fid, id, th);
1281         }
1282
1283         LASSERT(ergo(result == 0, dt_object_exists(dt)));
1284         LINVRNT(osd_invariant(obj));
1285         RETURN(result);
1286 }
1287
1288 /*
1289  * Concurrency: @dt is write locked.
1290  */
1291 static void osd_object_ref_add(const struct lu_env *env,
1292                                struct dt_object *dt,
1293                                struct thandle *th)
1294 {
1295         struct osd_object *obj = osd_dt_obj(dt);
1296         struct inode *inode = obj->oo_inode;
1297
1298         LINVRNT(osd_invariant(obj));
1299         LASSERT(dt_object_exists(dt));
1300         LASSERT(osd_write_locked(env, obj));
1301         LASSERT(th != NULL);
1302
1303         spin_lock(&obj->oo_guard);
1304         LASSERT(inode->i_nlink < LDISKFS_LINK_MAX);
1305         inode->i_nlink++;
1306         spin_unlock(&obj->oo_guard);
1307         mark_inode_dirty(inode);
1308         LINVRNT(osd_invariant(obj));
1309 }
1310
1311 /*
1312  * Concurrency: @dt is write locked.
1313  */
1314 static void osd_object_ref_del(const struct lu_env *env,
1315                                struct dt_object *dt,
1316                                struct thandle *th)
1317 {
1318         struct osd_object *obj = osd_dt_obj(dt);
1319         struct inode *inode = obj->oo_inode;
1320
1321         LINVRNT(osd_invariant(obj));
1322         LASSERT(dt_object_exists(dt));
1323         LASSERT(osd_write_locked(env, obj));
1324         LASSERT(th != NULL);
1325
1326         spin_lock(&obj->oo_guard);
1327         LASSERT(inode->i_nlink > 0);
1328         inode->i_nlink--;
1329         spin_unlock(&obj->oo_guard);
1330         mark_inode_dirty(inode);
1331         LINVRNT(osd_invariant(obj));
1332 }
1333
1334 /*
1335  * Concurrency: @dt is read locked.
1336  */
1337 static int osd_xattr_get(const struct lu_env *env,
1338                          struct dt_object *dt,
1339                          struct lu_buf *buf,
1340                          const char *name,
1341                          struct lustre_capa *capa)
1342 {
1343         struct osd_object      *obj    = osd_dt_obj(dt);
1344         struct inode           *inode  = obj->oo_inode;
1345         struct osd_thread_info *info   = osd_oti_get(env);
1346         struct dentry          *dentry = &info->oti_dentry;
1347
1348         LASSERT(dt_object_exists(dt));
1349         LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
1350         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
1351
1352         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1353                 return -EACCES;
1354
1355         dentry->d_inode = inode;
1356         return inode->i_op->getxattr(dentry, name, buf->lb_buf, buf->lb_len);
1357 }
1358
1359 /*
1360  * Concurrency: @dt is write locked.
1361  */
1362 static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
1363                          const struct lu_buf *buf, const char *name, int fl,
1364                          struct thandle *handle, struct lustre_capa *capa)
1365 {
1366         struct osd_object      *obj    = osd_dt_obj(dt);
1367         struct inode           *inode  = obj->oo_inode;
1368         struct osd_thread_info *info   = osd_oti_get(env);
1369         struct dentry          *dentry = &info->oti_dentry;
1370         struct timespec        *t      = &info->oti_time;
1371         int                     fs_flags = 0, rc;
1372
1373         LASSERT(dt_object_exists(dt));
1374         LASSERT(inode->i_op != NULL && inode->i_op->setxattr != NULL);
1375         LASSERT(osd_write_locked(env, obj));
1376         LASSERT(handle != NULL);
1377
1378         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1379                 return -EACCES;
1380
1381         if (fl & LU_XATTR_REPLACE)
1382                 fs_flags |= XATTR_REPLACE;
1383
1384         if (fl & LU_XATTR_CREATE)
1385                 fs_flags |= XATTR_CREATE;
1386
1387         dentry->d_inode = inode;
1388         *t = inode->i_ctime;
1389         rc = inode->i_op->setxattr(dentry, name,
1390                                    buf->lb_buf, buf->lb_len, fs_flags);
1391         if (likely(rc == 0)) {
1392                 /* ctime should not be updated with server-side time. */
1393                 spin_lock(&obj->oo_guard);
1394                 inode->i_ctime = *t;
1395                 spin_unlock(&obj->oo_guard);
1396                 mark_inode_dirty(inode);
1397         }
1398         return rc;
1399 }
1400
1401 /*
1402  * Concurrency: @dt is read locked.
1403  */
1404 static int osd_xattr_list(const struct lu_env *env,
1405                           struct dt_object *dt,
1406                           struct lu_buf *buf,
1407                           struct lustre_capa *capa)
1408 {
1409         struct osd_object      *obj    = osd_dt_obj(dt);
1410         struct inode           *inode  = obj->oo_inode;
1411         struct osd_thread_info *info   = osd_oti_get(env);
1412         struct dentry          *dentry = &info->oti_dentry;
1413
1414         LASSERT(dt_object_exists(dt));
1415         LASSERT(inode->i_op != NULL && inode->i_op->listxattr != NULL);
1416         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
1417
1418         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1419                 return -EACCES;
1420
1421         dentry->d_inode = inode;
1422         return inode->i_op->listxattr(dentry, buf->lb_buf, buf->lb_len);
1423 }
1424
1425 /*
1426  * Concurrency: @dt is write locked.
1427  */
1428 static int osd_xattr_del(const struct lu_env *env,
1429                          struct dt_object *dt,
1430                          const char *name,
1431                          struct thandle *handle,
1432                          struct lustre_capa *capa)
1433 {
1434         struct osd_object      *obj    = osd_dt_obj(dt);
1435         struct inode           *inode  = obj->oo_inode;
1436         struct osd_thread_info *info   = osd_oti_get(env);
1437         struct dentry          *dentry = &info->oti_dentry;
1438         struct timespec        *t      = &info->oti_time;
1439         int                     rc;
1440
1441         LASSERT(dt_object_exists(dt));
1442         LASSERT(inode->i_op != NULL && inode->i_op->removexattr != NULL);
1443         LASSERT(osd_write_locked(env, obj));
1444         LASSERT(handle != NULL);
1445
1446         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1447                 return -EACCES;
1448
1449         dentry->d_inode = inode;
1450         *t = inode->i_ctime;
1451         rc = inode->i_op->removexattr(dentry, name);
1452         if (likely(rc == 0)) {
1453                 /* ctime should not be updated with server-side time. */
1454                 spin_lock(&obj->oo_guard);
1455                 inode->i_ctime = *t;
1456                 spin_unlock(&obj->oo_guard);
1457                 mark_inode_dirty(inode);
1458         }
1459         return rc;
1460 }
1461
1462 static struct obd_capa *osd_capa_get(const struct lu_env *env,
1463                                      struct dt_object *dt,
1464                                      struct lustre_capa *old,
1465                                      __u64 opc)
1466 {
1467         struct osd_thread_info *info = osd_oti_get(env);
1468         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
1469         struct osd_object *obj = osd_dt_obj(dt);
1470         struct osd_device *dev = osd_obj2dev(obj);
1471         struct lustre_capa_key *key = &info->oti_capa_key;
1472         struct lustre_capa *capa = &info->oti_capa;
1473         struct obd_capa *oc;
1474         int rc;
1475         ENTRY;
1476
1477         if (!dev->od_fl_capa)
1478                 RETURN(ERR_PTR(-ENOENT));
1479
1480         LASSERT(dt_object_exists(dt));
1481         LINVRNT(osd_invariant(obj));
1482
1483         /* renewal sanity check */
1484         if (old && osd_object_auth(env, dt, old, opc))
1485                 RETURN(ERR_PTR(-EACCES));
1486
1487         capa->lc_fid = *fid;
1488         capa->lc_opc = opc;
1489         capa->lc_uid = 0;
1490         capa->lc_flags = dev->od_capa_alg << 24;
1491         capa->lc_timeout = dev->od_capa_timeout;
1492         capa->lc_expiry = 0;
1493
1494         oc = capa_lookup(dev->od_capa_hash, capa, 1);
1495         if (oc) {
1496                 LASSERT(!capa_is_expired(oc));
1497                 RETURN(oc);
1498         }
1499
1500         spin_lock(&capa_lock);
1501         *key = dev->od_capa_keys[1];
1502         spin_unlock(&capa_lock);
1503
1504         capa->lc_keyid = key->lk_keyid;
1505         capa->lc_expiry = cfs_time_current_sec() + dev->od_capa_timeout;
1506
1507         rc = capa_hmac(capa->lc_hmac, capa, key->lk_key);
1508         if (rc) {
1509                 DEBUG_CAPA(D_ERROR, capa, "HMAC failed: %d for", rc);
1510                 RETURN(ERR_PTR(rc));
1511         }
1512
1513         oc = capa_add(dev->od_capa_hash, capa);
1514         RETURN(oc);
1515 }
1516
1517 static int osd_object_sync(const struct lu_env *env, struct dt_object *dt)
1518 {
1519         int rc;
1520         struct osd_object      *obj    = osd_dt_obj(dt);
1521         struct inode           *inode  = obj->oo_inode;
1522         struct osd_thread_info *info   = osd_oti_get(env);
1523         struct dentry          *dentry = &info->oti_dentry;
1524         struct file            *file   = &info->oti_file;
1525         ENTRY;
1526
1527         dentry->d_inode = inode;
1528         file->f_dentry = dentry;
1529         file->f_mapping = inode->i_mapping;
1530         file->f_op = inode->i_fop;
1531         LOCK_INODE_MUTEX(inode);
1532         rc = file->f_op->fsync(file, dentry, 0);
1533         UNLOCK_INODE_MUTEX(inode);
1534         RETURN(rc);
1535 }
1536
1537 static const struct dt_object_operations osd_obj_ops = {
1538         .do_read_lock    = osd_object_read_lock,
1539         .do_write_lock   = osd_object_write_lock,
1540         .do_read_unlock  = osd_object_read_unlock,
1541         .do_write_unlock = osd_object_write_unlock,
1542         .do_attr_get     = osd_attr_get,
1543         .do_attr_set     = osd_attr_set,
1544         .do_ah_init      = osd_ah_init,
1545         .do_create       = osd_object_create,
1546         .do_index_try    = osd_index_try,
1547         .do_ref_add      = osd_object_ref_add,
1548         .do_ref_del      = osd_object_ref_del,
1549         .do_xattr_get    = osd_xattr_get,
1550         .do_xattr_set    = osd_xattr_set,
1551         .do_xattr_del    = osd_xattr_del,
1552         .do_xattr_list   = osd_xattr_list,
1553         .do_capa_get     = osd_capa_get,
1554         .do_object_sync  = osd_object_sync,
1555 };
1556
1557 /*
1558  * Body operations.
1559  */
1560
1561 /*
1562  * XXX: Another layering violation for now.
1563  *
1564  * We don't want to use ->f_op->read methods, because generic file write
1565  *
1566  *         - serializes on ->i_sem, and
1567  *
1568  *         - does a lot of extra work like balance_dirty_pages(),
1569  *
1570  * which doesn't work for globally shared files like /last-received.
1571  */
1572 int fsfilt_ldiskfs_read(struct inode *inode, void *buf, int size, loff_t *offs);
1573 int fsfilt_ldiskfs_write_handle(struct inode *inode, void *buf, int bufsize,
1574                                 loff_t *offs, handle_t *handle);
1575
1576 static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt,
1577                         struct lu_buf *buf, loff_t *pos,
1578                         struct lustre_capa *capa)
1579 {
1580         struct inode *inode = osd_dt_obj(dt)->oo_inode;
1581
1582         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
1583                 RETURN(-EACCES);
1584
1585         return fsfilt_ldiskfs_read(inode, buf->lb_buf, buf->lb_len, pos);
1586 }
1587
1588 static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
1589                          const struct lu_buf *buf, loff_t *pos,
1590                          struct thandle *handle, struct lustre_capa *capa)
1591 {
1592         struct inode       *inode = osd_dt_obj(dt)->oo_inode;
1593         struct osd_thandle *oh;
1594         ssize_t             result;
1595
1596         LASSERT(handle != NULL);
1597
1598         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_WRITE))
1599                 RETURN(-EACCES);
1600
1601         oh = container_of(handle, struct osd_thandle, ot_super);
1602         LASSERT(oh->ot_handle->h_transaction != NULL);
1603         result = fsfilt_ldiskfs_write_handle(inode, buf->lb_buf, buf->lb_len,
1604                                              pos, oh->ot_handle);
1605         if (result == 0)
1606                 result = buf->lb_len;
1607         return result;
1608 }
1609
1610 static const struct dt_body_operations osd_body_ops = {
1611         .dbo_read  = osd_read,
1612         .dbo_write = osd_write
1613 };
1614
1615 /*
1616  * Index operations.
1617  */
1618
1619 static int osd_object_is_root(const struct osd_object *obj)
1620 {
1621         return osd_sb(osd_obj2dev(obj))->s_root->d_inode == obj->oo_inode;
1622 }
1623
1624 static int osd_index_probe(const struct lu_env *env, struct osd_object *o,
1625                            const struct dt_index_features *feat)
1626 {
1627         struct iam_descr *descr;
1628
1629         if (osd_object_is_root(o))
1630                 return feat == &dt_directory_features;
1631
1632         LASSERT(o->oo_dir != NULL);
1633
1634         descr = o->oo_dir->od_container.ic_descr;
1635         if (feat == &dt_directory_features)
1636                 return descr == &iam_htree_compat_param ||
1637                         (descr->id_rec_size == sizeof(struct lu_fid_pack) &&
1638                          1 /*
1639                             * XXX check that index looks like directory.
1640                             */
1641                                 );
1642         else
1643                 return
1644                         feat->dif_keysize_min <= descr->id_key_size &&
1645                         descr->id_key_size <= feat->dif_keysize_max &&
1646                         feat->dif_recsize_min <= descr->id_rec_size &&
1647                         descr->id_rec_size <= feat->dif_recsize_max &&
1648                         !(feat->dif_flags & (DT_IND_VARKEY |
1649                                              DT_IND_VARREC | DT_IND_NONUNQ)) &&
1650                         ergo(feat->dif_flags & DT_IND_UPDATE,
1651                              1 /* XXX check that object (and file system) is
1652                                 * writable */);
1653 }
1654
1655 static int osd_container_init(const struct lu_env *env,
1656                               struct osd_object *obj,
1657                               struct osd_directory *dir)
1658 {
1659         int result;
1660         struct iam_container *bag;
1661
1662         bag    = &dir->od_container;
1663         result = iam_container_init(bag, &dir->od_descr, obj->oo_inode);
1664         if (result == 0) {
1665                 result = iam_container_setup(bag);
1666                 if (result == 0)
1667                         obj->oo_dt.do_index_ops = &osd_index_ops;
1668                 else
1669                         iam_container_fini(bag);
1670         }
1671         return result;
1672 }
1673
1674 /*
1675  * Concurrency: no external locking is necessary.
1676  */
1677 static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
1678                          const struct dt_index_features *feat)
1679 {
1680         int result;
1681         struct osd_object *obj = osd_dt_obj(dt);
1682
1683         LINVRNT(osd_invariant(obj));
1684         LASSERT(dt_object_exists(dt));
1685
1686         if (osd_object_is_root(obj)) {
1687                 dt->do_index_ops = &osd_index_compat_ops;
1688                 result = 0;
1689         } else if (!osd_has_index(obj)) {
1690                 struct osd_directory *dir;
1691
1692                 OBD_ALLOC_PTR(dir);
1693                 if (dir != NULL) {
1694                         sema_init(&dir->od_sem, 1);
1695
1696                         spin_lock(&obj->oo_guard);
1697                         if (obj->oo_dir == NULL)
1698                                 obj->oo_dir = dir;
1699                         else
1700                                 /*
1701                                  * Concurrent thread allocated container data.
1702                                  */
1703                                 OBD_FREE_PTR(dir);
1704                         spin_unlock(&obj->oo_guard);
1705                         /*
1706                          * Now, that we have container data, serialize its
1707                          * initialization.
1708                          */
1709                         down(&obj->oo_dir->od_sem);
1710                         /*
1711                          * recheck under lock.
1712                          */
1713                         if (!osd_has_index(obj))
1714                                 result = osd_container_init(env, obj, dir);
1715                         else
1716                                 result = 0;
1717                         up(&obj->oo_dir->od_sem);
1718                 } else
1719                         result = -ENOMEM;
1720         } else
1721                 result = 0;
1722
1723         if (result == 0) {
1724                 if (!osd_index_probe(env, obj, feat))
1725                         result = -ENOTDIR;
1726         }
1727         LINVRNT(osd_invariant(obj));
1728
1729         return result;
1730 }
1731
1732 static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
1733                             const struct dt_key *key, struct thandle *handle,
1734                             struct lustre_capa *capa)
1735 {
1736         struct osd_object     *obj = osd_dt_obj(dt);
1737         struct osd_thandle    *oh;
1738         struct iam_path_descr *ipd;
1739         struct iam_container  *bag = &obj->oo_dir->od_container;
1740         int rc;
1741
1742         ENTRY;
1743
1744         LINVRNT(osd_invariant(obj));
1745         LASSERT(dt_object_exists(dt));
1746         LASSERT(bag->ic_object == obj->oo_inode);
1747         LASSERT(handle != NULL);
1748
1749         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
1750                 RETURN(-EACCES);
1751
1752         ipd = osd_ipd_get(env, bag);
1753         if (unlikely(ipd == NULL))
1754                 RETURN(-ENOMEM);
1755
1756         oh = container_of0(handle, struct osd_thandle, ot_super);
1757         LASSERT(oh->ot_handle != NULL);
1758         LASSERT(oh->ot_handle->h_transaction != NULL);
1759
1760         rc = iam_delete(oh->ot_handle, bag, (const struct iam_key *)key, ipd);
1761         osd_ipd_put(env, bag, ipd);
1762         LINVRNT(osd_invariant(obj));
1763         RETURN(rc);
1764 }
1765
1766 static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
1767                             struct dt_rec *rec, const struct dt_key *key,
1768                             struct lustre_capa *capa)
1769 {
1770         struct osd_object     *obj = osd_dt_obj(dt);
1771         struct iam_path_descr *ipd;
1772         struct iam_container  *bag = &obj->oo_dir->od_container;
1773         int rc;
1774
1775         ENTRY;
1776
1777         LINVRNT(osd_invariant(obj));
1778         LASSERT(dt_object_exists(dt));
1779         LASSERT(bag->ic_object == obj->oo_inode);
1780
1781         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
1782                 return -EACCES;
1783
1784         ipd = osd_ipd_get(env, bag);
1785         if (unlikely(ipd == NULL))
1786                 RETURN(-ENOMEM);
1787
1788         rc = iam_lookup(bag, (const struct iam_key *)key,
1789                         (struct iam_rec *)rec, ipd);
1790         osd_ipd_put(env, bag, ipd);
1791         LINVRNT(osd_invariant(obj));
1792
1793         RETURN(rc);
1794 }
1795
1796 static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
1797                             const struct dt_rec *rec, const struct dt_key *key,
1798                             struct thandle *th, struct lustre_capa *capa)
1799 {
1800         struct osd_object     *obj = osd_dt_obj(dt);
1801         struct iam_path_descr *ipd;
1802         struct osd_thandle    *oh;
1803         struct iam_container  *bag = &obj->oo_dir->od_container;
1804         int rc;
1805
1806         ENTRY;
1807
1808         LINVRNT(osd_invariant(obj));
1809         LASSERT(dt_object_exists(dt));
1810         LASSERT(bag->ic_object == obj->oo_inode);
1811         LASSERT(th != NULL);
1812
1813         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
1814                 return -EACCES;
1815
1816         ipd = osd_ipd_get(env, bag);
1817         if (unlikely(ipd == NULL))
1818                 RETURN(-ENOMEM);
1819
1820         oh = container_of0(th, struct osd_thandle, ot_super);
1821         LASSERT(oh->ot_handle != NULL);
1822         LASSERT(oh->ot_handle->h_transaction != NULL);
1823         rc = iam_insert(oh->ot_handle, bag, (const struct iam_key *)key,
1824                         (struct iam_rec *)rec, ipd);
1825         osd_ipd_put(env, bag, ipd);
1826         LINVRNT(osd_invariant(obj));
1827         RETURN(rc);
1828 }
1829
1830 /*
1831  * Iterator operations.
1832  */
1833 struct osd_it {
1834         struct osd_object     *oi_obj;
1835         struct iam_path_descr *oi_ipd;
1836         struct iam_iterator    oi_it;
1837 };
1838
1839 static struct dt_it *osd_it_init(const struct lu_env *env,
1840                                  struct dt_object *dt, int writable,
1841                                  struct lustre_capa *capa)
1842 {
1843         struct osd_it         *it;
1844         struct osd_object     *obj = osd_dt_obj(dt);
1845         struct lu_object      *lo  = &dt->do_lu;
1846         struct iam_path_descr *ipd;
1847         struct iam_container  *bag = &obj->oo_dir->od_container;
1848         __u32                  flags;
1849
1850         LASSERT(lu_object_exists(lo));
1851
1852         if (osd_object_auth(env, dt, capa, writable ? CAPA_OPC_BODY_WRITE :
1853                             CAPA_OPC_BODY_READ))
1854                 return ERR_PTR(-EACCES);
1855
1856         flags = writable ? IAM_IT_MOVE|IAM_IT_WRITE : IAM_IT_MOVE;
1857         OBD_ALLOC_PTR(it);
1858         if (it != NULL) {
1859                 /*
1860                  * XXX: as ipd is allocated within osd_thread_info, assignment
1861                  * below implies that iterator usage is confined within single
1862                  * environment.
1863                  */
1864                 ipd = osd_ipd_get(env, bag);
1865                 if (likely(ipd != NULL)) {
1866                         it->oi_obj = obj;
1867                         it->oi_ipd = ipd;
1868                         lu_object_get(lo);
1869                         iam_it_init(&it->oi_it, bag, flags, ipd);
1870                         return (struct dt_it *)it;
1871                 } else
1872                         OBD_FREE_PTR(it);
1873         }
1874         return ERR_PTR(-ENOMEM);
1875 }
1876
1877 static void osd_it_fini(const struct lu_env *env, struct dt_it *di)
1878 {
1879         struct osd_it     *it = (struct osd_it *)di;
1880         struct osd_object *obj = it->oi_obj;
1881
1882         iam_it_fini(&it->oi_it);
1883         osd_ipd_put(env, &obj->oo_dir->od_container, it->oi_ipd);
1884         lu_object_put(env, &obj->oo_dt.do_lu);
1885         OBD_FREE_PTR(it);
1886 }
1887
1888 static int osd_it_get(const struct lu_env *env,
1889                       struct dt_it *di, const struct dt_key *key)
1890 {
1891         struct osd_it *it = (struct osd_it *)di;
1892
1893         return iam_it_get(&it->oi_it, (const struct iam_key *)key);
1894 }
1895
1896 static void osd_it_put(const struct lu_env *env, struct dt_it *di)
1897 {
1898         struct osd_it *it = (struct osd_it *)di;
1899
1900         iam_it_put(&it->oi_it);
1901 }
1902
1903 static int osd_it_next(const struct lu_env *env, struct dt_it *di)
1904 {
1905         struct osd_it *it = (struct osd_it *)di;
1906
1907         return iam_it_next(&it->oi_it);
1908 }
1909
1910 static int osd_it_del(const struct lu_env *env, struct dt_it *di,
1911                       struct thandle *th)
1912 {
1913         struct osd_it      *it = (struct osd_it *)di;
1914         struct osd_thandle *oh;
1915
1916         LASSERT(th != NULL);
1917
1918         oh = container_of0(th, struct osd_thandle, ot_super);
1919         LASSERT(oh->ot_handle != NULL);
1920         LASSERT(oh->ot_handle->h_transaction != NULL);
1921
1922         return iam_it_rec_delete(oh->ot_handle, &it->oi_it);
1923 }
1924
1925 static struct dt_key *osd_it_key(const struct lu_env *env,
1926                                  const struct dt_it *di)
1927 {
1928         struct osd_it *it = (struct osd_it *)di;
1929
1930         return (struct dt_key *)iam_it_key_get(&it->oi_it);
1931 }
1932
1933 static int osd_it_key_size(const struct lu_env *env, const struct dt_it *di)
1934 {
1935         struct osd_it *it = (struct osd_it *)di;
1936
1937         return iam_it_key_size(&it->oi_it);
1938 }
1939
1940 static struct dt_rec *osd_it_rec(const struct lu_env *env,
1941                                  const struct dt_it *di)
1942 {
1943         struct osd_it *it = (struct osd_it *)di;
1944
1945         return (struct dt_rec *)iam_it_rec_get(&it->oi_it);
1946 }
1947
1948 static __u64 osd_it_store(const struct lu_env *env, const struct dt_it *di)
1949 {
1950         struct osd_it *it = (struct osd_it *)di;
1951
1952         return iam_it_store(&it->oi_it);
1953 }
1954
1955 static int osd_it_load(const struct lu_env *env,
1956                        const struct dt_it *di, __u64 hash)
1957 {
1958         struct osd_it *it = (struct osd_it *)di;
1959
1960         return iam_it_load(&it->oi_it, hash);
1961 }
1962
1963 static const struct dt_index_operations osd_index_ops = {
1964         .dio_lookup = osd_index_lookup,
1965         .dio_insert = osd_index_insert,
1966         .dio_delete = osd_index_delete,
1967         .dio_it     = {
1968                 .init     = osd_it_init,
1969                 .fini     = osd_it_fini,
1970                 .get      = osd_it_get,
1971                 .put      = osd_it_put,
1972                 .del      = osd_it_del,
1973                 .next     = osd_it_next,
1974                 .key      = osd_it_key,
1975                 .key_size = osd_it_key_size,
1976                 .rec      = osd_it_rec,
1977                 .store    = osd_it_store,
1978                 .load     = osd_it_load
1979         }
1980 };
1981
1982 static int osd_index_compat_delete(const struct lu_env *env,
1983                                    struct dt_object *dt,
1984                                    const struct dt_key *key,
1985                                    struct thandle *handle,
1986                                    struct lustre_capa *capa)
1987 {
1988         struct osd_object *obj = osd_dt_obj(dt);
1989
1990         LASSERT(handle != NULL);
1991         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
1992         ENTRY;
1993
1994 #if 0
1995         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
1996                 RETURN(-EACCES);
1997 #endif
1998
1999         RETURN(-EOPNOTSUPP);
2000 }
2001
2002 /*
2003  * Compatibility index operations.
2004  */
2005
2006
2007 static void osd_build_pack(const struct lu_env *env, struct osd_device *osd,
2008                            struct dentry *dentry, struct lu_fid_pack *pack)
2009 {
2010         struct inode  *inode = dentry->d_inode;
2011         struct lu_fid *fid   = &osd_oti_get(env)->oti_fid;
2012
2013         lu_igif_build(fid, inode->i_ino, inode->i_generation);
2014         fid_cpu_to_be(fid, fid);
2015         pack->fp_len = sizeof *fid + 1;
2016         memcpy(pack->fp_area, fid, sizeof *fid);
2017 }
2018
2019 static int osd_index_compat_lookup(const struct lu_env *env,
2020                                    struct dt_object *dt,
2021                                    struct dt_rec *rec, const struct dt_key *key,
2022                                    struct lustre_capa *capa)
2023 {
2024         struct osd_object *obj = osd_dt_obj(dt);
2025
2026         struct osd_device      *osd  = osd_obj2dev(obj);
2027         struct osd_thread_info *info = osd_oti_get(env);
2028         struct inode           *dir;
2029
2030         int result;
2031
2032         /*
2033          * XXX temporary solution.
2034          */
2035         struct dentry *dentry;
2036         struct dentry *parent;
2037
2038         LINVRNT(osd_invariant(obj));
2039         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
2040         LASSERT(osd_has_index(obj));
2041
2042         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
2043                 return -EACCES;
2044
2045         info->oti_str.name = (const char *)key;
2046         info->oti_str.len  = strlen((const char *)key);
2047
2048         dir = obj->oo_inode;
2049         LASSERT(dir->i_op != NULL && dir->i_op->lookup != NULL);
2050
2051         parent = d_alloc_root(dir);
2052         if (parent == NULL)
2053                 return -ENOMEM;
2054         igrab(dir);
2055         dentry = d_alloc(parent, &info->oti_str);
2056         if (dentry != NULL) {
2057                 struct dentry *d;
2058
2059                 /*
2060                  * XXX passing NULL for nameidata should work for
2061                  * ext3/ldiskfs.
2062                  */
2063                 d = dir->i_op->lookup(dir, dentry, NULL);
2064                 if (d == NULL) {
2065                         /*
2066                          * normal case, result is in @dentry.
2067                          */
2068                         if (dentry->d_inode != NULL) {
2069                                 osd_build_pack(env, osd, dentry,
2070                                                (struct lu_fid_pack *)rec);
2071                                 result = 0;
2072                         } else
2073                                 result = -ENOENT;
2074                  } else {
2075                         /* What? Disconnected alias? Ppheeeww... */
2076                         CERROR("Aliasing where not expected\n");
2077                         result = -EIO;
2078                         dput(d);
2079                 }
2080                 dput(dentry);
2081         } else
2082                 result = -ENOMEM;
2083         dput(parent);
2084         LINVRNT(osd_invariant(obj));
2085         return result;
2086 }
2087
2088 static int osd_add_rec(struct osd_thread_info *info, struct osd_device *dev,
2089                        struct inode *dir, struct inode *inode, const char *name)
2090 {
2091         struct dentry *old;
2092         struct dentry *new;
2093         struct dentry *parent;
2094
2095         int result;
2096
2097         info->oti_str.name = name;
2098         info->oti_str.len  = strlen(name);
2099
2100         LASSERT(atomic_read(&dir->i_count) > 0);
2101         result = -ENOMEM;
2102         old = d_alloc(dev->od_obj_area, &info->oti_str);
2103         if (old != NULL) {
2104                 d_instantiate(old, inode);
2105                 igrab(inode);
2106                 LASSERT(atomic_read(&dir->i_count) > 0);
2107                 parent = d_alloc_root(dir);
2108                 if (parent != NULL) {
2109                         igrab(dir);
2110                         LASSERT(atomic_read(&dir->i_count) > 1);
2111                         new = d_alloc(parent, &info->oti_str);
2112                         LASSERT(atomic_read(&dir->i_count) > 1);
2113                         if (new != NULL) {
2114                                 LASSERT(atomic_read(&dir->i_count) > 1);
2115                                 result = dir->i_op->link(old, dir, new);
2116                                 LASSERT(atomic_read(&dir->i_count) > 1);
2117                                 dput(new);
2118                                 LASSERT(atomic_read(&dir->i_count) > 1);
2119                         }
2120                         LASSERT(atomic_read(&dir->i_count) > 1);
2121                         dput(parent);
2122                         LASSERT(atomic_read(&dir->i_count) > 0);
2123                 }
2124                 dput(old);
2125         }
2126         LASSERT(atomic_read(&dir->i_count) > 0);
2127         return result;
2128 }
2129
2130
2131 /*
2132  * XXX Temporary stuff.
2133  */
2134 static int osd_index_compat_insert(const struct lu_env *env,
2135                                    struct dt_object *dt,
2136                                    const struct dt_rec *rec,
2137                                    const struct dt_key *key, struct thandle *th,
2138                                    struct lustre_capa *capa)
2139 {
2140         struct osd_object     *obj = osd_dt_obj(dt);
2141
2142         const char          *name = (const char *)key;
2143
2144         struct lu_device    *ludev = dt->do_lu.lo_dev;
2145         struct lu_object    *luch;
2146
2147         struct osd_thread_info   *info = osd_oti_get(env);
2148         const struct lu_fid_pack *pack  = (const struct lu_fid_pack *)rec;
2149         struct lu_fid            *fid   = &osd_oti_get(env)->oti_fid;
2150
2151         int result;
2152
2153         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
2154         LINVRNT(osd_invariant(obj));
2155         LASSERT(th != NULL);
2156
2157         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
2158                 return -EACCES;
2159
2160         result = fid_unpack(pack, fid);
2161         if (result != 0)
2162                 return result;
2163
2164         luch = lu_object_find(env, ludev, fid, NULL);
2165         if (!IS_ERR(luch)) {
2166                 if (lu_object_exists(luch)) {
2167                         struct osd_object *child;
2168
2169                         child = osd_obj(lu_object_locate(luch->lo_header,
2170                                                          ludev->ld_type));
2171                         if (child != NULL)
2172                                 result = osd_add_rec(info, osd_obj2dev(obj),
2173                                                      obj->oo_inode,
2174                                                      child->oo_inode, name);
2175                         else {
2176                                 CERROR("No osd slice.\n");
2177                                 result = -ENOENT;
2178                         }
2179                         LINVRNT(osd_invariant(obj));
2180                         LINVRNT(osd_invariant(child));
2181                 } else {
2182                         CERROR("Sorry.\n");
2183                         result = -ENOENT;
2184                 }
2185                 lu_object_put(env, luch);
2186         } else
2187                 result = PTR_ERR(luch);
2188         LINVRNT(osd_invariant(obj));
2189         return result;
2190 }
2191
2192 static const struct dt_index_operations osd_index_compat_ops = {
2193         .dio_lookup = osd_index_compat_lookup,
2194         .dio_insert = osd_index_compat_insert,
2195         .dio_delete = osd_index_compat_delete
2196 };
2197
2198 /* type constructor/destructor: osd_type_init, osd_type_fini */
2199 LU_TYPE_INIT_FINI(osd, &osd_key);
2200
2201 static struct lu_context_key osd_key = {
2202         .lct_tags = LCT_DT_THREAD | LCT_MD_THREAD,
2203         .lct_init = osd_key_init,
2204         .lct_fini = osd_key_fini,
2205         .lct_exit = osd_key_exit
2206 };
2207
2208 static void *osd_key_init(const struct lu_context *ctx,
2209                           struct lu_context_key *key)
2210 {
2211         struct osd_thread_info *info;
2212
2213         OBD_ALLOC_PTR(info);
2214         if (info != NULL)
2215                 info->oti_env = container_of(ctx, struct lu_env, le_ctx);
2216         else
2217                 info = ERR_PTR(-ENOMEM);
2218         return info;
2219 }
2220
2221 /* context key destructor: osd_key_fini */
2222 LU_KEY_FINI(osd, struct osd_thread_info);
2223
2224 static void osd_key_exit(const struct lu_context *ctx,
2225                          struct lu_context_key *key, void *data)
2226 {
2227         struct osd_thread_info *info = data;
2228
2229         LASSERT(info->oti_r_locks == 0);
2230         LASSERT(info->oti_w_locks == 0);
2231         LASSERT(info->oti_txns    == 0);
2232 }
2233
2234 static int osd_device_init(const struct lu_env *env, struct lu_device *d,
2235                            const char *name, struct lu_device *next)
2236 {
2237         int rc;
2238         /* context for commit hooks */
2239         rc = lu_context_init(&osd_dev(d)->od_env_for_commit.le_ctx,
2240                              LCT_MD_THREAD);
2241         if (rc == 0)
2242                 rc = osd_procfs_init(osd_dev(d), name);
2243         return rc;
2244 }
2245
2246 static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
2247 {
2248         struct osd_thread_info *info = osd_oti_get(env);
2249         ENTRY;
2250         if (o->od_obj_area != NULL) {
2251                 dput(o->od_obj_area);
2252                 o->od_obj_area = NULL;
2253         }
2254         osd_oi_fini(info, &o->od_oi);
2255
2256         RETURN(0);
2257 }
2258
2259 static int osd_mount(const struct lu_env *env,
2260                      struct osd_device *o, struct lustre_cfg *cfg)
2261 {
2262         struct lustre_mount_info *lmi;
2263         const char               *dev  = lustre_cfg_string(cfg, 0);
2264         struct osd_thread_info   *info = osd_oti_get(env);
2265         int result;
2266
2267         ENTRY;
2268
2269         if (o->od_mount != NULL) {
2270                 CERROR("Already mounted (%s)\n", dev);
2271                 RETURN(-EEXIST);
2272         }
2273
2274         /* get mount */
2275         lmi = server_get_mount(dev);
2276         if (lmi == NULL) {
2277                 CERROR("Cannot get mount info for %s!\n", dev);
2278                 RETURN(-EFAULT);
2279         }
2280
2281         LASSERT(lmi != NULL);
2282         /* save lustre_mount_info in dt_device */
2283         o->od_mount = lmi;
2284
2285         result = osd_oi_init(info, &o->od_oi, &o->od_dt_dev);
2286         if (result == 0) {
2287                 struct dentry *d;
2288
2289                 d = simple_mkdir(osd_sb(o)->s_root, lmi->lmi_mnt, "*OBJ-TEMP*",
2290                                  0777, 1);
2291                 if (!IS_ERR(d)) {
2292                         o->od_obj_area = d;
2293                 } else
2294                         result = PTR_ERR(d);
2295         }
2296         if (result != 0)
2297                 osd_shutdown(env, o);
2298         RETURN(result);
2299 }
2300
2301 static struct lu_device *osd_device_fini(const struct lu_env *env,
2302                                          struct lu_device *d)
2303 {
2304         int rc;
2305         ENTRY;
2306
2307         shrink_dcache_sb(osd_sb(osd_dev(d)));
2308         osd_sync(env, lu2dt_dev(d));
2309
2310         rc = osd_procfs_fini(osd_dev(d));
2311         if (rc) {
2312                 CERROR("proc fini error %d \n", rc);
2313                 RETURN (ERR_PTR(rc));
2314         }
2315
2316         if (osd_dev(d)->od_mount)
2317                 server_put_mount(osd_dev(d)->od_mount->lmi_name,
2318                                  osd_dev(d)->od_mount->lmi_mnt);
2319         osd_dev(d)->od_mount = NULL;
2320
2321         lu_context_fini(&osd_dev(d)->od_env_for_commit.le_ctx);
2322         RETURN(NULL);
2323 }
2324
2325 static struct lu_device *osd_device_alloc(const struct lu_env *env,
2326                                           struct lu_device_type *t,
2327                                           struct lustre_cfg *cfg)
2328 {
2329         struct lu_device  *l;
2330         struct osd_device *o;
2331
2332         OBD_ALLOC_PTR(o);
2333         if (o != NULL) {
2334                 int result;
2335
2336                 result = dt_device_init(&o->od_dt_dev, t);
2337                 if (result == 0) {
2338                         l = osd2lu_dev(o);
2339                         l->ld_ops = &osd_lu_ops;
2340                         o->od_dt_dev.dd_ops = &osd_dt_ops;
2341                         spin_lock_init(&o->od_osfs_lock);
2342                         o->od_osfs_age = cfs_time_shift_64(-1000);
2343                         o->od_capa_hash = init_capa_hash();
2344                         if (o->od_capa_hash == NULL) {
2345                                 dt_device_fini(&o->od_dt_dev);
2346                                 l = ERR_PTR(-ENOMEM);
2347                         }
2348                 } else
2349                         l = ERR_PTR(result);
2350
2351                 if (IS_ERR(l))
2352                         OBD_FREE_PTR(o);
2353         } else
2354                 l = ERR_PTR(-ENOMEM);
2355         return l;
2356 }
2357
2358 static struct lu_device *osd_device_free(const struct lu_env *env,
2359                                          struct lu_device *d)
2360 {
2361         struct osd_device *o = osd_dev(d);
2362         ENTRY;
2363
2364         cleanup_capa_hash(o->od_capa_hash);
2365         dt_device_fini(&o->od_dt_dev);
2366         OBD_FREE_PTR(o);
2367         RETURN(NULL);
2368 }
2369
2370 static int osd_process_config(const struct lu_env *env,
2371                               struct lu_device *d, struct lustre_cfg *cfg)
2372 {
2373         struct osd_device *o = osd_dev(d);
2374         int err;
2375         ENTRY;
2376
2377         switch(cfg->lcfg_command) {
2378         case LCFG_SETUP:
2379                 err = osd_mount(env, o, cfg);
2380                 break;
2381         case LCFG_CLEANUP:
2382                 err = osd_shutdown(env, o);
2383                 break;
2384         default:
2385                 err = -ENOTTY;
2386         }
2387
2388         RETURN(err);
2389 }
2390 extern void ldiskfs_orphan_cleanup (struct super_block * sb,
2391                                     struct ldiskfs_super_block * es);
2392
2393 static int osd_recovery_complete(const struct lu_env *env,
2394                                  struct lu_device *d)
2395 {
2396         struct osd_device *o = osd_dev(d);
2397         ENTRY;
2398         /* TODO: orphans handling */
2399         ldiskfs_orphan_cleanup(osd_sb(o), LDISKFS_SB(osd_sb(o))->s_es);
2400         RETURN(0);
2401 }
2402
2403 static struct inode *osd_iget(struct osd_thread_info *info,
2404                               struct osd_device *dev,
2405                               const struct osd_inode_id *id)
2406 {
2407         struct inode *inode;
2408
2409         inode = iget(osd_sb(dev), id->oii_ino);
2410         if (inode == NULL) {
2411                 CERROR("no inode\n");
2412                 inode = ERR_PTR(-EACCES);
2413         } else if (is_bad_inode(inode)) {
2414                 CERROR("bad inode\n");
2415                 iput(inode);
2416                 inode = ERR_PTR(-ENOENT);
2417         } else if (inode->i_generation != id->oii_gen) {
2418                 CERROR("stale inode\n");
2419                 iput(inode);
2420                 inode = ERR_PTR(-ESTALE);
2421         }
2422
2423         return inode;
2424
2425 }
2426
2427 static int osd_fid_lookup(const struct lu_env *env,
2428                           struct osd_object *obj, const struct lu_fid *fid)
2429 {
2430         struct osd_thread_info *info;
2431         struct lu_device       *ldev = obj->oo_dt.do_lu.lo_dev;
2432         struct osd_device      *dev;
2433         struct osd_inode_id    *id;
2434         struct osd_oi          *oi;
2435         struct inode           *inode;
2436         int                     result;
2437
2438         LINVRNT(osd_invariant(obj));
2439         LASSERT(obj->oo_inode == NULL);
2440         LASSERT(fid_is_sane(fid));
2441         /*
2442          * This assertion checks that osd layer sees only local
2443          * fids. Unfortunately it is somewhat expensive (does a
2444          * cache-lookup). Disabling it for production/acceptance-testing.
2445          */
2446         LASSERT(1 || fid_is_local(ldev->ld_site, fid));
2447
2448         ENTRY;
2449
2450         info = osd_oti_get(env);
2451         dev  = osd_dev(ldev);
2452         id   = &info->oti_id;
2453         oi   = &dev->od_oi;
2454
2455         if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT))
2456                 RETURN(-ENOENT);
2457
2458         result = osd_oi_lookup(info, oi, fid, id);
2459         if (result == 0) {
2460                 inode = osd_iget(info, dev, id);
2461                 if (!IS_ERR(inode)) {
2462                         obj->oo_inode = inode;
2463                         LASSERT(obj->oo_inode->i_sb == osd_sb(dev));
2464                         result = 0;
2465                 } else
2466                         /*
2467                          * If fid wasn't found in oi, inode-less object is
2468                          * created, for which lu_object_exists() returns
2469                          * false. This is used in a (frequent) case when
2470                          * objects are created as locking anchors or
2471                          * place holders for objects yet to be created.
2472                          */
2473                         result = PTR_ERR(inode);
2474         } else if (result == -ENOENT)
2475                 result = 0;
2476         LINVRNT(osd_invariant(obj));
2477         RETURN(result);
2478 }
2479
2480 static void osd_inode_getattr(const struct lu_env *env,
2481                               struct inode *inode, struct lu_attr *attr)
2482 {
2483         attr->la_valid      |= LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
2484                                LA_SIZE | LA_BLOCKS | LA_UID | LA_GID |
2485                                LA_FLAGS | LA_NLINK | LA_RDEV | LA_BLKSIZE;
2486
2487         attr->la_atime      = LTIME_S(inode->i_atime);
2488         attr->la_mtime      = LTIME_S(inode->i_mtime);
2489         attr->la_ctime      = LTIME_S(inode->i_ctime);
2490         attr->la_mode       = inode->i_mode;
2491         attr->la_size       = i_size_read(inode);
2492         attr->la_blocks     = inode->i_blocks;
2493         attr->la_uid        = inode->i_uid;
2494         attr->la_gid        = inode->i_gid;
2495         attr->la_flags      = LDISKFS_I(inode)->i_flags;
2496         attr->la_nlink      = inode->i_nlink;
2497         attr->la_rdev       = inode->i_rdev;
2498         attr->la_blksize    = ll_inode_blksize(inode);
2499         attr->la_blkbits    = inode->i_blkbits;
2500 }
2501
2502 /*
2503  * Helpers.
2504  */
2505
2506 static int lu_device_is_osd(const struct lu_device *d)
2507 {
2508         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &osd_lu_ops);
2509 }
2510
2511 static struct osd_object *osd_obj(const struct lu_object *o)
2512 {
2513         LASSERT(lu_device_is_osd(o->lo_dev));
2514         return container_of0(o, struct osd_object, oo_dt.do_lu);
2515 }
2516
2517 static struct osd_device *osd_dt_dev(const struct dt_device *d)
2518 {
2519         LASSERT(lu_device_is_osd(&d->dd_lu_dev));
2520         return container_of0(d, struct osd_device, od_dt_dev);
2521 }
2522
2523 static struct osd_device *osd_dev(const struct lu_device *d)
2524 {
2525         LASSERT(lu_device_is_osd(d));
2526         return osd_dt_dev(container_of0(d, struct dt_device, dd_lu_dev));
2527 }
2528
2529 static struct osd_object *osd_dt_obj(const struct dt_object *d)
2530 {
2531         return osd_obj(&d->do_lu);
2532 }
2533
2534 static struct osd_device *osd_obj2dev(const struct osd_object *o)
2535 {
2536         return osd_dev(o->oo_dt.do_lu.lo_dev);
2537 }
2538
2539 static struct lu_device *osd2lu_dev(struct osd_device *osd)
2540 {
2541         return &osd->od_dt_dev.dd_lu_dev;
2542 }
2543
2544 static struct super_block *osd_sb(const struct osd_device *dev)
2545 {
2546         return dev->od_mount->lmi_mnt->mnt_sb;
2547 }
2548
2549 static journal_t *osd_journal(const struct osd_device *dev)
2550 {
2551         return LDISKFS_SB(osd_sb(dev))->s_journal;
2552 }
2553
2554 static int osd_has_index(const struct osd_object *obj)
2555 {
2556         return obj->oo_dt.do_index_ops != NULL;
2557 }
2558
2559 static int osd_object_invariant(const struct lu_object *l)
2560 {
2561         return osd_invariant(osd_obj(l));
2562 }
2563
2564 static const struct lu_object_operations osd_lu_obj_ops = {
2565         .loo_object_init      = osd_object_init,
2566         .loo_object_delete    = osd_object_delete,
2567         .loo_object_release   = osd_object_release,
2568         .loo_object_free      = osd_object_free,
2569         .loo_object_print     = osd_object_print,
2570         .loo_object_invariant = osd_object_invariant
2571 };
2572
2573 static const struct lu_device_operations osd_lu_ops = {
2574         .ldo_object_alloc      = osd_object_alloc,
2575         .ldo_process_config    = osd_process_config,
2576         .ldo_recovery_complete = osd_recovery_complete
2577 };
2578
2579 static const struct lu_device_type_operations osd_device_type_ops = {
2580         .ldto_init = osd_type_init,
2581         .ldto_fini = osd_type_fini,
2582
2583         .ldto_start = osd_type_start,
2584         .ldto_stop  = osd_type_stop,
2585
2586         .ldto_device_alloc = osd_device_alloc,
2587         .ldto_device_free  = osd_device_free,
2588
2589         .ldto_device_init    = osd_device_init,
2590         .ldto_device_fini    = osd_device_fini
2591 };
2592
2593 static struct lu_device_type osd_device_type = {
2594         .ldt_tags     = LU_DEVICE_DT,
2595         .ldt_name     = LUSTRE_OSD_NAME,
2596         .ldt_ops      = &osd_device_type_ops,
2597         .ldt_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
2598 };
2599
2600 /*
2601  * lprocfs legacy support.
2602  */
2603 static struct obd_ops osd_obd_device_ops = {
2604         .o_owner = THIS_MODULE
2605 };
2606
2607 static int __init osd_mod_init(void)
2608 {
2609         struct lprocfs_static_vars lvars;
2610
2611         lprocfs_osd_init_vars(&lvars);
2612         return class_register_type(&osd_obd_device_ops, NULL, lvars.module_vars,
2613                                    LUSTRE_OSD_NAME, &osd_device_type);
2614 }
2615
2616 static void __exit osd_mod_exit(void)
2617 {
2618         class_unregister_type(LUSTRE_OSD_NAME);
2619 }
2620
2621 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2622 MODULE_DESCRIPTION("Lustre Object Storage Device ("LUSTRE_OSD_NAME")");
2623 MODULE_LICENSE("GPL");
2624
2625 cfs_module(osd, "0.0.2", osd_mod_init, osd_mod_exit);