Whamcloud - gitweb
Augment ->do_{read,write}_lock() prototypes with a `role' parameter indicating
[fs/lustre-release.git] / lustre / osd / osd_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/osd/osd_handler.c
37  *
38  * Top-level entry points into osd module
39  *
40  * Author: Nikita Danilov <nikita@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49
50 /* LUSTRE_VERSION_CODE */
51 #include <lustre_ver.h>
52 /* prerequisite for linux/xattr.h */
53 #include <linux/types.h>
54 /* prerequisite for linux/xattr.h */
55 #include <linux/fs.h>
56 /* XATTR_{REPLACE,CREATE} */
57 #include <linux/xattr.h>
58 /*
59  * XXX temporary stuff: direct access to ldiskfs/jdb. Interface between osd
60  * and file system is not yet specified.
61  */
62 /* handle_t, journal_start(), journal_stop() */
63 #include <linux/jbd.h>
64 /* LDISKFS_SB() */
65 #include <linux/ldiskfs_fs.h>
66 #include <linux/ldiskfs_jbd.h>
67 /* simple_mkdir() */
68 #include <lvfs.h>
69
70 /*
71  * struct OBD_{ALLOC,FREE}*()
72  * OBD_FAIL_CHECK
73  */
74 #include <obd_support.h>
75 /* struct ptlrpc_thread */
76 #include <lustre_net.h>
77
78 /* fid_is_local() */
79 #include <lustre_fid.h>
80 #include <linux/lustre_iam.h>
81
82 #include "osd_internal.h"
83 #include "osd_igif.h"
84
85 struct osd_directory {
86         struct iam_container od_container;
87         struct iam_descr     od_descr;
88         struct semaphore     od_sem;
89 };
90
91 struct osd_object {
92         struct dt_object       oo_dt;
93         /**
94          * Inode for file system object represented by this osd_object. This
95          * inode is pinned for the whole duration of lu_object life.
96          *
97          * Not modified concurrently (either setup early during object
98          * creation, or assigned by osd_object_create() under write lock).
99          */
100         struct inode          *oo_inode;
101         struct rw_semaphore    oo_sem;
102         struct osd_directory  *oo_dir;
103         /** protects inode attributes. */
104         spinlock_t             oo_guard;
105         const struct lu_env   *oo_owner;
106 #ifdef CONFIG_LOCKDEP
107         struct lockdep_map     oo_dep_map;
108 #endif
109 };
110
111 static int   osd_root_get      (const struct lu_env *env,
112                                 struct dt_device *dev, struct lu_fid *f);
113
114 static int   lu_device_is_osd  (const struct lu_device *d);
115 static void  osd_mod_exit      (void) __exit;
116 static int   osd_mod_init      (void) __init;
117 static int   osd_type_init     (struct lu_device_type *t);
118 static void  osd_type_fini     (struct lu_device_type *t);
119 static int   osd_object_init   (const struct lu_env *env,
120                                 struct lu_object *l);
121 static void  osd_object_release(const struct lu_env *env,
122                                 struct lu_object *l);
123 static int   osd_object_print  (const struct lu_env *env, void *cookie,
124                                 lu_printer_t p, const struct lu_object *o);
125 static struct lu_device *osd_device_free   (const struct lu_env *env,
126                                 struct lu_device *m);
127 static void *osd_key_init      (const struct lu_context *ctx,
128                                 struct lu_context_key *key);
129 static void  osd_key_fini      (const struct lu_context *ctx,
130                                 struct lu_context_key *key, void *data);
131 static void  osd_key_exit      (const struct lu_context *ctx,
132                                 struct lu_context_key *key, void *data);
133 static int   osd_has_index     (const struct osd_object *obj);
134 static void  osd_object_init0  (struct osd_object *obj);
135 static int   osd_device_init   (const struct lu_env *env,
136                                 struct lu_device *d, const char *,
137                                 struct lu_device *);
138 static int   osd_fid_lookup    (const struct lu_env *env,
139                                 struct osd_object *obj,
140                                 const struct lu_fid *fid);
141 static void  osd_inode_getattr (const struct lu_env *env,
142                                 struct inode *inode, struct lu_attr *attr);
143 static void  osd_inode_setattr (const struct lu_env *env,
144                                 struct inode *inode, const struct lu_attr *attr);
145 static int   osd_param_is_sane (const struct osd_device *dev,
146                                 const struct txn_param *param);
147 static int   osd_index_lookup  (const struct lu_env *env,
148                                 struct dt_object *dt,
149                                 struct dt_rec *rec, const struct dt_key *key,
150                                 struct lustre_capa *capa);
151 static int   osd_index_insert  (const struct lu_env *env,
152                                 struct dt_object *dt,
153                                 const struct dt_rec *rec,
154                                 const struct dt_key *key,
155                                 struct thandle *handle,
156                                 struct lustre_capa *capa);
157 static int   osd_index_delete  (const struct lu_env *env,
158                                 struct dt_object *dt, const struct dt_key *key,
159                                 struct thandle *handle,
160                                 struct lustre_capa *capa);
161 static int   osd_index_probe   (const struct lu_env *env,
162                                 struct osd_object *o,
163                                 const struct dt_index_features *feat);
164 static int   osd_index_try     (const struct lu_env *env,
165                                 struct dt_object *dt,
166                                 const struct dt_index_features *feat);
167 static void  osd_index_fini    (struct osd_object *o);
168
169 static void  osd_it_fini       (const struct lu_env *env, struct dt_it *di);
170 static int   osd_it_get        (const struct lu_env *env,
171                                 struct dt_it *di, const struct dt_key *key);
172 static void  osd_it_put        (const struct lu_env *env, struct dt_it *di);
173 static int   osd_it_next       (const struct lu_env *env, struct dt_it *di);
174 static int   osd_it_del        (const struct lu_env *env, struct dt_it *di,
175                                 struct thandle *th);
176 static int   osd_it_key_size   (const struct lu_env *env,
177                                 const struct dt_it *di);
178 static void  osd_conf_get      (const struct lu_env *env,
179                                 const struct dt_device *dev,
180                                 struct dt_device_param *param);
181 static void  osd_trans_stop    (const struct lu_env *env,
182                                 struct thandle *th);
183 static int   osd_object_is_root(const struct osd_object *obj);
184
185 static struct osd_object  *osd_obj          (const struct lu_object *o);
186 static struct osd_device  *osd_dev          (const struct lu_device *d);
187 static struct osd_device  *osd_dt_dev       (const struct dt_device *d);
188 static struct osd_object  *osd_dt_obj       (const struct dt_object *d);
189 static struct osd_device  *osd_obj2dev      (const struct osd_object *o);
190 static struct lu_device   *osd2lu_dev       (struct osd_device *osd);
191 static struct lu_device   *osd_device_fini  (const struct lu_env *env,
192                                              struct lu_device *d);
193 static struct lu_device   *osd_device_alloc (const struct lu_env *env,
194                                              struct lu_device_type *t,
195                                              struct lustre_cfg *cfg);
196 static struct lu_object   *osd_object_alloc (const struct lu_env *env,
197                                              const struct lu_object_header *hdr,
198                                              struct lu_device *d);
199 static struct inode       *osd_iget         (struct osd_thread_info *info,
200                                              struct osd_device *dev,
201                                              const struct osd_inode_id *id);
202 static struct super_block *osd_sb           (const struct osd_device *dev);
203 static struct dt_it       *osd_it_init      (const struct lu_env *env,
204                                              struct dt_object *dt, int wable,
205                                              struct lustre_capa *capa);
206 static struct dt_key      *osd_it_key       (const struct lu_env *env,
207                                              const struct dt_it *di);
208 static struct dt_rec      *osd_it_rec       (const struct lu_env *env,
209                                              const struct dt_it *di);
210 static struct timespec    *osd_inode_time   (const struct lu_env *env,
211                                              struct inode *inode,
212                                              __u64 seconds);
213 static struct thandle     *osd_trans_start  (const struct lu_env *env,
214                                              struct dt_device *d,
215                                              struct txn_param *p);
216 static journal_t          *osd_journal      (const struct osd_device *dev);
217
218 static struct lu_device_type_operations osd_device_type_ops;
219 static struct lu_device_type            osd_device_type;
220 static struct lu_object_operations      osd_lu_obj_ops;
221 static struct obd_ops                   osd_obd_device_ops;
222 static struct lu_device_operations      osd_lu_ops;
223 static struct lu_context_key            osd_key;
224 static struct dt_object_operations      osd_obj_ops;
225 static struct dt_body_operations        osd_body_ops;
226 static struct dt_index_operations       osd_index_ops;
227 static struct dt_index_operations       osd_index_compat_ops;
228
229 struct osd_thandle {
230         struct thandle          ot_super;
231         handle_t               *ot_handle;
232         struct journal_callback ot_jcb;
233 };
234
235 /*
236  * Invariants, assertions.
237  */
238
239 /*
240  * XXX: do not enable this, until invariant checking code is made thread safe
241  * in the face of pdirops locking.
242  */
243 #define OSD_INVARIANT_CHECKS (0)
244
245 #if OSD_INVARIANT_CHECKS
246 static int osd_invariant(const struct osd_object *obj)
247 {
248         return
249                 obj != NULL &&
250                 ergo(obj->oo_inode != NULL,
251                      obj->oo_inode->i_sb == osd_sb(osd_obj2dev(obj)) &&
252                      atomic_read(&obj->oo_inode->i_count) > 0) &&
253                 ergo(obj->oo_dir != NULL &&
254                      obj->oo_dir->od_conationer.ic_object != NULL,
255                      obj->oo_dir->od_conationer.ic_object == obj->oo_inode);
256 }
257 #else
258 #define osd_invariant(obj) (1)
259 #endif
260
261 static inline struct osd_thread_info *osd_oti_get(const struct lu_env *env)
262 {
263         return lu_context_key_get(&env->le_ctx, &osd_key);
264 }
265
266 /*
267  * Concurrency: doesn't matter
268  */
269 static int osd_read_locked(const struct lu_env *env, struct osd_object *o)
270 {
271         return osd_oti_get(env)->oti_r_locks > 0;
272 }
273
274 /*
275  * Concurrency: doesn't matter
276  */
277 static int osd_write_locked(const struct lu_env *env, struct osd_object *o)
278 {
279         struct osd_thread_info *oti = osd_oti_get(env);
280         return oti->oti_w_locks > 0 && o->oo_owner == env;
281 }
282
283 /*
284  * Concurrency: doesn't access mutable data
285  */
286 static int osd_root_get(const struct lu_env *env,
287                         struct dt_device *dev, struct lu_fid *f)
288 {
289         struct inode *inode;
290
291         inode = osd_sb(osd_dt_dev(dev))->s_root->d_inode;
292         lu_igif_build(f, inode->i_ino, inode->i_generation);
293         return 0;
294 }
295
296 /*
297  * OSD object methods.
298  */
299
300 /*
301  * Concurrency: no concurrent access is possible that early in object
302  * life-cycle.
303  */
304 static struct lu_object *osd_object_alloc(const struct lu_env *env,
305                                           const struct lu_object_header *hdr,
306                                           struct lu_device *d)
307 {
308         struct osd_object *mo;
309
310         OBD_ALLOC_PTR(mo);
311         if (mo != NULL) {
312                 struct lu_object *l;
313
314                 l = &mo->oo_dt.do_lu;
315                 dt_object_init(&mo->oo_dt, NULL, d);
316                 mo->oo_dt.do_ops = &osd_obj_ops;
317                 l->lo_ops = &osd_lu_obj_ops;
318                 init_rwsem(&mo->oo_sem);
319                 spin_lock_init(&mo->oo_guard);
320                 return l;
321         } else
322                 return NULL;
323 }
324
325 /*
326  * Concurrency: shouldn't matter.
327  */
328 static void osd_object_init0(struct osd_object *obj)
329 {
330         LASSERT(obj->oo_inode != NULL);
331         obj->oo_dt.do_body_ops = &osd_body_ops;
332         obj->oo_dt.do_lu.lo_header->loh_attr |=
333                 (LOHA_EXISTS | (obj->oo_inode->i_mode & S_IFMT));
334 }
335
336 /*
337  * Concurrency: no concurrent access is possible that early in object
338  * life-cycle.
339  */
340 static int osd_object_init(const struct lu_env *env, struct lu_object *l)
341 {
342         struct osd_object *obj = osd_obj(l);
343         int result;
344
345         LASSERT(osd_invariant(obj));
346
347         result = osd_fid_lookup(env, obj, lu_object_fid(l));
348         if (result == 0) {
349                 if (obj->oo_inode != NULL)
350                         osd_object_init0(obj);
351         }
352         LASSERT(osd_invariant(obj));
353         return result;
354 }
355
356 /*
357  * Concurrency: no concurrent access is possible that late in object
358  * life-cycle.
359  */
360 static void osd_object_free(const struct lu_env *env, struct lu_object *l)
361 {
362         struct osd_object *obj = osd_obj(l);
363
364         LASSERT(osd_invariant(obj));
365
366         dt_object_fini(&obj->oo_dt);
367         OBD_FREE_PTR(obj);
368 }
369
370 static struct iam_path_descr *osd_ipd_get(const struct lu_env *env,
371                                           const struct iam_container *bag)
372 {
373         return bag->ic_descr->id_ops->id_ipd_alloc(bag,
374                                                    osd_oti_get(env)->oti_ipd);
375 }
376
377 static void osd_ipd_put(const struct lu_env *env,
378                         const struct iam_container *bag,
379                         struct iam_path_descr *ipd)
380 {
381         bag->ic_descr->id_ops->id_ipd_free(ipd);
382 }
383
384 /*
385  * Concurrency: no concurrent access is possible that late in object
386  * life-cycle.
387  */
388 static void osd_index_fini(struct osd_object *o)
389 {
390         struct iam_container *bag;
391
392         if (o->oo_dir != NULL) {
393                 bag = &o->oo_dir->od_container;
394                 if (o->oo_inode != NULL) {
395                         if (bag->ic_object == o->oo_inode)
396                                 iam_container_fini(bag);
397                 }
398                 OBD_FREE_PTR(o->oo_dir);
399                 o->oo_dir = NULL;
400         }
401 }
402
403 /*
404  * Concurrency: no concurrent access is possible that late in object
405  * life-cycle (for all existing callers, that is. New callers have to provide
406  * their own locking.)
407  */
408 static int osd_inode_unlinked(const struct inode *inode)
409 {
410         return inode->i_nlink == 0;
411 }
412
413 enum {
414         OSD_TXN_OI_DELETE_CREDITS    = 20,
415         OSD_TXN_INODE_DELETE_CREDITS = 20
416 };
417
418 /*
419  * Concurrency: no concurrent access is possible that late in object
420  * life-cycle.
421  */
422 static int osd_inode_remove(const struct lu_env *env, struct osd_object *obj)
423 {
424         const struct lu_fid    *fid = lu_object_fid(&obj->oo_dt.do_lu);
425         struct osd_device      *osd = osd_obj2dev(obj);
426         struct osd_thread_info *oti = osd_oti_get(env);
427         struct txn_param       *prm = &oti->oti_txn;
428         struct thandle         *th;
429         int result;
430
431         txn_param_init(prm, OSD_TXN_OI_DELETE_CREDITS + 
432                             OSD_TXN_INODE_DELETE_CREDITS);
433         th = osd_trans_start(env, &osd->od_dt_dev, prm);
434         if (!IS_ERR(th)) {
435                 result = osd_oi_delete(oti, &osd->od_oi, fid, th);
436                 osd_trans_stop(env, th);
437         } else
438                 result = PTR_ERR(th);
439         return result;
440 }
441
442 /*
443  * Called just before object is freed. Releases all resources except for
444  * object itself (that is released by osd_object_free()).
445  *
446  * Concurrency: no concurrent access is possible that late in object
447  * life-cycle.
448  */
449 static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
450 {
451         struct osd_object *obj   = osd_obj(l);
452         struct inode      *inode = obj->oo_inode;
453
454         LASSERT(osd_invariant(obj));
455
456         /*
457          * If object is unlinked remove fid->ino mapping from object index.
458          *
459          * File body will be deleted by iput().
460          */
461
462         osd_index_fini(obj);
463         if (inode != NULL) {
464                 int result;
465
466                 if (osd_inode_unlinked(inode)) {
467                         result = osd_inode_remove(env, obj);
468                         if (result != 0)
469                                 LU_OBJECT_DEBUG(D_ERROR, env, l,
470                                                 "Failed to cleanup: %d\n",
471                                                 result);
472                 }
473                 iput(inode);
474                 obj->oo_inode = NULL;
475         }
476 }
477
478 /*
479  * Concurrency: ->loo_object_release() is called under site spin-lock.
480  */
481 static void osd_object_release(const struct lu_env *env,
482                                struct lu_object *l)
483 {
484         struct osd_object *o = osd_obj(l);
485
486         LASSERT(!lu_object_is_dying(l->lo_header));
487         if (o->oo_inode != NULL && osd_inode_unlinked(o->oo_inode))
488                 set_bit(LU_OBJECT_HEARD_BANSHEE, &l->lo_header->loh_flags);
489 }
490
491 /*
492  * Concurrency: shouldn't matter.
493  */
494 static int osd_object_print(const struct lu_env *env, void *cookie,
495                             lu_printer_t p, const struct lu_object *l)
496 {
497         struct osd_object *o = osd_obj(l);
498         struct iam_descr  *d;
499
500         if (o->oo_dir != NULL)
501                 d = o->oo_dir->od_container.ic_descr;
502         else
503                 d = NULL;
504         return (*p)(env, cookie, LUSTRE_OSD_NAME"-object@%p(i:%p:%lu/%u)[%s]",
505                     o, o->oo_inode,
506                     o->oo_inode ? o->oo_inode->i_ino : 0UL,
507                     o->oo_inode ? o->oo_inode->i_generation : 0,
508                     d ? d->id_ops->id_name : "plain");
509 }
510
511 /*
512  * Concurrency: shouldn't matter.
513  */
514 int osd_statfs(const struct lu_env *env, struct dt_device *d,
515                struct kstatfs *sfs)
516 {
517         struct osd_device *osd = osd_dt_dev(d);
518         struct super_block *sb = osd_sb(osd);
519         int result = 0;
520
521         spin_lock(&osd->od_osfs_lock);
522         /* cache 1 second */
523         if (cfs_time_before_64(osd->od_osfs_age, cfs_time_shift_64(-1))) {
524                 result = ll_do_statfs(sb, &osd->od_kstatfs);
525                 if (likely(result == 0)) /* N.B. statfs can't really fail */
526                         osd->od_osfs_age = cfs_time_current_64();
527         }
528
529         if (likely(result == 0))
530                 *sfs = osd->od_kstatfs; 
531         spin_unlock(&osd->od_osfs_lock);
532
533         return result;
534 }
535
536 /*
537  * Concurrency: doesn't access mutable data.
538  */
539 static void osd_conf_get(const struct lu_env *env,
540                          const struct dt_device *dev,
541                          struct dt_device_param *param)
542 {
543         /*
544          * XXX should be taken from not-yet-existing fs abstraction layer.
545          */
546         param->ddp_max_name_len  = LDISKFS_NAME_LEN;
547         param->ddp_max_nlink     = LDISKFS_LINK_MAX;
548         param->ddp_block_shift   = osd_sb(osd_dt_dev(dev))->s_blocksize_bits;
549 }
550
551 /*
552  * Journal
553  */
554
555 /*
556  * Concurrency: doesn't access mutable data.
557  */
558 static int osd_param_is_sane(const struct osd_device *dev,
559                              const struct txn_param *param)
560 {
561         return param->tp_credits <= osd_journal(dev)->j_max_transaction_buffers;
562 }
563
564 /*
565  * Concurrency: shouldn't matter.
566  */
567 static void osd_trans_commit_cb(struct journal_callback *jcb, int error)
568 {
569         struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb);
570         struct thandle     *th = &oh->ot_super;
571         struct dt_device   *dev = th->th_dev;
572
573         LASSERT(dev != NULL);
574         LASSERT(oh->ot_handle == NULL);
575
576         if (error) {
577                 CERROR("transaction @0x%p commit error: %d\n", th, error);
578         } else {
579                 struct lu_env *env = &osd_dt_dev(dev)->od_env_for_commit;
580                 /*
581                  * This od_env_for_commit is only for commit usage.  see
582                  * "struct dt_device"
583                  */
584                 lu_context_enter(&env->le_ctx);
585                 dt_txn_hook_commit(env, th);
586                 lu_context_exit(&env->le_ctx);
587         }
588
589         lu_device_put(&dev->dd_lu_dev);
590         th->th_dev = NULL;
591
592         lu_context_exit(&th->th_ctx);
593         lu_context_fini(&th->th_ctx);
594         OBD_FREE_PTR(oh);
595 }
596
597 /*
598  * Concurrency: shouldn't matter.
599  */
600 static struct thandle *osd_trans_start(const struct lu_env *env,
601                                        struct dt_device *d,
602                                        struct txn_param *p)
603 {
604         struct osd_device  *dev = osd_dt_dev(d);
605         handle_t           *jh;
606         struct osd_thandle *oh;
607         struct thandle     *th;
608         int hook_res;
609
610         ENTRY;
611
612         hook_res = dt_txn_hook_start(env, d, p);
613         if (hook_res != 0)
614                 RETURN(ERR_PTR(hook_res));
615
616         if (osd_param_is_sane(dev, p)) {
617                 OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO);
618                 if (oh != NULL) {
619                         /*
620                          * XXX temporary stuff. Some abstraction layer should
621                          * be used.
622                          */
623
624                         jh = journal_start(osd_journal(dev), p->tp_credits);
625                         if (!IS_ERR(jh)) {
626                                 oh->ot_handle = jh;
627                                 th = &oh->ot_super;
628                                 th->th_dev = d;
629                                 th->th_result = 0;
630                                 jh->h_sync = p->tp_sync;
631                                 lu_device_get(&d->dd_lu_dev);
632                                 /* add commit callback */
633                                 lu_context_init(&th->th_ctx, LCT_TX_HANDLE);
634                                 lu_context_enter(&th->th_ctx);
635                                 journal_callback_set(jh, osd_trans_commit_cb,
636                                                      (struct journal_callback *)&oh->ot_jcb);
637 #if OSD_COUNTERS
638                                 {
639                                         struct osd_thread_info *oti =
640                                                 osd_oti_get(env);
641
642                                         LASSERT(oti->oti_txns == 0);
643                                         LASSERT(oti->oti_r_locks == 0);
644                                         LASSERT(oti->oti_w_locks == 0);
645                                         oti->oti_txns++;
646                                 }
647 #endif
648                         } else {
649                                 OBD_FREE_PTR(oh);
650                                 th = (void *)jh;
651                         }
652                 } else
653                         th = ERR_PTR(-ENOMEM);
654         } else {
655                 CERROR("Invalid transaction parameters\n");
656                 th = ERR_PTR(-EINVAL);
657         }
658
659         RETURN(th);
660 }
661
662 /*
663  * Concurrency: shouldn't matter.
664  */
665 static void osd_trans_stop(const struct lu_env *env, struct thandle *th)
666 {
667         int result;
668         struct osd_thandle *oh;
669         struct osd_thread_info *oti = osd_oti_get(env);
670
671         ENTRY;
672
673         oh = container_of0(th, struct osd_thandle, ot_super);
674         if (oh->ot_handle != NULL) {
675                 handle_t *hdl = oh->ot_handle;
676
677                 LASSERT(oti->oti_txns == 1);
678                 oti->oti_txns--;
679                 LASSERT(oti->oti_r_locks == 0);
680                 LASSERT(oti->oti_w_locks == 0);
681                 result = dt_txn_hook_stop(env, th);
682                 if (result != 0)
683                         CERROR("Failure in transaction hook: %d\n", result);
684                 oh->ot_handle = NULL;
685                 result = journal_stop(hdl);
686                 if (result != 0)
687                         CERROR("Failure to stop transaction: %d\n", result);
688         }
689         EXIT;
690 }
691
692 /*
693  * Concurrency: shouldn't matter.
694  */
695 static int osd_sync(const struct lu_env *env, struct dt_device *d)
696 {
697         CDEBUG(D_HA, "syncing OSD %s\n", LUSTRE_OSD_NAME);
698         return ldiskfs_force_commit(osd_sb(osd_dt_dev(d)));
699 }
700
701 /*
702  * Concurrency: shouldn't matter.
703  */
704 lvfs_sbdev_type fsfilt_ldiskfs_journal_sbdev(struct super_block *);
705
706 static void osd_ro(const struct lu_env *env, struct dt_device *d)
707 {
708         ENTRY;
709
710         CERROR("*** setting device %s read-only ***\n", LUSTRE_OSD_NAME);
711
712         __lvfs_set_rdonly(lvfs_sbdev(osd_sb(osd_dt_dev(d))),
713                           fsfilt_ldiskfs_journal_sbdev(osd_sb(osd_dt_dev(d))));
714         EXIT;
715 }
716
717 /*
718  * Concurrency: serialization provided by callers.
719  */
720 static int osd_init_capa_ctxt(const struct lu_env *env, struct dt_device *d,
721                               int mode, unsigned long timeout, __u32 alg,
722                               struct lustre_capa_key *keys)
723 {
724         struct osd_device *dev = osd_dt_dev(d);
725         ENTRY;
726
727         dev->od_fl_capa = mode;
728         dev->od_capa_timeout = timeout;
729         dev->od_capa_alg = alg;
730         dev->od_capa_keys = keys;
731         RETURN(0);
732 }
733
734 /* Note: we did not count into QUOTA here, If we mount with --data_journal
735  * we may need more*/
736 static const int osd_dto_credits[DTO_NR] = {
737         /*
738          * Insert/Delete. IAM EXT3_INDEX_EXTRA_TRANS_BLOCKS(8) +
739          * EXT3_SINGLEDATA_TRANS_BLOCKS 8 XXX Note: maybe iam need more,since
740          * iam have more level than Ext3 htree
741          */
742         [DTO_INDEX_INSERT]  = 16,
743         [DTO_INDEX_DELETE]  = 16,
744         [DTO_IDNEX_UPDATE]  = 16,
745         /*
746          * Create a object. Same as create object in Ext3 filesystem, but did
747          * not count QUOTA i EXT3_DATA_TRANS_BLOCKS(12) +
748          * INDEX_EXTRA_BLOCKS(8) + 3(inode bits,groups, GDT)
749          */
750         [DTO_OBJECT_CREATE] = 23,
751         [DTO_OBJECT_DELETE] = 23,
752         /*
753          * Attr set credits 3 inode, group, GDT
754          */
755         [DTO_ATTR_SET]      = 3,
756         /*
757          * XATTR_SET. SAME AS XATTR of EXT3 EXT3_DATA_TRANS_BLOCKS XXX Note:
758          * in original MDS implmentation EXT3_INDEX_EXTRA_TRANS_BLOCKS are
759          * also counted in. Do not know why?
760          */
761         [DTO_XATTR_SET]     = 16,
762         [DTO_LOG_REC]       = 16,
763         /* creadits for inode change during write */
764         [DTO_WRITE_BASE]    = 3,
765         /* credits for single block write */
766         [DTO_WRITE_BLOCK]   = 12 
767 };
768
769 static int osd_credit_get(const struct lu_env *env, struct dt_device *d,
770                           enum dt_txn_op op)
771 {
772         LASSERT(0 <= op && op < ARRAY_SIZE(osd_dto_credits));
773         return osd_dto_credits[op];
774 }
775
776 static struct dt_device_operations osd_dt_ops = {
777         .dt_root_get       = osd_root_get,
778         .dt_statfs         = osd_statfs,
779         .dt_trans_start    = osd_trans_start,
780         .dt_trans_stop     = osd_trans_stop,
781         .dt_conf_get       = osd_conf_get,
782         .dt_sync           = osd_sync,
783         .dt_ro             = osd_ro,
784         .dt_credit_get     = osd_credit_get,
785         .dt_init_capa_ctxt = osd_init_capa_ctxt,
786 };
787
788 static void osd_object_read_lock(const struct lu_env *env,
789                                  struct dt_object *dt, unsigned role)
790 {
791         struct osd_object *obj = osd_dt_obj(dt);
792         struct osd_thread_info *oti = osd_oti_get(env);
793
794         LINVRNT(osd_invariant(obj));
795
796         LASSERT(obj->oo_owner != env);
797         down_read_nested(&obj->oo_sem, role);
798
799                 LASSERT(obj->oo_owner == NULL);
800                 oti->oti_r_locks++;
801 }
802
803 static void osd_object_write_lock(const struct lu_env *env,
804                                   struct dt_object *dt, unsigned role)
805 {
806         struct osd_object *obj = osd_dt_obj(dt);
807         struct osd_thread_info *oti = osd_oti_get(env);
808
809         LINVRNT(osd_invariant(obj));
810
811         LASSERT(obj->oo_owner != env);
812         down_write_nested(&obj->oo_sem, role);
813
814                 LASSERT(obj->oo_owner == NULL);
815                 obj->oo_owner = env;
816                 oti->oti_w_locks++;
817 }
818
819 static void osd_object_read_unlock(const struct lu_env *env,
820                                    struct dt_object *dt)
821 {
822         struct osd_object *obj = osd_dt_obj(dt);
823                 struct osd_thread_info *oti = osd_oti_get(env);
824
825         LINVRNT(osd_invariant(obj));
826
827                 LASSERT(oti->oti_r_locks > 0);
828                 oti->oti_r_locks--;
829         up_read(&obj->oo_sem);
830 }
831
832 static void osd_object_write_unlock(const struct lu_env *env,
833                                     struct dt_object *dt)
834 {
835         struct osd_object *obj = osd_dt_obj(dt);
836
837         LASSERT(osd_invariant(obj));
838 #if OSD_COUNTERS
839         {
840                 struct osd_thread_info *oti = osd_oti_get(env);
841
842                 LASSERT(obj->oo_owner == env);
843                 LASSERT(oti->oti_w_locks > 0);
844                 oti->oti_w_locks--;
845                 obj->oo_owner = NULL;
846         }
847 #endif
848         up_write(&obj->oo_sem);
849 }
850
851 static int capa_is_sane(const struct lu_env *env,
852                         struct osd_device *dev,
853                         struct lustre_capa *capa,
854                         struct lustre_capa_key *keys)
855 {
856         struct osd_thread_info *oti = osd_oti_get(env);
857         struct obd_capa *oc;
858         int i, rc = 0;
859         ENTRY;
860
861         oc = capa_lookup(dev->od_capa_hash, capa, 0);
862         if (oc) {
863                 if (capa_is_expired(oc)) {
864                         DEBUG_CAPA(D_ERROR, capa, "expired");
865                         rc = -ESTALE;
866                 }
867                 capa_put(oc);
868                 RETURN(rc);
869         }
870
871         spin_lock(&capa_lock);
872         for (i = 0; i < 2; i++) {
873                 if (keys[i].lk_keyid == capa->lc_keyid) {
874                         oti->oti_capa_key = keys[i];
875                         break;
876                 }
877         }
878         spin_unlock(&capa_lock);
879
880         if (i == 2) {
881                 DEBUG_CAPA(D_ERROR, capa, "no matched capa key");
882                 RETURN(-ESTALE);
883         }
884
885         rc = capa_hmac(oti->oti_capa.lc_hmac, capa, oti->oti_capa_key.lk_key);
886         if (rc)
887                 RETURN(rc);
888         if (memcmp(oti->oti_capa.lc_hmac, capa->lc_hmac, sizeof(capa->lc_hmac)))
889         {
890                 DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch");
891                 RETURN(-EACCES);
892         }
893
894         oc = capa_add(dev->od_capa_hash, capa);
895         capa_put(oc);
896
897         RETURN(0);
898 }
899
900 static int osd_object_auth(const struct lu_env *env, struct dt_object *dt,
901                            struct lustre_capa *capa, __u64 opc)
902 {
903         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
904         struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
905         int rc;
906
907         if (!dev->od_fl_capa)
908                 return 0;
909
910         if (capa == BYPASS_CAPA)
911                 return 0;
912
913         if (!capa) {
914                 CERROR("no capability is provided for fid "DFID"\n", PFID(fid));
915                 return -EACCES;
916         }
917
918         if (!lu_fid_eq(fid, &capa->lc_fid)) {
919                 DEBUG_CAPA(D_ERROR, capa, "fid "DFID" mismatch with",
920                            PFID(fid));
921                 return -EACCES;
922         }
923
924         if (!capa_opc_supported(capa, opc)) {
925                 DEBUG_CAPA(D_ERROR, capa, "opc "LPX64" not supported by", opc);
926                 return -EACCES;
927         }
928
929         if ((rc = capa_is_sane(env, dev, capa, dev->od_capa_keys))) {
930                 DEBUG_CAPA(D_ERROR, capa, "insane (rc %d)", rc);
931                 return -EACCES;
932         }
933
934         return 0;
935 }
936
937 static int osd_attr_get(const struct lu_env *env,
938                         struct dt_object *dt,
939                         struct lu_attr *attr,
940                         struct lustre_capa *capa)
941 {
942         struct osd_object *obj = osd_dt_obj(dt);
943
944         LASSERT(dt_object_exists(dt));
945         LASSERT(osd_invariant(obj));
946
947         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
948                 return -EACCES;
949
950         spin_lock(&obj->oo_guard);
951         osd_inode_getattr(env, obj->oo_inode, attr);
952         spin_unlock(&obj->oo_guard);
953         return 0;
954 }
955
956 static int osd_attr_set(const struct lu_env *env,
957                         struct dt_object *dt,
958                         const struct lu_attr *attr,
959                         struct thandle *handle,
960                         struct lustre_capa *capa)
961 {
962         struct osd_object *obj = osd_dt_obj(dt);
963
964         LASSERT(handle != NULL);
965         LASSERT(dt_object_exists(dt));
966         LASSERT(osd_invariant(obj));
967
968         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
969                 return -EACCES;
970
971         spin_lock(&obj->oo_guard);
972         osd_inode_setattr(env, obj->oo_inode, attr);
973         spin_unlock(&obj->oo_guard);
974
975         mark_inode_dirty(obj->oo_inode);
976         return 0;
977 }
978
979 static struct timespec *osd_inode_time(const struct lu_env *env,
980                                        struct inode *inode, __u64 seconds)
981 {
982         struct osd_thread_info *oti = osd_oti_get(env);
983         struct timespec        *t   = &oti->oti_time;
984
985         t->tv_sec  = seconds;
986         t->tv_nsec = 0;
987         *t = timespec_trunc(*t, get_sb_time_gran(inode->i_sb));
988         return t;
989 }
990
991 static void osd_inode_setattr(const struct lu_env *env,
992                               struct inode *inode, const struct lu_attr *attr)
993 {
994         __u64 bits;
995
996         bits = attr->la_valid;
997
998         LASSERT(!(bits & LA_TYPE)); /* Huh? You want too much. */
999
1000         if (bits & LA_ATIME)
1001                 inode->i_atime  = *osd_inode_time(env, inode, attr->la_atime);
1002         if (bits & LA_CTIME)
1003                 inode->i_ctime  = *osd_inode_time(env, inode, attr->la_ctime);
1004         if (bits & LA_MTIME)
1005                 inode->i_mtime  = *osd_inode_time(env, inode, attr->la_mtime);
1006         if (bits & LA_SIZE) {
1007                 LDISKFS_I(inode)->i_disksize = attr->la_size;
1008                 i_size_write(inode, attr->la_size);
1009         }
1010         if (bits & LA_BLOCKS)
1011                 inode->i_blocks = attr->la_blocks;
1012         if (bits & LA_MODE)
1013                 inode->i_mode   = (inode->i_mode & S_IFMT) |
1014                         (attr->la_mode & ~S_IFMT);
1015         if (bits & LA_UID)
1016                 inode->i_uid    = attr->la_uid;
1017         if (bits & LA_GID)
1018                 inode->i_gid    = attr->la_gid;
1019         if (bits & LA_NLINK)
1020                 inode->i_nlink  = attr->la_nlink;
1021         if (bits & LA_RDEV)
1022                 inode->i_rdev   = attr->la_rdev;
1023
1024         if (bits & LA_FLAGS) {
1025                 struct ldiskfs_inode_info *li = LDISKFS_I(inode);
1026
1027                 li->i_flags = (li->i_flags & ~LDISKFS_FL_USER_MODIFIABLE) |
1028                         (attr->la_flags & LDISKFS_FL_USER_MODIFIABLE);
1029         }
1030 }
1031
1032 /*
1033  * Object creation.
1034  *
1035  * XXX temporary solution.
1036  */
1037
1038 static int osd_create_pre(struct osd_thread_info *info, struct osd_object *obj,
1039                           struct lu_attr *attr, struct thandle *th)
1040 {
1041         return 0;
1042 }
1043
1044 static int osd_create_post(struct osd_thread_info *info, struct osd_object *obj,
1045                            struct lu_attr *attr, struct thandle *th)
1046 {
1047         LASSERT(obj->oo_inode != NULL);
1048
1049         osd_object_init0(obj);
1050         return 0;
1051 }
1052
1053 extern struct inode *ldiskfs_create_inode(handle_t *handle,
1054                                           struct inode * dir, int mode);
1055
1056 static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
1057                       umode_t mode,
1058                       struct dt_allocation_hint *hint,
1059                       struct thandle *th)
1060 {
1061         int result;
1062         struct osd_device  *osd = osd_obj2dev(obj);
1063         struct osd_thandle *oth;
1064         struct inode       *parent;
1065         struct inode       *inode;
1066
1067         LASSERT(osd_invariant(obj));
1068         LASSERT(obj->oo_inode == NULL);
1069         LASSERT(osd->od_obj_area != NULL);
1070
1071         oth = container_of(th, struct osd_thandle, ot_super);
1072         LASSERT(oth->ot_handle->h_transaction != NULL);
1073
1074         if (hint && hint->dah_parent)
1075                 parent = osd_dt_obj(hint->dah_parent)->oo_inode;
1076         else
1077                 parent = osd->od_obj_area->d_inode;
1078         LASSERT(parent->i_op != NULL);
1079
1080         inode = ldiskfs_create_inode(oth->ot_handle, parent, mode);
1081         if (!IS_ERR(inode)) {
1082                 obj->oo_inode = inode;
1083                 result = 0;
1084         } else
1085                 result = PTR_ERR(inode);
1086         LASSERT(osd_invariant(obj));
1087         return result;
1088 }
1089
1090
1091 extern int iam_lvar_create(struct inode *obj, int keysize, int ptrsize,
1092                            int recsize, handle_t *handle);
1093
1094 enum {
1095         OSD_NAME_LEN = 255
1096 };
1097
1098 static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj,
1099                      struct lu_attr *attr,
1100                      struct dt_allocation_hint *hint,
1101                      struct thandle *th)
1102 {
1103         int result;
1104         struct osd_thandle *oth;
1105
1106         LASSERT(S_ISDIR(attr->la_mode));
1107
1108         oth = container_of(th, struct osd_thandle, ot_super);
1109         LASSERT(oth->ot_handle->h_transaction != NULL);
1110         result = osd_mkfile(info, obj, (attr->la_mode &
1111                             (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1112         if (result == 0) {
1113                 LASSERT(obj->oo_inode != NULL);
1114                 /*
1115                  * XXX uh-oh... call low-level iam function directly.
1116                  */
1117                 result = iam_lvar_create(obj->oo_inode, OSD_NAME_LEN, 4,
1118                                          sizeof (struct lu_fid_pack),
1119                                          oth->ot_handle);
1120         }
1121         return result;
1122 }
1123
1124 static int osd_mkreg(struct osd_thread_info *info, struct osd_object *obj,
1125                      struct lu_attr *attr,
1126                      struct dt_allocation_hint *hint,
1127                      struct thandle *th)
1128 {
1129         LASSERT(S_ISREG(attr->la_mode));
1130         return osd_mkfile(info, obj, (attr->la_mode &
1131                                (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1132 }
1133
1134 static int osd_mksym(struct osd_thread_info *info, struct osd_object *obj,
1135                      struct lu_attr *attr,
1136                      struct dt_allocation_hint *hint,
1137                      struct thandle *th)
1138 {
1139         LASSERT(S_ISLNK(attr->la_mode));
1140         return osd_mkfile(info, obj, (attr->la_mode &
1141                               (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1142 }
1143
1144 static int osd_mknod(struct osd_thread_info *info, struct osd_object *obj,
1145                      struct lu_attr *attr,
1146                      struct dt_allocation_hint *hint,
1147                      struct thandle *th)
1148 {
1149         int result;
1150         struct osd_device *osd = osd_obj2dev(obj);
1151         struct inode      *dir;
1152         umode_t mode = attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX);
1153
1154         LASSERT(osd_invariant(obj));
1155         LASSERT(obj->oo_inode == NULL);
1156         LASSERT(osd->od_obj_area != NULL);
1157         LASSERT(S_ISCHR(mode) || S_ISBLK(mode) ||
1158                 S_ISFIFO(mode) || S_ISSOCK(mode));
1159
1160         dir = osd->od_obj_area->d_inode;
1161         LASSERT(dir->i_op != NULL);
1162
1163         result = osd_mkfile(info, obj, mode, hint, th);
1164         if (result == 0) {
1165                 LASSERT(obj->oo_inode != NULL);
1166                 init_special_inode(obj->oo_inode, mode, attr->la_rdev);
1167         }
1168         LASSERT(osd_invariant(obj));
1169         return result;
1170 }
1171
1172 typedef int (*osd_obj_type_f)(struct osd_thread_info *, struct osd_object *,
1173                               struct lu_attr *,
1174                               struct dt_allocation_hint *hint,
1175                               struct thandle *);
1176
1177 static osd_obj_type_f osd_create_type_f(__u32 mode)
1178 {
1179         osd_obj_type_f result;
1180
1181         switch (mode) {
1182         case S_IFDIR:
1183                 result = osd_mkdir;
1184                 break;
1185         case S_IFREG:
1186                 result = osd_mkreg;
1187                 break;
1188         case S_IFLNK:
1189                 result = osd_mksym;
1190                 break;
1191         case S_IFCHR:
1192         case S_IFBLK:
1193         case S_IFIFO:
1194         case S_IFSOCK:
1195                 result = osd_mknod;
1196                 break;
1197         default:
1198                 LBUG();
1199                 break;
1200         }
1201         return result;
1202 }
1203
1204
1205 static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah,
1206                         struct dt_object *parent, umode_t child_mode)
1207 {
1208         LASSERT(ah);
1209
1210         memset(ah, 0, sizeof(*ah));
1211         ah->dah_parent = parent;
1212         ah->dah_mode = child_mode;
1213 }
1214
1215
1216 /*
1217  * Concurrency: @dt is write locked.
1218  */
1219 static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
1220                              struct lu_attr *attr, 
1221                              struct dt_allocation_hint *hint,
1222                              struct thandle *th)
1223 {
1224         const struct lu_fid    *fid  = lu_object_fid(&dt->do_lu);
1225         struct osd_object      *obj  = osd_dt_obj(dt);
1226         struct osd_device      *osd  = osd_obj2dev(obj);
1227         struct osd_thread_info *info = osd_oti_get(env);
1228         int result;
1229
1230         ENTRY;
1231
1232         LASSERT(osd_invariant(obj));
1233         LASSERT(!dt_object_exists(dt));
1234         LASSERT(osd_write_locked(env, obj));
1235         LASSERT(th != NULL);
1236
1237         /*
1238          * XXX missing: Quote handling.
1239          */
1240
1241         result = osd_create_pre(info, obj, attr, th);
1242         if (result == 0) {
1243                 result = osd_create_type_f(attr->la_mode & S_IFMT)(info, obj,
1244                                                                 attr, hint, th);
1245                 if (result == 0)
1246                         result = osd_create_post(info, obj, attr, th);
1247         }
1248         if (result == 0) {
1249                 struct osd_inode_id *id = &info->oti_id;
1250
1251                 LASSERT(obj->oo_inode != NULL);
1252
1253                 id->oii_ino = obj->oo_inode->i_ino;
1254                 id->oii_gen = obj->oo_inode->i_generation;
1255
1256                 result = osd_oi_insert(info, &osd->od_oi, fid, id, th);
1257         }
1258
1259         LASSERT(ergo(result == 0, dt_object_exists(dt)));
1260         LASSERT(osd_invariant(obj));
1261         RETURN(result);
1262 }
1263
1264 /*
1265  * Concurrency: @dt is write locked.
1266  */
1267 static void osd_object_ref_add(const struct lu_env *env,
1268                                struct dt_object *dt,
1269                                struct thandle *th)
1270 {
1271         struct osd_object *obj = osd_dt_obj(dt);
1272         struct inode *inode = obj->oo_inode;
1273
1274         LASSERT(osd_invariant(obj));
1275         LASSERT(dt_object_exists(dt));
1276         LASSERT(osd_write_locked(env, obj));
1277         LASSERT(th != NULL);
1278
1279         spin_lock(&obj->oo_guard);
1280         LASSERT(inode->i_nlink < LDISKFS_LINK_MAX);
1281         inode->i_nlink++;
1282         spin_unlock(&obj->oo_guard);
1283         mark_inode_dirty(inode);
1284         LASSERT(osd_invariant(obj));
1285 }
1286
1287 /*
1288  * Concurrency: @dt is write locked.
1289  */
1290 static void osd_object_ref_del(const struct lu_env *env,
1291                                struct dt_object *dt,
1292                                struct thandle *th)
1293 {
1294         struct osd_object *obj = osd_dt_obj(dt);
1295         struct inode *inode = obj->oo_inode;
1296
1297         LASSERT(osd_invariant(obj));
1298         LASSERT(dt_object_exists(dt));
1299         LASSERT(osd_write_locked(env, obj));
1300         LASSERT(th != NULL);
1301
1302         spin_lock(&obj->oo_guard);
1303         LASSERT(inode->i_nlink > 0);
1304         inode->i_nlink--;
1305         spin_unlock(&obj->oo_guard);
1306         mark_inode_dirty(inode);
1307         LASSERT(osd_invariant(obj));
1308 }
1309
1310 /*
1311  * Concurrency: @dt is read locked.
1312  */
1313 static int osd_xattr_get(const struct lu_env *env,
1314                          struct dt_object *dt,
1315                          struct lu_buf *buf,
1316                          const char *name,
1317                          struct lustre_capa *capa)
1318 {
1319         struct osd_object      *obj    = osd_dt_obj(dt);
1320         struct inode           *inode  = obj->oo_inode;
1321         struct osd_thread_info *info   = osd_oti_get(env);
1322         struct dentry          *dentry = &info->oti_dentry;
1323
1324         LASSERT(dt_object_exists(dt));
1325         LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
1326         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
1327
1328         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1329                 return -EACCES;
1330
1331         dentry->d_inode = inode;
1332         return inode->i_op->getxattr(dentry, name, buf->lb_buf, buf->lb_len);
1333 }
1334
1335 /*
1336  * Concurrency: @dt is write locked.
1337  */
1338 static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
1339                          const struct lu_buf *buf, const char *name, int fl,
1340                          struct thandle *handle, struct lustre_capa *capa)
1341 {
1342         struct osd_object      *obj    = osd_dt_obj(dt);
1343         struct inode           *inode  = obj->oo_inode;
1344         struct osd_thread_info *info   = osd_oti_get(env);
1345         struct dentry          *dentry = &info->oti_dentry;
1346         struct timespec        *t      = &info->oti_time;
1347         int                     fs_flags = 0, rc;
1348
1349         LASSERT(dt_object_exists(dt));
1350         LASSERT(inode->i_op != NULL && inode->i_op->setxattr != NULL);
1351         LASSERT(osd_write_locked(env, obj));
1352         LASSERT(handle != NULL);
1353
1354         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1355                 return -EACCES;
1356
1357         if (fl & LU_XATTR_REPLACE)
1358                 fs_flags |= XATTR_REPLACE;
1359
1360         if (fl & LU_XATTR_CREATE)
1361                 fs_flags |= XATTR_CREATE;
1362
1363         dentry->d_inode = inode;
1364         *t = inode->i_ctime;
1365         rc = inode->i_op->setxattr(dentry, name,
1366                                    buf->lb_buf, buf->lb_len, fs_flags);
1367         if (likely(rc == 0)) {
1368                 /* ctime should not be updated with server-side time. */
1369                 spin_lock(&obj->oo_guard);
1370                 inode->i_ctime = *t;
1371                 spin_unlock(&obj->oo_guard);
1372                 mark_inode_dirty(inode);
1373         }
1374         return rc;
1375 }
1376
1377 /*
1378  * Concurrency: @dt is read locked.
1379  */
1380 static int osd_xattr_list(const struct lu_env *env,
1381                           struct dt_object *dt,
1382                           struct lu_buf *buf,
1383                           struct lustre_capa *capa)
1384 {
1385         struct osd_object      *obj    = osd_dt_obj(dt);
1386         struct inode           *inode  = obj->oo_inode;
1387         struct osd_thread_info *info   = osd_oti_get(env);
1388         struct dentry          *dentry = &info->oti_dentry;
1389
1390         LASSERT(dt_object_exists(dt));
1391         LASSERT(inode->i_op != NULL && inode->i_op->listxattr != NULL);
1392         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
1393
1394         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1395                 return -EACCES;
1396
1397         dentry->d_inode = inode;
1398         return inode->i_op->listxattr(dentry, buf->lb_buf, buf->lb_len);
1399 }
1400
1401 /*
1402  * Concurrency: @dt is write locked.
1403  */
1404 static int osd_xattr_del(const struct lu_env *env,
1405                          struct dt_object *dt,
1406                          const char *name,
1407                          struct thandle *handle,
1408                          struct lustre_capa *capa)
1409 {
1410         struct osd_object      *obj    = osd_dt_obj(dt);
1411         struct inode           *inode  = obj->oo_inode;
1412         struct osd_thread_info *info   = osd_oti_get(env);
1413         struct dentry          *dentry = &info->oti_dentry;
1414         struct timespec        *t      = &info->oti_time;
1415         int                     rc;
1416
1417         LASSERT(dt_object_exists(dt));
1418         LASSERT(inode->i_op != NULL && inode->i_op->removexattr != NULL);
1419         LASSERT(osd_write_locked(env, obj));
1420         LASSERT(handle != NULL);
1421
1422         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1423                 return -EACCES;
1424
1425         dentry->d_inode = inode;
1426         *t = inode->i_ctime;
1427         rc = inode->i_op->removexattr(dentry, name);
1428         if (likely(rc == 0)) {
1429                 /* ctime should not be updated with server-side time. */
1430                 spin_lock(&obj->oo_guard);
1431                 inode->i_ctime = *t;
1432                 spin_unlock(&obj->oo_guard);
1433                 mark_inode_dirty(inode);
1434         }
1435         return rc;
1436 }
1437
1438 static struct obd_capa *osd_capa_get(const struct lu_env *env,
1439                                      struct dt_object *dt,
1440                                      struct lustre_capa *old,
1441                                      __u64 opc)
1442 {
1443         struct osd_thread_info *info = osd_oti_get(env);
1444         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
1445         struct osd_object *obj = osd_dt_obj(dt);
1446         struct osd_device *dev = osd_obj2dev(obj);
1447         struct lustre_capa_key *key = &info->oti_capa_key;
1448         struct lustre_capa *capa = &info->oti_capa;
1449         struct obd_capa *oc;
1450         int rc;
1451         ENTRY;
1452
1453         if (!dev->od_fl_capa)
1454                 RETURN(ERR_PTR(-ENOENT));
1455
1456         LASSERT(dt_object_exists(dt));
1457         LASSERT(osd_invariant(obj));
1458
1459         /* renewal sanity check */
1460         if (old && osd_object_auth(env, dt, old, opc))
1461                 RETURN(ERR_PTR(-EACCES));
1462
1463         capa->lc_fid = *fid;
1464         capa->lc_opc = opc;
1465         capa->lc_uid = 0;
1466         capa->lc_flags = dev->od_capa_alg << 24;
1467         capa->lc_timeout = dev->od_capa_timeout;
1468         capa->lc_expiry = 0;
1469
1470         oc = capa_lookup(dev->od_capa_hash, capa, 1);
1471         if (oc) {
1472                 LASSERT(!capa_is_expired(oc));
1473                 RETURN(oc);
1474         }
1475
1476         spin_lock(&capa_lock);
1477         *key = dev->od_capa_keys[1];
1478         spin_unlock(&capa_lock);
1479
1480         capa->lc_keyid = key->lk_keyid;
1481         capa->lc_expiry = cfs_time_current_sec() + dev->od_capa_timeout;
1482
1483         rc = capa_hmac(capa->lc_hmac, capa, key->lk_key);
1484         if (rc) {
1485                 DEBUG_CAPA(D_ERROR, capa, "HMAC failed: %d for", rc);
1486                 RETURN(ERR_PTR(rc));
1487         }
1488
1489         oc = capa_add(dev->od_capa_hash, capa);
1490         RETURN(oc);
1491 }
1492
1493 static int osd_object_sync(const struct lu_env *env, struct dt_object *dt)
1494 {
1495         int rc;
1496         struct osd_object      *obj    = osd_dt_obj(dt);
1497         struct inode           *inode  = obj->oo_inode;
1498         struct osd_thread_info *info   = osd_oti_get(env);
1499         struct dentry          *dentry = &info->oti_dentry;
1500         struct file            *file   = &info->oti_file;
1501         ENTRY;
1502
1503         dentry->d_inode = inode;
1504         file->f_dentry = dentry;
1505         file->f_mapping = inode->i_mapping;
1506         file->f_op = inode->i_fop;
1507         LOCK_INODE_MUTEX(inode);
1508         rc = file->f_op->fsync(file, dentry, 0);
1509         UNLOCK_INODE_MUTEX(inode);
1510         RETURN(rc);
1511 }
1512
1513 static struct dt_object_operations osd_obj_ops = {
1514         .do_read_lock    = osd_object_read_lock,
1515         .do_write_lock   = osd_object_write_lock,
1516         .do_read_unlock  = osd_object_read_unlock,
1517         .do_write_unlock = osd_object_write_unlock,
1518         .do_attr_get     = osd_attr_get,
1519         .do_attr_set     = osd_attr_set,
1520         .do_ah_init      = osd_ah_init,
1521         .do_create       = osd_object_create,
1522         .do_index_try    = osd_index_try,
1523         .do_ref_add      = osd_object_ref_add,
1524         .do_ref_del      = osd_object_ref_del,
1525         .do_xattr_get    = osd_xattr_get,
1526         .do_xattr_set    = osd_xattr_set,
1527         .do_xattr_del    = osd_xattr_del,
1528         .do_xattr_list   = osd_xattr_list,
1529         .do_capa_get     = osd_capa_get,
1530         .do_object_sync  = osd_object_sync,
1531 };
1532
1533 /*
1534  * Body operations.
1535  */
1536
1537 /*
1538  * XXX: Another layering violation for now.
1539  *
1540  * We don't want to use ->f_op->read methods, because generic file write
1541  *
1542  *         - serializes on ->i_sem, and
1543  *
1544  *         - does a lot of extra work like balance_dirty_pages(),
1545  *
1546  * which doesn't work for globally shared files like /last-received.
1547  */
1548 int fsfilt_ldiskfs_read(struct inode *inode, void *buf, int size, loff_t *offs);
1549 int fsfilt_ldiskfs_write_handle(struct inode *inode, void *buf, int bufsize,
1550                                 loff_t *offs, handle_t *handle);
1551
1552 static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt,
1553                         struct lu_buf *buf, loff_t *pos,
1554                         struct lustre_capa *capa)
1555 {
1556         struct inode *inode = osd_dt_obj(dt)->oo_inode;
1557
1558         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
1559                 RETURN(-EACCES);
1560
1561         return fsfilt_ldiskfs_read(inode, buf->lb_buf, buf->lb_len, pos);
1562 }
1563
1564 static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
1565                          const struct lu_buf *buf, loff_t *pos,
1566                          struct thandle *handle, struct lustre_capa *capa)
1567 {
1568         struct inode       *inode = osd_dt_obj(dt)->oo_inode;
1569         struct osd_thandle *oh;
1570         ssize_t             result;
1571
1572         LASSERT(handle != NULL);
1573
1574         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_WRITE))
1575                 RETURN(-EACCES);
1576
1577         oh = container_of(handle, struct osd_thandle, ot_super);
1578         LASSERT(oh->ot_handle->h_transaction != NULL);
1579         result = fsfilt_ldiskfs_write_handle(inode, buf->lb_buf, buf->lb_len,
1580                                              pos, oh->ot_handle);
1581         if (result == 0)
1582                 result = buf->lb_len;
1583         return result;
1584 }
1585
1586 static struct dt_body_operations osd_body_ops = {
1587         .dbo_read  = osd_read,
1588         .dbo_write = osd_write
1589 };
1590
1591 /*
1592  * Index operations.
1593  */
1594
1595 static int osd_object_is_root(const struct osd_object *obj)
1596 {
1597         return osd_sb(osd_obj2dev(obj))->s_root->d_inode == obj->oo_inode;
1598 }
1599
1600 static int osd_index_probe(const struct lu_env *env, struct osd_object *o,
1601                            const struct dt_index_features *feat)
1602 {
1603         struct iam_descr *descr;
1604
1605         if (osd_object_is_root(o))
1606                 return feat == &dt_directory_features;
1607
1608         LASSERT(o->oo_dir != NULL);
1609
1610         descr = o->oo_dir->od_container.ic_descr;
1611         if (feat == &dt_directory_features)
1612                 return descr == &iam_htree_compat_param ||
1613                         (descr->id_rec_size == sizeof(struct lu_fid_pack) &&
1614                          1 /*
1615                             * XXX check that index looks like directory.
1616                             */
1617                                 );
1618         else
1619                 return
1620                         feat->dif_keysize_min <= descr->id_key_size &&
1621                         descr->id_key_size <= feat->dif_keysize_max &&
1622                         feat->dif_recsize_min <= descr->id_rec_size &&
1623                         descr->id_rec_size <= feat->dif_recsize_max &&
1624                         !(feat->dif_flags & (DT_IND_VARKEY |
1625                                              DT_IND_VARREC | DT_IND_NONUNQ)) &&
1626                         ergo(feat->dif_flags & DT_IND_UPDATE,
1627                              1 /* XXX check that object (and file system) is
1628                                 * writable */);
1629 }
1630
1631 static int osd_container_init(const struct lu_env *env,
1632                               struct osd_object *obj,
1633                               struct osd_directory *dir)
1634 {
1635         int result;
1636         struct iam_container *bag;
1637
1638         bag    = &dir->od_container;
1639         result = iam_container_init(bag, &dir->od_descr, obj->oo_inode);
1640         if (result == 0) {
1641                 result = iam_container_setup(bag);
1642                 if (result == 0)
1643                         obj->oo_dt.do_index_ops = &osd_index_ops;
1644                 else
1645                         iam_container_fini(bag);
1646         }
1647         return result;
1648 }
1649
1650 /*
1651  * Concurrency: no external locking is necessary.
1652  */
1653 static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
1654                          const struct dt_index_features *feat)
1655 {
1656         int result;
1657         struct osd_object *obj = osd_dt_obj(dt);
1658
1659         LASSERT(osd_invariant(obj));
1660         LASSERT(dt_object_exists(dt));
1661
1662         if (osd_object_is_root(obj)) {
1663                 dt->do_index_ops = &osd_index_compat_ops;
1664                 result = 0;
1665         } else if (!osd_has_index(obj)) {
1666                 struct osd_directory *dir;
1667
1668                 OBD_ALLOC_PTR(dir);
1669                 if (dir != NULL) {
1670                         sema_init(&dir->od_sem, 1);
1671
1672                         spin_lock(&obj->oo_guard);
1673                         if (obj->oo_dir == NULL)
1674                                 obj->oo_dir = dir;
1675                         else
1676                                 /*
1677                                  * Concurrent thread allocated container data.
1678                                  */
1679                                 OBD_FREE_PTR(dir);
1680                         spin_unlock(&obj->oo_guard);
1681                         /*
1682                          * Now, that we have container data, serialize its
1683                          * initialization.
1684                          */
1685                         down(&obj->oo_dir->od_sem);
1686                         /*
1687                          * recheck under lock.
1688                          */
1689                         if (!osd_has_index(obj))
1690                                 result = osd_container_init(env, obj, dir);
1691                         else
1692                                 result = 0;
1693                         up(&obj->oo_dir->od_sem);
1694                 } else
1695                         result = -ENOMEM;
1696         } else
1697                 result = 0;
1698
1699         if (result == 0) {
1700                 if (!osd_index_probe(env, obj, feat))
1701                         result = -ENOTDIR;
1702         }
1703         LASSERT(osd_invariant(obj));
1704
1705         return result;
1706 }
1707
1708 static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
1709                             const struct dt_key *key, struct thandle *handle,
1710                             struct lustre_capa *capa)
1711 {
1712         struct osd_object     *obj = osd_dt_obj(dt);
1713         struct osd_thandle    *oh;
1714         struct iam_path_descr *ipd;
1715         struct iam_container  *bag = &obj->oo_dir->od_container;
1716         int rc;
1717
1718         ENTRY;
1719
1720         LASSERT(osd_invariant(obj));
1721         LASSERT(dt_object_exists(dt));
1722         LASSERT(bag->ic_object == obj->oo_inode);
1723         LASSERT(handle != NULL);
1724
1725         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
1726                 RETURN(-EACCES);
1727
1728         ipd = osd_ipd_get(env, bag);
1729         if (unlikely(ipd == NULL))
1730                 RETURN(-ENOMEM);
1731
1732         oh = container_of0(handle, struct osd_thandle, ot_super);
1733         LASSERT(oh->ot_handle != NULL);
1734         LASSERT(oh->ot_handle->h_transaction != NULL);
1735
1736         rc = iam_delete(oh->ot_handle, bag, (const struct iam_key *)key, ipd);
1737         osd_ipd_put(env, bag, ipd);
1738         LASSERT(osd_invariant(obj));
1739         RETURN(rc);
1740 }
1741
1742 static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
1743                             struct dt_rec *rec, const struct dt_key *key,
1744                             struct lustre_capa *capa)
1745 {
1746         struct osd_object     *obj = osd_dt_obj(dt);
1747         struct iam_path_descr *ipd;
1748         struct iam_container  *bag = &obj->oo_dir->od_container;
1749         int rc;
1750
1751         ENTRY;
1752
1753         LASSERT(osd_invariant(obj));
1754         LASSERT(dt_object_exists(dt));
1755         LASSERT(bag->ic_object == obj->oo_inode);
1756
1757         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
1758                 return -EACCES;
1759
1760         ipd = osd_ipd_get(env, bag);
1761         if (unlikely(ipd == NULL))
1762                 RETURN(-ENOMEM);
1763
1764         rc = iam_lookup(bag, (const struct iam_key *)key,
1765                         (struct iam_rec *)rec, ipd);
1766         osd_ipd_put(env, bag, ipd);
1767         LASSERT(osd_invariant(obj));
1768
1769         RETURN(rc);
1770 }
1771
1772 static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
1773                             const struct dt_rec *rec, const struct dt_key *key,
1774                             struct thandle *th, struct lustre_capa *capa)
1775 {
1776         struct osd_object     *obj = osd_dt_obj(dt);
1777         struct iam_path_descr *ipd;
1778         struct osd_thandle    *oh;
1779         struct iam_container  *bag = &obj->oo_dir->od_container;
1780         int rc;
1781
1782         ENTRY;
1783
1784         LASSERT(osd_invariant(obj));
1785         LASSERT(dt_object_exists(dt));
1786         LASSERT(bag->ic_object == obj->oo_inode);
1787         LASSERT(th != NULL);
1788
1789         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
1790                 return -EACCES;
1791
1792         ipd = osd_ipd_get(env, bag);
1793         if (unlikely(ipd == NULL))
1794                 RETURN(-ENOMEM);
1795
1796         oh = container_of0(th, struct osd_thandle, ot_super);
1797         LASSERT(oh->ot_handle != NULL);
1798         LASSERT(oh->ot_handle->h_transaction != NULL);
1799         rc = iam_insert(oh->ot_handle, bag, (const struct iam_key *)key,
1800                         (struct iam_rec *)rec, ipd);
1801         osd_ipd_put(env, bag, ipd);
1802         LASSERT(osd_invariant(obj));
1803         RETURN(rc);
1804 }
1805
1806 /*
1807  * Iterator operations.
1808  */
1809 struct osd_it {
1810         struct osd_object     *oi_obj;
1811         struct iam_path_descr *oi_ipd;
1812         struct iam_iterator    oi_it;
1813 };
1814
1815 static struct dt_it *osd_it_init(const struct lu_env *env,
1816                                  struct dt_object *dt, int writable,
1817                                  struct lustre_capa *capa)
1818 {
1819         struct osd_it         *it;
1820         struct osd_object     *obj = osd_dt_obj(dt);
1821         struct lu_object      *lo  = &dt->do_lu;
1822         struct iam_path_descr *ipd;
1823         struct iam_container  *bag = &obj->oo_dir->od_container;
1824         __u32                  flags;
1825
1826         LASSERT(lu_object_exists(lo));
1827
1828         if (osd_object_auth(env, dt, capa, writable ? CAPA_OPC_BODY_WRITE :
1829                             CAPA_OPC_BODY_READ))
1830                 return ERR_PTR(-EACCES);
1831
1832         flags = writable ? IAM_IT_MOVE|IAM_IT_WRITE : IAM_IT_MOVE;
1833         OBD_ALLOC_PTR(it);
1834         if (it != NULL) {
1835                 /*
1836                  * XXX: as ipd is allocated within osd_thread_info, assignment
1837                  * below implies that iterator usage is confined within single
1838                  * environment.
1839                  */
1840                 ipd = osd_ipd_get(env, bag);
1841                 if (likely(ipd != NULL)) {
1842                         it->oi_obj = obj;
1843                         it->oi_ipd = ipd;
1844                         lu_object_get(lo);
1845                         iam_it_init(&it->oi_it, bag, flags, ipd);
1846                         return (struct dt_it *)it;
1847                 } else
1848                         OBD_FREE_PTR(it);
1849         }
1850         return ERR_PTR(-ENOMEM);
1851 }
1852
1853 static void osd_it_fini(const struct lu_env *env, struct dt_it *di)
1854 {
1855         struct osd_it     *it = (struct osd_it *)di;
1856         struct osd_object *obj = it->oi_obj;
1857
1858         iam_it_fini(&it->oi_it);
1859         osd_ipd_put(env, &obj->oo_dir->od_container, it->oi_ipd);
1860         lu_object_put(env, &obj->oo_dt.do_lu);
1861         OBD_FREE_PTR(it);
1862 }
1863
1864 static int osd_it_get(const struct lu_env *env,
1865                       struct dt_it *di, const struct dt_key *key)
1866 {
1867         struct osd_it *it = (struct osd_it *)di;
1868
1869         return iam_it_get(&it->oi_it, (const struct iam_key *)key);
1870 }
1871
1872 static void osd_it_put(const struct lu_env *env, struct dt_it *di)
1873 {
1874         struct osd_it *it = (struct osd_it *)di;
1875
1876         iam_it_put(&it->oi_it);
1877 }
1878
1879 static int osd_it_next(const struct lu_env *env, struct dt_it *di)
1880 {
1881         struct osd_it *it = (struct osd_it *)di;
1882
1883         return iam_it_next(&it->oi_it);
1884 }
1885
1886 static int osd_it_del(const struct lu_env *env, struct dt_it *di,
1887                       struct thandle *th)
1888 {
1889         struct osd_it      *it = (struct osd_it *)di;
1890         struct osd_thandle *oh;
1891
1892         LASSERT(th != NULL);
1893
1894         oh = container_of0(th, struct osd_thandle, ot_super);
1895         LASSERT(oh->ot_handle != NULL);
1896         LASSERT(oh->ot_handle->h_transaction != NULL);
1897
1898         return iam_it_rec_delete(oh->ot_handle, &it->oi_it);
1899 }
1900
1901 static struct dt_key *osd_it_key(const struct lu_env *env,
1902                                  const struct dt_it *di)
1903 {
1904         struct osd_it *it = (struct osd_it *)di;
1905
1906         return (struct dt_key *)iam_it_key_get(&it->oi_it);
1907 }
1908
1909 static int osd_it_key_size(const struct lu_env *env, const struct dt_it *di)
1910 {
1911         struct osd_it *it = (struct osd_it *)di;
1912
1913         return iam_it_key_size(&it->oi_it);
1914 }
1915
1916 static struct dt_rec *osd_it_rec(const struct lu_env *env,
1917                                  const struct dt_it *di)
1918 {
1919         struct osd_it *it = (struct osd_it *)di;
1920
1921         return (struct dt_rec *)iam_it_rec_get(&it->oi_it);
1922 }
1923
1924 static __u64 osd_it_store(const struct lu_env *env, const struct dt_it *di)
1925 {
1926         struct osd_it *it = (struct osd_it *)di;
1927
1928         return iam_it_store(&it->oi_it);
1929 }
1930
1931 static int osd_it_load(const struct lu_env *env,
1932                        const struct dt_it *di, __u64 hash)
1933 {
1934         struct osd_it *it = (struct osd_it *)di;
1935
1936         return iam_it_load(&it->oi_it, hash);
1937 }
1938
1939 static struct dt_index_operations osd_index_ops = {
1940         .dio_lookup = osd_index_lookup,
1941         .dio_insert = osd_index_insert,
1942         .dio_delete = osd_index_delete,
1943         .dio_it     = {
1944                 .init     = osd_it_init,
1945                 .fini     = osd_it_fini,
1946                 .get      = osd_it_get,
1947                 .put      = osd_it_put,
1948                 .del      = osd_it_del,
1949                 .next     = osd_it_next,
1950                 .key      = osd_it_key,
1951                 .key_size = osd_it_key_size,
1952                 .rec      = osd_it_rec,
1953                 .store    = osd_it_store,
1954                 .load     = osd_it_load
1955         }
1956 };
1957
1958 static int osd_index_compat_delete(const struct lu_env *env,
1959                                    struct dt_object *dt,
1960                                    const struct dt_key *key,
1961                                    struct thandle *handle,
1962                                    struct lustre_capa *capa)
1963 {
1964         struct osd_object *obj = osd_dt_obj(dt);
1965
1966         LASSERT(handle != NULL);
1967         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
1968         ENTRY;
1969
1970 #if 0
1971         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
1972                 RETURN(-EACCES);
1973 #endif
1974
1975         RETURN(-EOPNOTSUPP);
1976 }
1977
1978 /*
1979  * Compatibility index operations.
1980  */
1981
1982
1983 static void osd_build_pack(const struct lu_env *env, struct osd_device *osd,
1984                            struct dentry *dentry, struct lu_fid_pack *pack)
1985 {
1986         struct inode  *inode = dentry->d_inode;
1987         struct lu_fid *fid   = &osd_oti_get(env)->oti_fid;
1988
1989         lu_igif_build(fid, inode->i_ino, inode->i_generation);
1990         fid_cpu_to_be(fid, fid);
1991         pack->fp_len = sizeof *fid + 1;
1992         memcpy(pack->fp_area, fid, sizeof *fid);
1993 }
1994
1995 static int osd_index_compat_lookup(const struct lu_env *env,
1996                                    struct dt_object *dt,
1997                                    struct dt_rec *rec, const struct dt_key *key,
1998                                    struct lustre_capa *capa)
1999 {
2000         struct osd_object *obj = osd_dt_obj(dt);
2001
2002         struct osd_device      *osd  = osd_obj2dev(obj);
2003         struct osd_thread_info *info = osd_oti_get(env);
2004         struct inode           *dir;
2005
2006         int result;
2007
2008         /*
2009          * XXX temporary solution.
2010          */
2011         struct dentry *dentry;
2012         struct dentry *parent;
2013
2014         LASSERT(osd_invariant(obj));
2015         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
2016         LASSERT(osd_has_index(obj));
2017
2018         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
2019                 return -EACCES;
2020
2021         info->oti_str.name = (const char *)key;
2022         info->oti_str.len  = strlen((const char *)key);
2023
2024         dir = obj->oo_inode;
2025         LASSERT(dir->i_op != NULL && dir->i_op->lookup != NULL);
2026
2027         parent = d_alloc_root(dir);
2028         if (parent == NULL)
2029                 return -ENOMEM;
2030         igrab(dir);
2031         dentry = d_alloc(parent, &info->oti_str);
2032         if (dentry != NULL) {
2033                 struct dentry *d;
2034
2035                 /*
2036                  * XXX passing NULL for nameidata should work for
2037                  * ext3/ldiskfs.
2038                  */
2039                 d = dir->i_op->lookup(dir, dentry, NULL);
2040                 if (d == NULL) {
2041                         /*
2042                          * normal case, result is in @dentry.
2043                          */
2044                         if (dentry->d_inode != NULL) {
2045                                 osd_build_pack(env, osd, dentry,
2046                                                (struct lu_fid_pack *)rec);
2047                                 result = 0;
2048                         } else
2049                                 result = -ENOENT;
2050                  } else {
2051                         /* What? Disconnected alias? Ppheeeww... */
2052                         CERROR("Aliasing where not expected\n");
2053                         result = -EIO;
2054                         dput(d);
2055                 }
2056                 dput(dentry);
2057         } else
2058                 result = -ENOMEM;
2059         dput(parent);
2060         LASSERT(osd_invariant(obj));
2061         return result;
2062 }
2063
2064 static int osd_add_rec(struct osd_thread_info *info, struct osd_device *dev,
2065                        struct inode *dir, struct inode *inode, const char *name)
2066 {
2067         struct dentry *old;
2068         struct dentry *new;
2069         struct dentry *parent;
2070
2071         int result;
2072
2073         info->oti_str.name = name;
2074         info->oti_str.len  = strlen(name);
2075
2076         LASSERT(atomic_read(&dir->i_count) > 0);
2077         result = -ENOMEM;
2078         old = d_alloc(dev->od_obj_area, &info->oti_str);
2079         if (old != NULL) {
2080                 d_instantiate(old, inode);
2081                 igrab(inode);
2082                 LASSERT(atomic_read(&dir->i_count) > 0);
2083                 parent = d_alloc_root(dir);
2084                 if (parent != NULL) {
2085                         igrab(dir);
2086                         LASSERT(atomic_read(&dir->i_count) > 1);
2087                         new = d_alloc(parent, &info->oti_str);
2088                         LASSERT(atomic_read(&dir->i_count) > 1);
2089                         if (new != NULL) {
2090                                 LASSERT(atomic_read(&dir->i_count) > 1);
2091                                 result = dir->i_op->link(old, dir, new);
2092                                 LASSERT(atomic_read(&dir->i_count) > 1);
2093                                 dput(new);
2094                                 LASSERT(atomic_read(&dir->i_count) > 1);
2095                         }
2096                         LASSERT(atomic_read(&dir->i_count) > 1);
2097                         dput(parent);
2098                         LASSERT(atomic_read(&dir->i_count) > 0);
2099                 }
2100                 dput(old);
2101         }
2102         LASSERT(atomic_read(&dir->i_count) > 0);
2103         return result;
2104 }
2105
2106
2107 /*
2108  * XXX Temporary stuff.
2109  */
2110 static int osd_index_compat_insert(const struct lu_env *env,
2111                                    struct dt_object *dt,
2112                                    const struct dt_rec *rec,
2113                                    const struct dt_key *key, struct thandle *th,
2114                                    struct lustre_capa *capa)
2115 {
2116         struct osd_object     *obj = osd_dt_obj(dt);
2117
2118         const char          *name = (const char *)key;
2119
2120         struct lu_device    *ludev = dt->do_lu.lo_dev;
2121         struct lu_object    *luch;
2122
2123         struct osd_thread_info   *info = osd_oti_get(env);
2124         const struct lu_fid_pack *pack  = (const struct lu_fid_pack *)rec;
2125         struct lu_fid            *fid   = &osd_oti_get(env)->oti_fid;
2126
2127         int result;
2128
2129         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
2130         LASSERT(osd_invariant(obj));
2131         LASSERT(th != NULL);
2132
2133         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
2134                 return -EACCES;
2135
2136         result = fid_unpack(pack, fid);
2137         if (result != 0)
2138                 return result;
2139
2140         luch = lu_object_find(env, ludev->ld_site, fid);
2141         if (!IS_ERR(luch)) {
2142                 if (lu_object_exists(luch)) {
2143                         struct osd_object *child;
2144
2145                         child = osd_obj(lu_object_locate(luch->lo_header,
2146                                                          ludev->ld_type));
2147                         if (child != NULL)
2148                                 result = osd_add_rec(info, osd_obj2dev(obj),
2149                                                      obj->oo_inode,
2150                                                      child->oo_inode, name);
2151                         else {
2152                                 CERROR("No osd slice.\n");
2153                                 result = -ENOENT;
2154                         }
2155                         LASSERT(osd_invariant(obj));
2156                         LASSERT(osd_invariant(child));
2157                 } else {
2158                         CERROR("Sorry.\n");
2159                         result = -ENOENT;
2160                 }
2161                 lu_object_put(env, luch);
2162         } else
2163                 result = PTR_ERR(luch);
2164         LASSERT(osd_invariant(obj));
2165         return result;
2166 }
2167
2168 static struct dt_index_operations osd_index_compat_ops = {
2169         .dio_lookup = osd_index_compat_lookup,
2170         .dio_insert = osd_index_compat_insert,
2171         .dio_delete = osd_index_compat_delete
2172 };
2173
2174 /* type constructor/destructor: osd_type_init, osd_type_fini */
2175 LU_TYPE_INIT_FINI(osd, &osd_key);
2176
2177 static struct lu_context_key osd_key = {
2178         .lct_tags = LCT_DT_THREAD | LCT_MD_THREAD,
2179         .lct_init = osd_key_init,
2180         .lct_fini = osd_key_fini,
2181         .lct_exit = osd_key_exit
2182 };
2183
2184 static void *osd_key_init(const struct lu_context *ctx,
2185                           struct lu_context_key *key)
2186 {
2187         struct osd_thread_info *info;
2188
2189         OBD_ALLOC_PTR(info);
2190         if (info != NULL)
2191                 info->oti_env = container_of(ctx, struct lu_env, le_ctx);
2192         else
2193                 info = ERR_PTR(-ENOMEM);
2194         return info;
2195 }
2196
2197 /* context key destructor: osd_key_fini */
2198 LU_KEY_FINI(osd, struct osd_thread_info);
2199
2200 static void osd_key_exit(const struct lu_context *ctx,
2201                          struct lu_context_key *key, void *data)
2202 {
2203         struct osd_thread_info *info = data;
2204
2205         LASSERT(info->oti_r_locks == 0);
2206         LASSERT(info->oti_w_locks == 0);
2207         LASSERT(info->oti_txns    == 0);
2208 }
2209
2210 static int osd_device_init(const struct lu_env *env, struct lu_device *d,
2211                            const char *name, struct lu_device *next)
2212 {
2213         int rc;
2214         /* context for commit hooks */
2215         rc = lu_context_init(&osd_dev(d)->od_env_for_commit.le_ctx,
2216                              LCT_MD_THREAD);
2217         if (rc == 0)
2218                 rc = osd_procfs_init(osd_dev(d), name);
2219         return rc;
2220 }
2221
2222 static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
2223 {
2224         struct osd_thread_info *info = osd_oti_get(env);
2225         ENTRY;
2226         if (o->od_obj_area != NULL) {
2227                 dput(o->od_obj_area);
2228                 o->od_obj_area = NULL;
2229         }
2230         osd_oi_fini(info, &o->od_oi);
2231
2232         RETURN(0);
2233 }
2234
2235 static int osd_mount(const struct lu_env *env,
2236                      struct osd_device *o, struct lustre_cfg *cfg)
2237 {
2238         struct lustre_mount_info *lmi;
2239         const char               *dev  = lustre_cfg_string(cfg, 0);
2240         struct osd_thread_info   *info = osd_oti_get(env);
2241         int result;
2242
2243         ENTRY;
2244
2245         if (o->od_mount != NULL) {
2246                 CERROR("Already mounted (%s)\n", dev);
2247                 RETURN(-EEXIST);
2248         }
2249
2250         /* get mount */
2251         lmi = server_get_mount(dev);
2252         if (lmi == NULL) {
2253                 CERROR("Cannot get mount info for %s!\n", dev);
2254                 RETURN(-EFAULT);
2255         }
2256
2257         LASSERT(lmi != NULL);
2258         /* save lustre_mount_info in dt_device */
2259         o->od_mount = lmi;
2260
2261         result = osd_oi_init(info, &o->od_oi, &o->od_dt_dev);
2262         if (result == 0) {
2263                 struct dentry *d;
2264
2265                 d = simple_mkdir(osd_sb(o)->s_root, lmi->lmi_mnt, "*OBJ-TEMP*",
2266                                  0777, 1);
2267                 if (!IS_ERR(d)) {
2268                         o->od_obj_area = d;
2269                 } else
2270                         result = PTR_ERR(d);
2271         }
2272         if (result != 0)
2273                 osd_shutdown(env, o);
2274         RETURN(result);
2275 }
2276
2277 static struct lu_device *osd_device_fini(const struct lu_env *env,
2278                                          struct lu_device *d)
2279 {
2280         int rc;
2281         ENTRY;
2282
2283         shrink_dcache_sb(osd_sb(osd_dev(d)));
2284         osd_sync(env, lu2dt_dev(d));
2285
2286         rc = osd_procfs_fini(osd_dev(d));
2287         if (rc) {
2288                 CERROR("proc fini error %d \n", rc);
2289                 RETURN (ERR_PTR(rc));
2290         }
2291
2292         if (osd_dev(d)->od_mount)
2293                 server_put_mount(osd_dev(d)->od_mount->lmi_name,
2294                                  osd_dev(d)->od_mount->lmi_mnt);
2295         osd_dev(d)->od_mount = NULL;
2296
2297         lu_context_fini(&osd_dev(d)->od_env_for_commit.le_ctx);
2298         RETURN(NULL);
2299 }
2300
2301 static struct lu_device *osd_device_alloc(const struct lu_env *env,
2302                                           struct lu_device_type *t,
2303                                           struct lustre_cfg *cfg)
2304 {
2305         struct lu_device  *l;
2306         struct osd_device *o;
2307
2308         OBD_ALLOC_PTR(o);
2309         if (o != NULL) {
2310                 int result;
2311
2312                 result = dt_device_init(&o->od_dt_dev, t);
2313                 if (result == 0) {
2314                         l = osd2lu_dev(o);
2315                         l->ld_ops = &osd_lu_ops;
2316                         o->od_dt_dev.dd_ops = &osd_dt_ops;
2317                         spin_lock_init(&o->od_osfs_lock);
2318                         o->od_osfs_age = cfs_time_shift_64(-1000);
2319                         o->od_capa_hash = init_capa_hash();
2320                         if (o->od_capa_hash == NULL) {
2321                                 dt_device_fini(&o->od_dt_dev);
2322                                 l = ERR_PTR(-ENOMEM);
2323                         }
2324                 } else
2325                         l = ERR_PTR(result);
2326
2327                 if (IS_ERR(l))
2328                         OBD_FREE_PTR(o);
2329         } else
2330                 l = ERR_PTR(-ENOMEM);
2331         return l;
2332 }
2333
2334 static struct lu_device *osd_device_free(const struct lu_env *env,
2335                                          struct lu_device *d)
2336 {
2337         struct osd_device *o = osd_dev(d);
2338         ENTRY;
2339
2340         cleanup_capa_hash(o->od_capa_hash);
2341         dt_device_fini(&o->od_dt_dev);
2342         OBD_FREE_PTR(o);
2343         RETURN(NULL);
2344 }
2345
2346 static int osd_process_config(const struct lu_env *env,
2347                               struct lu_device *d, struct lustre_cfg *cfg)
2348 {
2349         struct osd_device *o = osd_dev(d);
2350         int err;
2351         ENTRY;
2352
2353         switch(cfg->lcfg_command) {
2354         case LCFG_SETUP:
2355                 err = osd_mount(env, o, cfg);
2356                 break;
2357         case LCFG_CLEANUP:
2358                 err = osd_shutdown(env, o);
2359                 break;
2360         default:
2361                 err = -ENOTTY;
2362         }
2363
2364         RETURN(err);
2365 }
2366 extern void ldiskfs_orphan_cleanup (struct super_block * sb,
2367                                     struct ldiskfs_super_block * es);
2368
2369 static int osd_recovery_complete(const struct lu_env *env,
2370                                  struct lu_device *d)
2371 {
2372         struct osd_device *o = osd_dev(d);
2373         ENTRY;
2374         /* TODO: orphans handling */
2375         ldiskfs_orphan_cleanup(osd_sb(o), LDISKFS_SB(osd_sb(o))->s_es);
2376         RETURN(0);
2377 }
2378
2379 static struct inode *osd_iget(struct osd_thread_info *info,
2380                               struct osd_device *dev,
2381                               const struct osd_inode_id *id)
2382 {
2383         struct inode *inode;
2384
2385         inode = iget(osd_sb(dev), id->oii_ino);
2386         if (inode == NULL) {
2387                 CERROR("no inode\n");
2388                 inode = ERR_PTR(-EACCES);
2389         } else if (is_bad_inode(inode)) {
2390                 CERROR("bad inode\n");
2391                 iput(inode);
2392                 inode = ERR_PTR(-ENOENT);
2393         } else if (inode->i_generation != id->oii_gen) {
2394                 CERROR("stale inode\n");
2395                 iput(inode);
2396                 inode = ERR_PTR(-ESTALE);
2397         }
2398
2399         return inode;
2400
2401 }
2402
2403 static int osd_fid_lookup(const struct lu_env *env,
2404                           struct osd_object *obj, const struct lu_fid *fid)
2405 {
2406         struct osd_thread_info *info;
2407         struct lu_device       *ldev = obj->oo_dt.do_lu.lo_dev;
2408         struct osd_device      *dev;
2409         struct osd_inode_id    *id;
2410         struct osd_oi          *oi;
2411         struct inode           *inode;
2412         int                     result;
2413
2414         LASSERT(osd_invariant(obj));
2415         LASSERT(obj->oo_inode == NULL);
2416         LASSERT(fid_is_sane(fid));
2417         /*
2418          * This assertion checks that osd layer sees only local
2419          * fids. Unfortunately it is somewhat expensive (does a
2420          * cache-lookup). Disabling it for production/acceptance-testing.
2421          */
2422         LASSERT(1 || fid_is_local(ldev->ld_site, fid));
2423
2424         ENTRY;
2425
2426         info = osd_oti_get(env);
2427         dev  = osd_dev(ldev);
2428         id   = &info->oti_id;
2429         oi   = &dev->od_oi;
2430
2431         if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT))
2432                 RETURN(-ENOENT);
2433
2434         result = osd_oi_lookup(info, oi, fid, id);
2435         if (result == 0) {
2436                 inode = osd_iget(info, dev, id);
2437                 if (!IS_ERR(inode)) {
2438                         obj->oo_inode = inode;
2439                         LASSERT(obj->oo_inode->i_sb == osd_sb(dev));
2440                         result = 0;
2441                 } else
2442                         /*
2443                          * If fid wasn't found in oi, inode-less object is
2444                          * created, for which lu_object_exists() returns
2445                          * false. This is used in a (frequent) case when
2446                          * objects are created as locking anchors or
2447                          * place holders for objects yet to be created.
2448                          */
2449                         result = PTR_ERR(inode);
2450         } else if (result == -ENOENT)
2451                 result = 0;
2452         LASSERT(osd_invariant(obj));
2453         RETURN(result);
2454 }
2455
2456 static void osd_inode_getattr(const struct lu_env *env,
2457                               struct inode *inode, struct lu_attr *attr)
2458 {
2459         attr->la_valid      |= LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
2460                                LA_SIZE | LA_BLOCKS | LA_UID | LA_GID |
2461                                LA_FLAGS | LA_NLINK | LA_RDEV | LA_BLKSIZE;
2462
2463         attr->la_atime      = LTIME_S(inode->i_atime);
2464         attr->la_mtime      = LTIME_S(inode->i_mtime);
2465         attr->la_ctime      = LTIME_S(inode->i_ctime);
2466         attr->la_mode       = inode->i_mode;
2467         attr->la_size       = i_size_read(inode);
2468         attr->la_blocks     = inode->i_blocks;
2469         attr->la_uid        = inode->i_uid;
2470         attr->la_gid        = inode->i_gid;
2471         attr->la_flags      = LDISKFS_I(inode)->i_flags;
2472         attr->la_nlink      = inode->i_nlink;
2473         attr->la_rdev       = inode->i_rdev;
2474         attr->la_blksize    = ll_inode_blksize(inode);
2475         attr->la_blkbits    = inode->i_blkbits;
2476 }
2477
2478 /*
2479  * Helpers.
2480  */
2481
2482 static int lu_device_is_osd(const struct lu_device *d)
2483 {
2484         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &osd_lu_ops);
2485 }
2486
2487 static struct osd_object *osd_obj(const struct lu_object *o)
2488 {
2489         LASSERT(lu_device_is_osd(o->lo_dev));
2490         return container_of0(o, struct osd_object, oo_dt.do_lu);
2491 }
2492
2493 static struct osd_device *osd_dt_dev(const struct dt_device *d)
2494 {
2495         LASSERT(lu_device_is_osd(&d->dd_lu_dev));
2496         return container_of0(d, struct osd_device, od_dt_dev);
2497 }
2498
2499 static struct osd_device *osd_dev(const struct lu_device *d)
2500 {
2501         LASSERT(lu_device_is_osd(d));
2502         return osd_dt_dev(container_of0(d, struct dt_device, dd_lu_dev));
2503 }
2504
2505 static struct osd_object *osd_dt_obj(const struct dt_object *d)
2506 {
2507         return osd_obj(&d->do_lu);
2508 }
2509
2510 static struct osd_device *osd_obj2dev(const struct osd_object *o)
2511 {
2512         return osd_dev(o->oo_dt.do_lu.lo_dev);
2513 }
2514
2515 static struct lu_device *osd2lu_dev(struct osd_device *osd)
2516 {
2517         return &osd->od_dt_dev.dd_lu_dev;
2518 }
2519
2520 static struct super_block *osd_sb(const struct osd_device *dev)
2521 {
2522         return dev->od_mount->lmi_mnt->mnt_sb;
2523 }
2524
2525 static journal_t *osd_journal(const struct osd_device *dev)
2526 {
2527         return LDISKFS_SB(osd_sb(dev))->s_journal;
2528 }
2529
2530 static int osd_has_index(const struct osd_object *obj)
2531 {
2532         return obj->oo_dt.do_index_ops != NULL;
2533 }
2534
2535 static int osd_object_invariant(const struct lu_object *l)
2536 {
2537         return osd_invariant(osd_obj(l));
2538 }
2539
2540 static struct lu_object_operations osd_lu_obj_ops = {
2541         .loo_object_init      = osd_object_init,
2542         .loo_object_delete    = osd_object_delete,
2543         .loo_object_release   = osd_object_release,
2544         .loo_object_free      = osd_object_free,
2545         .loo_object_print     = osd_object_print,
2546         .loo_object_invariant = osd_object_invariant
2547 };
2548
2549 static struct lu_device_operations osd_lu_ops = {
2550         .ldo_object_alloc      = osd_object_alloc,
2551         .ldo_process_config    = osd_process_config,
2552         .ldo_recovery_complete = osd_recovery_complete
2553 };
2554
2555 static struct lu_device_type_operations osd_device_type_ops = {
2556         .ldto_init = osd_type_init,
2557         .ldto_fini = osd_type_fini,
2558
2559         .ldto_device_alloc = osd_device_alloc,
2560         .ldto_device_free  = osd_device_free,
2561
2562         .ldto_device_init    = osd_device_init,
2563         .ldto_device_fini    = osd_device_fini
2564 };
2565
2566 static struct lu_device_type osd_device_type = {
2567         .ldt_tags     = LU_DEVICE_DT,
2568         .ldt_name     = LUSTRE_OSD_NAME,
2569         .ldt_ops      = &osd_device_type_ops,
2570         .ldt_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
2571 };
2572
2573 /*
2574  * lprocfs legacy support.
2575  */
2576 static struct obd_ops osd_obd_device_ops = {
2577         .o_owner = THIS_MODULE
2578 };
2579
2580 static int __init osd_mod_init(void)
2581 {
2582         struct lprocfs_static_vars lvars;
2583
2584         lprocfs_osd_init_vars(&lvars);
2585         return class_register_type(&osd_obd_device_ops, NULL, lvars.module_vars,
2586                                    LUSTRE_OSD_NAME, &osd_device_type);
2587 }
2588
2589 static void __exit osd_mod_exit(void)
2590 {
2591         class_unregister_type(LUSTRE_OSD_NAME);
2592 }
2593
2594 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2595 MODULE_DESCRIPTION("Lustre Object Storage Device ("LUSTRE_OSD_NAME")");
2596 MODULE_LICENSE("GPL");
2597
2598 cfs_module(osd, "0.0.2", osd_mod_init, osd_mod_exit);