Whamcloud - gitweb
ea283e220d27f7fadb172e312d4a0cdc6671c5b1
[fs/lustre-release.git] / lustre / osd / osd_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/osd/osd_handler.c
5  *  Top-level entry points into osd module
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Nikita Danilov <nikita@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include <linux/module.h>
35
36 /* LUSTRE_VERSION_CODE */
37 #include <lustre_ver.h>
38 /* prerequisite for linux/xattr.h */
39 #include <linux/types.h>
40 /* prerequisite for linux/xattr.h */
41 #include <linux/fs.h>
42 /* XATTR_{REPLACE,CREATE} */
43 #include <linux/xattr.h>
44 /*
45  * XXX temporary stuff: direct access to ldiskfs/jdb. Interface between osd
46  * and file system is not yet specified.
47  */
48 /* handle_t, journal_start(), journal_stop() */
49 #include <linux/jbd.h>
50 /* LDISKFS_SB() */
51 #include <linux/ldiskfs_fs.h>
52 #include <linux/ldiskfs_jbd.h>
53 /* simple_mkdir() */
54 #include <lvfs.h>
55
56 /*
57  * struct OBD_{ALLOC,FREE}*()
58  * OBD_FAIL_CHECK
59  */
60 #include <obd_support.h>
61 /* struct ptlrpc_thread */
62 #include <lustre_net.h>
63 /* LUSTRE_OSD_NAME */
64 #include <obd.h>
65 /* class_register_type(), class_unregister_type(), class_get_type() */
66 #include <obd_class.h>
67 #include <lustre_disk.h>
68
69 /* fid_is_local() */
70 #include <lustre_fid.h>
71 #include <linux/lustre_iam.h>
72
73 #include "osd_internal.h"
74 #include "osd_igif.h"
75
76 struct osd_directory {
77         struct iam_container od_container;
78         struct iam_descr     od_descr;
79         struct semaphore     od_sem;
80 };
81
82 struct osd_object {
83         struct dt_object       oo_dt;
84         /*
85          * Inode for file system object represented by this osd_object. This
86          * inode is pinned for the whole duration of lu_object life.
87          *
88          * Not modified concurrently (either setup early during object
89          * creation, or assigned by osd_object_create() under write lock).
90          */
91         struct inode          *oo_inode;
92         struct rw_semaphore    oo_sem;
93         struct osd_directory  *oo_dir;
94         /* protects inode attributes. */
95         spinlock_t             oo_guard;
96 #if OSD_COUNTERS
97         const struct lu_env   *oo_owner;
98 #endif
99 };
100
101 /*
102  * osd device.
103  */
104 struct osd_device {
105         /* super-class */
106         struct dt_device          od_dt_dev;
107         /* information about underlying file system */
108         struct lustre_mount_info *od_mount;
109         /* object index */
110         struct osd_oi             od_oi;
111         /*
112          * XXX temporary stuff for object index: directory where every object
113          * is named by its fid.
114          */
115         struct dentry            *od_obj_area;
116
117         /* Environment for transaction commit callback.
118          * Currently, OSD is based on ext3/JBD. Transaction commit in ext3/JBD
119          * is serialized, that is there is no more than one transaction commit
120          * at a time (JBD journal_commit_transaction() is serialized).
121          * This means that it's enough to have _one_ lu_context.
122          */
123         struct lu_env             od_env_for_commit;
124
125         /*
126          * Fid Capability
127          */
128         unsigned int              od_fl_capa:1;
129         unsigned long             od_capa_timeout;
130         __u32                     od_capa_alg;
131         struct lustre_capa_key   *od_capa_keys;
132         struct hlist_head        *od_capa_hash;
133         
134         /*
135          * statfs optimization: we cache a bit.
136          */
137         cfs_time_t                od_osfs_age;
138         struct kstatfs            od_kstatfs;
139         spinlock_t                od_osfs_lock;
140 };
141
142 static int   osd_root_get      (const struct lu_env *env,
143                                 struct dt_device *dev, struct lu_fid *f);
144 static int   osd_statfs        (const struct lu_env *env,
145                                 struct dt_device *dev, struct kstatfs *sfs);
146
147 static int   lu_device_is_osd  (const struct lu_device *d);
148 static void  osd_mod_exit      (void) __exit;
149 static int   osd_mod_init      (void) __init;
150 static int   osd_type_init     (struct lu_device_type *t);
151 static void  osd_type_fini     (struct lu_device_type *t);
152 static int   osd_object_init   (const struct lu_env *env,
153                                 struct lu_object *l);
154 static void  osd_object_release(const struct lu_env *env,
155                                 struct lu_object *l);
156 static int   osd_object_print  (const struct lu_env *env, void *cookie,
157                                 lu_printer_t p, const struct lu_object *o);
158 static void  osd_device_free   (const struct lu_env *env,
159                                 struct lu_device *m);
160 static void *osd_key_init      (const struct lu_context *ctx,
161                                 struct lu_context_key *key);
162 static void  osd_key_fini      (const struct lu_context *ctx,
163                                 struct lu_context_key *key, void *data);
164 static void  osd_key_exit      (const struct lu_context *ctx,
165                                 struct lu_context_key *key, void *data);
166 static int   osd_has_index     (const struct osd_object *obj);
167 static void  osd_object_init0  (struct osd_object *obj);
168 static int   osd_device_init   (const struct lu_env *env,
169                                 struct lu_device *d, const char *,
170                                 struct lu_device *);
171 static int   osd_fid_lookup    (const struct lu_env *env,
172                                 struct osd_object *obj,
173                                 const struct lu_fid *fid);
174 static void  osd_inode_getattr (const struct lu_env *env,
175                                 struct inode *inode, struct lu_attr *attr);
176 static void  osd_inode_setattr (const struct lu_env *env,
177                                 struct inode *inode, const struct lu_attr *attr);
178 static int   osd_param_is_sane (const struct osd_device *dev,
179                                 const struct txn_param *param);
180 static int   osd_index_lookup  (const struct lu_env *env,
181                                 struct dt_object *dt,
182                                 struct dt_rec *rec, const struct dt_key *key,
183                                 struct lustre_capa *capa);
184 static int   osd_index_insert  (const struct lu_env *env,
185                                 struct dt_object *dt,
186                                 const struct dt_rec *rec,
187                                 const struct dt_key *key,
188                                 struct thandle *handle,
189                                 struct lustre_capa *capa);
190 static int   osd_index_delete  (const struct lu_env *env,
191                                 struct dt_object *dt, const struct dt_key *key,
192                                 struct thandle *handle,
193                                 struct lustre_capa *capa);
194 static int   osd_index_probe   (const struct lu_env *env,
195                                 struct osd_object *o,
196                                 const struct dt_index_features *feat);
197 static int   osd_index_try     (const struct lu_env *env,
198                                 struct dt_object *dt,
199                                 const struct dt_index_features *feat);
200 static void  osd_index_fini    (struct osd_object *o);
201
202 static void  osd_it_fini       (const struct lu_env *env, struct dt_it *di);
203 static int   osd_it_get        (const struct lu_env *env,
204                                 struct dt_it *di, const struct dt_key *key);
205 static void  osd_it_put        (const struct lu_env *env, struct dt_it *di);
206 static int   osd_it_next       (const struct lu_env *env, struct dt_it *di);
207 static int   osd_it_del        (const struct lu_env *env, struct dt_it *di,
208                                 struct thandle *th);
209 static int   osd_it_key_size   (const struct lu_env *env,
210                                 const struct dt_it *di);
211 static void  osd_conf_get      (const struct lu_env *env,
212                                 const struct dt_device *dev,
213                                 struct dt_device_param *param);
214 static void  osd_trans_stop    (const struct lu_env *env,
215                                 struct thandle *th);
216 static int   osd_object_is_root(const struct osd_object *obj);
217
218 static struct osd_object  *osd_obj          (const struct lu_object *o);
219 static struct osd_device  *osd_dev          (const struct lu_device *d);
220 static struct osd_device  *osd_dt_dev       (const struct dt_device *d);
221 static struct osd_object  *osd_dt_obj       (const struct dt_object *d);
222 static struct osd_device  *osd_obj2dev      (const struct osd_object *o);
223 static struct lu_device   *osd2lu_dev       (struct osd_device *osd);
224 static struct lu_device   *osd_device_fini  (const struct lu_env *env,
225                                              struct lu_device *d);
226 static struct lu_device   *osd_device_alloc (const struct lu_env *env,
227                                              struct lu_device_type *t,
228                                              struct lustre_cfg *cfg);
229 static struct lu_object   *osd_object_alloc (const struct lu_env *env,
230                                              const struct lu_object_header *hdr,
231                                              struct lu_device *d);
232 static struct inode       *osd_iget         (struct osd_thread_info *info,
233                                              struct osd_device *dev,
234                                              const struct osd_inode_id *id);
235 static struct super_block *osd_sb           (const struct osd_device *dev);
236 static struct dt_it       *osd_it_init      (const struct lu_env *env,
237                                              struct dt_object *dt, int wable,
238                                              struct lustre_capa *capa);
239 static struct dt_key      *osd_it_key       (const struct lu_env *env,
240                                              const struct dt_it *di);
241 static struct dt_rec      *osd_it_rec       (const struct lu_env *env,
242                                              const struct dt_it *di);
243 static struct timespec    *osd_inode_time   (const struct lu_env *env,
244                                              struct inode *inode,
245                                              __u64 seconds);
246 static struct thandle     *osd_trans_start  (const struct lu_env *env,
247                                              struct dt_device *d,
248                                              struct txn_param *p);
249 static journal_t          *osd_journal      (const struct osd_device *dev);
250
251 static struct lu_device_type_operations osd_device_type_ops;
252 static struct lu_device_type            osd_device_type;
253 static struct lu_object_operations      osd_lu_obj_ops;
254 static struct obd_ops                   osd_obd_device_ops;
255 static struct lprocfs_vars              lprocfs_osd_module_vars[];
256 static struct lprocfs_vars              lprocfs_osd_obd_vars[];
257 static struct lu_device_operations      osd_lu_ops;
258 static struct lu_context_key            osd_key;
259 static struct dt_object_operations      osd_obj_ops;
260 static struct dt_body_operations        osd_body_ops;
261 static struct dt_index_operations       osd_index_ops;
262 static struct dt_index_operations       osd_index_compat_ops;
263
264 struct osd_thandle {
265         struct thandle          ot_super;
266         handle_t               *ot_handle;
267         struct journal_callback ot_jcb;
268 };
269
270 /*
271  * Invariants, assertions.
272  */
273
274 /*
275  * XXX: do not enable this, until invariant checking code is made thread safe
276  * in the face of pdirops locking.
277  */
278 #define OSD_INVARIANT_CHECKS (0)
279
280 #if OSD_INVARIANT_CHECKS
281 static int osd_invariant(const struct osd_object *obj)
282 {
283         return
284                 obj != NULL &&
285                 ergo(obj->oo_inode != NULL,
286                      obj->oo_inode->i_sb == osd_sb(osd_obj2dev(obj)) &&
287                      atomic_read(&obj->oo_inode->i_count) > 0) &&
288                 ergo(obj->oo_dir != NULL &&
289                      obj->oo_dir->od_conationer.ic_object != NULL,
290                      obj->oo_dir->od_conationer.ic_object == obj->oo_inode);
291 }
292 #else
293 #define osd_invariant(obj) (1)
294 #endif
295
296 static inline struct osd_thread_info *osd_oti_get(const struct lu_env *env)
297 {
298         return lu_context_key_get(&env->le_ctx, &osd_key);
299 }
300
301 #if OSD_COUNTERS
302 /*
303  * Concurrency: doesn't matter
304  */
305 static int osd_read_locked(const struct lu_env *env, struct osd_object *o)
306 {
307         return osd_oti_get(env)->oti_r_locks > 0;
308 }
309
310 /*
311  * Concurrency: doesn't matter
312  */
313 static int osd_write_locked(const struct lu_env *env, struct osd_object *o)
314 {
315         struct osd_thread_info *oti = osd_oti_get(env);
316         return oti->oti_w_locks > 0 && o->oo_owner == env;
317 }
318
319 #define OSD_COUNTERS_DO(exp) exp
320 #else
321
322
323 #define osd_read_locked(env, o) (1)
324 #define osd_write_locked(env, o) (1)
325 #define OSD_COUNTERS_DO(exp) ((void)0)
326 #endif
327
328 /*
329  * Concurrency: doesn't access mutable data
330  */
331 static int osd_root_get(const struct lu_env *env,
332                         struct dt_device *dev, struct lu_fid *f)
333 {
334         struct inode *inode;
335
336         inode = osd_sb(osd_dt_dev(dev))->s_root->d_inode;
337         lu_igif_build(f, inode->i_ino, inode->i_generation);
338         return 0;
339 }
340
341 /*
342  * OSD object methods.
343  */
344
345 /*
346  * Concurrency: no concurrent access is possible that early in object
347  * life-cycle.
348  */
349 static struct lu_object *osd_object_alloc(const struct lu_env *env,
350                                           const struct lu_object_header *hdr,
351                                           struct lu_device *d)
352 {
353         struct osd_object *mo;
354
355         OBD_ALLOC_PTR(mo);
356         if (mo != NULL) {
357                 struct lu_object *l;
358
359                 l = &mo->oo_dt.do_lu;
360                 dt_object_init(&mo->oo_dt, NULL, d);
361                 mo->oo_dt.do_ops = &osd_obj_ops;
362                 l->lo_ops = &osd_lu_obj_ops;
363                 init_rwsem(&mo->oo_sem);
364                 spin_lock_init(&mo->oo_guard);
365                 return l;
366         } else
367                 return NULL;
368 }
369
370 /*
371  * Concurrency: shouldn't matter.
372  */
373 static void osd_object_init0(struct osd_object *obj)
374 {
375         LASSERT(obj->oo_inode != NULL);
376         obj->oo_dt.do_body_ops = &osd_body_ops;
377         obj->oo_dt.do_lu.lo_header->loh_attr |=
378                 (LOHA_EXISTS | (obj->oo_inode->i_mode & S_IFMT));
379 }
380
381 /*
382  * Concurrency: no concurrent access is possible that early in object
383  * life-cycle.
384  */
385 static int osd_object_init(const struct lu_env *env, struct lu_object *l)
386 {
387         struct osd_object *obj = osd_obj(l);
388         int result;
389
390         LASSERT(osd_invariant(obj));
391
392         result = osd_fid_lookup(env, obj, lu_object_fid(l));
393         if (result == 0) {
394                 if (obj->oo_inode != NULL)
395                         osd_object_init0(obj);
396         }
397         LASSERT(osd_invariant(obj));
398         return result;
399 }
400
401 /*
402  * Concurrency: no concurrent access is possible that late in object
403  * life-cycle.
404  */
405 static void osd_object_free(const struct lu_env *env, struct lu_object *l)
406 {
407         struct osd_object *obj = osd_obj(l);
408
409         LASSERT(osd_invariant(obj));
410
411         dt_object_fini(&obj->oo_dt);
412         OBD_FREE_PTR(obj);
413 }
414
415 static struct iam_path_descr *osd_ipd_get(const struct lu_env *env,
416                                           const struct iam_container *bag)
417 {
418         return bag->ic_descr->id_ops->id_ipd_alloc(bag,
419                                                    osd_oti_get(env)->oti_ipd);
420 }
421
422 static void osd_ipd_put(const struct lu_env *env,
423                         const struct iam_container *bag,
424                         struct iam_path_descr *ipd)
425 {
426         bag->ic_descr->id_ops->id_ipd_free(ipd);
427 }
428
429 /*
430  * Concurrency: no concurrent access is possible that late in object
431  * life-cycle.
432  */
433 static void osd_index_fini(struct osd_object *o)
434 {
435         struct iam_container *bag;
436
437         if (o->oo_dir != NULL) {
438                 bag = &o->oo_dir->od_container;
439                 if (o->oo_inode != NULL) {
440                         if (bag->ic_object == o->oo_inode)
441                                 iam_container_fini(bag);
442                 }
443                 OBD_FREE_PTR(o->oo_dir);
444                 o->oo_dir = NULL;
445         }
446 }
447
448 /*
449  * Concurrency: no concurrent access is possible that late in object
450  * life-cycle (for all existing callers, that is. New callers have to provide
451  * their own locking.)
452  */
453 static int osd_inode_unlinked(const struct inode *inode)
454 {
455         return inode->i_nlink == 0;
456 }
457
458 enum {
459         OSD_TXN_OI_DELETE_CREDITS    = 20,
460         OSD_TXN_INODE_DELETE_CREDITS = 20
461 };
462
463 /*
464  * Concurrency: no concurrent access is possible that late in object
465  * life-cycle.
466  */
467 static int osd_inode_remove(const struct lu_env *env, struct osd_object *obj)
468 {
469         const struct lu_fid    *fid = lu_object_fid(&obj->oo_dt.do_lu);
470         struct osd_device      *osd = osd_obj2dev(obj);
471         struct osd_thread_info *oti = osd_oti_get(env);
472         struct txn_param       *prm = &oti->oti_txn;
473         struct thandle         *th;
474         int result;
475
476         txn_param_init(prm, OSD_TXN_OI_DELETE_CREDITS + 
477                             OSD_TXN_INODE_DELETE_CREDITS);
478         th = osd_trans_start(env, &osd->od_dt_dev, prm);
479         if (!IS_ERR(th)) {
480                 result = osd_oi_delete(oti, &osd->od_oi, fid, th);
481                 osd_trans_stop(env, th);
482         } else
483                 result = PTR_ERR(th);
484         return result;
485 }
486
487 /*
488  * Called just before object is freed. Releases all resources except for
489  * object itself (that is released by osd_object_free()).
490  *
491  * Concurrency: no concurrent access is possible that late in object
492  * life-cycle.
493  */
494 static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
495 {
496         struct osd_object *obj   = osd_obj(l);
497         struct inode      *inode = obj->oo_inode;
498
499         LASSERT(osd_invariant(obj));
500
501         /*
502          * If object is unlinked remove fid->ino mapping from object index.
503          *
504          * File body will be deleted by iput().
505          */
506
507         osd_index_fini(obj);
508         if (inode != NULL) {
509                 int result;
510
511                 if (osd_inode_unlinked(inode)) {
512                         result = osd_inode_remove(env, obj);
513                         if (result != 0)
514                                 LU_OBJECT_DEBUG(D_ERROR, env, l,
515                                                 "Failed to cleanup: %d\n",
516                                                 result);
517                 }
518                 iput(inode);
519                 obj->oo_inode = NULL;
520         }
521 }
522
523 /*
524  * Concurrency: ->loo_object_release() is called under site spin-lock.
525  */
526 static void osd_object_release(const struct lu_env *env,
527                                struct lu_object *l)
528 {
529         struct osd_object *o = osd_obj(l);
530
531         LASSERT(!lu_object_is_dying(l->lo_header));
532         if (o->oo_inode != NULL && osd_inode_unlinked(o->oo_inode))
533                 set_bit(LU_OBJECT_HEARD_BANSHEE, &l->lo_header->loh_flags);
534 }
535
536 /*
537  * Concurrency: shouldn't matter.
538  */
539 static int osd_object_print(const struct lu_env *env, void *cookie,
540                             lu_printer_t p, const struct lu_object *l)
541 {
542         struct osd_object *o = osd_obj(l);
543         struct iam_descr  *d;
544
545         if (o->oo_dir != NULL)
546                 d = o->oo_dir->od_container.ic_descr;
547         else
548                 d = NULL;
549         return (*p)(env, cookie, LUSTRE_OSD_NAME"-object@%p(i:%p:%lu/%u)[%s]",
550                     o, o->oo_inode,
551                     o->oo_inode ? o->oo_inode->i_ino : 0UL,
552                     o->oo_inode ? o->oo_inode->i_generation : 0,
553                     d ? d->id_ops->id_name : "plain");
554 }
555
556 /*
557  * Concurrency: shouldn't matter.
558  */
559 static int osd_statfs(const struct lu_env *env,
560                       struct dt_device *d, struct kstatfs *sfs)
561 {
562         struct osd_device *osd = osd_dt_dev(d);
563         struct super_block *sb = osd_sb(osd);
564         int result = 0;
565
566         spin_lock(&osd->od_osfs_lock);
567         /* cache 1 second */
568         if (cfs_time_before_64(osd->od_osfs_age, cfs_time_shift_64(-1))) {
569                 result = sb->s_op->statfs(sb, &osd->od_kstatfs);
570                 if (likely(result == 0)) /* N.B. statfs can't really fail */
571                         osd->od_osfs_age = cfs_time_current_64();
572         }
573
574         if (likely(result == 0))
575                 *sfs = osd->od_kstatfs; 
576         spin_unlock(&osd->od_osfs_lock);
577
578         return result;
579 }
580
581 /*
582  * Concurrency: doesn't access mutable data.
583  */
584 static void osd_conf_get(const struct lu_env *env,
585                          const struct dt_device *dev,
586                          struct dt_device_param *param)
587 {
588         /*
589          * XXX should be taken from not-yet-existing fs abstraction layer.
590          */
591         param->ddp_max_name_len  = LDISKFS_NAME_LEN;
592         param->ddp_max_nlink     = LDISKFS_LINK_MAX;
593         param->ddp_block_shift   = osd_sb(osd_dt_dev(dev))->s_blocksize_bits;
594 }
595
596 /*
597  * Journal
598  */
599
600 /*
601  * Concurrency: doesn't access mutable data.
602  */
603 static int osd_param_is_sane(const struct osd_device *dev,
604                              const struct txn_param *param)
605 {
606         return param->tp_credits <= osd_journal(dev)->j_max_transaction_buffers;
607 }
608
609 /*
610  * Concurrency: shouldn't matter.
611  */
612 static void osd_trans_commit_cb(struct journal_callback *jcb, int error)
613 {
614         struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb);
615         struct thandle     *th = &oh->ot_super;
616         struct dt_device   *dev = th->th_dev;
617
618         LASSERT(dev != NULL);
619         LASSERT(oh->ot_handle == NULL);
620
621         if (error) {
622                 CERROR("transaction @0x%p commit error: %d\n", th, error);
623         } else {
624                 /*
625                  * This od_env_for_commit is only for commit usage.  see
626                  * "struct dt_device"
627                  */
628                 dt_txn_hook_commit(&osd_dt_dev(dev)->od_env_for_commit, th);
629         }
630
631         lu_device_put(&dev->dd_lu_dev);
632         th->th_dev = NULL;
633
634         lu_context_exit(&th->th_ctx);
635         lu_context_fini(&th->th_ctx);
636         OBD_FREE_PTR(oh);
637 }
638
639 /*
640  * Concurrency: shouldn't matter.
641  */
642 static struct thandle *osd_trans_start(const struct lu_env *env,
643                                        struct dt_device *d,
644                                        struct txn_param *p)
645 {
646         struct osd_device  *dev = osd_dt_dev(d);
647         handle_t           *jh;
648         struct osd_thandle *oh;
649         struct thandle     *th;
650         int hook_res;
651
652         ENTRY;
653
654         hook_res = dt_txn_hook_start(env, d, p);
655         if (hook_res != 0)
656                 RETURN(ERR_PTR(hook_res));
657
658         if (osd_param_is_sane(dev, p)) {
659                 OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO);
660                 if (oh != NULL) {
661                         /*
662                          * XXX temporary stuff. Some abstraction layer should
663                          * be used.
664                          */
665
666                         jh = journal_start(osd_journal(dev), p->tp_credits);
667                         if (!IS_ERR(jh)) {
668                                 oh->ot_handle = jh;
669                                 th = &oh->ot_super;
670                                 th->th_dev = d;
671                                 th->th_result = 0;
672                                 jh->h_sync = p->tp_sync;
673                                 lu_device_get(&d->dd_lu_dev);
674                                 /* add commit callback */
675                                 lu_context_init(&th->th_ctx, LCT_TX_HANDLE);
676                                 lu_context_enter(&th->th_ctx);
677                                 journal_callback_set(jh, osd_trans_commit_cb,
678                                                      (struct journal_callback *)&oh->ot_jcb);
679 #if OSD_COUNTERS
680                                 {
681                                         struct osd_thread_info *oti =
682                                                 osd_oti_get(env);
683
684                                         LASSERT(oti->oti_txns == 0);
685                                         LASSERT(oti->oti_r_locks == 0);
686                                         LASSERT(oti->oti_w_locks == 0);
687                                         oti->oti_txns++;
688                                 }
689 #endif
690                         } else {
691                                 OBD_FREE_PTR(oh);
692                                 th = (void *)jh;
693                         }
694                 } else
695                         th = ERR_PTR(-ENOMEM);
696         } else {
697                 CERROR("Invalid transaction parameters\n");
698                 th = ERR_PTR(-EINVAL);
699         }
700
701         RETURN(th);
702 }
703
704 /*
705  * Concurrency: shouldn't matter.
706  */
707 static void osd_trans_stop(const struct lu_env *env, struct thandle *th)
708 {
709         int result;
710         struct osd_thandle *oh;
711
712         ENTRY;
713
714         oh = container_of0(th, struct osd_thandle, ot_super);
715         if (oh->ot_handle != NULL) {
716                 handle_t *hdl = oh->ot_handle;
717                 /*
718                  * XXX temporary stuff. Some abstraction layer should be used.
719                  */
720                 result = dt_txn_hook_stop(env, th);
721                 if (result != 0)
722                         CERROR("Failure in transaction hook: %d\n", result);
723
724                 /**/
725                 oh->ot_handle = NULL;
726                 result = journal_stop(hdl);
727                 if (result != 0)
728                         CERROR("Failure to stop transaction: %d\n", result);
729
730 #if OSD_COUNTERS
731                 {
732                         struct osd_thread_info *oti = osd_oti_get(env);
733
734                         LASSERT(oti->oti_txns == 1);
735                         LASSERT(oti->oti_r_locks == 0);
736                         LASSERT(oti->oti_w_locks == 0);
737                         oti->oti_txns--;
738                 }
739 #endif
740         }
741         EXIT;
742 }
743
744 /*
745  * Concurrency: shouldn't matter.
746  */
747 static int osd_sync(const struct lu_env *env, struct dt_device *d)
748 {
749         CDEBUG(D_HA, "syncing OSD %s\n", LUSTRE_OSD_NAME);
750         return ldiskfs_force_commit(osd_sb(osd_dt_dev(d)));
751 }
752
753 /*
754  * Concurrency: shouldn't matter.
755  */
756 lvfs_sbdev_type fsfilt_ldiskfs_journal_sbdev(struct super_block *);
757
758 static void osd_ro(const struct lu_env *env, struct dt_device *d)
759 {
760         ENTRY;
761
762         CERROR("*** setting device %s read-only ***\n", LUSTRE_OSD_NAME);
763
764         __lvfs_set_rdonly(lvfs_sbdev(osd_sb(osd_dt_dev(d))),
765                           fsfilt_ldiskfs_journal_sbdev(osd_sb(osd_dt_dev(d))));
766         EXIT;
767 }
768
769 /*
770  * Concurrency: serialization provided by callers.
771  */
772 static int osd_init_capa_ctxt(const struct lu_env *env, struct dt_device *d,
773                               int mode, unsigned long timeout, __u32 alg,
774                               struct lustre_capa_key *keys)
775 {
776         struct osd_device *dev = osd_dt_dev(d);
777         ENTRY;
778
779         dev->od_fl_capa = mode;
780         dev->od_capa_timeout = timeout;
781         dev->od_capa_alg = alg;
782         dev->od_capa_keys = keys;
783         RETURN(0);
784 }
785
786 /* Note: we did not count into QUOTA here, If we mount with --data_journal
787  * we may need more*/
788 static const int osd_dto_credits[DTO_NR] = {
789         /*
790          * Insert/Delete. IAM EXT3_INDEX_EXTRA_TRANS_BLOCKS(8) +
791          * EXT3_SINGLEDATA_TRANS_BLOCKS 8 XXX Note: maybe iam need more,since
792          * iam have more level than Ext3 htree
793          */
794         [DTO_INDEX_INSERT]  = 16,
795         [DTO_INDEX_DELETE]  = 16,
796         [DTO_IDNEX_UPDATE]  = 16,
797         /*
798          * Create a object. Same as create object in Ext3 filesystem, but did
799          * not count QUOTA i EXT3_DATA_TRANS_BLOCKS(12) +
800          * INDEX_EXTRA_BLOCKS(8) + 3(inode bits,groups, GDT)
801          */
802         [DTO_OBJECT_CREATE] = 23,
803         [DTO_OBJECT_DELETE] = 23,
804         /*
805          * Attr set credits 3 inode, group, GDT
806          */
807         [DTO_ATTR_SET]      = 3,
808         /*
809          * XATTR_SET. SAME AS XATTR of EXT3 EXT3_DATA_TRANS_BLOCKS XXX Note:
810          * in original MDS implmentation EXT3_INDEX_EXTRA_TRANS_BLOCKS are
811          * also counted in. Do not know why?
812          */
813         [DTO_XATTR_SET]     = 16,
814         [DTO_LOG_REC]       = 16
815 };
816
817 static int osd_credit_get(const struct lu_env *env, struct dt_device *d,
818                           enum dt_txn_op op)
819 {
820         LASSERT(0 <= op && op < ARRAY_SIZE(osd_dto_credits));
821         return osd_dto_credits[op];
822 }
823
824 static struct dt_device_operations osd_dt_ops = {
825         .dt_root_get       = osd_root_get,
826         .dt_statfs         = osd_statfs,
827         .dt_trans_start    = osd_trans_start,
828         .dt_trans_stop     = osd_trans_stop,
829         .dt_conf_get       = osd_conf_get,
830         .dt_sync           = osd_sync,
831         .dt_ro             = osd_ro,
832         .dt_credit_get     = osd_credit_get,
833         .dt_init_capa_ctxt = osd_init_capa_ctxt,
834 };
835
836 static void osd_object_read_lock(const struct lu_env *env,
837                                  struct dt_object *dt)
838 {
839         struct osd_object *obj = osd_dt_obj(dt);
840
841         LASSERT(osd_invariant(obj));
842
843         OSD_COUNTERS_DO(LASSERT(obj->oo_owner != env));
844         down_read(&obj->oo_sem);
845 #if OSD_COUNTERS
846         {
847                 struct osd_thread_info *oti = osd_oti_get(env);
848
849                 LASSERT(obj->oo_owner == NULL);
850                 oti->oti_r_locks++;
851         }
852 #endif
853 }
854
855 static void osd_object_write_lock(const struct lu_env *env,
856                                   struct dt_object *dt)
857 {
858         struct osd_object *obj = osd_dt_obj(dt);
859
860         LASSERT(osd_invariant(obj));
861
862         OSD_COUNTERS_DO(LASSERT(obj->oo_owner != env));
863         down_write(&obj->oo_sem);
864 #if OSD_COUNTERS
865         {
866                 struct osd_thread_info *oti = osd_oti_get(env);
867
868                 LASSERT(obj->oo_owner == NULL);
869                 obj->oo_owner = env;
870                 oti->oti_w_locks++;
871         }
872 #endif
873 }
874
875 static void osd_object_read_unlock(const struct lu_env *env,
876                                    struct dt_object *dt)
877 {
878         struct osd_object *obj = osd_dt_obj(dt);
879
880         LASSERT(osd_invariant(obj));
881 #if OSD_COUNTERS
882         {
883                 struct osd_thread_info *oti = osd_oti_get(env);
884
885                 LASSERT(oti->oti_r_locks > 0);
886                 oti->oti_r_locks--;
887         }
888 #endif
889         up_read(&obj->oo_sem);
890 }
891
892 static void osd_object_write_unlock(const struct lu_env *env,
893                                     struct dt_object *dt)
894 {
895         struct osd_object *obj = osd_dt_obj(dt);
896
897         LASSERT(osd_invariant(obj));
898 #if OSD_COUNTERS
899         {
900                 struct osd_thread_info *oti = osd_oti_get(env);
901
902                 LASSERT(obj->oo_owner == env);
903                 LASSERT(oti->oti_w_locks > 0);
904                 oti->oti_w_locks--;
905                 obj->oo_owner = NULL;
906         }
907 #endif
908         up_write(&obj->oo_sem);
909 }
910
911 static int capa_is_sane(const struct lu_env *env,
912                         struct osd_device *dev,
913                         struct lustre_capa *capa,
914                         struct lustre_capa_key *keys)
915 {
916         struct osd_thread_info *oti = osd_oti_get(env);
917         struct obd_capa *oc;
918         int i, rc = 0;
919         ENTRY;
920
921         oc = capa_lookup(dev->od_capa_hash, capa, 0);
922         if (oc) {
923                 if (capa_is_expired(oc)) {
924                         DEBUG_CAPA(D_ERROR, capa, "expired");
925                         rc = -ESTALE;
926                 }
927                 capa_put(oc);
928                 RETURN(rc);
929         }
930
931         spin_lock(&capa_lock);
932         for (i = 0; i < 2; i++) {
933                 if (keys[i].lk_keyid == capa->lc_keyid) {
934                         oti->oti_capa_key = keys[i];
935                         break;
936                 }
937         }
938         spin_unlock(&capa_lock);
939
940         if (i == 2) {
941                 DEBUG_CAPA(D_ERROR, capa, "no matched capa key");
942                 RETURN(-ESTALE);
943         }
944
945         rc = capa_hmac(oti->oti_capa.lc_hmac, capa, oti->oti_capa_key.lk_key);
946         if (rc)
947                 RETURN(rc);
948         if (memcmp(oti->oti_capa.lc_hmac, capa->lc_hmac, sizeof(capa->lc_hmac)))
949         {
950                 DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch");
951                 RETURN(-EACCES);
952         }
953
954         oc = capa_add(dev->od_capa_hash, capa);
955         capa_put(oc);
956
957         RETURN(0);
958 }
959
960 static int osd_object_auth(const struct lu_env *env, struct dt_object *dt,
961                            struct lustre_capa *capa, __u64 opc)
962 {
963         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
964         struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
965         int rc;
966
967         if (!dev->od_fl_capa)
968                 return 0;
969
970         if (capa == BYPASS_CAPA)
971                 return 0;
972
973         if (!capa) {
974                 CERROR("no capability is provided for fid "DFID"\n", PFID(fid));
975                 return -EACCES;
976         }
977
978         if (!lu_fid_eq(fid, &capa->lc_fid)) {
979                 DEBUG_CAPA(D_ERROR, capa, "fid "DFID" mismatch with",
980                            PFID(fid));
981                 return -EACCES;
982         }
983
984         if (!capa_opc_supported(capa, opc)) {
985                 DEBUG_CAPA(D_ERROR, capa, "opc "LPX64" not supported by", opc);
986                 return -EACCES;
987         }
988
989         if ((rc = capa_is_sane(env, dev, capa, dev->od_capa_keys))) {
990                 DEBUG_CAPA(D_ERROR, capa, "insane (rc %d)", rc);
991                 return -EACCES;
992         }
993
994         return 0;
995 }
996
997 static int osd_attr_get(const struct lu_env *env,
998                         struct dt_object *dt,
999                         struct lu_attr *attr,
1000                         struct lustre_capa *capa)
1001 {
1002         struct osd_object *obj = osd_dt_obj(dt);
1003
1004         LASSERT(dt_object_exists(dt));
1005         LASSERT(osd_invariant(obj));
1006
1007         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1008                 return -EACCES;
1009
1010         spin_lock(&obj->oo_guard);
1011         osd_inode_getattr(env, obj->oo_inode, attr);
1012         spin_unlock(&obj->oo_guard);
1013         return 0;
1014 }
1015
1016 static int osd_attr_set(const struct lu_env *env,
1017                         struct dt_object *dt,
1018                         const struct lu_attr *attr,
1019                         struct thandle *handle,
1020                         struct lustre_capa *capa)
1021 {
1022         struct osd_object *obj = osd_dt_obj(dt);
1023
1024         LASSERT(handle != NULL);
1025         LASSERT(dt_object_exists(dt));
1026         LASSERT(osd_invariant(obj));
1027
1028         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1029                 return -EACCES;
1030
1031         spin_lock(&obj->oo_guard);
1032         osd_inode_setattr(env, obj->oo_inode, attr);
1033         spin_unlock(&obj->oo_guard);
1034
1035         mark_inode_dirty(obj->oo_inode);
1036         return 0;
1037 }
1038
1039 static struct timespec *osd_inode_time(const struct lu_env *env,
1040                                        struct inode *inode, __u64 seconds)
1041 {
1042         struct osd_thread_info *oti = osd_oti_get(env);
1043         struct timespec        *t   = &oti->oti_time;
1044
1045         t->tv_sec  = seconds;
1046         t->tv_nsec = 0;
1047         *t = timespec_trunc(*t, get_sb_time_gran(inode->i_sb));
1048         return t;
1049 }
1050
1051 static void osd_inode_setattr(const struct lu_env *env,
1052                               struct inode *inode, const struct lu_attr *attr)
1053 {
1054         __u64 bits;
1055
1056         bits = attr->la_valid;
1057
1058         LASSERT(!(bits & LA_TYPE)); /* Huh? You want too much. */
1059
1060         if (bits & LA_ATIME)
1061                 inode->i_atime  = *osd_inode_time(env, inode, attr->la_atime);
1062         if (bits & LA_CTIME)
1063                 inode->i_ctime  = *osd_inode_time(env, inode, attr->la_ctime);
1064         if (bits & LA_MTIME)
1065                 inode->i_mtime  = *osd_inode_time(env, inode, attr->la_mtime);
1066         if (bits & LA_SIZE) {
1067                 LDISKFS_I(inode)->i_disksize = attr->la_size;
1068                 i_size_write(inode, attr->la_size);
1069         }
1070         if (bits & LA_BLOCKS)
1071                 inode->i_blocks = attr->la_blocks;
1072         if (bits & LA_MODE)
1073                 inode->i_mode   = (inode->i_mode & S_IFMT) |
1074                         (attr->la_mode & ~S_IFMT);
1075         if (bits & LA_UID)
1076                 inode->i_uid    = attr->la_uid;
1077         if (bits & LA_GID)
1078                 inode->i_gid    = attr->la_gid;
1079         if (bits & LA_NLINK)
1080                 inode->i_nlink  = attr->la_nlink;
1081         if (bits & LA_RDEV)
1082                 inode->i_rdev   = attr->la_rdev;
1083         if (bits & LA_BLKSIZE)
1084                 inode->i_blksize = attr->la_blksize;
1085
1086         if (bits & LA_FLAGS) {
1087                 struct ldiskfs_inode_info *li = LDISKFS_I(inode);
1088
1089                 li->i_flags = (li->i_flags & ~LDISKFS_FL_USER_MODIFIABLE) |
1090                         (attr->la_flags & LDISKFS_FL_USER_MODIFIABLE);
1091         }
1092 }
1093
1094 /*
1095  * Object creation.
1096  *
1097  * XXX temporary solution.
1098  */
1099
1100 static int osd_create_pre(struct osd_thread_info *info, struct osd_object *obj,
1101                           struct lu_attr *attr, struct thandle *th)
1102 {
1103         return 0;
1104 }
1105
1106 static int osd_create_post(struct osd_thread_info *info, struct osd_object *obj,
1107                            struct lu_attr *attr, struct thandle *th)
1108 {
1109         LASSERT(obj->oo_inode != NULL);
1110
1111         osd_object_init0(obj);
1112         return 0;
1113 }
1114
1115 extern struct inode *ldiskfs_create_inode(handle_t *handle,
1116                                           struct inode * dir, int mode);
1117
1118 static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
1119                       umode_t mode,
1120                       struct dt_allocation_hint *hint,
1121                       struct thandle *th)
1122 {
1123         int result;
1124         struct osd_device  *osd = osd_obj2dev(obj);
1125         struct osd_thandle *oth;
1126         struct inode       *parent;
1127         struct inode       *inode;
1128
1129         LASSERT(osd_invariant(obj));
1130         LASSERT(obj->oo_inode == NULL);
1131         LASSERT(osd->od_obj_area != NULL);
1132
1133         oth = container_of(th, struct osd_thandle, ot_super);
1134         LASSERT(oth->ot_handle->h_transaction != NULL);
1135
1136         if (hint && hint->dah_parent)
1137                 parent = osd_dt_obj(hint->dah_parent)->oo_inode;
1138         else
1139                 parent = osd->od_obj_area->d_inode;
1140         LASSERT(parent->i_op != NULL);
1141
1142         inode = ldiskfs_create_inode(oth->ot_handle, parent, mode);
1143         if (!IS_ERR(inode)) {
1144                 obj->oo_inode = inode;
1145                 result = 0;
1146         } else
1147                 result = PTR_ERR(inode);
1148         LASSERT(osd_invariant(obj));
1149         return result;
1150 }
1151
1152
1153 extern int iam_lvar_create(struct inode *obj, int keysize, int ptrsize,
1154                            int recsize, handle_t *handle);
1155
1156 enum {
1157         OSD_NAME_LEN = 255
1158 };
1159
1160 static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj,
1161                      struct lu_attr *attr,
1162                      struct dt_allocation_hint *hint,
1163                      struct thandle *th)
1164 {
1165         int result;
1166         struct osd_thandle *oth;
1167
1168         LASSERT(S_ISDIR(attr->la_mode));
1169
1170         oth = container_of(th, struct osd_thandle, ot_super);
1171         LASSERT(oth->ot_handle->h_transaction != NULL);
1172         result = osd_mkfile(info, obj, (attr->la_mode &
1173                             (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1174         if (result == 0) {
1175                 LASSERT(obj->oo_inode != NULL);
1176                 /*
1177                  * XXX uh-oh... call low-level iam function directly.
1178                  */
1179                 result = iam_lvar_create(obj->oo_inode, OSD_NAME_LEN, 4,
1180                                          sizeof (struct lu_fid_pack),
1181                                          oth->ot_handle);
1182         }
1183         return result;
1184 }
1185
1186 static int osd_mkreg(struct osd_thread_info *info, struct osd_object *obj,
1187                      struct lu_attr *attr,
1188                      struct dt_allocation_hint *hint,
1189                      struct thandle *th)
1190 {
1191         LASSERT(S_ISREG(attr->la_mode));
1192         return osd_mkfile(info, obj, (attr->la_mode &
1193                                (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1194 }
1195
1196 static int osd_mksym(struct osd_thread_info *info, struct osd_object *obj,
1197                      struct lu_attr *attr,
1198                      struct dt_allocation_hint *hint,
1199                      struct thandle *th)
1200 {
1201         LASSERT(S_ISLNK(attr->la_mode));
1202         return osd_mkfile(info, obj, (attr->la_mode &
1203                               (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1204 }
1205
1206 static int osd_mknod(struct osd_thread_info *info, struct osd_object *obj,
1207                      struct lu_attr *attr,
1208                      struct dt_allocation_hint *hint,
1209                      struct thandle *th)
1210 {
1211         int result;
1212         struct osd_device *osd = osd_obj2dev(obj);
1213         struct inode      *dir;
1214         umode_t mode = attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX);
1215
1216         LASSERT(osd_invariant(obj));
1217         LASSERT(obj->oo_inode == NULL);
1218         LASSERT(osd->od_obj_area != NULL);
1219         LASSERT(S_ISCHR(mode) || S_ISBLK(mode) ||
1220                 S_ISFIFO(mode) || S_ISSOCK(mode));
1221
1222         dir = osd->od_obj_area->d_inode;
1223         LASSERT(dir->i_op != NULL);
1224
1225         result = osd_mkfile(info, obj, mode, hint, th);
1226         if (result == 0) {
1227                 LASSERT(obj->oo_inode != NULL);
1228                 init_special_inode(obj->oo_inode, mode, attr->la_rdev);
1229         }
1230         LASSERT(osd_invariant(obj));
1231         return result;
1232 }
1233
1234 typedef int (*osd_obj_type_f)(struct osd_thread_info *, struct osd_object *,
1235                               struct lu_attr *,
1236                               struct dt_allocation_hint *hint,
1237                               struct thandle *);
1238
1239 static osd_obj_type_f osd_create_type_f(__u32 mode)
1240 {
1241         osd_obj_type_f result;
1242
1243         switch (mode) {
1244         case S_IFDIR:
1245                 result = osd_mkdir;
1246                 break;
1247         case S_IFREG:
1248                 result = osd_mkreg;
1249                 break;
1250         case S_IFLNK:
1251                 result = osd_mksym;
1252                 break;
1253         case S_IFCHR:
1254         case S_IFBLK:
1255         case S_IFIFO:
1256         case S_IFSOCK:
1257                 result = osd_mknod;
1258                 break;
1259         default:
1260                 LBUG();
1261                 break;
1262         }
1263         return result;
1264 }
1265
1266
1267 static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah,
1268                         struct dt_object *parent, umode_t child_mode)
1269 {
1270         LASSERT(ah);
1271
1272         memset(ah, 0, sizeof(*ah));
1273         ah->dah_parent = parent;
1274         ah->dah_mode = child_mode;
1275 }
1276
1277
1278 /*
1279  * Concurrency: @dt is write locked.
1280  */
1281 static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
1282                              struct lu_attr *attr, 
1283                              struct dt_allocation_hint *hint,
1284                              struct thandle *th)
1285 {
1286         const struct lu_fid    *fid  = lu_object_fid(&dt->do_lu);
1287         struct osd_object      *obj  = osd_dt_obj(dt);
1288         struct osd_device      *osd  = osd_obj2dev(obj);
1289         struct osd_thread_info *info = osd_oti_get(env);
1290         int result;
1291
1292         ENTRY;
1293
1294         LASSERT(osd_invariant(obj));
1295         LASSERT(!dt_object_exists(dt));
1296         LASSERT(osd_write_locked(env, obj));
1297         LASSERT(th != NULL);
1298
1299         /*
1300          * XXX missing: Quote handling.
1301          */
1302
1303         result = osd_create_pre(info, obj, attr, th);
1304         if (result == 0) {
1305                 result = osd_create_type_f(attr->la_mode & S_IFMT)(info, obj,
1306                                                                 attr, hint, th);
1307                 if (result == 0)
1308                         result = osd_create_post(info, obj, attr, th);
1309         }
1310         if (result == 0) {
1311                 struct osd_inode_id *id = &info->oti_id;
1312
1313                 LASSERT(obj->oo_inode != NULL);
1314
1315                 id->oii_ino = obj->oo_inode->i_ino;
1316                 id->oii_gen = obj->oo_inode->i_generation;
1317
1318                 result = osd_oi_insert(info, &osd->od_oi, fid, id, th);
1319         }
1320
1321         LASSERT(ergo(result == 0, dt_object_exists(dt)));
1322         LASSERT(osd_invariant(obj));
1323         RETURN(result);
1324 }
1325
1326 /*
1327  * Concurrency: @dt is write locked.
1328  */
1329 static void osd_object_ref_add(const struct lu_env *env,
1330                                struct dt_object *dt,
1331                                struct thandle *th)
1332 {
1333         struct osd_object *obj = osd_dt_obj(dt);
1334         struct inode *inode = obj->oo_inode;
1335
1336         LASSERT(osd_invariant(obj));
1337         LASSERT(dt_object_exists(dt));
1338         LASSERT(osd_write_locked(env, obj));
1339         LASSERT(th != NULL);
1340
1341         spin_lock(&obj->oo_guard);
1342         if (inode->i_nlink < LDISKFS_LINK_MAX) {
1343                 inode->i_nlink ++;
1344                 spin_unlock(&obj->oo_guard);
1345                 mark_inode_dirty(inode);
1346         } else {
1347                 spin_unlock(&obj->oo_guard);
1348                 LU_OBJECT_DEBUG(D_ERROR, env, &dt->do_lu,
1349                                 "Overflowed nlink\n");
1350         }
1351         LASSERT(osd_invariant(obj));
1352 }
1353
1354 /*
1355  * Concurrency: @dt is write locked.
1356  */
1357 static void osd_object_ref_del(const struct lu_env *env,
1358                                struct dt_object *dt,
1359                                struct thandle *th)
1360 {
1361         struct osd_object *obj = osd_dt_obj(dt);
1362         struct inode *inode = obj->oo_inode;
1363
1364         LASSERT(osd_invariant(obj));
1365         LASSERT(dt_object_exists(dt));
1366         LASSERT(osd_write_locked(env, obj));
1367         LASSERT(th != NULL);
1368
1369         spin_lock(&obj->oo_guard);
1370         if (inode->i_nlink > 0) {
1371                 inode->i_nlink --;
1372                 spin_unlock(&obj->oo_guard);
1373                 mark_inode_dirty(inode);
1374         } else {
1375                 spin_unlock(&obj->oo_guard);
1376                 LU_OBJECT_DEBUG(D_ERROR, env, &dt->do_lu,
1377                                 "Underflowed nlink\n");
1378         }
1379         LASSERT(osd_invariant(obj));
1380 }
1381
1382 /*
1383  * Concurrency: @dt is read locked.
1384  */
1385 static int osd_xattr_get(const struct lu_env *env,
1386                          struct dt_object *dt,
1387                          struct lu_buf *buf,
1388                          const char *name,
1389                          struct lustre_capa *capa)
1390 {
1391         struct osd_object      *obj    = osd_dt_obj(dt);
1392         struct inode           *inode  = obj->oo_inode;
1393         struct osd_thread_info *info   = osd_oti_get(env);
1394         struct dentry          *dentry = &info->oti_dentry;
1395
1396         LASSERT(dt_object_exists(dt));
1397         LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
1398         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
1399
1400         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1401                 return -EACCES;
1402
1403         dentry->d_inode = inode;
1404         return inode->i_op->getxattr(dentry, name, buf->lb_buf, buf->lb_len);
1405 }
1406
1407 /*
1408  * Concurrency: @dt is write locked.
1409  */
1410 static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
1411                          const struct lu_buf *buf, const char *name, int fl,
1412                          struct thandle *handle, struct lustre_capa *capa)
1413 {
1414         int fs_flags;
1415
1416         struct osd_object      *obj    = osd_dt_obj(dt);
1417         struct inode           *inode  = obj->oo_inode;
1418         struct osd_thread_info *info   = osd_oti_get(env);
1419         struct dentry          *dentry = &info->oti_dentry;
1420
1421         LASSERT(dt_object_exists(dt));
1422         LASSERT(inode->i_op != NULL && inode->i_op->setxattr != NULL);
1423         LASSERT(osd_write_locked(env, obj));
1424         LASSERT(handle != NULL);
1425
1426         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1427                 return -EACCES;
1428
1429         dentry->d_inode = inode;
1430
1431         fs_flags = 0;
1432         if (fl & LU_XATTR_REPLACE)
1433                 fs_flags |= XATTR_REPLACE;
1434
1435         if (fl & LU_XATTR_CREATE)
1436                 fs_flags |= XATTR_CREATE;
1437
1438         return inode->i_op->setxattr(dentry, name,
1439                                      buf->lb_buf, buf->lb_len, fs_flags);
1440 }
1441
1442 /*
1443  * Concurrency: @dt is read locked.
1444  */
1445 static int osd_xattr_list(const struct lu_env *env,
1446                           struct dt_object *dt,
1447                           struct lu_buf *buf,
1448                           struct lustre_capa *capa)
1449 {
1450         struct osd_object      *obj    = osd_dt_obj(dt);
1451         struct inode           *inode  = obj->oo_inode;
1452         struct osd_thread_info *info   = osd_oti_get(env);
1453         struct dentry          *dentry = &info->oti_dentry;
1454
1455         LASSERT(dt_object_exists(dt));
1456         LASSERT(inode->i_op != NULL && inode->i_op->listxattr != NULL);
1457         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
1458
1459         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1460                 return -EACCES;
1461
1462         dentry->d_inode = inode;
1463         return inode->i_op->listxattr(dentry, buf->lb_buf, buf->lb_len);
1464 }
1465
1466 /*
1467  * Concurrency: @dt is write locked.
1468  */
1469 static int osd_xattr_del(const struct lu_env *env,
1470                          struct dt_object *dt,
1471                          const char *name,
1472                          struct thandle *handle,
1473                          struct lustre_capa *capa)
1474 {
1475         struct osd_object      *obj    = osd_dt_obj(dt);
1476         struct inode           *inode  = obj->oo_inode;
1477         struct osd_thread_info *info   = osd_oti_get(env);
1478         struct dentry          *dentry = &info->oti_dentry;
1479
1480         LASSERT(dt_object_exists(dt));
1481         LASSERT(inode->i_op != NULL && inode->i_op->removexattr != NULL);
1482         LASSERT(osd_write_locked(env, obj));
1483         LASSERT(handle != NULL);
1484
1485         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1486                 return -EACCES;
1487
1488         dentry->d_inode = inode;
1489         return inode->i_op->removexattr(dentry, name);
1490 }
1491
1492 static struct obd_capa *osd_capa_get(const struct lu_env *env,
1493                                      struct dt_object *dt,
1494                                      struct lustre_capa *old,
1495                                      __u64 opc)
1496 {
1497         struct osd_thread_info *info = osd_oti_get(env);
1498         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
1499         struct osd_object *obj = osd_dt_obj(dt);
1500         struct osd_device *dev = osd_obj2dev(obj);
1501         struct lustre_capa_key *key = &info->oti_capa_key;
1502         struct lustre_capa *capa = &info->oti_capa;
1503         struct obd_capa *oc;
1504         int rc;
1505         ENTRY;
1506
1507         if (!dev->od_fl_capa)
1508                 RETURN(ERR_PTR(-ENOENT));
1509
1510         LASSERT(dt_object_exists(dt));
1511         LASSERT(osd_invariant(obj));
1512
1513         /* renewal sanity check */
1514         if (old && osd_object_auth(env, dt, old, opc))
1515                 RETURN(ERR_PTR(-EACCES));
1516
1517         capa->lc_fid = *fid;
1518         capa->lc_opc = opc;
1519         capa->lc_uid = 0;
1520         capa->lc_flags = dev->od_capa_alg << 24;
1521         capa->lc_timeout = dev->od_capa_timeout;
1522         capa->lc_expiry = 0;
1523
1524         oc = capa_lookup(dev->od_capa_hash, capa, 1);
1525         if (oc) {
1526                 LASSERT(!capa_is_expired(oc));
1527                 RETURN(oc);
1528         }
1529
1530         spin_lock(&capa_lock);
1531         *key = dev->od_capa_keys[1];
1532         spin_unlock(&capa_lock);
1533
1534         capa->lc_keyid = key->lk_keyid;
1535         capa->lc_expiry = CURRENT_SECONDS + dev->od_capa_timeout;
1536
1537         rc = capa_hmac(capa->lc_hmac, capa, key->lk_key);
1538         if (rc) {
1539                 DEBUG_CAPA(D_ERROR, capa, "HMAC failed: %d for", rc);
1540                 RETURN(ERR_PTR(rc));
1541         }
1542
1543         oc = capa_add(dev->od_capa_hash, capa);
1544         RETURN(oc);
1545 }
1546
1547 static struct dt_object_operations osd_obj_ops = {
1548         .do_read_lock    = osd_object_read_lock,
1549         .do_write_lock   = osd_object_write_lock,
1550         .do_read_unlock  = osd_object_read_unlock,
1551         .do_write_unlock = osd_object_write_unlock,
1552         .do_attr_get     = osd_attr_get,
1553         .do_attr_set     = osd_attr_set,
1554         .do_ah_init      = osd_ah_init,
1555         .do_create       = osd_object_create,
1556         .do_index_try    = osd_index_try,
1557         .do_ref_add      = osd_object_ref_add,
1558         .do_ref_del      = osd_object_ref_del,
1559         .do_xattr_get    = osd_xattr_get,
1560         .do_xattr_set    = osd_xattr_set,
1561         .do_xattr_del    = osd_xattr_del,
1562         .do_xattr_list   = osd_xattr_list,
1563         .do_capa_get     = osd_capa_get,
1564 };
1565
1566 /*
1567  * Body operations.
1568  */
1569
1570 /*
1571  * XXX: Another layering violation for now.
1572  *
1573  * We don't want to use ->f_op->read methods, because generic file write
1574  *
1575  *         - serializes on ->i_sem, and
1576  *
1577  *         - does a lot of extra work like balance_dirty_pages(),
1578  *
1579  * which doesn't work for globally shared files like /last-received.
1580  */
1581 int fsfilt_ldiskfs_read(struct inode *inode, void *buf, int size, loff_t *offs);
1582 int fsfilt_ldiskfs_write_handle(struct inode *inode, void *buf, int bufsize,
1583                                 loff_t *offs, handle_t *handle);
1584
1585 static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt,
1586                         struct lu_buf *buf, loff_t *pos,
1587                         struct lustre_capa *capa)
1588 {
1589         struct inode *inode = osd_dt_obj(dt)->oo_inode;
1590
1591         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
1592                 RETURN(-EACCES);
1593
1594         return fsfilt_ldiskfs_read(inode, buf->lb_buf, buf->lb_len, pos);
1595 }
1596
1597 static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
1598                          const struct lu_buf *buf, loff_t *pos,
1599                          struct thandle *handle, struct lustre_capa *capa)
1600 {
1601         struct inode       *inode = osd_dt_obj(dt)->oo_inode;
1602         struct osd_thandle *oh;
1603         ssize_t             result;
1604
1605         LASSERT(handle != NULL);
1606
1607         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_WRITE))
1608                 RETURN(-EACCES);
1609
1610         oh = container_of(handle, struct osd_thandle, ot_super);
1611         LASSERT(oh->ot_handle->h_transaction != NULL);
1612         result = fsfilt_ldiskfs_write_handle(inode, buf->lb_buf, buf->lb_len,
1613                                              pos, oh->ot_handle);
1614         if (result == 0)
1615                 result = buf->lb_len;
1616         return result;
1617 }
1618
1619 static struct dt_body_operations osd_body_ops = {
1620         .dbo_read  = osd_read,
1621         .dbo_write = osd_write
1622 };
1623
1624 /*
1625  * Index operations.
1626  */
1627
1628 static int osd_object_is_root(const struct osd_object *obj)
1629 {
1630         return osd_sb(osd_obj2dev(obj))->s_root->d_inode == obj->oo_inode;
1631 }
1632
1633 static int osd_index_probe(const struct lu_env *env, struct osd_object *o,
1634                            const struct dt_index_features *feat)
1635 {
1636         struct iam_descr *descr;
1637
1638         if (osd_object_is_root(o))
1639                 return feat == &dt_directory_features;
1640
1641         LASSERT(o->oo_dir != NULL);
1642
1643         descr = o->oo_dir->od_container.ic_descr;
1644         if (feat == &dt_directory_features)
1645                 return descr == &iam_htree_compat_param ||
1646                         (descr->id_rec_size == sizeof(struct lu_fid_pack) &&
1647                          1 /*
1648                             * XXX check that index looks like directory.
1649                             */
1650                                 );
1651         else
1652                 return
1653                         feat->dif_keysize_min <= descr->id_key_size &&
1654                         descr->id_key_size <= feat->dif_keysize_max &&
1655                         feat->dif_recsize_min <= descr->id_rec_size &&
1656                         descr->id_rec_size <= feat->dif_recsize_max &&
1657                         !(feat->dif_flags & (DT_IND_VARKEY |
1658                                              DT_IND_VARREC | DT_IND_NONUNQ)) &&
1659                         ergo(feat->dif_flags & DT_IND_UPDATE,
1660                              1 /* XXX check that object (and file system) is
1661                                 * writable */);
1662 }
1663
1664 static int osd_container_init(const struct lu_env *env,
1665                               struct osd_object *obj,
1666                               struct osd_directory *dir)
1667 {
1668         int result;
1669         struct iam_container *bag;
1670
1671         bag    = &dir->od_container;
1672         result = iam_container_init(bag, &dir->od_descr, obj->oo_inode);
1673         if (result == 0) {
1674                 result = iam_container_setup(bag);
1675                 if (result == 0)
1676                         obj->oo_dt.do_index_ops = &osd_index_ops;
1677                 else
1678                         iam_container_fini(bag);
1679         }
1680         return result;
1681 }
1682
1683 /*
1684  * Concurrency: no external locking is necessary.
1685  */
1686 static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
1687                          const struct dt_index_features *feat)
1688 {
1689         int result;
1690         struct osd_object *obj = osd_dt_obj(dt);
1691
1692         LASSERT(osd_invariant(obj));
1693         LASSERT(dt_object_exists(dt));
1694
1695         if (osd_object_is_root(obj)) {
1696                 dt->do_index_ops = &osd_index_compat_ops;
1697                 result = 0;
1698         } else if (!osd_has_index(obj)) {
1699                 struct osd_directory *dir;
1700
1701                 OBD_ALLOC_PTR(dir);
1702                 if (dir != NULL) {
1703                         sema_init(&dir->od_sem, 1);
1704
1705                         spin_lock(&obj->oo_guard);
1706                         if (obj->oo_dir == NULL)
1707                                 obj->oo_dir = dir;
1708                         else
1709                                 /*
1710                                  * Concurrent thread allocated container data.
1711                                  */
1712                                 OBD_FREE_PTR(dir);
1713                         spin_unlock(&obj->oo_guard);
1714                         /*
1715                          * Now, that we have container data, serialize its
1716                          * initialization.
1717                          */
1718                         down(&obj->oo_dir->od_sem);
1719                         /*
1720                          * recheck under lock.
1721                          */
1722                         if (!osd_has_index(obj))
1723                                 result = osd_container_init(env, obj, dir);
1724                         else
1725                                 result = 0;
1726                         up(&obj->oo_dir->od_sem);
1727                 } else
1728                         result = -ENOMEM;
1729         } else
1730                 result = 0;
1731
1732         if (result == 0) {
1733                 if (!osd_index_probe(env, obj, feat))
1734                         result = -ENOTDIR;
1735         }
1736         LASSERT(osd_invariant(obj));
1737
1738         return result;
1739 }
1740
1741 static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
1742                             const struct dt_key *key, struct thandle *handle,
1743                             struct lustre_capa *capa)
1744 {
1745         struct osd_object     *obj = osd_dt_obj(dt);
1746         struct osd_thandle    *oh;
1747         struct iam_path_descr *ipd;
1748         struct iam_container  *bag = &obj->oo_dir->od_container;
1749         int rc;
1750
1751         ENTRY;
1752
1753         LASSERT(osd_invariant(obj));
1754         LASSERT(dt_object_exists(dt));
1755         LASSERT(bag->ic_object == obj->oo_inode);
1756         LASSERT(handle != NULL);
1757
1758         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
1759                 RETURN(-EACCES);
1760
1761         ipd = osd_ipd_get(env, bag);
1762         if (unlikely(ipd == NULL))
1763                 RETURN(-ENOMEM);
1764
1765         oh = container_of0(handle, struct osd_thandle, ot_super);
1766         LASSERT(oh->ot_handle != NULL);
1767         LASSERT(oh->ot_handle->h_transaction != NULL);
1768
1769         rc = iam_delete(oh->ot_handle, bag, (const struct iam_key *)key, ipd);
1770         osd_ipd_put(env, bag, ipd);
1771         LASSERT(osd_invariant(obj));
1772         RETURN(rc);
1773 }
1774
1775 static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
1776                             struct dt_rec *rec, const struct dt_key *key,
1777                             struct lustre_capa *capa)
1778 {
1779         struct osd_object     *obj = osd_dt_obj(dt);
1780         struct iam_path_descr *ipd;
1781         struct iam_container  *bag = &obj->oo_dir->od_container;
1782         int rc;
1783
1784         ENTRY;
1785
1786         LASSERT(osd_invariant(obj));
1787         LASSERT(dt_object_exists(dt));
1788         LASSERT(bag->ic_object == obj->oo_inode);
1789
1790         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
1791                 return -EACCES;
1792
1793         ipd = osd_ipd_get(env, bag);
1794         if (unlikely(ipd == NULL))
1795                 RETURN(-ENOMEM);
1796
1797         rc = iam_lookup(bag, (const struct iam_key *)key,
1798                         (struct iam_rec *)rec, ipd);
1799         osd_ipd_put(env, bag, ipd);
1800         LASSERT(osd_invariant(obj));
1801
1802         RETURN(rc);
1803 }
1804
1805 static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
1806                             const struct dt_rec *rec, const struct dt_key *key,
1807                             struct thandle *th, struct lustre_capa *capa)
1808 {
1809         struct osd_object     *obj = osd_dt_obj(dt);
1810         struct iam_path_descr *ipd;
1811         struct osd_thandle    *oh;
1812         struct iam_container  *bag = &obj->oo_dir->od_container;
1813         int rc;
1814
1815         ENTRY;
1816
1817         LASSERT(osd_invariant(obj));
1818         LASSERT(dt_object_exists(dt));
1819         LASSERT(bag->ic_object == obj->oo_inode);
1820         LASSERT(th != NULL);
1821
1822         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
1823                 return -EACCES;
1824
1825         ipd = osd_ipd_get(env, bag);
1826         if (unlikely(ipd == NULL))
1827                 RETURN(-ENOMEM);
1828
1829         oh = container_of0(th, struct osd_thandle, ot_super);
1830         LASSERT(oh->ot_handle != NULL);
1831         LASSERT(oh->ot_handle->h_transaction != NULL);
1832         rc = iam_insert(oh->ot_handle, bag, (const struct iam_key *)key,
1833                         (struct iam_rec *)rec, ipd);
1834         osd_ipd_put(env, bag, ipd);
1835         LASSERT(osd_invariant(obj));
1836         RETURN(rc);
1837 }
1838
1839 /*
1840  * Iterator operations.
1841  */
1842 struct osd_it {
1843         struct osd_object     *oi_obj;
1844         struct iam_path_descr *oi_ipd;
1845         struct iam_iterator    oi_it;
1846 };
1847
1848 static struct dt_it *osd_it_init(const struct lu_env *env,
1849                                  struct dt_object *dt, int writable,
1850                                  struct lustre_capa *capa)
1851 {
1852         struct osd_it         *it;
1853         struct osd_object     *obj = osd_dt_obj(dt);
1854         struct lu_object      *lo  = &dt->do_lu;
1855         struct iam_path_descr *ipd;
1856         struct iam_container  *bag = &obj->oo_dir->od_container;
1857         __u32                  flags;
1858
1859         LASSERT(lu_object_exists(lo));
1860
1861         if (osd_object_auth(env, dt, capa, writable ? CAPA_OPC_BODY_WRITE :
1862                             CAPA_OPC_BODY_READ))
1863                 return ERR_PTR(-EACCES);
1864
1865         flags = writable ? IAM_IT_MOVE|IAM_IT_WRITE : IAM_IT_MOVE;
1866         OBD_ALLOC_PTR(it);
1867         if (it != NULL) {
1868                 /*
1869                  * XXX: as ipd is allocated within osd_thread_info, assignment
1870                  * below implies that iterator usage is confined within single
1871                  * environment.
1872                  */
1873                 ipd = osd_ipd_get(env, bag);
1874                 if (likely(ipd != NULL)) {
1875                         it->oi_obj = obj;
1876                         it->oi_ipd = ipd;
1877                         lu_object_get(lo);
1878                         iam_it_init(&it->oi_it, bag, flags, ipd);
1879                         return (struct dt_it *)it;
1880                 } else
1881                         OBD_FREE_PTR(it);
1882         }
1883         return ERR_PTR(-ENOMEM);
1884 }
1885
1886 static void osd_it_fini(const struct lu_env *env, struct dt_it *di)
1887 {
1888         struct osd_it     *it = (struct osd_it *)di;
1889         struct osd_object *obj = it->oi_obj;
1890
1891         iam_it_fini(&it->oi_it);
1892         osd_ipd_put(env, &obj->oo_dir->od_container, it->oi_ipd);
1893         lu_object_put(env, &obj->oo_dt.do_lu);
1894         OBD_FREE_PTR(it);
1895 }
1896
1897 static int osd_it_get(const struct lu_env *env,
1898                       struct dt_it *di, const struct dt_key *key)
1899 {
1900         struct osd_it *it = (struct osd_it *)di;
1901
1902         return iam_it_get(&it->oi_it, (const struct iam_key *)key);
1903 }
1904
1905 static void osd_it_put(const struct lu_env *env, struct dt_it *di)
1906 {
1907         struct osd_it *it = (struct osd_it *)di;
1908
1909         iam_it_put(&it->oi_it);
1910 }
1911
1912 static int osd_it_next(const struct lu_env *env, struct dt_it *di)
1913 {
1914         struct osd_it *it = (struct osd_it *)di;
1915
1916         return iam_it_next(&it->oi_it);
1917 }
1918
1919 static int osd_it_del(const struct lu_env *env, struct dt_it *di,
1920                       struct thandle *th)
1921 {
1922         struct osd_it      *it = (struct osd_it *)di;
1923         struct osd_thandle *oh;
1924
1925         LASSERT(th != NULL);
1926
1927         oh = container_of0(th, struct osd_thandle, ot_super);
1928         LASSERT(oh->ot_handle != NULL);
1929         LASSERT(oh->ot_handle->h_transaction != NULL);
1930
1931         return iam_it_rec_delete(oh->ot_handle, &it->oi_it);
1932 }
1933
1934 static struct dt_key *osd_it_key(const struct lu_env *env,
1935                                  const struct dt_it *di)
1936 {
1937         struct osd_it *it = (struct osd_it *)di;
1938
1939         return (struct dt_key *)iam_it_key_get(&it->oi_it);
1940 }
1941
1942 static int osd_it_key_size(const struct lu_env *env, const struct dt_it *di)
1943 {
1944         struct osd_it *it = (struct osd_it *)di;
1945
1946         return iam_it_key_size(&it->oi_it);
1947 }
1948
1949 static struct dt_rec *osd_it_rec(const struct lu_env *env,
1950                                  const struct dt_it *di)
1951 {
1952         struct osd_it *it = (struct osd_it *)di;
1953
1954         return (struct dt_rec *)iam_it_rec_get(&it->oi_it);
1955 }
1956
1957 static __u32 osd_it_store(const struct lu_env *env, const struct dt_it *di)
1958 {
1959         struct osd_it *it = (struct osd_it *)di;
1960
1961         return iam_it_store(&it->oi_it);
1962 }
1963
1964 static int osd_it_load(const struct lu_env *env,
1965                        const struct dt_it *di, __u32 hash)
1966 {
1967         struct osd_it *it = (struct osd_it *)di;
1968
1969         return iam_it_load(&it->oi_it, hash);
1970 }
1971
1972 static struct dt_index_operations osd_index_ops = {
1973         .dio_lookup = osd_index_lookup,
1974         .dio_insert = osd_index_insert,
1975         .dio_delete = osd_index_delete,
1976         .dio_it     = {
1977                 .init     = osd_it_init,
1978                 .fini     = osd_it_fini,
1979                 .get      = osd_it_get,
1980                 .put      = osd_it_put,
1981                 .del      = osd_it_del,
1982                 .next     = osd_it_next,
1983                 .key      = osd_it_key,
1984                 .key_size = osd_it_key_size,
1985                 .rec      = osd_it_rec,
1986                 .store    = osd_it_store,
1987                 .load     = osd_it_load
1988         }
1989 };
1990
1991 static int osd_index_compat_delete(const struct lu_env *env,
1992                                    struct dt_object *dt,
1993                                    const struct dt_key *key,
1994                                    struct thandle *handle,
1995                                    struct lustre_capa *capa)
1996 {
1997         struct osd_object *obj = osd_dt_obj(dt);
1998
1999         LASSERT(handle != NULL);
2000         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
2001         ENTRY;
2002
2003 #if 0
2004         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
2005                 RETURN(-EACCES);
2006 #endif
2007
2008         RETURN(-EOPNOTSUPP);
2009 }
2010
2011 /*
2012  * Compatibility index operations.
2013  */
2014
2015
2016 static void osd_build_pack(const struct lu_env *env, struct osd_device *osd,
2017                            struct dentry *dentry, struct lu_fid_pack *pack)
2018 {
2019         struct inode  *inode = dentry->d_inode;
2020         struct lu_fid *fid   = &osd_oti_get(env)->oti_fid;
2021
2022         lu_igif_build(fid, inode->i_ino, inode->i_generation);
2023         fid_cpu_to_be(fid, fid);
2024         pack->fp_len = sizeof *fid + 1;
2025         memcpy(pack->fp_area, fid, sizeof *fid);
2026 }
2027
2028 static int osd_index_compat_lookup(const struct lu_env *env,
2029                                    struct dt_object *dt,
2030                                    struct dt_rec *rec, const struct dt_key *key,
2031                                    struct lustre_capa *capa)
2032 {
2033         struct osd_object *obj = osd_dt_obj(dt);
2034
2035         struct osd_device      *osd  = osd_obj2dev(obj);
2036         struct osd_thread_info *info = osd_oti_get(env);
2037         struct inode           *dir;
2038
2039         int result;
2040
2041         /*
2042          * XXX temporary solution.
2043          */
2044         struct dentry *dentry;
2045         struct dentry *parent;
2046
2047         LASSERT(osd_invariant(obj));
2048         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
2049         LASSERT(osd_has_index(obj));
2050
2051         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
2052                 return -EACCES;
2053
2054         info->oti_str.name = (const char *)key;
2055         info->oti_str.len  = strlen((const char *)key);
2056
2057         dir = obj->oo_inode;
2058         LASSERT(dir->i_op != NULL && dir->i_op->lookup != NULL);
2059
2060         parent = d_alloc_root(dir);
2061         if (parent == NULL)
2062                 return -ENOMEM;
2063         igrab(dir);
2064         dentry = d_alloc(parent, &info->oti_str);
2065         if (dentry != NULL) {
2066                 struct dentry *d;
2067
2068                 /*
2069                  * XXX passing NULL for nameidata should work for
2070                  * ext3/ldiskfs.
2071                  */
2072                 d = dir->i_op->lookup(dir, dentry, NULL);
2073                 if (d == NULL) {
2074                         /*
2075                          * normal case, result is in @dentry.
2076                          */
2077                         if (dentry->d_inode != NULL) {
2078                                 osd_build_pack(env, osd, dentry,
2079                                                (struct lu_fid_pack *)rec);
2080                                 result = 0;
2081                         } else
2082                                 result = -ENOENT;
2083                  } else {
2084                         /* What? Disconnected alias? Ppheeeww... */
2085                         CERROR("Aliasing where not expected\n");
2086                         result = -EIO;
2087                         dput(d);
2088                 }
2089                 dput(dentry);
2090         } else
2091                 result = -ENOMEM;
2092         dput(parent);
2093         LASSERT(osd_invariant(obj));
2094         return result;
2095 }
2096
2097 static int osd_add_rec(struct osd_thread_info *info, struct osd_device *dev,
2098                        struct inode *dir, struct inode *inode, const char *name)
2099 {
2100         struct dentry *old;
2101         struct dentry *new;
2102         struct dentry *parent;
2103
2104         int result;
2105
2106         info->oti_str.name = name;
2107         info->oti_str.len  = strlen(name);
2108
2109         LASSERT(atomic_read(&dir->i_count) > 0);
2110         result = -ENOMEM;
2111         old = d_alloc(dev->od_obj_area, &info->oti_str);
2112         if (old != NULL) {
2113                 d_instantiate(old, inode);
2114                 igrab(inode);
2115                 LASSERT(atomic_read(&dir->i_count) > 0);
2116                 parent = d_alloc_root(dir);
2117                 if (parent != NULL) {
2118                         igrab(dir);
2119                         LASSERT(atomic_read(&dir->i_count) > 1);
2120                         new = d_alloc(parent, &info->oti_str);
2121                         LASSERT(atomic_read(&dir->i_count) > 1);
2122                         if (new != NULL) {
2123                                 LASSERT(atomic_read(&dir->i_count) > 1);
2124                                 result = dir->i_op->link(old, dir, new);
2125                                 LASSERT(atomic_read(&dir->i_count) > 1);
2126                                 dput(new);
2127                                 LASSERT(atomic_read(&dir->i_count) > 1);
2128                         }
2129                         LASSERT(atomic_read(&dir->i_count) > 1);
2130                         dput(parent);
2131                         LASSERT(atomic_read(&dir->i_count) > 0);
2132                 }
2133                 dput(old);
2134         }
2135         LASSERT(atomic_read(&dir->i_count) > 0);
2136         return result;
2137 }
2138
2139
2140 /*
2141  * XXX Temporary stuff.
2142  */
2143 static int osd_index_compat_insert(const struct lu_env *env,
2144                                    struct dt_object *dt,
2145                                    const struct dt_rec *rec,
2146                                    const struct dt_key *key, struct thandle *th,
2147                                    struct lustre_capa *capa)
2148 {
2149         struct osd_object     *obj = osd_dt_obj(dt);
2150
2151         const char          *name = (const char *)key;
2152
2153         struct lu_device    *ludev = dt->do_lu.lo_dev;
2154         struct lu_object    *luch;
2155
2156         struct osd_thread_info   *info = osd_oti_get(env);
2157         const struct lu_fid_pack *pack  = (const struct lu_fid_pack *)rec;
2158         struct lu_fid            *fid   = &osd_oti_get(env)->oti_fid;
2159
2160         int result;
2161
2162         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
2163         LASSERT(osd_invariant(obj));
2164         LASSERT(th != NULL);
2165
2166         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
2167                 return -EACCES;
2168
2169         fid_unpack(pack, fid);
2170         luch = lu_object_find(env, ludev->ld_site, fid);
2171         if (!IS_ERR(luch)) {
2172                 if (lu_object_exists(luch)) {
2173                         struct osd_object *child;
2174
2175                         child = osd_obj(lu_object_locate(luch->lo_header,
2176                                                          ludev->ld_type));
2177                         if (child != NULL)
2178                                 result = osd_add_rec(info, osd_obj2dev(obj),
2179                                                      obj->oo_inode,
2180                                                      child->oo_inode, name);
2181                         else {
2182                                 CERROR("No osd slice.\n");
2183                                 result = -ENOENT;
2184                         }
2185                         LASSERT(osd_invariant(obj));
2186                         LASSERT(osd_invariant(child));
2187                 } else {
2188                         CERROR("Sorry.\n");
2189                         result = -ENOENT;
2190                 }
2191                 lu_object_put(env, luch);
2192         } else
2193                 result = PTR_ERR(luch);
2194         LASSERT(osd_invariant(obj));
2195         return result;
2196 }
2197
2198 static struct dt_index_operations osd_index_compat_ops = {
2199         .dio_lookup = osd_index_compat_lookup,
2200         .dio_insert = osd_index_compat_insert,
2201         .dio_delete = osd_index_compat_delete
2202 };
2203
2204 /*
2205  * OSD device type methods
2206  */
2207 static int osd_type_init(struct lu_device_type *t)
2208 {
2209         LU_CONTEXT_KEY_INIT(&osd_key);
2210         return lu_context_key_register(&osd_key);
2211 }
2212
2213 static void osd_type_fini(struct lu_device_type *t)
2214 {
2215         lu_context_key_degister(&osd_key);
2216 }
2217
2218 static struct lu_context_key osd_key = {
2219         .lct_tags = LCT_DT_THREAD | LCT_MD_THREAD,
2220         .lct_init = osd_key_init,
2221         .lct_fini = osd_key_fini,
2222         .lct_exit = osd_key_exit
2223 };
2224
2225 static void *osd_key_init(const struct lu_context *ctx,
2226                           struct lu_context_key *key)
2227 {
2228         struct osd_thread_info *info;
2229
2230         OBD_ALLOC_PTR(info);
2231         if (info != NULL)
2232                 info->oti_env = container_of(ctx, struct lu_env, le_ctx);
2233         else
2234                 info = ERR_PTR(-ENOMEM);
2235         return info;
2236 }
2237
2238 static void osd_key_fini(const struct lu_context *ctx,
2239                          struct lu_context_key *key, void *data)
2240 {
2241         struct osd_thread_info *info = data;
2242         OBD_FREE_PTR(info);
2243 }
2244
2245 static void osd_key_exit(const struct lu_context *ctx,
2246                          struct lu_context_key *key, void *data)
2247 {
2248 #if OSD_COUNTERS
2249         struct osd_thread_info *info = data;
2250
2251         LASSERT(info->oti_r_locks == 0);
2252         LASSERT(info->oti_w_locks == 0);
2253         LASSERT(info->oti_txns    == 0);
2254 #endif
2255 }
2256
2257 static int osd_device_init(const struct lu_env *env, struct lu_device *d,
2258                            const char *name, struct lu_device *next)
2259 {
2260         return lu_env_init(&osd_dev(d)->od_env_for_commit, NULL, LCT_MD_THREAD);
2261 }
2262
2263 static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
2264 {
2265         struct osd_thread_info *info = osd_oti_get(env);
2266         ENTRY;
2267         if (o->od_obj_area != NULL) {
2268                 dput(o->od_obj_area);
2269                 o->od_obj_area = NULL;
2270         }
2271         osd_oi_fini(info, &o->od_oi);
2272
2273         RETURN(0);
2274 }
2275
2276 static int osd_mount(const struct lu_env *env,
2277                      struct osd_device *o, struct lustre_cfg *cfg)
2278 {
2279         struct lustre_mount_info *lmi;
2280         const char               *dev  = lustre_cfg_string(cfg, 0);
2281         struct osd_thread_info   *info = osd_oti_get(env);
2282         int result;
2283
2284         ENTRY;
2285
2286         if (o->od_mount != NULL) {
2287                 CERROR("Already mounted (%s)\n", dev);
2288                 RETURN(-EEXIST);
2289         }
2290
2291         /* get mount */
2292         lmi = server_get_mount(dev);
2293         if (lmi == NULL) {
2294                 CERROR("Cannot get mount info for %s!\n", dev);
2295                 RETURN(-EFAULT);
2296         }
2297
2298         LASSERT(lmi != NULL);
2299         /* save lustre_mount_info in dt_device */
2300         o->od_mount = lmi;
2301
2302         result = osd_oi_init(info, &o->od_oi, &o->od_dt_dev);
2303         if (result == 0) {
2304                 struct dentry *d;
2305
2306                 d = simple_mkdir(osd_sb(o)->s_root, "*OBJ-TEMP*", 0777, 1);
2307                 if (!IS_ERR(d)) {
2308                         o->od_obj_area = d;
2309                 } else
2310                         result = PTR_ERR(d);
2311         }
2312         if (result != 0)
2313                 osd_shutdown(env, o);
2314         RETURN(result);
2315 }
2316
2317 static struct lu_device *osd_device_fini(const struct lu_env *env,
2318                                          struct lu_device *d)
2319 {
2320         ENTRY;
2321
2322         shrink_dcache_sb(osd_sb(osd_dev(d)));
2323         osd_sync(env, lu2dt_dev(d));
2324
2325         if (osd_dev(d)->od_mount)
2326                 server_put_mount(osd_dev(d)->od_mount->lmi_name,
2327                                  osd_dev(d)->od_mount->lmi_mnt);
2328         osd_dev(d)->od_mount = NULL;
2329
2330         lu_env_fini(&osd_dev(d)->od_env_for_commit);
2331         RETURN(NULL);
2332 }
2333
2334 static struct lu_device *osd_device_alloc(const struct lu_env *env,
2335                                           struct lu_device_type *t,
2336                                           struct lustre_cfg *cfg)
2337 {
2338         struct lu_device  *l;
2339         struct osd_device *o;
2340
2341         OBD_ALLOC_PTR(o);
2342         if (o != NULL) {
2343                 int result;
2344
2345                 result = dt_device_init(&o->od_dt_dev, t);
2346                 if (result == 0) {
2347                         l = osd2lu_dev(o);
2348                         l->ld_ops = &osd_lu_ops;
2349                         o->od_dt_dev.dd_ops = &osd_dt_ops;
2350                         spin_lock_init(&o->od_osfs_lock);
2351                         o->od_osfs_age = cfs_time_shift_64(-1000);
2352                         o->od_capa_hash = init_capa_hash();
2353                         if (o->od_capa_hash == NULL)
2354                                 l = ERR_PTR(-ENOMEM);
2355                 } else
2356                         l = ERR_PTR(result);
2357         } else
2358                 l = ERR_PTR(-ENOMEM);
2359         return l;
2360 }
2361
2362 static void osd_device_free(const struct lu_env *env, struct lu_device *d)
2363 {
2364         struct osd_device *o = osd_dev(d);
2365
2366         cleanup_capa_hash(o->od_capa_hash);
2367         dt_device_fini(&o->od_dt_dev);
2368         OBD_FREE_PTR(o);
2369 }
2370
2371 static int osd_process_config(const struct lu_env *env,
2372                               struct lu_device *d, struct lustre_cfg *cfg)
2373 {
2374         struct osd_device *o = osd_dev(d);
2375         int err;
2376         ENTRY;
2377
2378         switch(cfg->lcfg_command) {
2379         case LCFG_SETUP:
2380                 err = osd_mount(env, o, cfg);
2381                 break;
2382         case LCFG_CLEANUP:
2383                 err = osd_shutdown(env, o);
2384                 break;
2385         default:
2386                 err = -ENOTTY;
2387         }
2388
2389         RETURN(err);
2390 }
2391 extern void ldiskfs_orphan_cleanup (struct super_block * sb,
2392                                     struct ldiskfs_super_block * es);
2393
2394 static int osd_recovery_complete(const struct lu_env *env,
2395                                  struct lu_device *d)
2396 {
2397         struct osd_device *o = osd_dev(d);
2398         ENTRY;
2399         /* TODO: orphans handling */
2400         ldiskfs_orphan_cleanup(osd_sb(o), LDISKFS_SB(osd_sb(o))->s_es);
2401         RETURN(0);
2402 }
2403
2404 static struct inode *osd_iget(struct osd_thread_info *info,
2405                               struct osd_device *dev,
2406                               const struct osd_inode_id *id)
2407 {
2408         struct inode *inode;
2409
2410         inode = iget(osd_sb(dev), id->oii_ino);
2411         if (inode == NULL) {
2412                 CERROR("no inode\n");
2413                 inode = ERR_PTR(-EACCES);
2414         } else if (is_bad_inode(inode)) {
2415                 CERROR("bad inode\n");
2416                 iput(inode);
2417                 inode = ERR_PTR(-ENOENT);
2418         } else if (inode->i_generation != id->oii_gen) {
2419                 CERROR("stale inode\n");
2420                 iput(inode);
2421                 inode = ERR_PTR(-ESTALE);
2422         }
2423
2424         return inode;
2425
2426 }
2427
2428 static int osd_fid_lookup(const struct lu_env *env,
2429                           struct osd_object *obj, const struct lu_fid *fid)
2430 {
2431         struct osd_thread_info *info;
2432         struct lu_device       *ldev = obj->oo_dt.do_lu.lo_dev;
2433         struct osd_device      *dev;
2434         struct osd_inode_id    *id;
2435         struct osd_oi          *oi;
2436         struct inode           *inode;
2437         int                     result;
2438
2439         LASSERT(osd_invariant(obj));
2440         LASSERT(obj->oo_inode == NULL);
2441         LASSERT(fid_is_sane(fid));
2442         /*
2443          * This assertion checks that osd layer sees only local
2444          * fids. Unfortunately it is somewhat expensive (does a
2445          * cache-lookup). Disabling it for production/acceptance-testing.
2446          */
2447         LASSERT(1 || fid_is_local(ldev->ld_site, fid));
2448
2449         ENTRY;
2450
2451         info = osd_oti_get(env);
2452         dev  = osd_dev(ldev);
2453         id   = &info->oti_id;
2454         oi   = &dev->od_oi;
2455
2456         if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT))
2457                 RETURN(-ENOENT);
2458
2459         result = osd_oi_lookup(info, oi, fid, id);
2460         if (result == 0) {
2461                 inode = osd_iget(info, dev, id);
2462                 if (!IS_ERR(inode)) {
2463                         obj->oo_inode = inode;
2464                         LASSERT(obj->oo_inode->i_sb == osd_sb(dev));
2465                         result = 0;
2466                 } else
2467                         /*
2468                          * If fid wasn't found in oi, inode-less object is
2469                          * created, for which lu_object_exists() returns
2470                          * false. This is used in a (frequent) case when
2471                          * objects are created as locking anchors or
2472                          * place holders for objects yet to be created.
2473                          */
2474                         result = PTR_ERR(inode);
2475         } else if (result == -ENOENT)
2476                 result = 0;
2477         LASSERT(osd_invariant(obj));
2478         RETURN(result);
2479 }
2480
2481 static void osd_inode_getattr(const struct lu_env *env,
2482                               struct inode *inode, struct lu_attr *attr)
2483 {
2484         attr->la_valid      |= LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
2485                                LA_SIZE | LA_BLOCKS | LA_UID | LA_GID |
2486                                LA_FLAGS | LA_NLINK | LA_RDEV | LA_BLKSIZE;
2487
2488         attr->la_atime      = LTIME_S(inode->i_atime);
2489         attr->la_mtime      = LTIME_S(inode->i_mtime);
2490         attr->la_ctime      = LTIME_S(inode->i_ctime);
2491         attr->la_mode       = inode->i_mode;
2492         attr->la_size       = i_size_read(inode);
2493         attr->la_blocks     = inode->i_blocks;
2494         attr->la_uid        = inode->i_uid;
2495         attr->la_gid        = inode->i_gid;
2496         attr->la_flags      = LDISKFS_I(inode)->i_flags;
2497         attr->la_nlink      = inode->i_nlink;
2498         attr->la_rdev       = inode->i_rdev;
2499         attr->la_blksize    = inode->i_blksize;
2500 }
2501
2502 /*
2503  * Helpers.
2504  */
2505
2506 static int lu_device_is_osd(const struct lu_device *d)
2507 {
2508         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &osd_lu_ops);
2509 }
2510
2511 static struct osd_object *osd_obj(const struct lu_object *o)
2512 {
2513         LASSERT(lu_device_is_osd(o->lo_dev));
2514         return container_of0(o, struct osd_object, oo_dt.do_lu);
2515 }
2516
2517 static struct osd_device *osd_dt_dev(const struct dt_device *d)
2518 {
2519         LASSERT(lu_device_is_osd(&d->dd_lu_dev));
2520         return container_of0(d, struct osd_device, od_dt_dev);
2521 }
2522
2523 static struct osd_device *osd_dev(const struct lu_device *d)
2524 {
2525         LASSERT(lu_device_is_osd(d));
2526         return osd_dt_dev(container_of0(d, struct dt_device, dd_lu_dev));
2527 }
2528
2529 static struct osd_object *osd_dt_obj(const struct dt_object *d)
2530 {
2531         return osd_obj(&d->do_lu);
2532 }
2533
2534 static struct osd_device *osd_obj2dev(const struct osd_object *o)
2535 {
2536         return osd_dev(o->oo_dt.do_lu.lo_dev);
2537 }
2538
2539 static struct lu_device *osd2lu_dev(struct osd_device *osd)
2540 {
2541         return &osd->od_dt_dev.dd_lu_dev;
2542 }
2543
2544 static struct super_block *osd_sb(const struct osd_device *dev)
2545 {
2546         return dev->od_mount->lmi_mnt->mnt_sb;
2547 }
2548
2549 static journal_t *osd_journal(const struct osd_device *dev)
2550 {
2551         return LDISKFS_SB(osd_sb(dev))->s_journal;
2552 }
2553
2554 static int osd_has_index(const struct osd_object *obj)
2555 {
2556         return obj->oo_dt.do_index_ops != NULL;
2557 }
2558
2559 static int osd_object_invariant(const struct lu_object *l)
2560 {
2561         return osd_invariant(osd_obj(l));
2562 }
2563
2564 static struct lu_object_operations osd_lu_obj_ops = {
2565         .loo_object_init      = osd_object_init,
2566         .loo_object_delete    = osd_object_delete,
2567         .loo_object_release   = osd_object_release,
2568         .loo_object_free      = osd_object_free,
2569         .loo_object_print     = osd_object_print,
2570         .loo_object_invariant = osd_object_invariant
2571 };
2572
2573 static struct lu_device_operations osd_lu_ops = {
2574         .ldo_object_alloc      = osd_object_alloc,
2575         .ldo_process_config    = osd_process_config,
2576         .ldo_recovery_complete = osd_recovery_complete
2577 };
2578
2579 static struct lu_device_type_operations osd_device_type_ops = {
2580         .ldto_init = osd_type_init,
2581         .ldto_fini = osd_type_fini,
2582
2583         .ldto_device_alloc = osd_device_alloc,
2584         .ldto_device_free  = osd_device_free,
2585
2586         .ldto_device_init    = osd_device_init,
2587         .ldto_device_fini    = osd_device_fini
2588 };
2589
2590 static struct lu_device_type osd_device_type = {
2591         .ldt_tags     = LU_DEVICE_DT,
2592         .ldt_name     = LUSTRE_OSD_NAME,
2593         .ldt_ops      = &osd_device_type_ops,
2594         .ldt_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
2595 };
2596
2597 /*
2598  * lprocfs legacy support.
2599  */
2600 static struct lprocfs_vars lprocfs_osd_obd_vars[] = {
2601         { 0 }
2602 };
2603
2604 static struct lprocfs_vars lprocfs_osd_module_vars[] = {
2605         { 0 }
2606 };
2607
2608 static struct obd_ops osd_obd_device_ops = {
2609         .o_owner = THIS_MODULE
2610 };
2611
2612 LPROCFS_INIT_VARS(osd, lprocfs_osd_module_vars, lprocfs_osd_obd_vars);
2613
2614 static int __init osd_mod_init(void)
2615 {
2616         struct lprocfs_static_vars lvars;
2617
2618         lprocfs_init_vars(osd, &lvars);
2619         return class_register_type(&osd_obd_device_ops, NULL, lvars.module_vars,
2620                                    LUSTRE_OSD_NAME, &osd_device_type);
2621 }
2622
2623 static void __exit osd_mod_exit(void)
2624 {
2625         class_unregister_type(LUSTRE_OSD_NAME);
2626 }
2627
2628 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2629 MODULE_DESCRIPTION("Lustre Object Storage Device ("LUSTRE_OSD_NAME")");
2630 MODULE_LICENSE("GPL");
2631
2632 cfs_module(osd, "0.0.2", osd_mod_init, osd_mod_exit);