Whamcloud - gitweb
Update documenting comments to match doxygen conventions.
[fs/lustre-release.git] / lustre / osd / osd_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/osd/osd_handler.c
37  *
38  * Top-level entry points into osd module
39  *
40  * Author: Nikita Danilov <nikita@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49
50 /* LUSTRE_VERSION_CODE */
51 #include <lustre_ver.h>
52 /* prerequisite for linux/xattr.h */
53 #include <linux/types.h>
54 /* prerequisite for linux/xattr.h */
55 #include <linux/fs.h>
56 /* XATTR_{REPLACE,CREATE} */
57 #include <linux/xattr.h>
58 /*
59  * XXX temporary stuff: direct access to ldiskfs/jdb. Interface between osd
60  * and file system is not yet specified.
61  */
62 /* handle_t, journal_start(), journal_stop() */
63 #include <linux/jbd.h>
64 /* LDISKFS_SB() */
65 #include <linux/ldiskfs_fs.h>
66 #include <linux/ldiskfs_jbd.h>
67 /* simple_mkdir() */
68 #include <lvfs.h>
69
70 /*
71  * struct OBD_{ALLOC,FREE}*()
72  * OBD_FAIL_CHECK
73  */
74 #include <obd_support.h>
75 /* struct ptlrpc_thread */
76 #include <lustre_net.h>
77
78 /* fid_is_local() */
79 #include <lustre_fid.h>
80 #include <linux/lustre_iam.h>
81
82 #include "osd_internal.h"
83 #include "osd_igif.h"
84
85 struct osd_directory {
86         struct iam_container od_container;
87         struct iam_descr     od_descr;
88         struct semaphore     od_sem;
89 };
90
91 struct osd_object {
92         struct dt_object       oo_dt;
93         /**
94          * Inode for file system object represented by this osd_object. This
95          * inode is pinned for the whole duration of lu_object life.
96          *
97          * Not modified concurrently (either setup early during object
98          * creation, or assigned by osd_object_create() under write lock).
99          */
100         struct inode          *oo_inode;
101         struct rw_semaphore    oo_sem;
102         struct osd_directory  *oo_dir;
103         /* protects inode attributes. */
104         spinlock_t             oo_guard;
105 #if OSD_COUNTERS
106         const struct lu_env   *oo_owner;
107 #endif
108 };
109
110 static int   osd_root_get      (const struct lu_env *env,
111                                 struct dt_device *dev, struct lu_fid *f);
112
113 static int   lu_device_is_osd  (const struct lu_device *d);
114 static void  osd_mod_exit      (void) __exit;
115 static int   osd_mod_init      (void) __init;
116 static int   osd_type_init     (struct lu_device_type *t);
117 static void  osd_type_fini     (struct lu_device_type *t);
118 static int   osd_object_init   (const struct lu_env *env,
119                                 struct lu_object *l);
120 static void  osd_object_release(const struct lu_env *env,
121                                 struct lu_object *l);
122 static int   osd_object_print  (const struct lu_env *env, void *cookie,
123                                 lu_printer_t p, const struct lu_object *o);
124 static struct lu_device *osd_device_free   (const struct lu_env *env,
125                                 struct lu_device *m);
126 static void *osd_key_init      (const struct lu_context *ctx,
127                                 struct lu_context_key *key);
128 static void  osd_key_fini      (const struct lu_context *ctx,
129                                 struct lu_context_key *key, void *data);
130 static void  osd_key_exit      (const struct lu_context *ctx,
131                                 struct lu_context_key *key, void *data);
132 static int   osd_has_index     (const struct osd_object *obj);
133 static void  osd_object_init0  (struct osd_object *obj);
134 static int   osd_device_init   (const struct lu_env *env,
135                                 struct lu_device *d, const char *,
136                                 struct lu_device *);
137 static int   osd_fid_lookup    (const struct lu_env *env,
138                                 struct osd_object *obj,
139                                 const struct lu_fid *fid);
140 static void  osd_inode_getattr (const struct lu_env *env,
141                                 struct inode *inode, struct lu_attr *attr);
142 static void  osd_inode_setattr (const struct lu_env *env,
143                                 struct inode *inode, const struct lu_attr *attr);
144 static int   osd_param_is_sane (const struct osd_device *dev,
145                                 const struct txn_param *param);
146 static int   osd_index_lookup  (const struct lu_env *env,
147                                 struct dt_object *dt,
148                                 struct dt_rec *rec, const struct dt_key *key,
149                                 struct lustre_capa *capa);
150 static int   osd_index_insert  (const struct lu_env *env,
151                                 struct dt_object *dt,
152                                 const struct dt_rec *rec,
153                                 const struct dt_key *key,
154                                 struct thandle *handle,
155                                 struct lustre_capa *capa);
156 static int   osd_index_delete  (const struct lu_env *env,
157                                 struct dt_object *dt, const struct dt_key *key,
158                                 struct thandle *handle,
159                                 struct lustre_capa *capa);
160 static int   osd_index_probe   (const struct lu_env *env,
161                                 struct osd_object *o,
162                                 const struct dt_index_features *feat);
163 static int   osd_index_try     (const struct lu_env *env,
164                                 struct dt_object *dt,
165                                 const struct dt_index_features *feat);
166 static void  osd_index_fini    (struct osd_object *o);
167
168 static void  osd_it_fini       (const struct lu_env *env, struct dt_it *di);
169 static int   osd_it_get        (const struct lu_env *env,
170                                 struct dt_it *di, const struct dt_key *key);
171 static void  osd_it_put        (const struct lu_env *env, struct dt_it *di);
172 static int   osd_it_next       (const struct lu_env *env, struct dt_it *di);
173 static int   osd_it_del        (const struct lu_env *env, struct dt_it *di,
174                                 struct thandle *th);
175 static int   osd_it_key_size   (const struct lu_env *env,
176                                 const struct dt_it *di);
177 static void  osd_conf_get      (const struct lu_env *env,
178                                 const struct dt_device *dev,
179                                 struct dt_device_param *param);
180 static void  osd_trans_stop    (const struct lu_env *env,
181                                 struct thandle *th);
182 static int   osd_object_is_root(const struct osd_object *obj);
183
184 static struct osd_object  *osd_obj          (const struct lu_object *o);
185 static struct osd_device  *osd_dev          (const struct lu_device *d);
186 static struct osd_device  *osd_dt_dev       (const struct dt_device *d);
187 static struct osd_object  *osd_dt_obj       (const struct dt_object *d);
188 static struct osd_device  *osd_obj2dev      (const struct osd_object *o);
189 static struct lu_device   *osd2lu_dev       (struct osd_device *osd);
190 static struct lu_device   *osd_device_fini  (const struct lu_env *env,
191                                              struct lu_device *d);
192 static struct lu_device   *osd_device_alloc (const struct lu_env *env,
193                                              struct lu_device_type *t,
194                                              struct lustre_cfg *cfg);
195 static struct lu_object   *osd_object_alloc (const struct lu_env *env,
196                                              const struct lu_object_header *hdr,
197                                              struct lu_device *d);
198 static struct inode       *osd_iget         (struct osd_thread_info *info,
199                                              struct osd_device *dev,
200                                              const struct osd_inode_id *id);
201 static struct super_block *osd_sb           (const struct osd_device *dev);
202 static struct dt_it       *osd_it_init      (const struct lu_env *env,
203                                              struct dt_object *dt, int wable,
204                                              struct lustre_capa *capa);
205 static struct dt_key      *osd_it_key       (const struct lu_env *env,
206                                              const struct dt_it *di);
207 static struct dt_rec      *osd_it_rec       (const struct lu_env *env,
208                                              const struct dt_it *di);
209 static struct timespec    *osd_inode_time   (const struct lu_env *env,
210                                              struct inode *inode,
211                                              __u64 seconds);
212 static struct thandle     *osd_trans_start  (const struct lu_env *env,
213                                              struct dt_device *d,
214                                              struct txn_param *p);
215 static journal_t          *osd_journal      (const struct osd_device *dev);
216
217 static struct lu_device_type_operations osd_device_type_ops;
218 static struct lu_device_type            osd_device_type;
219 static struct lu_object_operations      osd_lu_obj_ops;
220 static struct obd_ops                   osd_obd_device_ops;
221 static struct lu_device_operations      osd_lu_ops;
222 static struct lu_context_key            osd_key;
223 static struct dt_object_operations      osd_obj_ops;
224 static struct dt_body_operations        osd_body_ops;
225 static struct dt_index_operations       osd_index_ops;
226 static struct dt_index_operations       osd_index_compat_ops;
227
228 struct osd_thandle {
229         struct thandle          ot_super;
230         handle_t               *ot_handle;
231         struct journal_callback ot_jcb;
232 };
233
234 /*
235  * Invariants, assertions.
236  */
237
238 /*
239  * XXX: do not enable this, until invariant checking code is made thread safe
240  * in the face of pdirops locking.
241  */
242 #define OSD_INVARIANT_CHECKS (0)
243
244 #if OSD_INVARIANT_CHECKS
245 static int osd_invariant(const struct osd_object *obj)
246 {
247         return
248                 obj != NULL &&
249                 ergo(obj->oo_inode != NULL,
250                      obj->oo_inode->i_sb == osd_sb(osd_obj2dev(obj)) &&
251                      atomic_read(&obj->oo_inode->i_count) > 0) &&
252                 ergo(obj->oo_dir != NULL &&
253                      obj->oo_dir->od_conationer.ic_object != NULL,
254                      obj->oo_dir->od_conationer.ic_object == obj->oo_inode);
255 }
256 #else
257 #define osd_invariant(obj) (1)
258 #endif
259
260 static inline struct osd_thread_info *osd_oti_get(const struct lu_env *env)
261 {
262         return lu_context_key_get(&env->le_ctx, &osd_key);
263 }
264
265 #if OSD_COUNTERS
266 /*
267  * Concurrency: doesn't matter
268  */
269 static int osd_read_locked(const struct lu_env *env, struct osd_object *o)
270 {
271         return osd_oti_get(env)->oti_r_locks > 0;
272 }
273
274 /*
275  * Concurrency: doesn't matter
276  */
277 static int osd_write_locked(const struct lu_env *env, struct osd_object *o)
278 {
279         struct osd_thread_info *oti = osd_oti_get(env);
280         return oti->oti_w_locks > 0 && o->oo_owner == env;
281 }
282
283 #define OSD_COUNTERS_DO(exp) exp
284 #else
285
286
287 #define osd_read_locked(env, o) (1)
288 #define osd_write_locked(env, o) (1)
289 #define OSD_COUNTERS_DO(exp) ((void)0)
290 #endif
291
292 /*
293  * Concurrency: doesn't access mutable data
294  */
295 static int osd_root_get(const struct lu_env *env,
296                         struct dt_device *dev, struct lu_fid *f)
297 {
298         struct inode *inode;
299
300         inode = osd_sb(osd_dt_dev(dev))->s_root->d_inode;
301         lu_igif_build(f, inode->i_ino, inode->i_generation);
302         return 0;
303 }
304
305 /*
306  * OSD object methods.
307  */
308
309 /*
310  * Concurrency: no concurrent access is possible that early in object
311  * life-cycle.
312  */
313 static struct lu_object *osd_object_alloc(const struct lu_env *env,
314                                           const struct lu_object_header *hdr,
315                                           struct lu_device *d)
316 {
317         struct osd_object *mo;
318
319         OBD_ALLOC_PTR(mo);
320         if (mo != NULL) {
321                 struct lu_object *l;
322
323                 l = &mo->oo_dt.do_lu;
324                 dt_object_init(&mo->oo_dt, NULL, d);
325                 mo->oo_dt.do_ops = &osd_obj_ops;
326                 l->lo_ops = &osd_lu_obj_ops;
327                 init_rwsem(&mo->oo_sem);
328                 spin_lock_init(&mo->oo_guard);
329                 return l;
330         } else
331                 return NULL;
332 }
333
334 /*
335  * Concurrency: shouldn't matter.
336  */
337 static void osd_object_init0(struct osd_object *obj)
338 {
339         LASSERT(obj->oo_inode != NULL);
340         obj->oo_dt.do_body_ops = &osd_body_ops;
341         obj->oo_dt.do_lu.lo_header->loh_attr |=
342                 (LOHA_EXISTS | (obj->oo_inode->i_mode & S_IFMT));
343 }
344
345 /*
346  * Concurrency: no concurrent access is possible that early in object
347  * life-cycle.
348  */
349 static int osd_object_init(const struct lu_env *env, struct lu_object *l)
350 {
351         struct osd_object *obj = osd_obj(l);
352         int result;
353
354         LASSERT(osd_invariant(obj));
355
356         result = osd_fid_lookup(env, obj, lu_object_fid(l));
357         if (result == 0) {
358                 if (obj->oo_inode != NULL)
359                         osd_object_init0(obj);
360         }
361         LASSERT(osd_invariant(obj));
362         return result;
363 }
364
365 /*
366  * Concurrency: no concurrent access is possible that late in object
367  * life-cycle.
368  */
369 static void osd_object_free(const struct lu_env *env, struct lu_object *l)
370 {
371         struct osd_object *obj = osd_obj(l);
372
373         LASSERT(osd_invariant(obj));
374
375         dt_object_fini(&obj->oo_dt);
376         OBD_FREE_PTR(obj);
377 }
378
379 static struct iam_path_descr *osd_ipd_get(const struct lu_env *env,
380                                           const struct iam_container *bag)
381 {
382         return bag->ic_descr->id_ops->id_ipd_alloc(bag,
383                                                    osd_oti_get(env)->oti_ipd);
384 }
385
386 static void osd_ipd_put(const struct lu_env *env,
387                         const struct iam_container *bag,
388                         struct iam_path_descr *ipd)
389 {
390         bag->ic_descr->id_ops->id_ipd_free(ipd);
391 }
392
393 /*
394  * Concurrency: no concurrent access is possible that late in object
395  * life-cycle.
396  */
397 static void osd_index_fini(struct osd_object *o)
398 {
399         struct iam_container *bag;
400
401         if (o->oo_dir != NULL) {
402                 bag = &o->oo_dir->od_container;
403                 if (o->oo_inode != NULL) {
404                         if (bag->ic_object == o->oo_inode)
405                                 iam_container_fini(bag);
406                 }
407                 OBD_FREE_PTR(o->oo_dir);
408                 o->oo_dir = NULL;
409         }
410 }
411
412 /*
413  * Concurrency: no concurrent access is possible that late in object
414  * life-cycle (for all existing callers, that is. New callers have to provide
415  * their own locking.)
416  */
417 static int osd_inode_unlinked(const struct inode *inode)
418 {
419         return inode->i_nlink == 0;
420 }
421
422 enum {
423         OSD_TXN_OI_DELETE_CREDITS    = 20,
424         OSD_TXN_INODE_DELETE_CREDITS = 20
425 };
426
427 /*
428  * Concurrency: no concurrent access is possible that late in object
429  * life-cycle.
430  */
431 static int osd_inode_remove(const struct lu_env *env, struct osd_object *obj)
432 {
433         const struct lu_fid    *fid = lu_object_fid(&obj->oo_dt.do_lu);
434         struct osd_device      *osd = osd_obj2dev(obj);
435         struct osd_thread_info *oti = osd_oti_get(env);
436         struct txn_param       *prm = &oti->oti_txn;
437         struct thandle         *th;
438         int result;
439
440         txn_param_init(prm, OSD_TXN_OI_DELETE_CREDITS + 
441                             OSD_TXN_INODE_DELETE_CREDITS);
442         th = osd_trans_start(env, &osd->od_dt_dev, prm);
443         if (!IS_ERR(th)) {
444                 result = osd_oi_delete(oti, &osd->od_oi, fid, th);
445                 osd_trans_stop(env, th);
446         } else
447                 result = PTR_ERR(th);
448         return result;
449 }
450
451 /*
452  * Called just before object is freed. Releases all resources except for
453  * object itself (that is released by osd_object_free()).
454  *
455  * Concurrency: no concurrent access is possible that late in object
456  * life-cycle.
457  */
458 static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
459 {
460         struct osd_object *obj   = osd_obj(l);
461         struct inode      *inode = obj->oo_inode;
462
463         LASSERT(osd_invariant(obj));
464
465         /*
466          * If object is unlinked remove fid->ino mapping from object index.
467          *
468          * File body will be deleted by iput().
469          */
470
471         osd_index_fini(obj);
472         if (inode != NULL) {
473                 int result;
474
475                 if (osd_inode_unlinked(inode)) {
476                         result = osd_inode_remove(env, obj);
477                         if (result != 0)
478                                 LU_OBJECT_DEBUG(D_ERROR, env, l,
479                                                 "Failed to cleanup: %d\n",
480                                                 result);
481                 }
482                 iput(inode);
483                 obj->oo_inode = NULL;
484         }
485 }
486
487 /*
488  * Concurrency: ->loo_object_release() is called under site spin-lock.
489  */
490 static void osd_object_release(const struct lu_env *env,
491                                struct lu_object *l)
492 {
493         struct osd_object *o = osd_obj(l);
494
495         LASSERT(!lu_object_is_dying(l->lo_header));
496         if (o->oo_inode != NULL && osd_inode_unlinked(o->oo_inode))
497                 set_bit(LU_OBJECT_HEARD_BANSHEE, &l->lo_header->loh_flags);
498 }
499
500 /*
501  * Concurrency: shouldn't matter.
502  */
503 static int osd_object_print(const struct lu_env *env, void *cookie,
504                             lu_printer_t p, const struct lu_object *l)
505 {
506         struct osd_object *o = osd_obj(l);
507         struct iam_descr  *d;
508
509         if (o->oo_dir != NULL)
510                 d = o->oo_dir->od_container.ic_descr;
511         else
512                 d = NULL;
513         return (*p)(env, cookie, LUSTRE_OSD_NAME"-object@%p(i:%p:%lu/%u)[%s]",
514                     o, o->oo_inode,
515                     o->oo_inode ? o->oo_inode->i_ino : 0UL,
516                     o->oo_inode ? o->oo_inode->i_generation : 0,
517                     d ? d->id_ops->id_name : "plain");
518 }
519
520 /*
521  * Concurrency: shouldn't matter.
522  */
523 int osd_statfs(const struct lu_env *env, struct dt_device *d,
524                struct kstatfs *sfs)
525 {
526         struct osd_device *osd = osd_dt_dev(d);
527         struct super_block *sb = osd_sb(osd);
528         int result = 0;
529
530         spin_lock(&osd->od_osfs_lock);
531         /* cache 1 second */
532         if (cfs_time_before_64(osd->od_osfs_age, cfs_time_shift_64(-1))) {
533                 result = ll_do_statfs(sb, &osd->od_kstatfs);
534                 if (likely(result == 0)) /* N.B. statfs can't really fail */
535                         osd->od_osfs_age = cfs_time_current_64();
536         }
537
538         if (likely(result == 0))
539                 *sfs = osd->od_kstatfs; 
540         spin_unlock(&osd->od_osfs_lock);
541
542         return result;
543 }
544
545 /*
546  * Concurrency: doesn't access mutable data.
547  */
548 static void osd_conf_get(const struct lu_env *env,
549                          const struct dt_device *dev,
550                          struct dt_device_param *param)
551 {
552         /*
553          * XXX should be taken from not-yet-existing fs abstraction layer.
554          */
555         param->ddp_max_name_len  = LDISKFS_NAME_LEN;
556         param->ddp_max_nlink     = LDISKFS_LINK_MAX;
557         param->ddp_block_shift   = osd_sb(osd_dt_dev(dev))->s_blocksize_bits;
558 }
559
560 /*
561  * Journal
562  */
563
564 /*
565  * Concurrency: doesn't access mutable data.
566  */
567 static int osd_param_is_sane(const struct osd_device *dev,
568                              const struct txn_param *param)
569 {
570         return param->tp_credits <= osd_journal(dev)->j_max_transaction_buffers;
571 }
572
573 /*
574  * Concurrency: shouldn't matter.
575  */
576 static void osd_trans_commit_cb(struct journal_callback *jcb, int error)
577 {
578         struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb);
579         struct thandle     *th = &oh->ot_super;
580         struct dt_device   *dev = th->th_dev;
581
582         LASSERT(dev != NULL);
583         LASSERT(oh->ot_handle == NULL);
584
585         if (error) {
586                 CERROR("transaction @0x%p commit error: %d\n", th, error);
587         } else {
588                 struct lu_env *env = &osd_dt_dev(dev)->od_env_for_commit;
589                 /*
590                  * This od_env_for_commit is only for commit usage.  see
591                  * "struct dt_device"
592                  */
593                 lu_context_enter(&env->le_ctx);
594                 dt_txn_hook_commit(env, th);
595                 lu_context_exit(&env->le_ctx);
596         }
597
598         lu_device_put(&dev->dd_lu_dev);
599         th->th_dev = NULL;
600
601         lu_context_exit(&th->th_ctx);
602         lu_context_fini(&th->th_ctx);
603         OBD_FREE_PTR(oh);
604 }
605
606 /*
607  * Concurrency: shouldn't matter.
608  */
609 static struct thandle *osd_trans_start(const struct lu_env *env,
610                                        struct dt_device *d,
611                                        struct txn_param *p)
612 {
613         struct osd_device  *dev = osd_dt_dev(d);
614         handle_t           *jh;
615         struct osd_thandle *oh;
616         struct thandle     *th;
617         int hook_res;
618
619         ENTRY;
620
621         hook_res = dt_txn_hook_start(env, d, p);
622         if (hook_res != 0)
623                 RETURN(ERR_PTR(hook_res));
624
625         if (osd_param_is_sane(dev, p)) {
626                 OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO);
627                 if (oh != NULL) {
628                         /*
629                          * XXX temporary stuff. Some abstraction layer should
630                          * be used.
631                          */
632
633                         jh = journal_start(osd_journal(dev), p->tp_credits);
634                         if (!IS_ERR(jh)) {
635                                 oh->ot_handle = jh;
636                                 th = &oh->ot_super;
637                                 th->th_dev = d;
638                                 th->th_result = 0;
639                                 jh->h_sync = p->tp_sync;
640                                 lu_device_get(&d->dd_lu_dev);
641                                 /* add commit callback */
642                                 lu_context_init(&th->th_ctx, LCT_TX_HANDLE);
643                                 lu_context_enter(&th->th_ctx);
644                                 journal_callback_set(jh, osd_trans_commit_cb,
645                                                      (struct journal_callback *)&oh->ot_jcb);
646 #if OSD_COUNTERS
647                                 {
648                                         struct osd_thread_info *oti =
649                                                 osd_oti_get(env);
650
651                                         LASSERT(oti->oti_txns == 0);
652                                         LASSERT(oti->oti_r_locks == 0);
653                                         LASSERT(oti->oti_w_locks == 0);
654                                         oti->oti_txns++;
655                                 }
656 #endif
657                         } else {
658                                 OBD_FREE_PTR(oh);
659                                 th = (void *)jh;
660                         }
661                 } else
662                         th = ERR_PTR(-ENOMEM);
663         } else {
664                 CERROR("Invalid transaction parameters\n");
665                 th = ERR_PTR(-EINVAL);
666         }
667
668         RETURN(th);
669 }
670
671 /*
672  * Concurrency: shouldn't matter.
673  */
674 static void osd_trans_stop(const struct lu_env *env, struct thandle *th)
675 {
676         int result;
677         struct osd_thandle *oh;
678
679         ENTRY;
680
681         oh = container_of0(th, struct osd_thandle, ot_super);
682         if (oh->ot_handle != NULL) {
683                 handle_t *hdl = oh->ot_handle;
684                 /*
685                  * XXX temporary stuff. Some abstraction layer should be used.
686                  */
687                 result = dt_txn_hook_stop(env, th);
688                 if (result != 0)
689                         CERROR("Failure in transaction hook: %d\n", result);
690
691                 /**/
692                 oh->ot_handle = NULL;
693                 result = journal_stop(hdl);
694                 if (result != 0)
695                         CERROR("Failure to stop transaction: %d\n", result);
696
697 #if OSD_COUNTERS
698                 {
699                         struct osd_thread_info *oti = osd_oti_get(env);
700
701                         LASSERT(oti->oti_txns == 1);
702                         LASSERT(oti->oti_r_locks == 0);
703                         LASSERT(oti->oti_w_locks == 0);
704                         oti->oti_txns--;
705                 }
706 #endif
707         }
708         EXIT;
709 }
710
711 /*
712  * Concurrency: shouldn't matter.
713  */
714 static int osd_sync(const struct lu_env *env, struct dt_device *d)
715 {
716         CDEBUG(D_HA, "syncing OSD %s\n", LUSTRE_OSD_NAME);
717         return ldiskfs_force_commit(osd_sb(osd_dt_dev(d)));
718 }
719
720 /*
721  * Concurrency: shouldn't matter.
722  */
723 lvfs_sbdev_type fsfilt_ldiskfs_journal_sbdev(struct super_block *);
724
725 static void osd_ro(const struct lu_env *env, struct dt_device *d)
726 {
727         ENTRY;
728
729         CERROR("*** setting device %s read-only ***\n", LUSTRE_OSD_NAME);
730
731         __lvfs_set_rdonly(lvfs_sbdev(osd_sb(osd_dt_dev(d))),
732                           fsfilt_ldiskfs_journal_sbdev(osd_sb(osd_dt_dev(d))));
733         EXIT;
734 }
735
736 /*
737  * Concurrency: serialization provided by callers.
738  */
739 static int osd_init_capa_ctxt(const struct lu_env *env, struct dt_device *d,
740                               int mode, unsigned long timeout, __u32 alg,
741                               struct lustre_capa_key *keys)
742 {
743         struct osd_device *dev = osd_dt_dev(d);
744         ENTRY;
745
746         dev->od_fl_capa = mode;
747         dev->od_capa_timeout = timeout;
748         dev->od_capa_alg = alg;
749         dev->od_capa_keys = keys;
750         RETURN(0);
751 }
752
753 /* Note: we did not count into QUOTA here, If we mount with --data_journal
754  * we may need more*/
755 static const int osd_dto_credits[DTO_NR] = {
756         /*
757          * Insert/Delete. IAM EXT3_INDEX_EXTRA_TRANS_BLOCKS(8) +
758          * EXT3_SINGLEDATA_TRANS_BLOCKS 8 XXX Note: maybe iam need more,since
759          * iam have more level than Ext3 htree
760          */
761         [DTO_INDEX_INSERT]  = 16,
762         [DTO_INDEX_DELETE]  = 16,
763         [DTO_IDNEX_UPDATE]  = 16,
764         /*
765          * Create a object. Same as create object in Ext3 filesystem, but did
766          * not count QUOTA i EXT3_DATA_TRANS_BLOCKS(12) +
767          * INDEX_EXTRA_BLOCKS(8) + 3(inode bits,groups, GDT)
768          */
769         [DTO_OBJECT_CREATE] = 23,
770         [DTO_OBJECT_DELETE] = 23,
771         /*
772          * Attr set credits 3 inode, group, GDT
773          */
774         [DTO_ATTR_SET]      = 3,
775         /*
776          * XATTR_SET. SAME AS XATTR of EXT3 EXT3_DATA_TRANS_BLOCKS XXX Note:
777          * in original MDS implmentation EXT3_INDEX_EXTRA_TRANS_BLOCKS are
778          * also counted in. Do not know why?
779          */
780         [DTO_XATTR_SET]     = 16,
781         [DTO_LOG_REC]       = 16,
782         /* creadits for inode change during write */
783         [DTO_WRITE_BASE]    = 3,
784         /* credits for single block write */
785         [DTO_WRITE_BLOCK]   = 12 
786 };
787
788 static int osd_credit_get(const struct lu_env *env, struct dt_device *d,
789                           enum dt_txn_op op)
790 {
791         LASSERT(0 <= op && op < ARRAY_SIZE(osd_dto_credits));
792         return osd_dto_credits[op];
793 }
794
795 static struct dt_device_operations osd_dt_ops = {
796         .dt_root_get       = osd_root_get,
797         .dt_statfs         = osd_statfs,
798         .dt_trans_start    = osd_trans_start,
799         .dt_trans_stop     = osd_trans_stop,
800         .dt_conf_get       = osd_conf_get,
801         .dt_sync           = osd_sync,
802         .dt_ro             = osd_ro,
803         .dt_credit_get     = osd_credit_get,
804         .dt_init_capa_ctxt = osd_init_capa_ctxt,
805 };
806
807 static void osd_object_read_lock(const struct lu_env *env,
808                                  struct dt_object *dt)
809 {
810         struct osd_object *obj = osd_dt_obj(dt);
811
812         LASSERT(osd_invariant(obj));
813
814         OSD_COUNTERS_DO(LASSERT(obj->oo_owner != env));
815         down_read(&obj->oo_sem);
816 #if OSD_COUNTERS
817         {
818                 struct osd_thread_info *oti = osd_oti_get(env);
819
820                 LASSERT(obj->oo_owner == NULL);
821                 oti->oti_r_locks++;
822         }
823 #endif
824 }
825
826 static void osd_object_write_lock(const struct lu_env *env,
827                                   struct dt_object *dt)
828 {
829         struct osd_object *obj = osd_dt_obj(dt);
830
831         LASSERT(osd_invariant(obj));
832
833         OSD_COUNTERS_DO(LASSERT(obj->oo_owner != env));
834         down_write(&obj->oo_sem);
835 #if OSD_COUNTERS
836         {
837                 struct osd_thread_info *oti = osd_oti_get(env);
838
839                 LASSERT(obj->oo_owner == NULL);
840                 obj->oo_owner = env;
841                 oti->oti_w_locks++;
842         }
843 #endif
844 }
845
846 static void osd_object_read_unlock(const struct lu_env *env,
847                                    struct dt_object *dt)
848 {
849         struct osd_object *obj = osd_dt_obj(dt);
850
851         LASSERT(osd_invariant(obj));
852 #if OSD_COUNTERS
853         {
854                 struct osd_thread_info *oti = osd_oti_get(env);
855
856                 LASSERT(oti->oti_r_locks > 0);
857                 oti->oti_r_locks--;
858         }
859 #endif
860         up_read(&obj->oo_sem);
861 }
862
863 static void osd_object_write_unlock(const struct lu_env *env,
864                                     struct dt_object *dt)
865 {
866         struct osd_object *obj = osd_dt_obj(dt);
867
868         LASSERT(osd_invariant(obj));
869 #if OSD_COUNTERS
870         {
871                 struct osd_thread_info *oti = osd_oti_get(env);
872
873                 LASSERT(obj->oo_owner == env);
874                 LASSERT(oti->oti_w_locks > 0);
875                 oti->oti_w_locks--;
876                 obj->oo_owner = NULL;
877         }
878 #endif
879         up_write(&obj->oo_sem);
880 }
881
882 static int capa_is_sane(const struct lu_env *env,
883                         struct osd_device *dev,
884                         struct lustre_capa *capa,
885                         struct lustre_capa_key *keys)
886 {
887         struct osd_thread_info *oti = osd_oti_get(env);
888         struct obd_capa *oc;
889         int i, rc = 0;
890         ENTRY;
891
892         oc = capa_lookup(dev->od_capa_hash, capa, 0);
893         if (oc) {
894                 if (capa_is_expired(oc)) {
895                         DEBUG_CAPA(D_ERROR, capa, "expired");
896                         rc = -ESTALE;
897                 }
898                 capa_put(oc);
899                 RETURN(rc);
900         }
901
902         spin_lock(&capa_lock);
903         for (i = 0; i < 2; i++) {
904                 if (keys[i].lk_keyid == capa->lc_keyid) {
905                         oti->oti_capa_key = keys[i];
906                         break;
907                 }
908         }
909         spin_unlock(&capa_lock);
910
911         if (i == 2) {
912                 DEBUG_CAPA(D_ERROR, capa, "no matched capa key");
913                 RETURN(-ESTALE);
914         }
915
916         rc = capa_hmac(oti->oti_capa.lc_hmac, capa, oti->oti_capa_key.lk_key);
917         if (rc)
918                 RETURN(rc);
919         if (memcmp(oti->oti_capa.lc_hmac, capa->lc_hmac, sizeof(capa->lc_hmac)))
920         {
921                 DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch");
922                 RETURN(-EACCES);
923         }
924
925         oc = capa_add(dev->od_capa_hash, capa);
926         capa_put(oc);
927
928         RETURN(0);
929 }
930
931 static int osd_object_auth(const struct lu_env *env, struct dt_object *dt,
932                            struct lustre_capa *capa, __u64 opc)
933 {
934         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
935         struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
936         int rc;
937
938         if (!dev->od_fl_capa)
939                 return 0;
940
941         if (capa == BYPASS_CAPA)
942                 return 0;
943
944         if (!capa) {
945                 CERROR("no capability is provided for fid "DFID"\n", PFID(fid));
946                 return -EACCES;
947         }
948
949         if (!lu_fid_eq(fid, &capa->lc_fid)) {
950                 DEBUG_CAPA(D_ERROR, capa, "fid "DFID" mismatch with",
951                            PFID(fid));
952                 return -EACCES;
953         }
954
955         if (!capa_opc_supported(capa, opc)) {
956                 DEBUG_CAPA(D_ERROR, capa, "opc "LPX64" not supported by", opc);
957                 return -EACCES;
958         }
959
960         if ((rc = capa_is_sane(env, dev, capa, dev->od_capa_keys))) {
961                 DEBUG_CAPA(D_ERROR, capa, "insane (rc %d)", rc);
962                 return -EACCES;
963         }
964
965         return 0;
966 }
967
968 static int osd_attr_get(const struct lu_env *env,
969                         struct dt_object *dt,
970                         struct lu_attr *attr,
971                         struct lustre_capa *capa)
972 {
973         struct osd_object *obj = osd_dt_obj(dt);
974
975         LASSERT(dt_object_exists(dt));
976         LASSERT(osd_invariant(obj));
977
978         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
979                 return -EACCES;
980
981         spin_lock(&obj->oo_guard);
982         osd_inode_getattr(env, obj->oo_inode, attr);
983         spin_unlock(&obj->oo_guard);
984         return 0;
985 }
986
987 static int osd_attr_set(const struct lu_env *env,
988                         struct dt_object *dt,
989                         const struct lu_attr *attr,
990                         struct thandle *handle,
991                         struct lustre_capa *capa)
992 {
993         struct osd_object *obj = osd_dt_obj(dt);
994
995         LASSERT(handle != NULL);
996         LASSERT(dt_object_exists(dt));
997         LASSERT(osd_invariant(obj));
998
999         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1000                 return -EACCES;
1001
1002         spin_lock(&obj->oo_guard);
1003         osd_inode_setattr(env, obj->oo_inode, attr);
1004         spin_unlock(&obj->oo_guard);
1005
1006         mark_inode_dirty(obj->oo_inode);
1007         return 0;
1008 }
1009
1010 static struct timespec *osd_inode_time(const struct lu_env *env,
1011                                        struct inode *inode, __u64 seconds)
1012 {
1013         struct osd_thread_info *oti = osd_oti_get(env);
1014         struct timespec        *t   = &oti->oti_time;
1015
1016         t->tv_sec  = seconds;
1017         t->tv_nsec = 0;
1018         *t = timespec_trunc(*t, get_sb_time_gran(inode->i_sb));
1019         return t;
1020 }
1021
1022 static void osd_inode_setattr(const struct lu_env *env,
1023                               struct inode *inode, const struct lu_attr *attr)
1024 {
1025         __u64 bits;
1026
1027         bits = attr->la_valid;
1028
1029         LASSERT(!(bits & LA_TYPE)); /* Huh? You want too much. */
1030
1031         if (bits & LA_ATIME)
1032                 inode->i_atime  = *osd_inode_time(env, inode, attr->la_atime);
1033         if (bits & LA_CTIME)
1034                 inode->i_ctime  = *osd_inode_time(env, inode, attr->la_ctime);
1035         if (bits & LA_MTIME)
1036                 inode->i_mtime  = *osd_inode_time(env, inode, attr->la_mtime);
1037         if (bits & LA_SIZE) {
1038                 LDISKFS_I(inode)->i_disksize = attr->la_size;
1039                 i_size_write(inode, attr->la_size);
1040         }
1041         if (bits & LA_BLOCKS)
1042                 inode->i_blocks = attr->la_blocks;
1043         if (bits & LA_MODE)
1044                 inode->i_mode   = (inode->i_mode & S_IFMT) |
1045                         (attr->la_mode & ~S_IFMT);
1046         if (bits & LA_UID)
1047                 inode->i_uid    = attr->la_uid;
1048         if (bits & LA_GID)
1049                 inode->i_gid    = attr->la_gid;
1050         if (bits & LA_NLINK)
1051                 inode->i_nlink  = attr->la_nlink;
1052         if (bits & LA_RDEV)
1053                 inode->i_rdev   = attr->la_rdev;
1054
1055         if (bits & LA_FLAGS) {
1056                 struct ldiskfs_inode_info *li = LDISKFS_I(inode);
1057
1058                 li->i_flags = (li->i_flags & ~LDISKFS_FL_USER_MODIFIABLE) |
1059                         (attr->la_flags & LDISKFS_FL_USER_MODIFIABLE);
1060         }
1061 }
1062
1063 /*
1064  * Object creation.
1065  *
1066  * XXX temporary solution.
1067  */
1068
1069 static int osd_create_pre(struct osd_thread_info *info, struct osd_object *obj,
1070                           struct lu_attr *attr, struct thandle *th)
1071 {
1072         return 0;
1073 }
1074
1075 static int osd_create_post(struct osd_thread_info *info, struct osd_object *obj,
1076                            struct lu_attr *attr, struct thandle *th)
1077 {
1078         LASSERT(obj->oo_inode != NULL);
1079
1080         osd_object_init0(obj);
1081         return 0;
1082 }
1083
1084 extern struct inode *ldiskfs_create_inode(handle_t *handle,
1085                                           struct inode * dir, int mode);
1086
1087 static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
1088                       umode_t mode,
1089                       struct dt_allocation_hint *hint,
1090                       struct thandle *th)
1091 {
1092         int result;
1093         struct osd_device  *osd = osd_obj2dev(obj);
1094         struct osd_thandle *oth;
1095         struct inode       *parent;
1096         struct inode       *inode;
1097
1098         LASSERT(osd_invariant(obj));
1099         LASSERT(obj->oo_inode == NULL);
1100         LASSERT(osd->od_obj_area != NULL);
1101
1102         oth = container_of(th, struct osd_thandle, ot_super);
1103         LASSERT(oth->ot_handle->h_transaction != NULL);
1104
1105         if (hint && hint->dah_parent)
1106                 parent = osd_dt_obj(hint->dah_parent)->oo_inode;
1107         else
1108                 parent = osd->od_obj_area->d_inode;
1109         LASSERT(parent->i_op != NULL);
1110
1111         inode = ldiskfs_create_inode(oth->ot_handle, parent, mode);
1112         if (!IS_ERR(inode)) {
1113                 obj->oo_inode = inode;
1114                 result = 0;
1115         } else
1116                 result = PTR_ERR(inode);
1117         LASSERT(osd_invariant(obj));
1118         return result;
1119 }
1120
1121
1122 extern int iam_lvar_create(struct inode *obj, int keysize, int ptrsize,
1123                            int recsize, handle_t *handle);
1124
1125 enum {
1126         OSD_NAME_LEN = 255
1127 };
1128
1129 static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj,
1130                      struct lu_attr *attr,
1131                      struct dt_allocation_hint *hint,
1132                      struct thandle *th)
1133 {
1134         int result;
1135         struct osd_thandle *oth;
1136
1137         LASSERT(S_ISDIR(attr->la_mode));
1138
1139         oth = container_of(th, struct osd_thandle, ot_super);
1140         LASSERT(oth->ot_handle->h_transaction != NULL);
1141         result = osd_mkfile(info, obj, (attr->la_mode &
1142                             (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1143         if (result == 0) {
1144                 LASSERT(obj->oo_inode != NULL);
1145                 /*
1146                  * XXX uh-oh... call low-level iam function directly.
1147                  */
1148                 result = iam_lvar_create(obj->oo_inode, OSD_NAME_LEN, 4,
1149                                          sizeof (struct lu_fid_pack),
1150                                          oth->ot_handle);
1151         }
1152         return result;
1153 }
1154
1155 static int osd_mkreg(struct osd_thread_info *info, struct osd_object *obj,
1156                      struct lu_attr *attr,
1157                      struct dt_allocation_hint *hint,
1158                      struct thandle *th)
1159 {
1160         LASSERT(S_ISREG(attr->la_mode));
1161         return osd_mkfile(info, obj, (attr->la_mode &
1162                                (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1163 }
1164
1165 static int osd_mksym(struct osd_thread_info *info, struct osd_object *obj,
1166                      struct lu_attr *attr,
1167                      struct dt_allocation_hint *hint,
1168                      struct thandle *th)
1169 {
1170         LASSERT(S_ISLNK(attr->la_mode));
1171         return osd_mkfile(info, obj, (attr->la_mode &
1172                               (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1173 }
1174
1175 static int osd_mknod(struct osd_thread_info *info, struct osd_object *obj,
1176                      struct lu_attr *attr,
1177                      struct dt_allocation_hint *hint,
1178                      struct thandle *th)
1179 {
1180         int result;
1181         struct osd_device *osd = osd_obj2dev(obj);
1182         struct inode      *dir;
1183         umode_t mode = attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX);
1184
1185         LASSERT(osd_invariant(obj));
1186         LASSERT(obj->oo_inode == NULL);
1187         LASSERT(osd->od_obj_area != NULL);
1188         LASSERT(S_ISCHR(mode) || S_ISBLK(mode) ||
1189                 S_ISFIFO(mode) || S_ISSOCK(mode));
1190
1191         dir = osd->od_obj_area->d_inode;
1192         LASSERT(dir->i_op != NULL);
1193
1194         result = osd_mkfile(info, obj, mode, hint, th);
1195         if (result == 0) {
1196                 LASSERT(obj->oo_inode != NULL);
1197                 init_special_inode(obj->oo_inode, mode, attr->la_rdev);
1198         }
1199         LASSERT(osd_invariant(obj));
1200         return result;
1201 }
1202
1203 typedef int (*osd_obj_type_f)(struct osd_thread_info *, struct osd_object *,
1204                               struct lu_attr *,
1205                               struct dt_allocation_hint *hint,
1206                               struct thandle *);
1207
1208 static osd_obj_type_f osd_create_type_f(__u32 mode)
1209 {
1210         osd_obj_type_f result;
1211
1212         switch (mode) {
1213         case S_IFDIR:
1214                 result = osd_mkdir;
1215                 break;
1216         case S_IFREG:
1217                 result = osd_mkreg;
1218                 break;
1219         case S_IFLNK:
1220                 result = osd_mksym;
1221                 break;
1222         case S_IFCHR:
1223         case S_IFBLK:
1224         case S_IFIFO:
1225         case S_IFSOCK:
1226                 result = osd_mknod;
1227                 break;
1228         default:
1229                 LBUG();
1230                 break;
1231         }
1232         return result;
1233 }
1234
1235
1236 static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah,
1237                         struct dt_object *parent, umode_t child_mode)
1238 {
1239         LASSERT(ah);
1240
1241         memset(ah, 0, sizeof(*ah));
1242         ah->dah_parent = parent;
1243         ah->dah_mode = child_mode;
1244 }
1245
1246
1247 /*
1248  * Concurrency: @dt is write locked.
1249  */
1250 static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
1251                              struct lu_attr *attr, 
1252                              struct dt_allocation_hint *hint,
1253                              struct thandle *th)
1254 {
1255         const struct lu_fid    *fid  = lu_object_fid(&dt->do_lu);
1256         struct osd_object      *obj  = osd_dt_obj(dt);
1257         struct osd_device      *osd  = osd_obj2dev(obj);
1258         struct osd_thread_info *info = osd_oti_get(env);
1259         int result;
1260
1261         ENTRY;
1262
1263         LASSERT(osd_invariant(obj));
1264         LASSERT(!dt_object_exists(dt));
1265         LASSERT(osd_write_locked(env, obj));
1266         LASSERT(th != NULL);
1267
1268         /*
1269          * XXX missing: Quote handling.
1270          */
1271
1272         result = osd_create_pre(info, obj, attr, th);
1273         if (result == 0) {
1274                 result = osd_create_type_f(attr->la_mode & S_IFMT)(info, obj,
1275                                                                 attr, hint, th);
1276                 if (result == 0)
1277                         result = osd_create_post(info, obj, attr, th);
1278         }
1279         if (result == 0) {
1280                 struct osd_inode_id *id = &info->oti_id;
1281
1282                 LASSERT(obj->oo_inode != NULL);
1283
1284                 id->oii_ino = obj->oo_inode->i_ino;
1285                 id->oii_gen = obj->oo_inode->i_generation;
1286
1287                 result = osd_oi_insert(info, &osd->od_oi, fid, id, th);
1288         }
1289
1290         LASSERT(ergo(result == 0, dt_object_exists(dt)));
1291         LASSERT(osd_invariant(obj));
1292         RETURN(result);
1293 }
1294
1295 /*
1296  * Concurrency: @dt is write locked.
1297  */
1298 static void osd_object_ref_add(const struct lu_env *env,
1299                                struct dt_object *dt,
1300                                struct thandle *th)
1301 {
1302         struct osd_object *obj = osd_dt_obj(dt);
1303         struct inode *inode = obj->oo_inode;
1304
1305         LASSERT(osd_invariant(obj));
1306         LASSERT(dt_object_exists(dt));
1307         LASSERT(osd_write_locked(env, obj));
1308         LASSERT(th != NULL);
1309
1310         spin_lock(&obj->oo_guard);
1311         LASSERT(inode->i_nlink < LDISKFS_LINK_MAX);
1312         inode->i_nlink++;
1313         spin_unlock(&obj->oo_guard);
1314         mark_inode_dirty(inode);
1315         LASSERT(osd_invariant(obj));
1316 }
1317
1318 /*
1319  * Concurrency: @dt is write locked.
1320  */
1321 static void osd_object_ref_del(const struct lu_env *env,
1322                                struct dt_object *dt,
1323                                struct thandle *th)
1324 {
1325         struct osd_object *obj = osd_dt_obj(dt);
1326         struct inode *inode = obj->oo_inode;
1327
1328         LASSERT(osd_invariant(obj));
1329         LASSERT(dt_object_exists(dt));
1330         LASSERT(osd_write_locked(env, obj));
1331         LASSERT(th != NULL);
1332
1333         spin_lock(&obj->oo_guard);
1334         LASSERT(inode->i_nlink > 0);
1335         inode->i_nlink--;
1336         spin_unlock(&obj->oo_guard);
1337         mark_inode_dirty(inode);
1338         LASSERT(osd_invariant(obj));
1339 }
1340
1341 /*
1342  * Concurrency: @dt is read locked.
1343  */
1344 static int osd_xattr_get(const struct lu_env *env,
1345                          struct dt_object *dt,
1346                          struct lu_buf *buf,
1347                          const char *name,
1348                          struct lustre_capa *capa)
1349 {
1350         struct osd_object      *obj    = osd_dt_obj(dt);
1351         struct inode           *inode  = obj->oo_inode;
1352         struct osd_thread_info *info   = osd_oti_get(env);
1353         struct dentry          *dentry = &info->oti_dentry;
1354
1355         LASSERT(dt_object_exists(dt));
1356         LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
1357         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
1358
1359         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1360                 return -EACCES;
1361
1362         dentry->d_inode = inode;
1363         return inode->i_op->getxattr(dentry, name, buf->lb_buf, buf->lb_len);
1364 }
1365
1366 /*
1367  * Concurrency: @dt is write locked.
1368  */
1369 static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
1370                          const struct lu_buf *buf, const char *name, int fl,
1371                          struct thandle *handle, struct lustre_capa *capa)
1372 {
1373         struct osd_object      *obj    = osd_dt_obj(dt);
1374         struct inode           *inode  = obj->oo_inode;
1375         struct osd_thread_info *info   = osd_oti_get(env);
1376         struct dentry          *dentry = &info->oti_dentry;
1377         struct timespec        *t      = &info->oti_time;
1378         int                     fs_flags = 0, rc;
1379
1380         LASSERT(dt_object_exists(dt));
1381         LASSERT(inode->i_op != NULL && inode->i_op->setxattr != NULL);
1382         LASSERT(osd_write_locked(env, obj));
1383         LASSERT(handle != NULL);
1384
1385         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1386                 return -EACCES;
1387
1388         if (fl & LU_XATTR_REPLACE)
1389                 fs_flags |= XATTR_REPLACE;
1390
1391         if (fl & LU_XATTR_CREATE)
1392                 fs_flags |= XATTR_CREATE;
1393
1394         dentry->d_inode = inode;
1395         *t = inode->i_ctime;
1396         rc = inode->i_op->setxattr(dentry, name,
1397                                    buf->lb_buf, buf->lb_len, fs_flags);
1398         if (likely(rc == 0)) {
1399                 /* ctime should not be updated with server-side time. */
1400                 spin_lock(&obj->oo_guard);
1401                 inode->i_ctime = *t;
1402                 spin_unlock(&obj->oo_guard);
1403                 mark_inode_dirty(inode);
1404         }
1405         return rc;
1406 }
1407
1408 /*
1409  * Concurrency: @dt is read locked.
1410  */
1411 static int osd_xattr_list(const struct lu_env *env,
1412                           struct dt_object *dt,
1413                           struct lu_buf *buf,
1414                           struct lustre_capa *capa)
1415 {
1416         struct osd_object      *obj    = osd_dt_obj(dt);
1417         struct inode           *inode  = obj->oo_inode;
1418         struct osd_thread_info *info   = osd_oti_get(env);
1419         struct dentry          *dentry = &info->oti_dentry;
1420
1421         LASSERT(dt_object_exists(dt));
1422         LASSERT(inode->i_op != NULL && inode->i_op->listxattr != NULL);
1423         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
1424
1425         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1426                 return -EACCES;
1427
1428         dentry->d_inode = inode;
1429         return inode->i_op->listxattr(dentry, buf->lb_buf, buf->lb_len);
1430 }
1431
1432 /*
1433  * Concurrency: @dt is write locked.
1434  */
1435 static int osd_xattr_del(const struct lu_env *env,
1436                          struct dt_object *dt,
1437                          const char *name,
1438                          struct thandle *handle,
1439                          struct lustre_capa *capa)
1440 {
1441         struct osd_object      *obj    = osd_dt_obj(dt);
1442         struct inode           *inode  = obj->oo_inode;
1443         struct osd_thread_info *info   = osd_oti_get(env);
1444         struct dentry          *dentry = &info->oti_dentry;
1445         struct timespec        *t      = &info->oti_time;
1446         int                     rc;
1447
1448         LASSERT(dt_object_exists(dt));
1449         LASSERT(inode->i_op != NULL && inode->i_op->removexattr != NULL);
1450         LASSERT(osd_write_locked(env, obj));
1451         LASSERT(handle != NULL);
1452
1453         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1454                 return -EACCES;
1455
1456         dentry->d_inode = inode;
1457         *t = inode->i_ctime;
1458         rc = inode->i_op->removexattr(dentry, name);
1459         if (likely(rc == 0)) {
1460                 /* ctime should not be updated with server-side time. */
1461                 spin_lock(&obj->oo_guard);
1462                 inode->i_ctime = *t;
1463                 spin_unlock(&obj->oo_guard);
1464                 mark_inode_dirty(inode);
1465         }
1466         return rc;
1467 }
1468
1469 static struct obd_capa *osd_capa_get(const struct lu_env *env,
1470                                      struct dt_object *dt,
1471                                      struct lustre_capa *old,
1472                                      __u64 opc)
1473 {
1474         struct osd_thread_info *info = osd_oti_get(env);
1475         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
1476         struct osd_object *obj = osd_dt_obj(dt);
1477         struct osd_device *dev = osd_obj2dev(obj);
1478         struct lustre_capa_key *key = &info->oti_capa_key;
1479         struct lustre_capa *capa = &info->oti_capa;
1480         struct obd_capa *oc;
1481         int rc;
1482         ENTRY;
1483
1484         if (!dev->od_fl_capa)
1485                 RETURN(ERR_PTR(-ENOENT));
1486
1487         LASSERT(dt_object_exists(dt));
1488         LASSERT(osd_invariant(obj));
1489
1490         /* renewal sanity check */
1491         if (old && osd_object_auth(env, dt, old, opc))
1492                 RETURN(ERR_PTR(-EACCES));
1493
1494         capa->lc_fid = *fid;
1495         capa->lc_opc = opc;
1496         capa->lc_uid = 0;
1497         capa->lc_flags = dev->od_capa_alg << 24;
1498         capa->lc_timeout = dev->od_capa_timeout;
1499         capa->lc_expiry = 0;
1500
1501         oc = capa_lookup(dev->od_capa_hash, capa, 1);
1502         if (oc) {
1503                 LASSERT(!capa_is_expired(oc));
1504                 RETURN(oc);
1505         }
1506
1507         spin_lock(&capa_lock);
1508         *key = dev->od_capa_keys[1];
1509         spin_unlock(&capa_lock);
1510
1511         capa->lc_keyid = key->lk_keyid;
1512         capa->lc_expiry = cfs_time_current_sec() + dev->od_capa_timeout;
1513
1514         rc = capa_hmac(capa->lc_hmac, capa, key->lk_key);
1515         if (rc) {
1516                 DEBUG_CAPA(D_ERROR, capa, "HMAC failed: %d for", rc);
1517                 RETURN(ERR_PTR(rc));
1518         }
1519
1520         oc = capa_add(dev->od_capa_hash, capa);
1521         RETURN(oc);
1522 }
1523
1524 static int osd_object_sync(const struct lu_env *env, struct dt_object *dt)
1525 {
1526         int rc;
1527         struct osd_object      *obj    = osd_dt_obj(dt);
1528         struct inode           *inode  = obj->oo_inode;
1529         struct osd_thread_info *info   = osd_oti_get(env);
1530         struct dentry          *dentry = &info->oti_dentry;
1531         struct file            *file   = &info->oti_file;
1532         ENTRY;
1533
1534         dentry->d_inode = inode;
1535         file->f_dentry = dentry;
1536         file->f_mapping = inode->i_mapping;
1537         file->f_op = inode->i_fop;
1538         LOCK_INODE_MUTEX(inode);
1539         rc = file->f_op->fsync(file, dentry, 0);
1540         UNLOCK_INODE_MUTEX(inode);
1541         RETURN(rc);
1542 }
1543
1544 static struct dt_object_operations osd_obj_ops = {
1545         .do_read_lock    = osd_object_read_lock,
1546         .do_write_lock   = osd_object_write_lock,
1547         .do_read_unlock  = osd_object_read_unlock,
1548         .do_write_unlock = osd_object_write_unlock,
1549         .do_attr_get     = osd_attr_get,
1550         .do_attr_set     = osd_attr_set,
1551         .do_ah_init      = osd_ah_init,
1552         .do_create       = osd_object_create,
1553         .do_index_try    = osd_index_try,
1554         .do_ref_add      = osd_object_ref_add,
1555         .do_ref_del      = osd_object_ref_del,
1556         .do_xattr_get    = osd_xattr_get,
1557         .do_xattr_set    = osd_xattr_set,
1558         .do_xattr_del    = osd_xattr_del,
1559         .do_xattr_list   = osd_xattr_list,
1560         .do_capa_get     = osd_capa_get,
1561         .do_object_sync  = osd_object_sync,
1562 };
1563
1564 /*
1565  * Body operations.
1566  */
1567
1568 /*
1569  * XXX: Another layering violation for now.
1570  *
1571  * We don't want to use ->f_op->read methods, because generic file write
1572  *
1573  *         - serializes on ->i_sem, and
1574  *
1575  *         - does a lot of extra work like balance_dirty_pages(),
1576  *
1577  * which doesn't work for globally shared files like /last-received.
1578  */
1579 int fsfilt_ldiskfs_read(struct inode *inode, void *buf, int size, loff_t *offs);
1580 int fsfilt_ldiskfs_write_handle(struct inode *inode, void *buf, int bufsize,
1581                                 loff_t *offs, handle_t *handle);
1582
1583 static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt,
1584                         struct lu_buf *buf, loff_t *pos,
1585                         struct lustre_capa *capa)
1586 {
1587         struct inode *inode = osd_dt_obj(dt)->oo_inode;
1588
1589         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
1590                 RETURN(-EACCES);
1591
1592         return fsfilt_ldiskfs_read(inode, buf->lb_buf, buf->lb_len, pos);
1593 }
1594
1595 static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
1596                          const struct lu_buf *buf, loff_t *pos,
1597                          struct thandle *handle, struct lustre_capa *capa)
1598 {
1599         struct inode       *inode = osd_dt_obj(dt)->oo_inode;
1600         struct osd_thandle *oh;
1601         ssize_t             result;
1602
1603         LASSERT(handle != NULL);
1604
1605         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_WRITE))
1606                 RETURN(-EACCES);
1607
1608         oh = container_of(handle, struct osd_thandle, ot_super);
1609         LASSERT(oh->ot_handle->h_transaction != NULL);
1610         result = fsfilt_ldiskfs_write_handle(inode, buf->lb_buf, buf->lb_len,
1611                                              pos, oh->ot_handle);
1612         if (result == 0)
1613                 result = buf->lb_len;
1614         return result;
1615 }
1616
1617 static struct dt_body_operations osd_body_ops = {
1618         .dbo_read  = osd_read,
1619         .dbo_write = osd_write
1620 };
1621
1622 /*
1623  * Index operations.
1624  */
1625
1626 static int osd_object_is_root(const struct osd_object *obj)
1627 {
1628         return osd_sb(osd_obj2dev(obj))->s_root->d_inode == obj->oo_inode;
1629 }
1630
1631 static int osd_index_probe(const struct lu_env *env, struct osd_object *o,
1632                            const struct dt_index_features *feat)
1633 {
1634         struct iam_descr *descr;
1635
1636         if (osd_object_is_root(o))
1637                 return feat == &dt_directory_features;
1638
1639         LASSERT(o->oo_dir != NULL);
1640
1641         descr = o->oo_dir->od_container.ic_descr;
1642         if (feat == &dt_directory_features)
1643                 return descr == &iam_htree_compat_param ||
1644                         (descr->id_rec_size == sizeof(struct lu_fid_pack) &&
1645                          1 /*
1646                             * XXX check that index looks like directory.
1647                             */
1648                                 );
1649         else
1650                 return
1651                         feat->dif_keysize_min <= descr->id_key_size &&
1652                         descr->id_key_size <= feat->dif_keysize_max &&
1653                         feat->dif_recsize_min <= descr->id_rec_size &&
1654                         descr->id_rec_size <= feat->dif_recsize_max &&
1655                         !(feat->dif_flags & (DT_IND_VARKEY |
1656                                              DT_IND_VARREC | DT_IND_NONUNQ)) &&
1657                         ergo(feat->dif_flags & DT_IND_UPDATE,
1658                              1 /* XXX check that object (and file system) is
1659                                 * writable */);
1660 }
1661
1662 static int osd_container_init(const struct lu_env *env,
1663                               struct osd_object *obj,
1664                               struct osd_directory *dir)
1665 {
1666         int result;
1667         struct iam_container *bag;
1668
1669         bag    = &dir->od_container;
1670         result = iam_container_init(bag, &dir->od_descr, obj->oo_inode);
1671         if (result == 0) {
1672                 result = iam_container_setup(bag);
1673                 if (result == 0)
1674                         obj->oo_dt.do_index_ops = &osd_index_ops;
1675                 else
1676                         iam_container_fini(bag);
1677         }
1678         return result;
1679 }
1680
1681 /*
1682  * Concurrency: no external locking is necessary.
1683  */
1684 static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
1685                          const struct dt_index_features *feat)
1686 {
1687         int result;
1688         struct osd_object *obj = osd_dt_obj(dt);
1689
1690         LASSERT(osd_invariant(obj));
1691         LASSERT(dt_object_exists(dt));
1692
1693         if (osd_object_is_root(obj)) {
1694                 dt->do_index_ops = &osd_index_compat_ops;
1695                 result = 0;
1696         } else if (!osd_has_index(obj)) {
1697                 struct osd_directory *dir;
1698
1699                 OBD_ALLOC_PTR(dir);
1700                 if (dir != NULL) {
1701                         sema_init(&dir->od_sem, 1);
1702
1703                         spin_lock(&obj->oo_guard);
1704                         if (obj->oo_dir == NULL)
1705                                 obj->oo_dir = dir;
1706                         else
1707                                 /*
1708                                  * Concurrent thread allocated container data.
1709                                  */
1710                                 OBD_FREE_PTR(dir);
1711                         spin_unlock(&obj->oo_guard);
1712                         /*
1713                          * Now, that we have container data, serialize its
1714                          * initialization.
1715                          */
1716                         down(&obj->oo_dir->od_sem);
1717                         /*
1718                          * recheck under lock.
1719                          */
1720                         if (!osd_has_index(obj))
1721                                 result = osd_container_init(env, obj, dir);
1722                         else
1723                                 result = 0;
1724                         up(&obj->oo_dir->od_sem);
1725                 } else
1726                         result = -ENOMEM;
1727         } else
1728                 result = 0;
1729
1730         if (result == 0) {
1731                 if (!osd_index_probe(env, obj, feat))
1732                         result = -ENOTDIR;
1733         }
1734         LASSERT(osd_invariant(obj));
1735
1736         return result;
1737 }
1738
1739 static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
1740                             const struct dt_key *key, struct thandle *handle,
1741                             struct lustre_capa *capa)
1742 {
1743         struct osd_object     *obj = osd_dt_obj(dt);
1744         struct osd_thandle    *oh;
1745         struct iam_path_descr *ipd;
1746         struct iam_container  *bag = &obj->oo_dir->od_container;
1747         int rc;
1748
1749         ENTRY;
1750
1751         LASSERT(osd_invariant(obj));
1752         LASSERT(dt_object_exists(dt));
1753         LASSERT(bag->ic_object == obj->oo_inode);
1754         LASSERT(handle != NULL);
1755
1756         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
1757                 RETURN(-EACCES);
1758
1759         ipd = osd_ipd_get(env, bag);
1760         if (unlikely(ipd == NULL))
1761                 RETURN(-ENOMEM);
1762
1763         oh = container_of0(handle, struct osd_thandle, ot_super);
1764         LASSERT(oh->ot_handle != NULL);
1765         LASSERT(oh->ot_handle->h_transaction != NULL);
1766
1767         rc = iam_delete(oh->ot_handle, bag, (const struct iam_key *)key, ipd);
1768         osd_ipd_put(env, bag, ipd);
1769         LASSERT(osd_invariant(obj));
1770         RETURN(rc);
1771 }
1772
1773 static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
1774                             struct dt_rec *rec, const struct dt_key *key,
1775                             struct lustre_capa *capa)
1776 {
1777         struct osd_object     *obj = osd_dt_obj(dt);
1778         struct iam_path_descr *ipd;
1779         struct iam_container  *bag = &obj->oo_dir->od_container;
1780         int rc;
1781
1782         ENTRY;
1783
1784         LASSERT(osd_invariant(obj));
1785         LASSERT(dt_object_exists(dt));
1786         LASSERT(bag->ic_object == obj->oo_inode);
1787
1788         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
1789                 return -EACCES;
1790
1791         ipd = osd_ipd_get(env, bag);
1792         if (unlikely(ipd == NULL))
1793                 RETURN(-ENOMEM);
1794
1795         rc = iam_lookup(bag, (const struct iam_key *)key,
1796                         (struct iam_rec *)rec, ipd);
1797         osd_ipd_put(env, bag, ipd);
1798         LASSERT(osd_invariant(obj));
1799
1800         RETURN(rc);
1801 }
1802
1803 static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
1804                             const struct dt_rec *rec, const struct dt_key *key,
1805                             struct thandle *th, struct lustre_capa *capa)
1806 {
1807         struct osd_object     *obj = osd_dt_obj(dt);
1808         struct iam_path_descr *ipd;
1809         struct osd_thandle    *oh;
1810         struct iam_container  *bag = &obj->oo_dir->od_container;
1811         int rc;
1812
1813         ENTRY;
1814
1815         LASSERT(osd_invariant(obj));
1816         LASSERT(dt_object_exists(dt));
1817         LASSERT(bag->ic_object == obj->oo_inode);
1818         LASSERT(th != NULL);
1819
1820         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
1821                 return -EACCES;
1822
1823         ipd = osd_ipd_get(env, bag);
1824         if (unlikely(ipd == NULL))
1825                 RETURN(-ENOMEM);
1826
1827         oh = container_of0(th, struct osd_thandle, ot_super);
1828         LASSERT(oh->ot_handle != NULL);
1829         LASSERT(oh->ot_handle->h_transaction != NULL);
1830         rc = iam_insert(oh->ot_handle, bag, (const struct iam_key *)key,
1831                         (struct iam_rec *)rec, ipd);
1832         osd_ipd_put(env, bag, ipd);
1833         LASSERT(osd_invariant(obj));
1834         RETURN(rc);
1835 }
1836
1837 /*
1838  * Iterator operations.
1839  */
1840 struct osd_it {
1841         struct osd_object     *oi_obj;
1842         struct iam_path_descr *oi_ipd;
1843         struct iam_iterator    oi_it;
1844 };
1845
1846 static struct dt_it *osd_it_init(const struct lu_env *env,
1847                                  struct dt_object *dt, int writable,
1848                                  struct lustre_capa *capa)
1849 {
1850         struct osd_it         *it;
1851         struct osd_object     *obj = osd_dt_obj(dt);
1852         struct lu_object      *lo  = &dt->do_lu;
1853         struct iam_path_descr *ipd;
1854         struct iam_container  *bag = &obj->oo_dir->od_container;
1855         __u32                  flags;
1856
1857         LASSERT(lu_object_exists(lo));
1858
1859         if (osd_object_auth(env, dt, capa, writable ? CAPA_OPC_BODY_WRITE :
1860                             CAPA_OPC_BODY_READ))
1861                 return ERR_PTR(-EACCES);
1862
1863         flags = writable ? IAM_IT_MOVE|IAM_IT_WRITE : IAM_IT_MOVE;
1864         OBD_ALLOC_PTR(it);
1865         if (it != NULL) {
1866                 /*
1867                  * XXX: as ipd is allocated within osd_thread_info, assignment
1868                  * below implies that iterator usage is confined within single
1869                  * environment.
1870                  */
1871                 ipd = osd_ipd_get(env, bag);
1872                 if (likely(ipd != NULL)) {
1873                         it->oi_obj = obj;
1874                         it->oi_ipd = ipd;
1875                         lu_object_get(lo);
1876                         iam_it_init(&it->oi_it, bag, flags, ipd);
1877                         return (struct dt_it *)it;
1878                 } else
1879                         OBD_FREE_PTR(it);
1880         }
1881         return ERR_PTR(-ENOMEM);
1882 }
1883
1884 static void osd_it_fini(const struct lu_env *env, struct dt_it *di)
1885 {
1886         struct osd_it     *it = (struct osd_it *)di;
1887         struct osd_object *obj = it->oi_obj;
1888
1889         iam_it_fini(&it->oi_it);
1890         osd_ipd_put(env, &obj->oo_dir->od_container, it->oi_ipd);
1891         lu_object_put(env, &obj->oo_dt.do_lu);
1892         OBD_FREE_PTR(it);
1893 }
1894
1895 static int osd_it_get(const struct lu_env *env,
1896                       struct dt_it *di, const struct dt_key *key)
1897 {
1898         struct osd_it *it = (struct osd_it *)di;
1899
1900         return iam_it_get(&it->oi_it, (const struct iam_key *)key);
1901 }
1902
1903 static void osd_it_put(const struct lu_env *env, struct dt_it *di)
1904 {
1905         struct osd_it *it = (struct osd_it *)di;
1906
1907         iam_it_put(&it->oi_it);
1908 }
1909
1910 static int osd_it_next(const struct lu_env *env, struct dt_it *di)
1911 {
1912         struct osd_it *it = (struct osd_it *)di;
1913
1914         return iam_it_next(&it->oi_it);
1915 }
1916
1917 static int osd_it_del(const struct lu_env *env, struct dt_it *di,
1918                       struct thandle *th)
1919 {
1920         struct osd_it      *it = (struct osd_it *)di;
1921         struct osd_thandle *oh;
1922
1923         LASSERT(th != NULL);
1924
1925         oh = container_of0(th, struct osd_thandle, ot_super);
1926         LASSERT(oh->ot_handle != NULL);
1927         LASSERT(oh->ot_handle->h_transaction != NULL);
1928
1929         return iam_it_rec_delete(oh->ot_handle, &it->oi_it);
1930 }
1931
1932 static struct dt_key *osd_it_key(const struct lu_env *env,
1933                                  const struct dt_it *di)
1934 {
1935         struct osd_it *it = (struct osd_it *)di;
1936
1937         return (struct dt_key *)iam_it_key_get(&it->oi_it);
1938 }
1939
1940 static int osd_it_key_size(const struct lu_env *env, const struct dt_it *di)
1941 {
1942         struct osd_it *it = (struct osd_it *)di;
1943
1944         return iam_it_key_size(&it->oi_it);
1945 }
1946
1947 static struct dt_rec *osd_it_rec(const struct lu_env *env,
1948                                  const struct dt_it *di)
1949 {
1950         struct osd_it *it = (struct osd_it *)di;
1951
1952         return (struct dt_rec *)iam_it_rec_get(&it->oi_it);
1953 }
1954
1955 static __u64 osd_it_store(const struct lu_env *env, const struct dt_it *di)
1956 {
1957         struct osd_it *it = (struct osd_it *)di;
1958
1959         return iam_it_store(&it->oi_it);
1960 }
1961
1962 static int osd_it_load(const struct lu_env *env,
1963                        const struct dt_it *di, __u64 hash)
1964 {
1965         struct osd_it *it = (struct osd_it *)di;
1966
1967         return iam_it_load(&it->oi_it, hash);
1968 }
1969
1970 static struct dt_index_operations osd_index_ops = {
1971         .dio_lookup = osd_index_lookup,
1972         .dio_insert = osd_index_insert,
1973         .dio_delete = osd_index_delete,
1974         .dio_it     = {
1975                 .init     = osd_it_init,
1976                 .fini     = osd_it_fini,
1977                 .get      = osd_it_get,
1978                 .put      = osd_it_put,
1979                 .del      = osd_it_del,
1980                 .next     = osd_it_next,
1981                 .key      = osd_it_key,
1982                 .key_size = osd_it_key_size,
1983                 .rec      = osd_it_rec,
1984                 .store    = osd_it_store,
1985                 .load     = osd_it_load
1986         }
1987 };
1988
1989 static int osd_index_compat_delete(const struct lu_env *env,
1990                                    struct dt_object *dt,
1991                                    const struct dt_key *key,
1992                                    struct thandle *handle,
1993                                    struct lustre_capa *capa)
1994 {
1995         struct osd_object *obj = osd_dt_obj(dt);
1996
1997         LASSERT(handle != NULL);
1998         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
1999         ENTRY;
2000
2001 #if 0
2002         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
2003                 RETURN(-EACCES);
2004 #endif
2005
2006         RETURN(-EOPNOTSUPP);
2007 }
2008
2009 /*
2010  * Compatibility index operations.
2011  */
2012
2013
2014 static void osd_build_pack(const struct lu_env *env, struct osd_device *osd,
2015                            struct dentry *dentry, struct lu_fid_pack *pack)
2016 {
2017         struct inode  *inode = dentry->d_inode;
2018         struct lu_fid *fid   = &osd_oti_get(env)->oti_fid;
2019
2020         lu_igif_build(fid, inode->i_ino, inode->i_generation);
2021         fid_cpu_to_be(fid, fid);
2022         pack->fp_len = sizeof *fid + 1;
2023         memcpy(pack->fp_area, fid, sizeof *fid);
2024 }
2025
2026 static int osd_index_compat_lookup(const struct lu_env *env,
2027                                    struct dt_object *dt,
2028                                    struct dt_rec *rec, const struct dt_key *key,
2029                                    struct lustre_capa *capa)
2030 {
2031         struct osd_object *obj = osd_dt_obj(dt);
2032
2033         struct osd_device      *osd  = osd_obj2dev(obj);
2034         struct osd_thread_info *info = osd_oti_get(env);
2035         struct inode           *dir;
2036
2037         int result;
2038
2039         /*
2040          * XXX temporary solution.
2041          */
2042         struct dentry *dentry;
2043         struct dentry *parent;
2044
2045         LASSERT(osd_invariant(obj));
2046         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
2047         LASSERT(osd_has_index(obj));
2048
2049         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
2050                 return -EACCES;
2051
2052         info->oti_str.name = (const char *)key;
2053         info->oti_str.len  = strlen((const char *)key);
2054
2055         dir = obj->oo_inode;
2056         LASSERT(dir->i_op != NULL && dir->i_op->lookup != NULL);
2057
2058         parent = d_alloc_root(dir);
2059         if (parent == NULL)
2060                 return -ENOMEM;
2061         igrab(dir);
2062         dentry = d_alloc(parent, &info->oti_str);
2063         if (dentry != NULL) {
2064                 struct dentry *d;
2065
2066                 /*
2067                  * XXX passing NULL for nameidata should work for
2068                  * ext3/ldiskfs.
2069                  */
2070                 d = dir->i_op->lookup(dir, dentry, NULL);
2071                 if (d == NULL) {
2072                         /*
2073                          * normal case, result is in @dentry.
2074                          */
2075                         if (dentry->d_inode != NULL) {
2076                                 osd_build_pack(env, osd, dentry,
2077                                                (struct lu_fid_pack *)rec);
2078                                 result = 0;
2079                         } else
2080                                 result = -ENOENT;
2081                  } else {
2082                         /* What? Disconnected alias? Ppheeeww... */
2083                         CERROR("Aliasing where not expected\n");
2084                         result = -EIO;
2085                         dput(d);
2086                 }
2087                 dput(dentry);
2088         } else
2089                 result = -ENOMEM;
2090         dput(parent);
2091         LASSERT(osd_invariant(obj));
2092         return result;
2093 }
2094
2095 static int osd_add_rec(struct osd_thread_info *info, struct osd_device *dev,
2096                        struct inode *dir, struct inode *inode, const char *name)
2097 {
2098         struct dentry *old;
2099         struct dentry *new;
2100         struct dentry *parent;
2101
2102         int result;
2103
2104         info->oti_str.name = name;
2105         info->oti_str.len  = strlen(name);
2106
2107         LASSERT(atomic_read(&dir->i_count) > 0);
2108         result = -ENOMEM;
2109         old = d_alloc(dev->od_obj_area, &info->oti_str);
2110         if (old != NULL) {
2111                 d_instantiate(old, inode);
2112                 igrab(inode);
2113                 LASSERT(atomic_read(&dir->i_count) > 0);
2114                 parent = d_alloc_root(dir);
2115                 if (parent != NULL) {
2116                         igrab(dir);
2117                         LASSERT(atomic_read(&dir->i_count) > 1);
2118                         new = d_alloc(parent, &info->oti_str);
2119                         LASSERT(atomic_read(&dir->i_count) > 1);
2120                         if (new != NULL) {
2121                                 LASSERT(atomic_read(&dir->i_count) > 1);
2122                                 result = dir->i_op->link(old, dir, new);
2123                                 LASSERT(atomic_read(&dir->i_count) > 1);
2124                                 dput(new);
2125                                 LASSERT(atomic_read(&dir->i_count) > 1);
2126                         }
2127                         LASSERT(atomic_read(&dir->i_count) > 1);
2128                         dput(parent);
2129                         LASSERT(atomic_read(&dir->i_count) > 0);
2130                 }
2131                 dput(old);
2132         }
2133         LASSERT(atomic_read(&dir->i_count) > 0);
2134         return result;
2135 }
2136
2137
2138 /*
2139  * XXX Temporary stuff.
2140  */
2141 static int osd_index_compat_insert(const struct lu_env *env,
2142                                    struct dt_object *dt,
2143                                    const struct dt_rec *rec,
2144                                    const struct dt_key *key, struct thandle *th,
2145                                    struct lustre_capa *capa)
2146 {
2147         struct osd_object     *obj = osd_dt_obj(dt);
2148
2149         const char          *name = (const char *)key;
2150
2151         struct lu_device    *ludev = dt->do_lu.lo_dev;
2152         struct lu_object    *luch;
2153
2154         struct osd_thread_info   *info = osd_oti_get(env);
2155         const struct lu_fid_pack *pack  = (const struct lu_fid_pack *)rec;
2156         struct lu_fid            *fid   = &osd_oti_get(env)->oti_fid;
2157
2158         int result;
2159
2160         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
2161         LASSERT(osd_invariant(obj));
2162         LASSERT(th != NULL);
2163
2164         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
2165                 return -EACCES;
2166
2167         result = fid_unpack(pack, fid);
2168         if (result != 0)
2169                 return result;
2170
2171         luch = lu_object_find(env, ludev->ld_site, fid);
2172         if (!IS_ERR(luch)) {
2173                 if (lu_object_exists(luch)) {
2174                         struct osd_object *child;
2175
2176                         child = osd_obj(lu_object_locate(luch->lo_header,
2177                                                          ludev->ld_type));
2178                         if (child != NULL)
2179                                 result = osd_add_rec(info, osd_obj2dev(obj),
2180                                                      obj->oo_inode,
2181                                                      child->oo_inode, name);
2182                         else {
2183                                 CERROR("No osd slice.\n");
2184                                 result = -ENOENT;
2185                         }
2186                         LASSERT(osd_invariant(obj));
2187                         LASSERT(osd_invariant(child));
2188                 } else {
2189                         CERROR("Sorry.\n");
2190                         result = -ENOENT;
2191                 }
2192                 lu_object_put(env, luch);
2193         } else
2194                 result = PTR_ERR(luch);
2195         LASSERT(osd_invariant(obj));
2196         return result;
2197 }
2198
2199 static struct dt_index_operations osd_index_compat_ops = {
2200         .dio_lookup = osd_index_compat_lookup,
2201         .dio_insert = osd_index_compat_insert,
2202         .dio_delete = osd_index_compat_delete
2203 };
2204
2205 /* type constructor/destructor: osd_type_init, osd_type_fini */
2206 LU_TYPE_INIT_FINI(osd, &osd_key);
2207
2208 static struct lu_context_key osd_key = {
2209         .lct_tags = LCT_DT_THREAD | LCT_MD_THREAD,
2210         .lct_init = osd_key_init,
2211         .lct_fini = osd_key_fini,
2212         .lct_exit = osd_key_exit
2213 };
2214
2215 static void *osd_key_init(const struct lu_context *ctx,
2216                           struct lu_context_key *key)
2217 {
2218         struct osd_thread_info *info;
2219
2220         OBD_ALLOC_PTR(info);
2221         if (info != NULL)
2222                 info->oti_env = container_of(ctx, struct lu_env, le_ctx);
2223         else
2224                 info = ERR_PTR(-ENOMEM);
2225         return info;
2226 }
2227
2228 /* context key destructor: osd_key_fini */
2229 LU_KEY_FINI(osd, struct osd_thread_info);
2230
2231 static void osd_key_exit(const struct lu_context *ctx,
2232                          struct lu_context_key *key, void *data)
2233 {
2234 #if OSD_COUNTERS
2235         struct osd_thread_info *info = data;
2236
2237         LASSERT(info->oti_r_locks == 0);
2238         LASSERT(info->oti_w_locks == 0);
2239         LASSERT(info->oti_txns    == 0);
2240 #endif
2241 }
2242
2243 static int osd_device_init(const struct lu_env *env, struct lu_device *d,
2244                            const char *name, struct lu_device *next)
2245 {
2246         int rc;
2247         /* context for commit hooks */
2248         rc = lu_context_init(&osd_dev(d)->od_env_for_commit.le_ctx,
2249                              LCT_MD_THREAD);
2250         if (rc == 0)
2251                 rc = osd_procfs_init(osd_dev(d), name);
2252         return rc;
2253 }
2254
2255 static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
2256 {
2257         struct osd_thread_info *info = osd_oti_get(env);
2258         ENTRY;
2259         if (o->od_obj_area != NULL) {
2260                 dput(o->od_obj_area);
2261                 o->od_obj_area = NULL;
2262         }
2263         osd_oi_fini(info, &o->od_oi);
2264
2265         RETURN(0);
2266 }
2267
2268 static int osd_mount(const struct lu_env *env,
2269                      struct osd_device *o, struct lustre_cfg *cfg)
2270 {
2271         struct lustre_mount_info *lmi;
2272         const char               *dev  = lustre_cfg_string(cfg, 0);
2273         struct osd_thread_info   *info = osd_oti_get(env);
2274         int result;
2275
2276         ENTRY;
2277
2278         if (o->od_mount != NULL) {
2279                 CERROR("Already mounted (%s)\n", dev);
2280                 RETURN(-EEXIST);
2281         }
2282
2283         /* get mount */
2284         lmi = server_get_mount(dev);
2285         if (lmi == NULL) {
2286                 CERROR("Cannot get mount info for %s!\n", dev);
2287                 RETURN(-EFAULT);
2288         }
2289
2290         LASSERT(lmi != NULL);
2291         /* save lustre_mount_info in dt_device */
2292         o->od_mount = lmi;
2293
2294         result = osd_oi_init(info, &o->od_oi, &o->od_dt_dev);
2295         if (result == 0) {
2296                 struct dentry *d;
2297
2298                 d = simple_mkdir(osd_sb(o)->s_root, lmi->lmi_mnt, "*OBJ-TEMP*",
2299                                  0777, 1);
2300                 if (!IS_ERR(d)) {
2301                         o->od_obj_area = d;
2302                 } else
2303                         result = PTR_ERR(d);
2304         }
2305         if (result != 0)
2306                 osd_shutdown(env, o);
2307         RETURN(result);
2308 }
2309
2310 static struct lu_device *osd_device_fini(const struct lu_env *env,
2311                                          struct lu_device *d)
2312 {
2313         int rc;
2314         ENTRY;
2315
2316         shrink_dcache_sb(osd_sb(osd_dev(d)));
2317         osd_sync(env, lu2dt_dev(d));
2318
2319         rc = osd_procfs_fini(osd_dev(d));
2320         if (rc) {
2321                 CERROR("proc fini error %d \n", rc);
2322                 RETURN (ERR_PTR(rc));
2323         }
2324
2325         if (osd_dev(d)->od_mount)
2326                 server_put_mount(osd_dev(d)->od_mount->lmi_name,
2327                                  osd_dev(d)->od_mount->lmi_mnt);
2328         osd_dev(d)->od_mount = NULL;
2329
2330         lu_context_fini(&osd_dev(d)->od_env_for_commit.le_ctx);
2331         RETURN(NULL);
2332 }
2333
2334 static struct lu_device *osd_device_alloc(const struct lu_env *env,
2335                                           struct lu_device_type *t,
2336                                           struct lustre_cfg *cfg)
2337 {
2338         struct lu_device  *l;
2339         struct osd_device *o;
2340
2341         OBD_ALLOC_PTR(o);
2342         if (o != NULL) {
2343                 int result;
2344
2345                 result = dt_device_init(&o->od_dt_dev, t);
2346                 if (result == 0) {
2347                         l = osd2lu_dev(o);
2348                         l->ld_ops = &osd_lu_ops;
2349                         o->od_dt_dev.dd_ops = &osd_dt_ops;
2350                         spin_lock_init(&o->od_osfs_lock);
2351                         o->od_osfs_age = cfs_time_shift_64(-1000);
2352                         o->od_capa_hash = init_capa_hash();
2353                         if (o->od_capa_hash == NULL) {
2354                                 dt_device_fini(&o->od_dt_dev);
2355                                 l = ERR_PTR(-ENOMEM);
2356                         }
2357                 } else
2358                         l = ERR_PTR(result);
2359
2360                 if (IS_ERR(l))
2361                         OBD_FREE_PTR(o);
2362         } else
2363                 l = ERR_PTR(-ENOMEM);
2364         return l;
2365 }
2366
2367 static struct lu_device *osd_device_free(const struct lu_env *env,
2368                                          struct lu_device *d)
2369 {
2370         struct osd_device *o = osd_dev(d);
2371         ENTRY;
2372
2373         cleanup_capa_hash(o->od_capa_hash);
2374         dt_device_fini(&o->od_dt_dev);
2375         OBD_FREE_PTR(o);
2376         RETURN(NULL);
2377 }
2378
2379 static int osd_process_config(const struct lu_env *env,
2380                               struct lu_device *d, struct lustre_cfg *cfg)
2381 {
2382         struct osd_device *o = osd_dev(d);
2383         int err;
2384         ENTRY;
2385
2386         switch(cfg->lcfg_command) {
2387         case LCFG_SETUP:
2388                 err = osd_mount(env, o, cfg);
2389                 break;
2390         case LCFG_CLEANUP:
2391                 err = osd_shutdown(env, o);
2392                 break;
2393         default:
2394                 err = -ENOTTY;
2395         }
2396
2397         RETURN(err);
2398 }
2399 extern void ldiskfs_orphan_cleanup (struct super_block * sb,
2400                                     struct ldiskfs_super_block * es);
2401
2402 static int osd_recovery_complete(const struct lu_env *env,
2403                                  struct lu_device *d)
2404 {
2405         struct osd_device *o = osd_dev(d);
2406         ENTRY;
2407         /* TODO: orphans handling */
2408         ldiskfs_orphan_cleanup(osd_sb(o), LDISKFS_SB(osd_sb(o))->s_es);
2409         RETURN(0);
2410 }
2411
2412 static struct inode *osd_iget(struct osd_thread_info *info,
2413                               struct osd_device *dev,
2414                               const struct osd_inode_id *id)
2415 {
2416         struct inode *inode;
2417
2418         inode = iget(osd_sb(dev), id->oii_ino);
2419         if (inode == NULL) {
2420                 CERROR("no inode\n");
2421                 inode = ERR_PTR(-EACCES);
2422         } else if (is_bad_inode(inode)) {
2423                 CERROR("bad inode\n");
2424                 iput(inode);
2425                 inode = ERR_PTR(-ENOENT);
2426         } else if (inode->i_generation != id->oii_gen) {
2427                 CERROR("stale inode\n");
2428                 iput(inode);
2429                 inode = ERR_PTR(-ESTALE);
2430         }
2431
2432         return inode;
2433
2434 }
2435
2436 static int osd_fid_lookup(const struct lu_env *env,
2437                           struct osd_object *obj, const struct lu_fid *fid)
2438 {
2439         struct osd_thread_info *info;
2440         struct lu_device       *ldev = obj->oo_dt.do_lu.lo_dev;
2441         struct osd_device      *dev;
2442         struct osd_inode_id    *id;
2443         struct osd_oi          *oi;
2444         struct inode           *inode;
2445         int                     result;
2446
2447         LASSERT(osd_invariant(obj));
2448         LASSERT(obj->oo_inode == NULL);
2449         LASSERT(fid_is_sane(fid));
2450         /*
2451          * This assertion checks that osd layer sees only local
2452          * fids. Unfortunately it is somewhat expensive (does a
2453          * cache-lookup). Disabling it for production/acceptance-testing.
2454          */
2455         LASSERT(1 || fid_is_local(ldev->ld_site, fid));
2456
2457         ENTRY;
2458
2459         info = osd_oti_get(env);
2460         dev  = osd_dev(ldev);
2461         id   = &info->oti_id;
2462         oi   = &dev->od_oi;
2463
2464         if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT))
2465                 RETURN(-ENOENT);
2466
2467         result = osd_oi_lookup(info, oi, fid, id);
2468         if (result == 0) {
2469                 inode = osd_iget(info, dev, id);
2470                 if (!IS_ERR(inode)) {
2471                         obj->oo_inode = inode;
2472                         LASSERT(obj->oo_inode->i_sb == osd_sb(dev));
2473                         result = 0;
2474                 } else
2475                         /*
2476                          * If fid wasn't found in oi, inode-less object is
2477                          * created, for which lu_object_exists() returns
2478                          * false. This is used in a (frequent) case when
2479                          * objects are created as locking anchors or
2480                          * place holders for objects yet to be created.
2481                          */
2482                         result = PTR_ERR(inode);
2483         } else if (result == -ENOENT)
2484                 result = 0;
2485         LASSERT(osd_invariant(obj));
2486         RETURN(result);
2487 }
2488
2489 static void osd_inode_getattr(const struct lu_env *env,
2490                               struct inode *inode, struct lu_attr *attr)
2491 {
2492         attr->la_valid      |= LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
2493                                LA_SIZE | LA_BLOCKS | LA_UID | LA_GID |
2494                                LA_FLAGS | LA_NLINK | LA_RDEV | LA_BLKSIZE;
2495
2496         attr->la_atime      = LTIME_S(inode->i_atime);
2497         attr->la_mtime      = LTIME_S(inode->i_mtime);
2498         attr->la_ctime      = LTIME_S(inode->i_ctime);
2499         attr->la_mode       = inode->i_mode;
2500         attr->la_size       = i_size_read(inode);
2501         attr->la_blocks     = inode->i_blocks;
2502         attr->la_uid        = inode->i_uid;
2503         attr->la_gid        = inode->i_gid;
2504         attr->la_flags      = LDISKFS_I(inode)->i_flags;
2505         attr->la_nlink      = inode->i_nlink;
2506         attr->la_rdev       = inode->i_rdev;
2507         attr->la_blksize    = ll_inode_blksize(inode);
2508         attr->la_blkbits    = inode->i_blkbits;
2509 }
2510
2511 /*
2512  * Helpers.
2513  */
2514
2515 static int lu_device_is_osd(const struct lu_device *d)
2516 {
2517         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &osd_lu_ops);
2518 }
2519
2520 static struct osd_object *osd_obj(const struct lu_object *o)
2521 {
2522         LASSERT(lu_device_is_osd(o->lo_dev));
2523         return container_of0(o, struct osd_object, oo_dt.do_lu);
2524 }
2525
2526 static struct osd_device *osd_dt_dev(const struct dt_device *d)
2527 {
2528         LASSERT(lu_device_is_osd(&d->dd_lu_dev));
2529         return container_of0(d, struct osd_device, od_dt_dev);
2530 }
2531
2532 static struct osd_device *osd_dev(const struct lu_device *d)
2533 {
2534         LASSERT(lu_device_is_osd(d));
2535         return osd_dt_dev(container_of0(d, struct dt_device, dd_lu_dev));
2536 }
2537
2538 static struct osd_object *osd_dt_obj(const struct dt_object *d)
2539 {
2540         return osd_obj(&d->do_lu);
2541 }
2542
2543 static struct osd_device *osd_obj2dev(const struct osd_object *o)
2544 {
2545         return osd_dev(o->oo_dt.do_lu.lo_dev);
2546 }
2547
2548 static struct lu_device *osd2lu_dev(struct osd_device *osd)
2549 {
2550         return &osd->od_dt_dev.dd_lu_dev;
2551 }
2552
2553 static struct super_block *osd_sb(const struct osd_device *dev)
2554 {
2555         return dev->od_mount->lmi_mnt->mnt_sb;
2556 }
2557
2558 static journal_t *osd_journal(const struct osd_device *dev)
2559 {
2560         return LDISKFS_SB(osd_sb(dev))->s_journal;
2561 }
2562
2563 static int osd_has_index(const struct osd_object *obj)
2564 {
2565         return obj->oo_dt.do_index_ops != NULL;
2566 }
2567
2568 static int osd_object_invariant(const struct lu_object *l)
2569 {
2570         return osd_invariant(osd_obj(l));
2571 }
2572
2573 static struct lu_object_operations osd_lu_obj_ops = {
2574         .loo_object_init      = osd_object_init,
2575         .loo_object_delete    = osd_object_delete,
2576         .loo_object_release   = osd_object_release,
2577         .loo_object_free      = osd_object_free,
2578         .loo_object_print     = osd_object_print,
2579         .loo_object_invariant = osd_object_invariant
2580 };
2581
2582 static struct lu_device_operations osd_lu_ops = {
2583         .ldo_object_alloc      = osd_object_alloc,
2584         .ldo_process_config    = osd_process_config,
2585         .ldo_recovery_complete = osd_recovery_complete
2586 };
2587
2588 static struct lu_device_type_operations osd_device_type_ops = {
2589         .ldto_init = osd_type_init,
2590         .ldto_fini = osd_type_fini,
2591
2592         .ldto_device_alloc = osd_device_alloc,
2593         .ldto_device_free  = osd_device_free,
2594
2595         .ldto_device_init    = osd_device_init,
2596         .ldto_device_fini    = osd_device_fini
2597 };
2598
2599 static struct lu_device_type osd_device_type = {
2600         .ldt_tags     = LU_DEVICE_DT,
2601         .ldt_name     = LUSTRE_OSD_NAME,
2602         .ldt_ops      = &osd_device_type_ops,
2603         .ldt_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
2604 };
2605
2606 /*
2607  * lprocfs legacy support.
2608  */
2609 static struct obd_ops osd_obd_device_ops = {
2610         .o_owner = THIS_MODULE
2611 };
2612
2613 static int __init osd_mod_init(void)
2614 {
2615         struct lprocfs_static_vars lvars;
2616
2617         lprocfs_osd_init_vars(&lvars);
2618         return class_register_type(&osd_obd_device_ops, NULL, lvars.module_vars,
2619                                    LUSTRE_OSD_NAME, &osd_device_type);
2620 }
2621
2622 static void __exit osd_mod_exit(void)
2623 {
2624         class_unregister_type(LUSTRE_OSD_NAME);
2625 }
2626
2627 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2628 MODULE_DESCRIPTION("Lustre Object Storage Device ("LUSTRE_OSD_NAME")");
2629 MODULE_LICENSE("GPL");
2630
2631 cfs_module(osd, "0.0.2", osd_mod_init, osd_mod_exit);