Whamcloud - gitweb
fedddcdfe25af3c242ec6a2b740416c79f690cfa
[fs/lustre-release.git] / lustre / osd / osd_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/osd/osd_handler.c
5  *  Top-level entry points into osd module
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Nikita Danilov <nikita@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include <linux/module.h>
35
36 /* LUSTRE_VERSION_CODE */
37 #include <lustre_ver.h>
38 /* prerequisite for linux/xattr.h */
39 #include <linux/types.h>
40 /* prerequisite for linux/xattr.h */
41 #include <linux/fs.h>
42 /* XATTR_{REPLACE,CREATE} */
43 #include <linux/xattr.h>
44 /*
45  * XXX temporary stuff: direct access to ldiskfs/jdb. Interface between osd
46  * and file system is not yet specified.
47  */
48 /* handle_t, journal_start(), journal_stop() */
49 #include <linux/jbd.h>
50 /* LDISKFS_SB() */
51 #include <linux/ldiskfs_fs.h>
52 #include <linux/ldiskfs_jbd.h>
53 /* simple_mkdir() */
54 #include <lvfs.h>
55
56 /*
57  * struct OBD_{ALLOC,FREE}*()
58  * OBD_FAIL_CHECK
59  */
60 #include <obd_support.h>
61 /* struct ptlrpc_thread */
62 #include <lustre_net.h>
63 /* LUSTRE_OSD_NAME */
64 #include <obd.h>
65 /* class_register_type(), class_unregister_type(), class_get_type() */
66 #include <obd_class.h>
67 #include <lustre_disk.h>
68
69 /* fid_is_local() */
70 #include <lustre_fid.h>
71 #include <linux/lustre_iam.h>
72
73 #include "osd_internal.h"
74 #include "osd_igif.h"
75
76 struct osd_directory {
77         struct iam_container od_container;
78         struct iam_descr     od_descr;
79         struct semaphore     od_sem;
80 };
81
82 struct osd_object {
83         struct dt_object       oo_dt;
84         /*
85          * Inode for file system object represented by this osd_object. This
86          * inode is pinned for the whole duration of lu_object life.
87          *
88          * Not modified concurrently (either setup early during object
89          * creation, or assigned by osd_object_create() under write lock).
90          */
91         struct inode          *oo_inode;
92         struct rw_semaphore    oo_sem;
93         struct osd_directory  *oo_dir;
94         /* protects inode attributes. */
95         spinlock_t             oo_guard;
96 #if OSD_COUNTERS
97         const struct lu_env   *oo_owner;
98 #endif
99 };
100
101 /*
102  * osd device.
103  */
104 struct osd_device {
105         /* super-class */
106         struct dt_device          od_dt_dev;
107         /* information about underlying file system */
108         struct lustre_mount_info *od_mount;
109         /* object index */
110         struct osd_oi             od_oi;
111         /*
112          * XXX temporary stuff for object index: directory where every object
113          * is named by its fid.
114          */
115         struct dentry            *od_obj_area;
116
117         /* Environment for transaction commit callback.
118          * Currently, OSD is based on ext3/JBD. Transaction commit in ext3/JBD
119          * is serialized, that is there is no more than one transaction commit
120          * at a time (JBD journal_commit_transaction() is serialized).
121          * This means that it's enough to have _one_ lu_context.
122          */
123         struct lu_env             od_env_for_commit;
124
125         /*
126          * Fid Capability
127          */
128         unsigned int              od_fl_capa:1;
129         unsigned long             od_capa_timeout;
130         __u32                     od_capa_alg;
131         struct lustre_capa_key   *od_capa_keys;
132         struct hlist_head        *od_capa_hash;
133         
134         /*
135          * statfs optimization: we cache a bit.
136          */
137         cfs_time_t                od_osfs_age;
138         struct kstatfs            od_kstatfs;
139         spinlock_t                od_osfs_lock;
140 };
141
142 static int   osd_root_get      (const struct lu_env *env,
143                                 struct dt_device *dev, struct lu_fid *f);
144 static int   osd_statfs        (const struct lu_env *env,
145                                 struct dt_device *dev, struct kstatfs *sfs);
146
147 static int   lu_device_is_osd  (const struct lu_device *d);
148 static void  osd_mod_exit      (void) __exit;
149 static int   osd_mod_init      (void) __init;
150 static int   osd_type_init     (struct lu_device_type *t);
151 static void  osd_type_fini     (struct lu_device_type *t);
152 static int   osd_object_init   (const struct lu_env *env,
153                                 struct lu_object *l);
154 static void  osd_object_release(const struct lu_env *env,
155                                 struct lu_object *l);
156 static int   osd_object_print  (const struct lu_env *env, void *cookie,
157                                 lu_printer_t p, const struct lu_object *o);
158 static void  osd_device_free   (const struct lu_env *env,
159                                 struct lu_device *m);
160 static void *osd_key_init      (const struct lu_context *ctx,
161                                 struct lu_context_key *key);
162 static void  osd_key_fini      (const struct lu_context *ctx,
163                                 struct lu_context_key *key, void *data);
164 static void  osd_key_exit      (const struct lu_context *ctx,
165                                 struct lu_context_key *key, void *data);
166 static int   osd_has_index     (const struct osd_object *obj);
167 static void  osd_object_init0  (struct osd_object *obj);
168 static int   osd_device_init   (const struct lu_env *env,
169                                 struct lu_device *d, const char *,
170                                 struct lu_device *);
171 static int   osd_fid_lookup    (const struct lu_env *env,
172                                 struct osd_object *obj,
173                                 const struct lu_fid *fid);
174 static void  osd_inode_getattr (const struct lu_env *env,
175                                 struct inode *inode, struct lu_attr *attr);
176 static void  osd_inode_setattr (const struct lu_env *env,
177                                 struct inode *inode, const struct lu_attr *attr);
178 static int   osd_param_is_sane (const struct osd_device *dev,
179                                 const struct txn_param *param);
180 static int   osd_index_lookup  (const struct lu_env *env,
181                                 struct dt_object *dt,
182                                 struct dt_rec *rec, const struct dt_key *key,
183                                 struct lustre_capa *capa);
184 static int   osd_index_insert  (const struct lu_env *env,
185                                 struct dt_object *dt,
186                                 const struct dt_rec *rec,
187                                 const struct dt_key *key,
188                                 struct thandle *handle,
189                                 struct lustre_capa *capa);
190 static int   osd_index_delete  (const struct lu_env *env,
191                                 struct dt_object *dt, const struct dt_key *key,
192                                 struct thandle *handle,
193                                 struct lustre_capa *capa);
194 static int   osd_index_probe   (const struct lu_env *env,
195                                 struct osd_object *o,
196                                 const struct dt_index_features *feat);
197 static int   osd_index_try     (const struct lu_env *env,
198                                 struct dt_object *dt,
199                                 const struct dt_index_features *feat);
200 static void  osd_index_fini    (struct osd_object *o);
201
202 static void  osd_it_fini       (const struct lu_env *env, struct dt_it *di);
203 static int   osd_it_get        (const struct lu_env *env,
204                                 struct dt_it *di, const struct dt_key *key);
205 static void  osd_it_put        (const struct lu_env *env, struct dt_it *di);
206 static int   osd_it_next       (const struct lu_env *env, struct dt_it *di);
207 static int   osd_it_del        (const struct lu_env *env, struct dt_it *di,
208                                 struct thandle *th);
209 static int   osd_it_key_size   (const struct lu_env *env,
210                                 const struct dt_it *di);
211 static void  osd_conf_get      (const struct lu_env *env,
212                                 const struct dt_device *dev,
213                                 struct dt_device_param *param);
214 static void  osd_trans_stop    (const struct lu_env *env,
215                                 struct thandle *th);
216 static int   osd_object_is_root(const struct osd_object *obj);
217
218 static struct osd_object  *osd_obj          (const struct lu_object *o);
219 static struct osd_device  *osd_dev          (const struct lu_device *d);
220 static struct osd_device  *osd_dt_dev       (const struct dt_device *d);
221 static struct osd_object  *osd_dt_obj       (const struct dt_object *d);
222 static struct osd_device  *osd_obj2dev      (const struct osd_object *o);
223 static struct lu_device   *osd2lu_dev       (struct osd_device *osd);
224 static struct lu_device   *osd_device_fini  (const struct lu_env *env,
225                                              struct lu_device *d);
226 static struct lu_device   *osd_device_alloc (const struct lu_env *env,
227                                              struct lu_device_type *t,
228                                              struct lustre_cfg *cfg);
229 static struct lu_object   *osd_object_alloc (const struct lu_env *env,
230                                              const struct lu_object_header *hdr,
231                                              struct lu_device *d);
232 static struct inode       *osd_iget         (struct osd_thread_info *info,
233                                              struct osd_device *dev,
234                                              const struct osd_inode_id *id);
235 static struct super_block *osd_sb           (const struct osd_device *dev);
236 static struct dt_it       *osd_it_init      (const struct lu_env *env,
237                                              struct dt_object *dt, int wable,
238                                              struct lustre_capa *capa);
239 static struct dt_key      *osd_it_key       (const struct lu_env *env,
240                                              const struct dt_it *di);
241 static struct dt_rec      *osd_it_rec       (const struct lu_env *env,
242                                              const struct dt_it *di);
243 static struct timespec    *osd_inode_time   (const struct lu_env *env,
244                                              struct inode *inode,
245                                              __u64 seconds);
246 static struct thandle     *osd_trans_start  (const struct lu_env *env,
247                                              struct dt_device *d,
248                                              struct txn_param *p);
249 static journal_t          *osd_journal      (const struct osd_device *dev);
250
251 static struct lu_device_type_operations osd_device_type_ops;
252 static struct lu_device_type            osd_device_type;
253 static struct lu_object_operations      osd_lu_obj_ops;
254 static struct obd_ops                   osd_obd_device_ops;
255 static struct lprocfs_vars              lprocfs_osd_module_vars[];
256 static struct lprocfs_vars              lprocfs_osd_obd_vars[];
257 static struct lu_device_operations      osd_lu_ops;
258 static struct lu_context_key            osd_key;
259 static struct dt_object_operations      osd_obj_ops;
260 static struct dt_body_operations        osd_body_ops;
261 static struct dt_index_operations       osd_index_ops;
262 static struct dt_index_operations       osd_index_compat_ops;
263
264 struct osd_thandle {
265         struct thandle          ot_super;
266         handle_t               *ot_handle;
267         struct journal_callback ot_jcb;
268 };
269
270 /*
271  * Invariants, assertions.
272  */
273
274 /*
275  * XXX: do not enable this, until invariant checking code is made thread safe
276  * in the face of pdirops locking.
277  */
278 #define OSD_INVARIANT_CHECKS (0)
279
280 #if OSD_INVARIANT_CHECKS
281 static int osd_invariant(const struct osd_object *obj)
282 {
283         return
284                 obj != NULL &&
285                 ergo(obj->oo_inode != NULL,
286                      obj->oo_inode->i_sb == osd_sb(osd_obj2dev(obj)) &&
287                      atomic_read(&obj->oo_inode->i_count) > 0) &&
288                 ergo(obj->oo_dir != NULL &&
289                      obj->oo_dir->od_conationer.ic_object != NULL,
290                      obj->oo_dir->od_conationer.ic_object == obj->oo_inode);
291 }
292 #else
293 #define osd_invariant(obj) (1)
294 #endif
295
296 static inline struct osd_thread_info *osd_oti_get(const struct lu_env *env)
297 {
298         return lu_context_key_get(&env->le_ctx, &osd_key);
299 }
300
301 #if OSD_COUNTERS
302 /*
303  * Concurrency: doesn't matter
304  */
305 static int osd_read_locked(const struct lu_env *env, struct osd_object *o)
306 {
307         return osd_oti_get(env)->oti_r_locks > 0;
308 }
309
310 /*
311  * Concurrency: doesn't matter
312  */
313 static int osd_write_locked(const struct lu_env *env, struct osd_object *o)
314 {
315         struct osd_thread_info *oti = osd_oti_get(env);
316         return oti->oti_w_locks > 0 && o->oo_owner == env;
317 }
318
319 #define OSD_COUNTERS_DO(exp) exp
320 #else
321
322
323 #define osd_read_locked(env, o) (1)
324 #define osd_write_locked(env, o) (1)
325 #define OSD_COUNTERS_DO(exp) ((void)0)
326 #endif
327
328 /*
329  * Concurrency: doesn't access mutable data
330  */
331 static int osd_root_get(const struct lu_env *env,
332                         struct dt_device *dev, struct lu_fid *f)
333 {
334         struct inode *inode;
335
336         inode = osd_sb(osd_dt_dev(dev))->s_root->d_inode;
337         lu_igif_build(f, inode->i_ino, inode->i_generation);
338         return 0;
339 }
340
341 /*
342  * OSD object methods.
343  */
344
345 /*
346  * Concurrency: no concurrent access is possible that early in object
347  * life-cycle.
348  */
349 static struct lu_object *osd_object_alloc(const struct lu_env *env,
350                                           const struct lu_object_header *hdr,
351                                           struct lu_device *d)
352 {
353         struct osd_object *mo;
354
355         OBD_ALLOC_PTR(mo);
356         if (mo != NULL) {
357                 struct lu_object *l;
358
359                 l = &mo->oo_dt.do_lu;
360                 dt_object_init(&mo->oo_dt, NULL, d);
361                 mo->oo_dt.do_ops = &osd_obj_ops;
362                 l->lo_ops = &osd_lu_obj_ops;
363                 init_rwsem(&mo->oo_sem);
364                 spin_lock_init(&mo->oo_guard);
365                 return l;
366         } else
367                 return NULL;
368 }
369
370 /*
371  * Concurrency: shouldn't matter.
372  */
373 static void osd_object_init0(struct osd_object *obj)
374 {
375         LASSERT(obj->oo_inode != NULL);
376         obj->oo_dt.do_body_ops = &osd_body_ops;
377         obj->oo_dt.do_lu.lo_header->loh_attr |=
378                 (LOHA_EXISTS | (obj->oo_inode->i_mode & S_IFMT));
379 }
380
381 /*
382  * Concurrency: no concurrent access is possible that early in object
383  * life-cycle.
384  */
385 static int osd_object_init(const struct lu_env *env, struct lu_object *l)
386 {
387         struct osd_object *obj = osd_obj(l);
388         int result;
389
390         LASSERT(osd_invariant(obj));
391
392         result = osd_fid_lookup(env, obj, lu_object_fid(l));
393         if (result == 0) {
394                 if (obj->oo_inode != NULL)
395                         osd_object_init0(obj);
396         }
397         LASSERT(osd_invariant(obj));
398         return result;
399 }
400
401 /*
402  * Concurrency: no concurrent access is possible that late in object
403  * life-cycle.
404  */
405 static void osd_object_free(const struct lu_env *env, struct lu_object *l)
406 {
407         struct osd_object *obj = osd_obj(l);
408
409         LASSERT(osd_invariant(obj));
410
411         dt_object_fini(&obj->oo_dt);
412         OBD_FREE_PTR(obj);
413 }
414
415 static struct iam_path_descr *osd_ipd_get(const struct lu_env *env,
416                                           const struct iam_container *bag)
417 {
418         return bag->ic_descr->id_ops->id_ipd_alloc(bag,
419                                                    osd_oti_get(env)->oti_ipd);
420 }
421
422 static void osd_ipd_put(const struct lu_env *env,
423                         const struct iam_container *bag,
424                         struct iam_path_descr *ipd)
425 {
426         bag->ic_descr->id_ops->id_ipd_free(ipd);
427 }
428
429 /*
430  * Concurrency: no concurrent access is possible that late in object
431  * life-cycle.
432  */
433 static void osd_index_fini(struct osd_object *o)
434 {
435         struct iam_container *bag;
436
437         if (o->oo_dir != NULL) {
438                 bag = &o->oo_dir->od_container;
439                 if (o->oo_inode != NULL) {
440                         if (bag->ic_object == o->oo_inode)
441                                 iam_container_fini(bag);
442                 }
443                 OBD_FREE_PTR(o->oo_dir);
444                 o->oo_dir = NULL;
445         }
446 }
447
448 /*
449  * Concurrency: no concurrent access is possible that late in object
450  * life-cycle (for all existing callers, that is. New callers have to provide
451  * their own locking.)
452  */
453 static int osd_inode_unlinked(const struct inode *inode)
454 {
455         return inode->i_nlink == 0;
456 }
457
458 enum {
459         OSD_TXN_OI_DELETE_CREDITS    = 20,
460         OSD_TXN_INODE_DELETE_CREDITS = 20
461 };
462
463 /*
464  * Concurrency: no concurrent access is possible that late in object
465  * life-cycle.
466  */
467 static int osd_inode_remove(const struct lu_env *env, struct osd_object *obj)
468 {
469         const struct lu_fid    *fid = lu_object_fid(&obj->oo_dt.do_lu);
470         struct osd_device      *osd = osd_obj2dev(obj);
471         struct osd_thread_info *oti = osd_oti_get(env);
472         struct txn_param       *prm = &oti->oti_txn;
473         struct thandle         *th;
474         int result;
475
476         txn_param_init(prm, OSD_TXN_OI_DELETE_CREDITS + 
477                             OSD_TXN_INODE_DELETE_CREDITS);
478         th = osd_trans_start(env, &osd->od_dt_dev, prm);
479         if (!IS_ERR(th)) {
480                 result = osd_oi_delete(oti, &osd->od_oi, fid, th);
481                 osd_trans_stop(env, th);
482         } else
483                 result = PTR_ERR(th);
484         return result;
485 }
486
487 /*
488  * Called just before object is freed. Releases all resources except for
489  * object itself (that is released by osd_object_free()).
490  *
491  * Concurrency: no concurrent access is possible that late in object
492  * life-cycle.
493  */
494 static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
495 {
496         struct osd_object *obj   = osd_obj(l);
497         struct inode      *inode = obj->oo_inode;
498
499         LASSERT(osd_invariant(obj));
500
501         /*
502          * If object is unlinked remove fid->ino mapping from object index.
503          *
504          * File body will be deleted by iput().
505          */
506
507         osd_index_fini(obj);
508         if (inode != NULL) {
509                 int result;
510
511                 if (osd_inode_unlinked(inode)) {
512                         result = osd_inode_remove(env, obj);
513                         if (result != 0)
514                                 LU_OBJECT_DEBUG(D_ERROR, env, l,
515                                                 "Failed to cleanup: %d\n",
516                                                 result);
517                 }
518                 iput(inode);
519                 obj->oo_inode = NULL;
520         }
521 }
522
523 /*
524  * Concurrency: ->loo_object_release() is called under site spin-lock.
525  */
526 static void osd_object_release(const struct lu_env *env,
527                                struct lu_object *l)
528 {
529         struct osd_object *o = osd_obj(l);
530
531         LASSERT(!lu_object_is_dying(l->lo_header));
532         if (o->oo_inode != NULL && osd_inode_unlinked(o->oo_inode))
533                 set_bit(LU_OBJECT_HEARD_BANSHEE, &l->lo_header->loh_flags);
534 }
535
536 /*
537  * Concurrency: shouldn't matter.
538  */
539 static int osd_object_print(const struct lu_env *env, void *cookie,
540                             lu_printer_t p, const struct lu_object *l)
541 {
542         struct osd_object *o = osd_obj(l);
543         struct iam_descr  *d;
544
545         if (o->oo_dir != NULL)
546                 d = o->oo_dir->od_container.ic_descr;
547         else
548                 d = NULL;
549         return (*p)(env, cookie, LUSTRE_OSD_NAME"-object@%p(i:%p:%lu/%u)[%s]",
550                     o, o->oo_inode,
551                     o->oo_inode ? o->oo_inode->i_ino : 0UL,
552                     o->oo_inode ? o->oo_inode->i_generation : 0,
553                     d ? d->id_ops->id_name : "plain");
554 }
555
556 /*
557  * Concurrency: shouldn't matter.
558  */
559 static int osd_statfs(const struct lu_env *env,
560                       struct dt_device *d, struct kstatfs *sfs)
561 {
562         struct osd_device *osd = osd_dt_dev(d);
563         struct super_block *sb = osd_sb(osd);
564         int result = 0;
565
566         spin_lock(&osd->od_osfs_lock);
567         /* cache 1 second */
568         if (cfs_time_before_64(osd->od_osfs_age, cfs_time_shift_64(-1))) {
569                 result = ll_do_statfs(sb, &osd->od_kstatfs);
570                 if (likely(result == 0)) /* N.B. statfs can't really fail */
571                         osd->od_osfs_age = cfs_time_current_64();
572         }
573
574         if (likely(result == 0))
575                 *sfs = osd->od_kstatfs; 
576         spin_unlock(&osd->od_osfs_lock);
577
578         return result;
579 }
580
581 /*
582  * Concurrency: doesn't access mutable data.
583  */
584 static void osd_conf_get(const struct lu_env *env,
585                          const struct dt_device *dev,
586                          struct dt_device_param *param)
587 {
588         /*
589          * XXX should be taken from not-yet-existing fs abstraction layer.
590          */
591         param->ddp_max_name_len  = LDISKFS_NAME_LEN;
592         param->ddp_max_nlink     = LDISKFS_LINK_MAX;
593         param->ddp_block_shift   = osd_sb(osd_dt_dev(dev))->s_blocksize_bits;
594 }
595
596 /*
597  * Journal
598  */
599
600 /*
601  * Concurrency: doesn't access mutable data.
602  */
603 static int osd_param_is_sane(const struct osd_device *dev,
604                              const struct txn_param *param)
605 {
606         return param->tp_credits <= osd_journal(dev)->j_max_transaction_buffers;
607 }
608
609 /*
610  * Concurrency: shouldn't matter.
611  */
612 static void osd_trans_commit_cb(struct journal_callback *jcb, int error)
613 {
614         struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb);
615         struct thandle     *th = &oh->ot_super;
616         struct dt_device   *dev = th->th_dev;
617
618         LASSERT(dev != NULL);
619         LASSERT(oh->ot_handle == NULL);
620
621         if (error) {
622                 CERROR("transaction @0x%p commit error: %d\n", th, error);
623         } else {
624                 struct lu_env *env = &osd_dt_dev(dev)->od_env_for_commit;
625                 /*
626                  * This od_env_for_commit is only for commit usage.  see
627                  * "struct dt_device"
628                  */
629                 lu_context_enter(&env->le_ctx);
630                 dt_txn_hook_commit(env, th);
631                 lu_context_exit(&env->le_ctx);
632         }
633
634         lu_device_put(&dev->dd_lu_dev);
635         th->th_dev = NULL;
636
637         lu_context_exit(&th->th_ctx);
638         lu_context_fini(&th->th_ctx);
639         OBD_FREE_PTR(oh);
640 }
641
642 /*
643  * Concurrency: shouldn't matter.
644  */
645 static struct thandle *osd_trans_start(const struct lu_env *env,
646                                        struct dt_device *d,
647                                        struct txn_param *p)
648 {
649         struct osd_device  *dev = osd_dt_dev(d);
650         handle_t           *jh;
651         struct osd_thandle *oh;
652         struct thandle     *th;
653         int hook_res;
654
655         ENTRY;
656
657         hook_res = dt_txn_hook_start(env, d, p);
658         if (hook_res != 0)
659                 RETURN(ERR_PTR(hook_res));
660
661         if (osd_param_is_sane(dev, p)) {
662                 OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO);
663                 if (oh != NULL) {
664                         /*
665                          * XXX temporary stuff. Some abstraction layer should
666                          * be used.
667                          */
668
669                         jh = journal_start(osd_journal(dev), p->tp_credits);
670                         if (!IS_ERR(jh)) {
671                                 oh->ot_handle = jh;
672                                 th = &oh->ot_super;
673                                 th->th_dev = d;
674                                 th->th_result = 0;
675                                 jh->h_sync = p->tp_sync;
676                                 lu_device_get(&d->dd_lu_dev);
677                                 /* add commit callback */
678                                 lu_context_init(&th->th_ctx, LCT_TX_HANDLE);
679                                 lu_context_enter(&th->th_ctx);
680                                 journal_callback_set(jh, osd_trans_commit_cb,
681                                                      (struct journal_callback *)&oh->ot_jcb);
682 #if OSD_COUNTERS
683                                 {
684                                         struct osd_thread_info *oti =
685                                                 osd_oti_get(env);
686
687                                         LASSERT(oti->oti_txns == 0);
688                                         LASSERT(oti->oti_r_locks == 0);
689                                         LASSERT(oti->oti_w_locks == 0);
690                                         oti->oti_txns++;
691                                 }
692 #endif
693                         } else {
694                                 OBD_FREE_PTR(oh);
695                                 th = (void *)jh;
696                         }
697                 } else
698                         th = ERR_PTR(-ENOMEM);
699         } else {
700                 CERROR("Invalid transaction parameters\n");
701                 th = ERR_PTR(-EINVAL);
702         }
703
704         RETURN(th);
705 }
706
707 /*
708  * Concurrency: shouldn't matter.
709  */
710 static void osd_trans_stop(const struct lu_env *env, struct thandle *th)
711 {
712         int result;
713         struct osd_thandle *oh;
714
715         ENTRY;
716
717         oh = container_of0(th, struct osd_thandle, ot_super);
718         if (oh->ot_handle != NULL) {
719                 handle_t *hdl = oh->ot_handle;
720                 /*
721                  * XXX temporary stuff. Some abstraction layer should be used.
722                  */
723                 result = dt_txn_hook_stop(env, th);
724                 if (result != 0)
725                         CERROR("Failure in transaction hook: %d\n", result);
726
727                 /**/
728                 oh->ot_handle = NULL;
729                 result = journal_stop(hdl);
730                 if (result != 0)
731                         CERROR("Failure to stop transaction: %d\n", result);
732
733 #if OSD_COUNTERS
734                 {
735                         struct osd_thread_info *oti = osd_oti_get(env);
736
737                         LASSERT(oti->oti_txns == 1);
738                         LASSERT(oti->oti_r_locks == 0);
739                         LASSERT(oti->oti_w_locks == 0);
740                         oti->oti_txns--;
741                 }
742 #endif
743         }
744         EXIT;
745 }
746
747 /*
748  * Concurrency: shouldn't matter.
749  */
750 static int osd_sync(const struct lu_env *env, struct dt_device *d)
751 {
752         CDEBUG(D_HA, "syncing OSD %s\n", LUSTRE_OSD_NAME);
753         return ldiskfs_force_commit(osd_sb(osd_dt_dev(d)));
754 }
755
756 /*
757  * Concurrency: shouldn't matter.
758  */
759 lvfs_sbdev_type fsfilt_ldiskfs_journal_sbdev(struct super_block *);
760
761 static void osd_ro(const struct lu_env *env, struct dt_device *d)
762 {
763         ENTRY;
764
765         CERROR("*** setting device %s read-only ***\n", LUSTRE_OSD_NAME);
766
767         __lvfs_set_rdonly(lvfs_sbdev(osd_sb(osd_dt_dev(d))),
768                           fsfilt_ldiskfs_journal_sbdev(osd_sb(osd_dt_dev(d))));
769         EXIT;
770 }
771
772 /*
773  * Concurrency: serialization provided by callers.
774  */
775 static int osd_init_capa_ctxt(const struct lu_env *env, struct dt_device *d,
776                               int mode, unsigned long timeout, __u32 alg,
777                               struct lustre_capa_key *keys)
778 {
779         struct osd_device *dev = osd_dt_dev(d);
780         ENTRY;
781
782         dev->od_fl_capa = mode;
783         dev->od_capa_timeout = timeout;
784         dev->od_capa_alg = alg;
785         dev->od_capa_keys = keys;
786         RETURN(0);
787 }
788
789 /* Note: we did not count into QUOTA here, If we mount with --data_journal
790  * we may need more*/
791 static const int osd_dto_credits[DTO_NR] = {
792         /*
793          * Insert/Delete. IAM EXT3_INDEX_EXTRA_TRANS_BLOCKS(8) +
794          * EXT3_SINGLEDATA_TRANS_BLOCKS 8 XXX Note: maybe iam need more,since
795          * iam have more level than Ext3 htree
796          */
797         [DTO_INDEX_INSERT]  = 16,
798         [DTO_INDEX_DELETE]  = 16,
799         [DTO_IDNEX_UPDATE]  = 16,
800         /*
801          * Create a object. Same as create object in Ext3 filesystem, but did
802          * not count QUOTA i EXT3_DATA_TRANS_BLOCKS(12) +
803          * INDEX_EXTRA_BLOCKS(8) + 3(inode bits,groups, GDT)
804          */
805         [DTO_OBJECT_CREATE] = 23,
806         [DTO_OBJECT_DELETE] = 23,
807         /*
808          * Attr set credits 3 inode, group, GDT
809          */
810         [DTO_ATTR_SET]      = 3,
811         /*
812          * XATTR_SET. SAME AS XATTR of EXT3 EXT3_DATA_TRANS_BLOCKS XXX Note:
813          * in original MDS implmentation EXT3_INDEX_EXTRA_TRANS_BLOCKS are
814          * also counted in. Do not know why?
815          */
816         [DTO_XATTR_SET]     = 16,
817         [DTO_LOG_REC]       = 16,
818         /* creadits for inode change during write */
819         [DTO_WRITE_BASE]    = 3,
820         /* credits for single block write */
821         [DTO_WRITE_BLOCK]   = 12 
822 };
823
824 static int osd_credit_get(const struct lu_env *env, struct dt_device *d,
825                           enum dt_txn_op op)
826 {
827         LASSERT(0 <= op && op < ARRAY_SIZE(osd_dto_credits));
828         return osd_dto_credits[op];
829 }
830
831 static struct dt_device_operations osd_dt_ops = {
832         .dt_root_get       = osd_root_get,
833         .dt_statfs         = osd_statfs,
834         .dt_trans_start    = osd_trans_start,
835         .dt_trans_stop     = osd_trans_stop,
836         .dt_conf_get       = osd_conf_get,
837         .dt_sync           = osd_sync,
838         .dt_ro             = osd_ro,
839         .dt_credit_get     = osd_credit_get,
840         .dt_init_capa_ctxt = osd_init_capa_ctxt,
841 };
842
843 static void osd_object_read_lock(const struct lu_env *env,
844                                  struct dt_object *dt)
845 {
846         struct osd_object *obj = osd_dt_obj(dt);
847
848         LASSERT(osd_invariant(obj));
849
850         OSD_COUNTERS_DO(LASSERT(obj->oo_owner != env));
851         down_read(&obj->oo_sem);
852 #if OSD_COUNTERS
853         {
854                 struct osd_thread_info *oti = osd_oti_get(env);
855
856                 LASSERT(obj->oo_owner == NULL);
857                 oti->oti_r_locks++;
858         }
859 #endif
860 }
861
862 static void osd_object_write_lock(const struct lu_env *env,
863                                   struct dt_object *dt)
864 {
865         struct osd_object *obj = osd_dt_obj(dt);
866
867         LASSERT(osd_invariant(obj));
868
869         OSD_COUNTERS_DO(LASSERT(obj->oo_owner != env));
870         down_write(&obj->oo_sem);
871 #if OSD_COUNTERS
872         {
873                 struct osd_thread_info *oti = osd_oti_get(env);
874
875                 LASSERT(obj->oo_owner == NULL);
876                 obj->oo_owner = env;
877                 oti->oti_w_locks++;
878         }
879 #endif
880 }
881
882 static void osd_object_read_unlock(const struct lu_env *env,
883                                    struct dt_object *dt)
884 {
885         struct osd_object *obj = osd_dt_obj(dt);
886
887         LASSERT(osd_invariant(obj));
888 #if OSD_COUNTERS
889         {
890                 struct osd_thread_info *oti = osd_oti_get(env);
891
892                 LASSERT(oti->oti_r_locks > 0);
893                 oti->oti_r_locks--;
894         }
895 #endif
896         up_read(&obj->oo_sem);
897 }
898
899 static void osd_object_write_unlock(const struct lu_env *env,
900                                     struct dt_object *dt)
901 {
902         struct osd_object *obj = osd_dt_obj(dt);
903
904         LASSERT(osd_invariant(obj));
905 #if OSD_COUNTERS
906         {
907                 struct osd_thread_info *oti = osd_oti_get(env);
908
909                 LASSERT(obj->oo_owner == env);
910                 LASSERT(oti->oti_w_locks > 0);
911                 oti->oti_w_locks--;
912                 obj->oo_owner = NULL;
913         }
914 #endif
915         up_write(&obj->oo_sem);
916 }
917
918 static int capa_is_sane(const struct lu_env *env,
919                         struct osd_device *dev,
920                         struct lustre_capa *capa,
921                         struct lustre_capa_key *keys)
922 {
923         struct osd_thread_info *oti = osd_oti_get(env);
924         struct obd_capa *oc;
925         int i, rc = 0;
926         ENTRY;
927
928         oc = capa_lookup(dev->od_capa_hash, capa, 0);
929         if (oc) {
930                 if (capa_is_expired(oc)) {
931                         DEBUG_CAPA(D_ERROR, capa, "expired");
932                         rc = -ESTALE;
933                 }
934                 capa_put(oc);
935                 RETURN(rc);
936         }
937
938         spin_lock(&capa_lock);
939         for (i = 0; i < 2; i++) {
940                 if (keys[i].lk_keyid == capa->lc_keyid) {
941                         oti->oti_capa_key = keys[i];
942                         break;
943                 }
944         }
945         spin_unlock(&capa_lock);
946
947         if (i == 2) {
948                 DEBUG_CAPA(D_ERROR, capa, "no matched capa key");
949                 RETURN(-ESTALE);
950         }
951
952         rc = capa_hmac(oti->oti_capa.lc_hmac, capa, oti->oti_capa_key.lk_key);
953         if (rc)
954                 RETURN(rc);
955         if (memcmp(oti->oti_capa.lc_hmac, capa->lc_hmac, sizeof(capa->lc_hmac)))
956         {
957                 DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch");
958                 RETURN(-EACCES);
959         }
960
961         oc = capa_add(dev->od_capa_hash, capa);
962         capa_put(oc);
963
964         RETURN(0);
965 }
966
967 static int osd_object_auth(const struct lu_env *env, struct dt_object *dt,
968                            struct lustre_capa *capa, __u64 opc)
969 {
970         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
971         struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
972         int rc;
973
974         if (!dev->od_fl_capa)
975                 return 0;
976
977         if (capa == BYPASS_CAPA)
978                 return 0;
979
980         if (!capa) {
981                 CERROR("no capability is provided for fid "DFID"\n", PFID(fid));
982                 return -EACCES;
983         }
984
985         if (!lu_fid_eq(fid, &capa->lc_fid)) {
986                 DEBUG_CAPA(D_ERROR, capa, "fid "DFID" mismatch with",
987                            PFID(fid));
988                 return -EACCES;
989         }
990
991         if (!capa_opc_supported(capa, opc)) {
992                 DEBUG_CAPA(D_ERROR, capa, "opc "LPX64" not supported by", opc);
993                 return -EACCES;
994         }
995
996         if ((rc = capa_is_sane(env, dev, capa, dev->od_capa_keys))) {
997                 DEBUG_CAPA(D_ERROR, capa, "insane (rc %d)", rc);
998                 return -EACCES;
999         }
1000
1001         return 0;
1002 }
1003
1004 static int osd_attr_get(const struct lu_env *env,
1005                         struct dt_object *dt,
1006                         struct lu_attr *attr,
1007                         struct lustre_capa *capa)
1008 {
1009         struct osd_object *obj = osd_dt_obj(dt);
1010
1011         LASSERT(dt_object_exists(dt));
1012         LASSERT(osd_invariant(obj));
1013
1014         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1015                 return -EACCES;
1016
1017         spin_lock(&obj->oo_guard);
1018         osd_inode_getattr(env, obj->oo_inode, attr);
1019         spin_unlock(&obj->oo_guard);
1020         return 0;
1021 }
1022
1023 static int osd_attr_set(const struct lu_env *env,
1024                         struct dt_object *dt,
1025                         const struct lu_attr *attr,
1026                         struct thandle *handle,
1027                         struct lustre_capa *capa)
1028 {
1029         struct osd_object *obj = osd_dt_obj(dt);
1030
1031         LASSERT(handle != NULL);
1032         LASSERT(dt_object_exists(dt));
1033         LASSERT(osd_invariant(obj));
1034
1035         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1036                 return -EACCES;
1037
1038         spin_lock(&obj->oo_guard);
1039         osd_inode_setattr(env, obj->oo_inode, attr);
1040         spin_unlock(&obj->oo_guard);
1041
1042         mark_inode_dirty(obj->oo_inode);
1043         return 0;
1044 }
1045
1046 static struct timespec *osd_inode_time(const struct lu_env *env,
1047                                        struct inode *inode, __u64 seconds)
1048 {
1049         struct osd_thread_info *oti = osd_oti_get(env);
1050         struct timespec        *t   = &oti->oti_time;
1051
1052         t->tv_sec  = seconds;
1053         t->tv_nsec = 0;
1054         *t = timespec_trunc(*t, get_sb_time_gran(inode->i_sb));
1055         return t;
1056 }
1057
1058 static void osd_inode_setattr(const struct lu_env *env,
1059                               struct inode *inode, const struct lu_attr *attr)
1060 {
1061         __u64 bits;
1062
1063         bits = attr->la_valid;
1064
1065         LASSERT(!(bits & LA_TYPE)); /* Huh? You want too much. */
1066
1067         if (bits & LA_ATIME)
1068                 inode->i_atime  = *osd_inode_time(env, inode, attr->la_atime);
1069         if (bits & LA_CTIME)
1070                 inode->i_ctime  = *osd_inode_time(env, inode, attr->la_ctime);
1071         if (bits & LA_MTIME)
1072                 inode->i_mtime  = *osd_inode_time(env, inode, attr->la_mtime);
1073         if (bits & LA_SIZE) {
1074                 LDISKFS_I(inode)->i_disksize = attr->la_size;
1075                 i_size_write(inode, attr->la_size);
1076         }
1077         if (bits & LA_BLOCKS)
1078                 inode->i_blocks = attr->la_blocks;
1079         if (bits & LA_MODE)
1080                 inode->i_mode   = (inode->i_mode & S_IFMT) |
1081                         (attr->la_mode & ~S_IFMT);
1082         if (bits & LA_UID)
1083                 inode->i_uid    = attr->la_uid;
1084         if (bits & LA_GID)
1085                 inode->i_gid    = attr->la_gid;
1086         if (bits & LA_NLINK)
1087                 inode->i_nlink  = attr->la_nlink;
1088         if (bits & LA_RDEV)
1089                 inode->i_rdev   = attr->la_rdev;
1090
1091         if (bits & LA_FLAGS) {
1092                 struct ldiskfs_inode_info *li = LDISKFS_I(inode);
1093
1094                 li->i_flags = (li->i_flags & ~LDISKFS_FL_USER_MODIFIABLE) |
1095                         (attr->la_flags & LDISKFS_FL_USER_MODIFIABLE);
1096         }
1097 }
1098
1099 /*
1100  * Object creation.
1101  *
1102  * XXX temporary solution.
1103  */
1104
1105 static int osd_create_pre(struct osd_thread_info *info, struct osd_object *obj,
1106                           struct lu_attr *attr, struct thandle *th)
1107 {
1108         return 0;
1109 }
1110
1111 static int osd_create_post(struct osd_thread_info *info, struct osd_object *obj,
1112                            struct lu_attr *attr, struct thandle *th)
1113 {
1114         LASSERT(obj->oo_inode != NULL);
1115
1116         osd_object_init0(obj);
1117         return 0;
1118 }
1119
1120 extern struct inode *ldiskfs_create_inode(handle_t *handle,
1121                                           struct inode * dir, int mode);
1122
1123 static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
1124                       umode_t mode,
1125                       struct dt_allocation_hint *hint,
1126                       struct thandle *th)
1127 {
1128         int result;
1129         struct osd_device  *osd = osd_obj2dev(obj);
1130         struct osd_thandle *oth;
1131         struct inode       *parent;
1132         struct inode       *inode;
1133
1134         LASSERT(osd_invariant(obj));
1135         LASSERT(obj->oo_inode == NULL);
1136         LASSERT(osd->od_obj_area != NULL);
1137
1138         oth = container_of(th, struct osd_thandle, ot_super);
1139         LASSERT(oth->ot_handle->h_transaction != NULL);
1140
1141         if (hint && hint->dah_parent)
1142                 parent = osd_dt_obj(hint->dah_parent)->oo_inode;
1143         else
1144                 parent = osd->od_obj_area->d_inode;
1145         LASSERT(parent->i_op != NULL);
1146
1147         inode = ldiskfs_create_inode(oth->ot_handle, parent, mode);
1148         if (!IS_ERR(inode)) {
1149                 obj->oo_inode = inode;
1150                 result = 0;
1151         } else
1152                 result = PTR_ERR(inode);
1153         LASSERT(osd_invariant(obj));
1154         return result;
1155 }
1156
1157
1158 extern int iam_lvar_create(struct inode *obj, int keysize, int ptrsize,
1159                            int recsize, handle_t *handle);
1160
1161 enum {
1162         OSD_NAME_LEN = 255
1163 };
1164
1165 static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj,
1166                      struct lu_attr *attr,
1167                      struct dt_allocation_hint *hint,
1168                      struct thandle *th)
1169 {
1170         int result;
1171         struct osd_thandle *oth;
1172
1173         LASSERT(S_ISDIR(attr->la_mode));
1174
1175         oth = container_of(th, struct osd_thandle, ot_super);
1176         LASSERT(oth->ot_handle->h_transaction != NULL);
1177         result = osd_mkfile(info, obj, (attr->la_mode &
1178                             (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1179         if (result == 0) {
1180                 LASSERT(obj->oo_inode != NULL);
1181                 /*
1182                  * XXX uh-oh... call low-level iam function directly.
1183                  */
1184                 result = iam_lvar_create(obj->oo_inode, OSD_NAME_LEN, 4,
1185                                          sizeof (struct lu_fid_pack),
1186                                          oth->ot_handle);
1187         }
1188         return result;
1189 }
1190
1191 static int osd_mkreg(struct osd_thread_info *info, struct osd_object *obj,
1192                      struct lu_attr *attr,
1193                      struct dt_allocation_hint *hint,
1194                      struct thandle *th)
1195 {
1196         LASSERT(S_ISREG(attr->la_mode));
1197         return osd_mkfile(info, obj, (attr->la_mode &
1198                                (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1199 }
1200
1201 static int osd_mksym(struct osd_thread_info *info, struct osd_object *obj,
1202                      struct lu_attr *attr,
1203                      struct dt_allocation_hint *hint,
1204                      struct thandle *th)
1205 {
1206         LASSERT(S_ISLNK(attr->la_mode));
1207         return osd_mkfile(info, obj, (attr->la_mode &
1208                               (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1209 }
1210
1211 static int osd_mknod(struct osd_thread_info *info, struct osd_object *obj,
1212                      struct lu_attr *attr,
1213                      struct dt_allocation_hint *hint,
1214                      struct thandle *th)
1215 {
1216         int result;
1217         struct osd_device *osd = osd_obj2dev(obj);
1218         struct inode      *dir;
1219         umode_t mode = attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX);
1220
1221         LASSERT(osd_invariant(obj));
1222         LASSERT(obj->oo_inode == NULL);
1223         LASSERT(osd->od_obj_area != NULL);
1224         LASSERT(S_ISCHR(mode) || S_ISBLK(mode) ||
1225                 S_ISFIFO(mode) || S_ISSOCK(mode));
1226
1227         dir = osd->od_obj_area->d_inode;
1228         LASSERT(dir->i_op != NULL);
1229
1230         result = osd_mkfile(info, obj, mode, hint, th);
1231         if (result == 0) {
1232                 LASSERT(obj->oo_inode != NULL);
1233                 init_special_inode(obj->oo_inode, mode, attr->la_rdev);
1234         }
1235         LASSERT(osd_invariant(obj));
1236         return result;
1237 }
1238
1239 typedef int (*osd_obj_type_f)(struct osd_thread_info *, struct osd_object *,
1240                               struct lu_attr *,
1241                               struct dt_allocation_hint *hint,
1242                               struct thandle *);
1243
1244 static osd_obj_type_f osd_create_type_f(__u32 mode)
1245 {
1246         osd_obj_type_f result;
1247
1248         switch (mode) {
1249         case S_IFDIR:
1250                 result = osd_mkdir;
1251                 break;
1252         case S_IFREG:
1253                 result = osd_mkreg;
1254                 break;
1255         case S_IFLNK:
1256                 result = osd_mksym;
1257                 break;
1258         case S_IFCHR:
1259         case S_IFBLK:
1260         case S_IFIFO:
1261         case S_IFSOCK:
1262                 result = osd_mknod;
1263                 break;
1264         default:
1265                 LBUG();
1266                 break;
1267         }
1268         return result;
1269 }
1270
1271
1272 static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah,
1273                         struct dt_object *parent, umode_t child_mode)
1274 {
1275         LASSERT(ah);
1276
1277         memset(ah, 0, sizeof(*ah));
1278         ah->dah_parent = parent;
1279         ah->dah_mode = child_mode;
1280 }
1281
1282
1283 /*
1284  * Concurrency: @dt is write locked.
1285  */
1286 static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
1287                              struct lu_attr *attr, 
1288                              struct dt_allocation_hint *hint,
1289                              struct thandle *th)
1290 {
1291         const struct lu_fid    *fid  = lu_object_fid(&dt->do_lu);
1292         struct osd_object      *obj  = osd_dt_obj(dt);
1293         struct osd_device      *osd  = osd_obj2dev(obj);
1294         struct osd_thread_info *info = osd_oti_get(env);
1295         int result;
1296
1297         ENTRY;
1298
1299         LASSERT(osd_invariant(obj));
1300         LASSERT(!dt_object_exists(dt));
1301         LASSERT(osd_write_locked(env, obj));
1302         LASSERT(th != NULL);
1303
1304         /*
1305          * XXX missing: Quote handling.
1306          */
1307
1308         result = osd_create_pre(info, obj, attr, th);
1309         if (result == 0) {
1310                 result = osd_create_type_f(attr->la_mode & S_IFMT)(info, obj,
1311                                                                 attr, hint, th);
1312                 if (result == 0)
1313                         result = osd_create_post(info, obj, attr, th);
1314         }
1315         if (result == 0) {
1316                 struct osd_inode_id *id = &info->oti_id;
1317
1318                 LASSERT(obj->oo_inode != NULL);
1319
1320                 id->oii_ino = obj->oo_inode->i_ino;
1321                 id->oii_gen = obj->oo_inode->i_generation;
1322
1323                 result = osd_oi_insert(info, &osd->od_oi, fid, id, th);
1324         }
1325
1326         LASSERT(ergo(result == 0, dt_object_exists(dt)));
1327         LASSERT(osd_invariant(obj));
1328         RETURN(result);
1329 }
1330
1331 /*
1332  * Concurrency: @dt is write locked.
1333  */
1334 static void osd_object_ref_add(const struct lu_env *env,
1335                                struct dt_object *dt,
1336                                struct thandle *th)
1337 {
1338         struct osd_object *obj = osd_dt_obj(dt);
1339         struct inode *inode = obj->oo_inode;
1340
1341         LASSERT(osd_invariant(obj));
1342         LASSERT(dt_object_exists(dt));
1343         LASSERT(osd_write_locked(env, obj));
1344         LASSERT(th != NULL);
1345
1346         spin_lock(&obj->oo_guard);
1347         if (inode->i_nlink < LDISKFS_LINK_MAX) {
1348                 inode->i_nlink ++;
1349                 spin_unlock(&obj->oo_guard);
1350                 mark_inode_dirty(inode);
1351         } else {
1352                 spin_unlock(&obj->oo_guard);
1353                 LU_OBJECT_DEBUG(D_ERROR, env, &dt->do_lu,
1354                                 "Overflowed nlink\n");
1355         }
1356         LASSERT(osd_invariant(obj));
1357 }
1358
1359 /*
1360  * Concurrency: @dt is write locked.
1361  */
1362 static void osd_object_ref_del(const struct lu_env *env,
1363                                struct dt_object *dt,
1364                                struct thandle *th)
1365 {
1366         struct osd_object *obj = osd_dt_obj(dt);
1367         struct inode *inode = obj->oo_inode;
1368
1369         LASSERT(osd_invariant(obj));
1370         LASSERT(dt_object_exists(dt));
1371         LASSERT(osd_write_locked(env, obj));
1372         LASSERT(th != NULL);
1373
1374         spin_lock(&obj->oo_guard);
1375         if (inode->i_nlink > 0) {
1376                 inode->i_nlink --;
1377                 spin_unlock(&obj->oo_guard);
1378                 mark_inode_dirty(inode);
1379         } else {
1380                 spin_unlock(&obj->oo_guard);
1381                 LU_OBJECT_DEBUG(D_ERROR, env, &dt->do_lu,
1382                                 "Underflowed nlink\n");
1383         }
1384         LASSERT(osd_invariant(obj));
1385 }
1386
1387 /*
1388  * Concurrency: @dt is read locked.
1389  */
1390 static int osd_xattr_get(const struct lu_env *env,
1391                          struct dt_object *dt,
1392                          struct lu_buf *buf,
1393                          const char *name,
1394                          struct lustre_capa *capa)
1395 {
1396         struct osd_object      *obj    = osd_dt_obj(dt);
1397         struct inode           *inode  = obj->oo_inode;
1398         struct osd_thread_info *info   = osd_oti_get(env);
1399         struct dentry          *dentry = &info->oti_dentry;
1400
1401         LASSERT(dt_object_exists(dt));
1402         LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
1403         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
1404
1405         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1406                 return -EACCES;
1407
1408         dentry->d_inode = inode;
1409         return inode->i_op->getxattr(dentry, name, buf->lb_buf, buf->lb_len);
1410 }
1411
1412 /*
1413  * Concurrency: @dt is write locked.
1414  */
1415 static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
1416                          const struct lu_buf *buf, const char *name, int fl,
1417                          struct thandle *handle, struct lustre_capa *capa)
1418 {
1419         int fs_flags;
1420
1421         struct osd_object      *obj    = osd_dt_obj(dt);
1422         struct inode           *inode  = obj->oo_inode;
1423         struct osd_thread_info *info   = osd_oti_get(env);
1424         struct dentry          *dentry = &info->oti_dentry;
1425
1426         LASSERT(dt_object_exists(dt));
1427         LASSERT(inode->i_op != NULL && inode->i_op->setxattr != NULL);
1428         LASSERT(osd_write_locked(env, obj));
1429         LASSERT(handle != NULL);
1430
1431         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1432                 return -EACCES;
1433
1434         dentry->d_inode = inode;
1435
1436         fs_flags = 0;
1437         if (fl & LU_XATTR_REPLACE)
1438                 fs_flags |= XATTR_REPLACE;
1439
1440         if (fl & LU_XATTR_CREATE)
1441                 fs_flags |= XATTR_CREATE;
1442
1443         return inode->i_op->setxattr(dentry, name,
1444                                      buf->lb_buf, buf->lb_len, fs_flags);
1445 }
1446
1447 /*
1448  * Concurrency: @dt is read locked.
1449  */
1450 static int osd_xattr_list(const struct lu_env *env,
1451                           struct dt_object *dt,
1452                           struct lu_buf *buf,
1453                           struct lustre_capa *capa)
1454 {
1455         struct osd_object      *obj    = osd_dt_obj(dt);
1456         struct inode           *inode  = obj->oo_inode;
1457         struct osd_thread_info *info   = osd_oti_get(env);
1458         struct dentry          *dentry = &info->oti_dentry;
1459
1460         LASSERT(dt_object_exists(dt));
1461         LASSERT(inode->i_op != NULL && inode->i_op->listxattr != NULL);
1462         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
1463
1464         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1465                 return -EACCES;
1466
1467         dentry->d_inode = inode;
1468         return inode->i_op->listxattr(dentry, buf->lb_buf, buf->lb_len);
1469 }
1470
1471 /*
1472  * Concurrency: @dt is write locked.
1473  */
1474 static int osd_xattr_del(const struct lu_env *env,
1475                          struct dt_object *dt,
1476                          const char *name,
1477                          struct thandle *handle,
1478                          struct lustre_capa *capa)
1479 {
1480         struct osd_object      *obj    = osd_dt_obj(dt);
1481         struct inode           *inode  = obj->oo_inode;
1482         struct osd_thread_info *info   = osd_oti_get(env);
1483         struct dentry          *dentry = &info->oti_dentry;
1484
1485         LASSERT(dt_object_exists(dt));
1486         LASSERT(inode->i_op != NULL && inode->i_op->removexattr != NULL);
1487         LASSERT(osd_write_locked(env, obj));
1488         LASSERT(handle != NULL);
1489
1490         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1491                 return -EACCES;
1492
1493         dentry->d_inode = inode;
1494         return inode->i_op->removexattr(dentry, name);
1495 }
1496
1497 static struct obd_capa *osd_capa_get(const struct lu_env *env,
1498                                      struct dt_object *dt,
1499                                      struct lustre_capa *old,
1500                                      __u64 opc)
1501 {
1502         struct osd_thread_info *info = osd_oti_get(env);
1503         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
1504         struct osd_object *obj = osd_dt_obj(dt);
1505         struct osd_device *dev = osd_obj2dev(obj);
1506         struct lustre_capa_key *key = &info->oti_capa_key;
1507         struct lustre_capa *capa = &info->oti_capa;
1508         struct obd_capa *oc;
1509         int rc;
1510         ENTRY;
1511
1512         if (!dev->od_fl_capa)
1513                 RETURN(ERR_PTR(-ENOENT));
1514
1515         LASSERT(dt_object_exists(dt));
1516         LASSERT(osd_invariant(obj));
1517
1518         /* renewal sanity check */
1519         if (old && osd_object_auth(env, dt, old, opc))
1520                 RETURN(ERR_PTR(-EACCES));
1521
1522         capa->lc_fid = *fid;
1523         capa->lc_opc = opc;
1524         capa->lc_uid = 0;
1525         capa->lc_flags = dev->od_capa_alg << 24;
1526         capa->lc_timeout = dev->od_capa_timeout;
1527         capa->lc_expiry = 0;
1528
1529         oc = capa_lookup(dev->od_capa_hash, capa, 1);
1530         if (oc) {
1531                 LASSERT(!capa_is_expired(oc));
1532                 RETURN(oc);
1533         }
1534
1535         spin_lock(&capa_lock);
1536         *key = dev->od_capa_keys[1];
1537         spin_unlock(&capa_lock);
1538
1539         capa->lc_keyid = key->lk_keyid;
1540         capa->lc_expiry = CURRENT_SECONDS + dev->od_capa_timeout;
1541
1542         rc = capa_hmac(capa->lc_hmac, capa, key->lk_key);
1543         if (rc) {
1544                 DEBUG_CAPA(D_ERROR, capa, "HMAC failed: %d for", rc);
1545                 RETURN(ERR_PTR(rc));
1546         }
1547
1548         oc = capa_add(dev->od_capa_hash, capa);
1549         RETURN(oc);
1550 }
1551
1552 static struct dt_object_operations osd_obj_ops = {
1553         .do_read_lock    = osd_object_read_lock,
1554         .do_write_lock   = osd_object_write_lock,
1555         .do_read_unlock  = osd_object_read_unlock,
1556         .do_write_unlock = osd_object_write_unlock,
1557         .do_attr_get     = osd_attr_get,
1558         .do_attr_set     = osd_attr_set,
1559         .do_ah_init      = osd_ah_init,
1560         .do_create       = osd_object_create,
1561         .do_index_try    = osd_index_try,
1562         .do_ref_add      = osd_object_ref_add,
1563         .do_ref_del      = osd_object_ref_del,
1564         .do_xattr_get    = osd_xattr_get,
1565         .do_xattr_set    = osd_xattr_set,
1566         .do_xattr_del    = osd_xattr_del,
1567         .do_xattr_list   = osd_xattr_list,
1568         .do_capa_get     = osd_capa_get,
1569 };
1570
1571 /*
1572  * Body operations.
1573  */
1574
1575 /*
1576  * XXX: Another layering violation for now.
1577  *
1578  * We don't want to use ->f_op->read methods, because generic file write
1579  *
1580  *         - serializes on ->i_sem, and
1581  *
1582  *         - does a lot of extra work like balance_dirty_pages(),
1583  *
1584  * which doesn't work for globally shared files like /last-received.
1585  */
1586 int fsfilt_ldiskfs_read(struct inode *inode, void *buf, int size, loff_t *offs);
1587 int fsfilt_ldiskfs_write_handle(struct inode *inode, void *buf, int bufsize,
1588                                 loff_t *offs, handle_t *handle);
1589
1590 static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt,
1591                         struct lu_buf *buf, loff_t *pos,
1592                         struct lustre_capa *capa)
1593 {
1594         struct inode *inode = osd_dt_obj(dt)->oo_inode;
1595
1596         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
1597                 RETURN(-EACCES);
1598
1599         return fsfilt_ldiskfs_read(inode, buf->lb_buf, buf->lb_len, pos);
1600 }
1601
1602 static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
1603                          const struct lu_buf *buf, loff_t *pos,
1604                          struct thandle *handle, struct lustre_capa *capa)
1605 {
1606         struct inode       *inode = osd_dt_obj(dt)->oo_inode;
1607         struct osd_thandle *oh;
1608         ssize_t             result;
1609
1610         LASSERT(handle != NULL);
1611
1612         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_WRITE))
1613                 RETURN(-EACCES);
1614
1615         oh = container_of(handle, struct osd_thandle, ot_super);
1616         LASSERT(oh->ot_handle->h_transaction != NULL);
1617         result = fsfilt_ldiskfs_write_handle(inode, buf->lb_buf, buf->lb_len,
1618                                              pos, oh->ot_handle);
1619         if (result == 0)
1620                 result = buf->lb_len;
1621         return result;
1622 }
1623
1624 static struct dt_body_operations osd_body_ops = {
1625         .dbo_read  = osd_read,
1626         .dbo_write = osd_write
1627 };
1628
1629 /*
1630  * Index operations.
1631  */
1632
1633 static int osd_object_is_root(const struct osd_object *obj)
1634 {
1635         return osd_sb(osd_obj2dev(obj))->s_root->d_inode == obj->oo_inode;
1636 }
1637
1638 static int osd_index_probe(const struct lu_env *env, struct osd_object *o,
1639                            const struct dt_index_features *feat)
1640 {
1641         struct iam_descr *descr;
1642
1643         if (osd_object_is_root(o))
1644                 return feat == &dt_directory_features;
1645
1646         LASSERT(o->oo_dir != NULL);
1647
1648         descr = o->oo_dir->od_container.ic_descr;
1649         if (feat == &dt_directory_features)
1650                 return descr == &iam_htree_compat_param ||
1651                         (descr->id_rec_size == sizeof(struct lu_fid_pack) &&
1652                          1 /*
1653                             * XXX check that index looks like directory.
1654                             */
1655                                 );
1656         else
1657                 return
1658                         feat->dif_keysize_min <= descr->id_key_size &&
1659                         descr->id_key_size <= feat->dif_keysize_max &&
1660                         feat->dif_recsize_min <= descr->id_rec_size &&
1661                         descr->id_rec_size <= feat->dif_recsize_max &&
1662                         !(feat->dif_flags & (DT_IND_VARKEY |
1663                                              DT_IND_VARREC | DT_IND_NONUNQ)) &&
1664                         ergo(feat->dif_flags & DT_IND_UPDATE,
1665                              1 /* XXX check that object (and file system) is
1666                                 * writable */);
1667 }
1668
1669 static int osd_container_init(const struct lu_env *env,
1670                               struct osd_object *obj,
1671                               struct osd_directory *dir)
1672 {
1673         int result;
1674         struct iam_container *bag;
1675
1676         bag    = &dir->od_container;
1677         result = iam_container_init(bag, &dir->od_descr, obj->oo_inode);
1678         if (result == 0) {
1679                 result = iam_container_setup(bag);
1680                 if (result == 0)
1681                         obj->oo_dt.do_index_ops = &osd_index_ops;
1682                 else
1683                         iam_container_fini(bag);
1684         }
1685         return result;
1686 }
1687
1688 /*
1689  * Concurrency: no external locking is necessary.
1690  */
1691 static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
1692                          const struct dt_index_features *feat)
1693 {
1694         int result;
1695         struct osd_object *obj = osd_dt_obj(dt);
1696
1697         LASSERT(osd_invariant(obj));
1698         LASSERT(dt_object_exists(dt));
1699
1700         if (osd_object_is_root(obj)) {
1701                 dt->do_index_ops = &osd_index_compat_ops;
1702                 result = 0;
1703         } else if (!osd_has_index(obj)) {
1704                 struct osd_directory *dir;
1705
1706                 OBD_ALLOC_PTR(dir);
1707                 if (dir != NULL) {
1708                         sema_init(&dir->od_sem, 1);
1709
1710                         spin_lock(&obj->oo_guard);
1711                         if (obj->oo_dir == NULL)
1712                                 obj->oo_dir = dir;
1713                         else
1714                                 /*
1715                                  * Concurrent thread allocated container data.
1716                                  */
1717                                 OBD_FREE_PTR(dir);
1718                         spin_unlock(&obj->oo_guard);
1719                         /*
1720                          * Now, that we have container data, serialize its
1721                          * initialization.
1722                          */
1723                         down(&obj->oo_dir->od_sem);
1724                         /*
1725                          * recheck under lock.
1726                          */
1727                         if (!osd_has_index(obj))
1728                                 result = osd_container_init(env, obj, dir);
1729                         else
1730                                 result = 0;
1731                         up(&obj->oo_dir->od_sem);
1732                 } else
1733                         result = -ENOMEM;
1734         } else
1735                 result = 0;
1736
1737         if (result == 0) {
1738                 if (!osd_index_probe(env, obj, feat))
1739                         result = -ENOTDIR;
1740         }
1741         LASSERT(osd_invariant(obj));
1742
1743         return result;
1744 }
1745
1746 static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
1747                             const struct dt_key *key, struct thandle *handle,
1748                             struct lustre_capa *capa)
1749 {
1750         struct osd_object     *obj = osd_dt_obj(dt);
1751         struct osd_thandle    *oh;
1752         struct iam_path_descr *ipd;
1753         struct iam_container  *bag = &obj->oo_dir->od_container;
1754         int rc;
1755
1756         ENTRY;
1757
1758         LASSERT(osd_invariant(obj));
1759         LASSERT(dt_object_exists(dt));
1760         LASSERT(bag->ic_object == obj->oo_inode);
1761         LASSERT(handle != NULL);
1762
1763         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
1764                 RETURN(-EACCES);
1765
1766         ipd = osd_ipd_get(env, bag);
1767         if (unlikely(ipd == NULL))
1768                 RETURN(-ENOMEM);
1769
1770         oh = container_of0(handle, struct osd_thandle, ot_super);
1771         LASSERT(oh->ot_handle != NULL);
1772         LASSERT(oh->ot_handle->h_transaction != NULL);
1773
1774         rc = iam_delete(oh->ot_handle, bag, (const struct iam_key *)key, ipd);
1775         osd_ipd_put(env, bag, ipd);
1776         LASSERT(osd_invariant(obj));
1777         RETURN(rc);
1778 }
1779
1780 static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
1781                             struct dt_rec *rec, const struct dt_key *key,
1782                             struct lustre_capa *capa)
1783 {
1784         struct osd_object     *obj = osd_dt_obj(dt);
1785         struct iam_path_descr *ipd;
1786         struct iam_container  *bag = &obj->oo_dir->od_container;
1787         int rc;
1788
1789         ENTRY;
1790
1791         LASSERT(osd_invariant(obj));
1792         LASSERT(dt_object_exists(dt));
1793         LASSERT(bag->ic_object == obj->oo_inode);
1794
1795         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
1796                 return -EACCES;
1797
1798         ipd = osd_ipd_get(env, bag);
1799         if (unlikely(ipd == NULL))
1800                 RETURN(-ENOMEM);
1801
1802         rc = iam_lookup(bag, (const struct iam_key *)key,
1803                         (struct iam_rec *)rec, ipd);
1804         osd_ipd_put(env, bag, ipd);
1805         LASSERT(osd_invariant(obj));
1806
1807         RETURN(rc);
1808 }
1809
1810 static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
1811                             const struct dt_rec *rec, const struct dt_key *key,
1812                             struct thandle *th, struct lustre_capa *capa)
1813 {
1814         struct osd_object     *obj = osd_dt_obj(dt);
1815         struct iam_path_descr *ipd;
1816         struct osd_thandle    *oh;
1817         struct iam_container  *bag = &obj->oo_dir->od_container;
1818         int rc;
1819
1820         ENTRY;
1821
1822         LASSERT(osd_invariant(obj));
1823         LASSERT(dt_object_exists(dt));
1824         LASSERT(bag->ic_object == obj->oo_inode);
1825         LASSERT(th != NULL);
1826
1827         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
1828                 return -EACCES;
1829
1830         ipd = osd_ipd_get(env, bag);
1831         if (unlikely(ipd == NULL))
1832                 RETURN(-ENOMEM);
1833
1834         oh = container_of0(th, struct osd_thandle, ot_super);
1835         LASSERT(oh->ot_handle != NULL);
1836         LASSERT(oh->ot_handle->h_transaction != NULL);
1837         rc = iam_insert(oh->ot_handle, bag, (const struct iam_key *)key,
1838                         (struct iam_rec *)rec, ipd);
1839         osd_ipd_put(env, bag, ipd);
1840         LASSERT(osd_invariant(obj));
1841         RETURN(rc);
1842 }
1843
1844 /*
1845  * Iterator operations.
1846  */
1847 struct osd_it {
1848         struct osd_object     *oi_obj;
1849         struct iam_path_descr *oi_ipd;
1850         struct iam_iterator    oi_it;
1851 };
1852
1853 static struct dt_it *osd_it_init(const struct lu_env *env,
1854                                  struct dt_object *dt, int writable,
1855                                  struct lustre_capa *capa)
1856 {
1857         struct osd_it         *it;
1858         struct osd_object     *obj = osd_dt_obj(dt);
1859         struct lu_object      *lo  = &dt->do_lu;
1860         struct iam_path_descr *ipd;
1861         struct iam_container  *bag = &obj->oo_dir->od_container;
1862         __u32                  flags;
1863
1864         LASSERT(lu_object_exists(lo));
1865
1866         if (osd_object_auth(env, dt, capa, writable ? CAPA_OPC_BODY_WRITE :
1867                             CAPA_OPC_BODY_READ))
1868                 return ERR_PTR(-EACCES);
1869
1870         flags = writable ? IAM_IT_MOVE|IAM_IT_WRITE : IAM_IT_MOVE;
1871         OBD_ALLOC_PTR(it);
1872         if (it != NULL) {
1873                 /*
1874                  * XXX: as ipd is allocated within osd_thread_info, assignment
1875                  * below implies that iterator usage is confined within single
1876                  * environment.
1877                  */
1878                 ipd = osd_ipd_get(env, bag);
1879                 if (likely(ipd != NULL)) {
1880                         it->oi_obj = obj;
1881                         it->oi_ipd = ipd;
1882                         lu_object_get(lo);
1883                         iam_it_init(&it->oi_it, bag, flags, ipd);
1884                         return (struct dt_it *)it;
1885                 } else
1886                         OBD_FREE_PTR(it);
1887         }
1888         return ERR_PTR(-ENOMEM);
1889 }
1890
1891 static void osd_it_fini(const struct lu_env *env, struct dt_it *di)
1892 {
1893         struct osd_it     *it = (struct osd_it *)di;
1894         struct osd_object *obj = it->oi_obj;
1895
1896         iam_it_fini(&it->oi_it);
1897         osd_ipd_put(env, &obj->oo_dir->od_container, it->oi_ipd);
1898         lu_object_put(env, &obj->oo_dt.do_lu);
1899         OBD_FREE_PTR(it);
1900 }
1901
1902 static int osd_it_get(const struct lu_env *env,
1903                       struct dt_it *di, const struct dt_key *key)
1904 {
1905         struct osd_it *it = (struct osd_it *)di;
1906
1907         return iam_it_get(&it->oi_it, (const struct iam_key *)key);
1908 }
1909
1910 static void osd_it_put(const struct lu_env *env, struct dt_it *di)
1911 {
1912         struct osd_it *it = (struct osd_it *)di;
1913
1914         iam_it_put(&it->oi_it);
1915 }
1916
1917 static int osd_it_next(const struct lu_env *env, struct dt_it *di)
1918 {
1919         struct osd_it *it = (struct osd_it *)di;
1920
1921         return iam_it_next(&it->oi_it);
1922 }
1923
1924 static int osd_it_del(const struct lu_env *env, struct dt_it *di,
1925                       struct thandle *th)
1926 {
1927         struct osd_it      *it = (struct osd_it *)di;
1928         struct osd_thandle *oh;
1929
1930         LASSERT(th != NULL);
1931
1932         oh = container_of0(th, struct osd_thandle, ot_super);
1933         LASSERT(oh->ot_handle != NULL);
1934         LASSERT(oh->ot_handle->h_transaction != NULL);
1935
1936         return iam_it_rec_delete(oh->ot_handle, &it->oi_it);
1937 }
1938
1939 static struct dt_key *osd_it_key(const struct lu_env *env,
1940                                  const struct dt_it *di)
1941 {
1942         struct osd_it *it = (struct osd_it *)di;
1943
1944         return (struct dt_key *)iam_it_key_get(&it->oi_it);
1945 }
1946
1947 static int osd_it_key_size(const struct lu_env *env, const struct dt_it *di)
1948 {
1949         struct osd_it *it = (struct osd_it *)di;
1950
1951         return iam_it_key_size(&it->oi_it);
1952 }
1953
1954 static struct dt_rec *osd_it_rec(const struct lu_env *env,
1955                                  const struct dt_it *di)
1956 {
1957         struct osd_it *it = (struct osd_it *)di;
1958
1959         return (struct dt_rec *)iam_it_rec_get(&it->oi_it);
1960 }
1961
1962 static __u32 osd_it_store(const struct lu_env *env, const struct dt_it *di)
1963 {
1964         struct osd_it *it = (struct osd_it *)di;
1965
1966         return iam_it_store(&it->oi_it);
1967 }
1968
1969 static int osd_it_load(const struct lu_env *env,
1970                        const struct dt_it *di, __u32 hash)
1971 {
1972         struct osd_it *it = (struct osd_it *)di;
1973
1974         return iam_it_load(&it->oi_it, hash);
1975 }
1976
1977 static struct dt_index_operations osd_index_ops = {
1978         .dio_lookup = osd_index_lookup,
1979         .dio_insert = osd_index_insert,
1980         .dio_delete = osd_index_delete,
1981         .dio_it     = {
1982                 .init     = osd_it_init,
1983                 .fini     = osd_it_fini,
1984                 .get      = osd_it_get,
1985                 .put      = osd_it_put,
1986                 .del      = osd_it_del,
1987                 .next     = osd_it_next,
1988                 .key      = osd_it_key,
1989                 .key_size = osd_it_key_size,
1990                 .rec      = osd_it_rec,
1991                 .store    = osd_it_store,
1992                 .load     = osd_it_load
1993         }
1994 };
1995
1996 static int osd_index_compat_delete(const struct lu_env *env,
1997                                    struct dt_object *dt,
1998                                    const struct dt_key *key,
1999                                    struct thandle *handle,
2000                                    struct lustre_capa *capa)
2001 {
2002         struct osd_object *obj = osd_dt_obj(dt);
2003
2004         LASSERT(handle != NULL);
2005         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
2006         ENTRY;
2007
2008 #if 0
2009         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
2010                 RETURN(-EACCES);
2011 #endif
2012
2013         RETURN(-EOPNOTSUPP);
2014 }
2015
2016 /*
2017  * Compatibility index operations.
2018  */
2019
2020
2021 static void osd_build_pack(const struct lu_env *env, struct osd_device *osd,
2022                            struct dentry *dentry, struct lu_fid_pack *pack)
2023 {
2024         struct inode  *inode = dentry->d_inode;
2025         struct lu_fid *fid   = &osd_oti_get(env)->oti_fid;
2026
2027         lu_igif_build(fid, inode->i_ino, inode->i_generation);
2028         fid_cpu_to_be(fid, fid);
2029         pack->fp_len = sizeof *fid + 1;
2030         memcpy(pack->fp_area, fid, sizeof *fid);
2031 }
2032
2033 static int osd_index_compat_lookup(const struct lu_env *env,
2034                                    struct dt_object *dt,
2035                                    struct dt_rec *rec, const struct dt_key *key,
2036                                    struct lustre_capa *capa)
2037 {
2038         struct osd_object *obj = osd_dt_obj(dt);
2039
2040         struct osd_device      *osd  = osd_obj2dev(obj);
2041         struct osd_thread_info *info = osd_oti_get(env);
2042         struct inode           *dir;
2043
2044         int result;
2045
2046         /*
2047          * XXX temporary solution.
2048          */
2049         struct dentry *dentry;
2050         struct dentry *parent;
2051
2052         LASSERT(osd_invariant(obj));
2053         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
2054         LASSERT(osd_has_index(obj));
2055
2056         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
2057                 return -EACCES;
2058
2059         info->oti_str.name = (const char *)key;
2060         info->oti_str.len  = strlen((const char *)key);
2061
2062         dir = obj->oo_inode;
2063         LASSERT(dir->i_op != NULL && dir->i_op->lookup != NULL);
2064
2065         parent = d_alloc_root(dir);
2066         if (parent == NULL)
2067                 return -ENOMEM;
2068         igrab(dir);
2069         dentry = d_alloc(parent, &info->oti_str);
2070         if (dentry != NULL) {
2071                 struct dentry *d;
2072
2073                 /*
2074                  * XXX passing NULL for nameidata should work for
2075                  * ext3/ldiskfs.
2076                  */
2077                 d = dir->i_op->lookup(dir, dentry, NULL);
2078                 if (d == NULL) {
2079                         /*
2080                          * normal case, result is in @dentry.
2081                          */
2082                         if (dentry->d_inode != NULL) {
2083                                 osd_build_pack(env, osd, dentry,
2084                                                (struct lu_fid_pack *)rec);
2085                                 result = 0;
2086                         } else
2087                                 result = -ENOENT;
2088                  } else {
2089                         /* What? Disconnected alias? Ppheeeww... */
2090                         CERROR("Aliasing where not expected\n");
2091                         result = -EIO;
2092                         dput(d);
2093                 }
2094                 dput(dentry);
2095         } else
2096                 result = -ENOMEM;
2097         dput(parent);
2098         LASSERT(osd_invariant(obj));
2099         return result;
2100 }
2101
2102 static int osd_add_rec(struct osd_thread_info *info, struct osd_device *dev,
2103                        struct inode *dir, struct inode *inode, const char *name)
2104 {
2105         struct dentry *old;
2106         struct dentry *new;
2107         struct dentry *parent;
2108
2109         int result;
2110
2111         info->oti_str.name = name;
2112         info->oti_str.len  = strlen(name);
2113
2114         LASSERT(atomic_read(&dir->i_count) > 0);
2115         result = -ENOMEM;
2116         old = d_alloc(dev->od_obj_area, &info->oti_str);
2117         if (old != NULL) {
2118                 d_instantiate(old, inode);
2119                 igrab(inode);
2120                 LASSERT(atomic_read(&dir->i_count) > 0);
2121                 parent = d_alloc_root(dir);
2122                 if (parent != NULL) {
2123                         igrab(dir);
2124                         LASSERT(atomic_read(&dir->i_count) > 1);
2125                         new = d_alloc(parent, &info->oti_str);
2126                         LASSERT(atomic_read(&dir->i_count) > 1);
2127                         if (new != NULL) {
2128                                 LASSERT(atomic_read(&dir->i_count) > 1);
2129                                 result = dir->i_op->link(old, dir, new);
2130                                 LASSERT(atomic_read(&dir->i_count) > 1);
2131                                 dput(new);
2132                                 LASSERT(atomic_read(&dir->i_count) > 1);
2133                         }
2134                         LASSERT(atomic_read(&dir->i_count) > 1);
2135                         dput(parent);
2136                         LASSERT(atomic_read(&dir->i_count) > 0);
2137                 }
2138                 dput(old);
2139         }
2140         LASSERT(atomic_read(&dir->i_count) > 0);
2141         return result;
2142 }
2143
2144
2145 /*
2146  * XXX Temporary stuff.
2147  */
2148 static int osd_index_compat_insert(const struct lu_env *env,
2149                                    struct dt_object *dt,
2150                                    const struct dt_rec *rec,
2151                                    const struct dt_key *key, struct thandle *th,
2152                                    struct lustre_capa *capa)
2153 {
2154         struct osd_object     *obj = osd_dt_obj(dt);
2155
2156         const char          *name = (const char *)key;
2157
2158         struct lu_device    *ludev = dt->do_lu.lo_dev;
2159         struct lu_object    *luch;
2160
2161         struct osd_thread_info   *info = osd_oti_get(env);
2162         const struct lu_fid_pack *pack  = (const struct lu_fid_pack *)rec;
2163         struct lu_fid            *fid   = &osd_oti_get(env)->oti_fid;
2164
2165         int result;
2166
2167         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
2168         LASSERT(osd_invariant(obj));
2169         LASSERT(th != NULL);
2170
2171         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
2172                 return -EACCES;
2173
2174         result = fid_unpack(pack, fid);
2175         if (result != 0)
2176                 return result;
2177
2178         luch = lu_object_find(env, ludev->ld_site, fid);
2179         if (!IS_ERR(luch)) {
2180                 if (lu_object_exists(luch)) {
2181                         struct osd_object *child;
2182
2183                         child = osd_obj(lu_object_locate(luch->lo_header,
2184                                                          ludev->ld_type));
2185                         if (child != NULL)
2186                                 result = osd_add_rec(info, osd_obj2dev(obj),
2187                                                      obj->oo_inode,
2188                                                      child->oo_inode, name);
2189                         else {
2190                                 CERROR("No osd slice.\n");
2191                                 result = -ENOENT;
2192                         }
2193                         LASSERT(osd_invariant(obj));
2194                         LASSERT(osd_invariant(child));
2195                 } else {
2196                         CERROR("Sorry.\n");
2197                         result = -ENOENT;
2198                 }
2199                 lu_object_put(env, luch);
2200         } else
2201                 result = PTR_ERR(luch);
2202         LASSERT(osd_invariant(obj));
2203         return result;
2204 }
2205
2206 static struct dt_index_operations osd_index_compat_ops = {
2207         .dio_lookup = osd_index_compat_lookup,
2208         .dio_insert = osd_index_compat_insert,
2209         .dio_delete = osd_index_compat_delete
2210 };
2211
2212 /* type constructor/destructor: osd_type_init, osd_type_fini */
2213 LU_TYPE_INIT_FINI(osd, &osd_key);
2214
2215 static struct lu_context_key osd_key = {
2216         .lct_tags = LCT_DT_THREAD | LCT_MD_THREAD,
2217         .lct_init = osd_key_init,
2218         .lct_fini = osd_key_fini,
2219         .lct_exit = osd_key_exit
2220 };
2221
2222 static void *osd_key_init(const struct lu_context *ctx,
2223                           struct lu_context_key *key)
2224 {
2225         struct osd_thread_info *info;
2226
2227         OBD_ALLOC_PTR(info);
2228         if (info != NULL)
2229                 info->oti_env = container_of(ctx, struct lu_env, le_ctx);
2230         else
2231                 info = ERR_PTR(-ENOMEM);
2232         return info;
2233 }
2234
2235 /* context key destructor: osd_key_fini */
2236 LU_KEY_FINI(osd, struct osd_thread_info);
2237
2238 static void osd_key_exit(const struct lu_context *ctx,
2239                          struct lu_context_key *key, void *data)
2240 {
2241 #if OSD_COUNTERS
2242         struct osd_thread_info *info = data;
2243
2244         LASSERT(info->oti_r_locks == 0);
2245         LASSERT(info->oti_w_locks == 0);
2246         LASSERT(info->oti_txns    == 0);
2247 #endif
2248 }
2249
2250 static int osd_device_init(const struct lu_env *env, struct lu_device *d,
2251                            const char *name, struct lu_device *next)
2252 {
2253         return lu_context_init(&osd_dev(d)->od_env_for_commit.le_ctx,
2254                                LCT_MD_THREAD);
2255 }
2256
2257 static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
2258 {
2259         struct osd_thread_info *info = osd_oti_get(env);
2260         ENTRY;
2261         if (o->od_obj_area != NULL) {
2262                 dput(o->od_obj_area);
2263                 o->od_obj_area = NULL;
2264         }
2265         osd_oi_fini(info, &o->od_oi);
2266
2267         RETURN(0);
2268 }
2269
2270 static int osd_mount(const struct lu_env *env,
2271                      struct osd_device *o, struct lustre_cfg *cfg)
2272 {
2273         struct lustre_mount_info *lmi;
2274         const char               *dev  = lustre_cfg_string(cfg, 0);
2275         struct osd_thread_info   *info = osd_oti_get(env);
2276         int result;
2277
2278         ENTRY;
2279
2280         if (o->od_mount != NULL) {
2281                 CERROR("Already mounted (%s)\n", dev);
2282                 RETURN(-EEXIST);
2283         }
2284
2285         /* get mount */
2286         lmi = server_get_mount(dev);
2287         if (lmi == NULL) {
2288                 CERROR("Cannot get mount info for %s!\n", dev);
2289                 RETURN(-EFAULT);
2290         }
2291
2292         LASSERT(lmi != NULL);
2293         /* save lustre_mount_info in dt_device */
2294         o->od_mount = lmi;
2295
2296         result = osd_oi_init(info, &o->od_oi, &o->od_dt_dev);
2297         if (result == 0) {
2298                 struct dentry *d;
2299
2300                 d = simple_mkdir(osd_sb(o)->s_root, "*OBJ-TEMP*", 0777, 1);
2301                 if (!IS_ERR(d)) {
2302                         o->od_obj_area = d;
2303                 } else
2304                         result = PTR_ERR(d);
2305         }
2306         if (result != 0)
2307                 osd_shutdown(env, o);
2308         RETURN(result);
2309 }
2310
2311 static struct lu_device *osd_device_fini(const struct lu_env *env,
2312                                          struct lu_device *d)
2313 {
2314         ENTRY;
2315
2316         shrink_dcache_sb(osd_sb(osd_dev(d)));
2317         osd_sync(env, lu2dt_dev(d));
2318
2319         if (osd_dev(d)->od_mount)
2320                 server_put_mount(osd_dev(d)->od_mount->lmi_name,
2321                                  osd_dev(d)->od_mount->lmi_mnt);
2322         osd_dev(d)->od_mount = NULL;
2323
2324         lu_context_fini(&osd_dev(d)->od_env_for_commit.le_ctx);
2325         RETURN(NULL);
2326 }
2327
2328 static struct lu_device *osd_device_alloc(const struct lu_env *env,
2329                                           struct lu_device_type *t,
2330                                           struct lustre_cfg *cfg)
2331 {
2332         struct lu_device  *l;
2333         struct osd_device *o;
2334
2335         OBD_ALLOC_PTR(o);
2336         if (o != NULL) {
2337                 int result;
2338
2339                 result = dt_device_init(&o->od_dt_dev, t);
2340                 if (result == 0) {
2341                         l = osd2lu_dev(o);
2342                         l->ld_ops = &osd_lu_ops;
2343                         o->od_dt_dev.dd_ops = &osd_dt_ops;
2344                         spin_lock_init(&o->od_osfs_lock);
2345                         o->od_osfs_age = cfs_time_shift_64(-1000);
2346                         o->od_capa_hash = init_capa_hash();
2347                         if (o->od_capa_hash == NULL)
2348                                 l = ERR_PTR(-ENOMEM);
2349                 } else
2350                         l = ERR_PTR(result);
2351         } else
2352                 l = ERR_PTR(-ENOMEM);
2353         return l;
2354 }
2355
2356 static void osd_device_free(const struct lu_env *env, struct lu_device *d)
2357 {
2358         struct osd_device *o = osd_dev(d);
2359
2360         cleanup_capa_hash(o->od_capa_hash);
2361         dt_device_fini(&o->od_dt_dev);
2362         OBD_FREE_PTR(o);
2363 }
2364
2365 static int osd_process_config(const struct lu_env *env,
2366                               struct lu_device *d, struct lustre_cfg *cfg)
2367 {
2368         struct osd_device *o = osd_dev(d);
2369         int err;
2370         ENTRY;
2371
2372         switch(cfg->lcfg_command) {
2373         case LCFG_SETUP:
2374                 err = osd_mount(env, o, cfg);
2375                 break;
2376         case LCFG_CLEANUP:
2377                 err = osd_shutdown(env, o);
2378                 break;
2379         default:
2380                 err = -ENOTTY;
2381         }
2382
2383         RETURN(err);
2384 }
2385 extern void ldiskfs_orphan_cleanup (struct super_block * sb,
2386                                     struct ldiskfs_super_block * es);
2387
2388 static int osd_recovery_complete(const struct lu_env *env,
2389                                  struct lu_device *d)
2390 {
2391         struct osd_device *o = osd_dev(d);
2392         ENTRY;
2393         /* TODO: orphans handling */
2394         ldiskfs_orphan_cleanup(osd_sb(o), LDISKFS_SB(osd_sb(o))->s_es);
2395         RETURN(0);
2396 }
2397
2398 static struct inode *osd_iget(struct osd_thread_info *info,
2399                               struct osd_device *dev,
2400                               const struct osd_inode_id *id)
2401 {
2402         struct inode *inode;
2403
2404         inode = iget(osd_sb(dev), id->oii_ino);
2405         if (inode == NULL) {
2406                 CERROR("no inode\n");
2407                 inode = ERR_PTR(-EACCES);
2408         } else if (is_bad_inode(inode)) {
2409                 CERROR("bad inode\n");
2410                 iput(inode);
2411                 inode = ERR_PTR(-ENOENT);
2412         } else if (inode->i_generation != id->oii_gen) {
2413                 CERROR("stale inode\n");
2414                 iput(inode);
2415                 inode = ERR_PTR(-ESTALE);
2416         }
2417
2418         return inode;
2419
2420 }
2421
2422 static int osd_fid_lookup(const struct lu_env *env,
2423                           struct osd_object *obj, const struct lu_fid *fid)
2424 {
2425         struct osd_thread_info *info;
2426         struct lu_device       *ldev = obj->oo_dt.do_lu.lo_dev;
2427         struct osd_device      *dev;
2428         struct osd_inode_id    *id;
2429         struct osd_oi          *oi;
2430         struct inode           *inode;
2431         int                     result;
2432
2433         LASSERT(osd_invariant(obj));
2434         LASSERT(obj->oo_inode == NULL);
2435         LASSERT(fid_is_sane(fid));
2436         /*
2437          * This assertion checks that osd layer sees only local
2438          * fids. Unfortunately it is somewhat expensive (does a
2439          * cache-lookup). Disabling it for production/acceptance-testing.
2440          */
2441         LASSERT(1 || fid_is_local(ldev->ld_site, fid));
2442
2443         ENTRY;
2444
2445         info = osd_oti_get(env);
2446         dev  = osd_dev(ldev);
2447         id   = &info->oti_id;
2448         oi   = &dev->od_oi;
2449
2450         if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT))
2451                 RETURN(-ENOENT);
2452
2453         result = osd_oi_lookup(info, oi, fid, id);
2454         if (result == 0) {
2455                 inode = osd_iget(info, dev, id);
2456                 if (!IS_ERR(inode)) {
2457                         obj->oo_inode = inode;
2458                         LASSERT(obj->oo_inode->i_sb == osd_sb(dev));
2459                         result = 0;
2460                 } else
2461                         /*
2462                          * If fid wasn't found in oi, inode-less object is
2463                          * created, for which lu_object_exists() returns
2464                          * false. This is used in a (frequent) case when
2465                          * objects are created as locking anchors or
2466                          * place holders for objects yet to be created.
2467                          */
2468                         result = PTR_ERR(inode);
2469         } else if (result == -ENOENT)
2470                 result = 0;
2471         LASSERT(osd_invariant(obj));
2472         RETURN(result);
2473 }
2474
2475 static void osd_inode_getattr(const struct lu_env *env,
2476                               struct inode *inode, struct lu_attr *attr)
2477 {
2478         attr->la_valid      |= LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
2479                                LA_SIZE | LA_BLOCKS | LA_UID | LA_GID |
2480                                LA_FLAGS | LA_NLINK | LA_RDEV | LA_BLKSIZE;
2481
2482         attr->la_atime      = LTIME_S(inode->i_atime);
2483         attr->la_mtime      = LTIME_S(inode->i_mtime);
2484         attr->la_ctime      = LTIME_S(inode->i_ctime);
2485         attr->la_mode       = inode->i_mode;
2486         attr->la_size       = i_size_read(inode);
2487         attr->la_blocks     = inode->i_blocks;
2488         attr->la_uid        = inode->i_uid;
2489         attr->la_gid        = inode->i_gid;
2490         attr->la_flags      = LDISKFS_I(inode)->i_flags;
2491         attr->la_nlink      = inode->i_nlink;
2492         attr->la_rdev       = inode->i_rdev;
2493         attr->la_blksize    = ll_inode_blksize(inode);
2494         attr->la_blkbits    = inode->i_blkbits;
2495 }
2496
2497 /*
2498  * Helpers.
2499  */
2500
2501 static int lu_device_is_osd(const struct lu_device *d)
2502 {
2503         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &osd_lu_ops);
2504 }
2505
2506 static struct osd_object *osd_obj(const struct lu_object *o)
2507 {
2508         LASSERT(lu_device_is_osd(o->lo_dev));
2509         return container_of0(o, struct osd_object, oo_dt.do_lu);
2510 }
2511
2512 static struct osd_device *osd_dt_dev(const struct dt_device *d)
2513 {
2514         LASSERT(lu_device_is_osd(&d->dd_lu_dev));
2515         return container_of0(d, struct osd_device, od_dt_dev);
2516 }
2517
2518 static struct osd_device *osd_dev(const struct lu_device *d)
2519 {
2520         LASSERT(lu_device_is_osd(d));
2521         return osd_dt_dev(container_of0(d, struct dt_device, dd_lu_dev));
2522 }
2523
2524 static struct osd_object *osd_dt_obj(const struct dt_object *d)
2525 {
2526         return osd_obj(&d->do_lu);
2527 }
2528
2529 static struct osd_device *osd_obj2dev(const struct osd_object *o)
2530 {
2531         return osd_dev(o->oo_dt.do_lu.lo_dev);
2532 }
2533
2534 static struct lu_device *osd2lu_dev(struct osd_device *osd)
2535 {
2536         return &osd->od_dt_dev.dd_lu_dev;
2537 }
2538
2539 static struct super_block *osd_sb(const struct osd_device *dev)
2540 {
2541         return dev->od_mount->lmi_mnt->mnt_sb;
2542 }
2543
2544 static journal_t *osd_journal(const struct osd_device *dev)
2545 {
2546         return LDISKFS_SB(osd_sb(dev))->s_journal;
2547 }
2548
2549 static int osd_has_index(const struct osd_object *obj)
2550 {
2551         return obj->oo_dt.do_index_ops != NULL;
2552 }
2553
2554 static int osd_object_invariant(const struct lu_object *l)
2555 {
2556         return osd_invariant(osd_obj(l));
2557 }
2558
2559 static struct lu_object_operations osd_lu_obj_ops = {
2560         .loo_object_init      = osd_object_init,
2561         .loo_object_delete    = osd_object_delete,
2562         .loo_object_release   = osd_object_release,
2563         .loo_object_free      = osd_object_free,
2564         .loo_object_print     = osd_object_print,
2565         .loo_object_invariant = osd_object_invariant
2566 };
2567
2568 static struct lu_device_operations osd_lu_ops = {
2569         .ldo_object_alloc      = osd_object_alloc,
2570         .ldo_process_config    = osd_process_config,
2571         .ldo_recovery_complete = osd_recovery_complete
2572 };
2573
2574 static struct lu_device_type_operations osd_device_type_ops = {
2575         .ldto_init = osd_type_init,
2576         .ldto_fini = osd_type_fini,
2577
2578         .ldto_device_alloc = osd_device_alloc,
2579         .ldto_device_free  = osd_device_free,
2580
2581         .ldto_device_init    = osd_device_init,
2582         .ldto_device_fini    = osd_device_fini
2583 };
2584
2585 static struct lu_device_type osd_device_type = {
2586         .ldt_tags     = LU_DEVICE_DT,
2587         .ldt_name     = LUSTRE_OSD_NAME,
2588         .ldt_ops      = &osd_device_type_ops,
2589         .ldt_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
2590 };
2591
2592 /*
2593  * lprocfs legacy support.
2594  */
2595 static struct lprocfs_vars lprocfs_osd_obd_vars[] = {
2596         { 0 }
2597 };
2598
2599 static struct lprocfs_vars lprocfs_osd_module_vars[] = {
2600         { 0 }
2601 };
2602
2603 static struct obd_ops osd_obd_device_ops = {
2604         .o_owner = THIS_MODULE
2605 };
2606
2607 static void lprocfs_osd_init_vars(struct lprocfs_static_vars *lvars)
2608 {
2609     lvars->module_vars  = lprocfs_osd_module_vars;
2610     lvars->obd_vars     = lprocfs_osd_obd_vars;
2611 }
2612
2613
2614 static int __init osd_mod_init(void)
2615 {
2616         struct lprocfs_static_vars lvars;
2617
2618         lprocfs_osd_init_vars(&lvars);
2619         return class_register_type(&osd_obd_device_ops, NULL, lvars.module_vars,
2620                                    LUSTRE_OSD_NAME, &osd_device_type);
2621 }
2622
2623 static void __exit osd_mod_exit(void)
2624 {
2625         class_unregister_type(LUSTRE_OSD_NAME);
2626 }
2627
2628 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2629 MODULE_DESCRIPTION("Lustre Object Storage Device ("LUSTRE_OSD_NAME")");
2630 MODULE_LICENSE("GPL");
2631
2632 cfs_module(osd, "0.0.2", osd_mod_init, osd_mod_exit);