Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / osd / osd_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/osd/osd_handler.c
5  *  Top-level entry points into osd module
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Nikita Danilov <nikita@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include <linux/module.h>
35
36 /* LUSTRE_VERSION_CODE */
37 #include <lustre_ver.h>
38 /* prerequisite for linux/xattr.h */
39 #include <linux/types.h>
40 /* prerequisite for linux/xattr.h */
41 #include <linux/fs.h>
42 /* XATTR_{REPLACE,CREATE} */
43 #include <linux/xattr.h>
44 /*
45  * XXX temporary stuff: direct access to ldiskfs/jdb. Interface between osd
46  * and file system is not yet specified.
47  */
48 /* handle_t, journal_start(), journal_stop() */
49 #include <linux/jbd.h>
50 /* LDISKFS_SB() */
51 #include <linux/ldiskfs_fs.h>
52 #include <linux/ldiskfs_jbd.h>
53 /* simple_mkdir() */
54 #include <lvfs.h>
55
56 /*
57  * struct OBD_{ALLOC,FREE}*()
58  * OBD_FAIL_CHECK
59  */
60 #include <obd_support.h>
61 /* struct ptlrpc_thread */
62 #include <lustre_net.h>
63
64 /* fid_is_local() */
65 #include <lustre_fid.h>
66 #include <linux/lustre_iam.h>
67
68 #include "osd_internal.h"
69 #include "osd_igif.h"
70
71 struct osd_directory {
72         struct iam_container od_container;
73         struct iam_descr     od_descr;
74         struct semaphore     od_sem;
75 };
76
77 struct osd_object {
78         struct dt_object       oo_dt;
79         /*
80          * Inode for file system object represented by this osd_object. This
81          * inode is pinned for the whole duration of lu_object life.
82          *
83          * Not modified concurrently (either setup early during object
84          * creation, or assigned by osd_object_create() under write lock).
85          */
86         struct inode          *oo_inode;
87         struct rw_semaphore    oo_sem;
88         struct osd_directory  *oo_dir;
89         /* protects inode attributes. */
90         spinlock_t             oo_guard;
91 #if OSD_COUNTERS
92         const struct lu_env   *oo_owner;
93 #endif
94 };
95
96 static int   osd_root_get      (const struct lu_env *env,
97                                 struct dt_device *dev, struct lu_fid *f);
98
99 static int   lu_device_is_osd  (const struct lu_device *d);
100 static void  osd_mod_exit      (void) __exit;
101 static int   osd_mod_init      (void) __init;
102 static int   osd_type_init     (struct lu_device_type *t);
103 static void  osd_type_fini     (struct lu_device_type *t);
104 static int   osd_object_init   (const struct lu_env *env,
105                                 struct lu_object *l);
106 static void  osd_object_release(const struct lu_env *env,
107                                 struct lu_object *l);
108 static int   osd_object_print  (const struct lu_env *env, void *cookie,
109                                 lu_printer_t p, const struct lu_object *o);
110 static struct lu_device *osd_device_free   (const struct lu_env *env,
111                                 struct lu_device *m);
112 static void *osd_key_init      (const struct lu_context *ctx,
113                                 struct lu_context_key *key);
114 static void  osd_key_fini      (const struct lu_context *ctx,
115                                 struct lu_context_key *key, void *data);
116 static void  osd_key_exit      (const struct lu_context *ctx,
117                                 struct lu_context_key *key, void *data);
118 static int   osd_has_index     (const struct osd_object *obj);
119 static void  osd_object_init0  (struct osd_object *obj);
120 static int   osd_device_init   (const struct lu_env *env,
121                                 struct lu_device *d, const char *,
122                                 struct lu_device *);
123 static int   osd_fid_lookup    (const struct lu_env *env,
124                                 struct osd_object *obj,
125                                 const struct lu_fid *fid);
126 static void  osd_inode_getattr (const struct lu_env *env,
127                                 struct inode *inode, struct lu_attr *attr);
128 static void  osd_inode_setattr (const struct lu_env *env,
129                                 struct inode *inode, const struct lu_attr *attr);
130 static int   osd_param_is_sane (const struct osd_device *dev,
131                                 const struct txn_param *param);
132 static int   osd_index_lookup  (const struct lu_env *env,
133                                 struct dt_object *dt,
134                                 struct dt_rec *rec, const struct dt_key *key,
135                                 struct lustre_capa *capa);
136 static int   osd_index_insert  (const struct lu_env *env,
137                                 struct dt_object *dt,
138                                 const struct dt_rec *rec,
139                                 const struct dt_key *key,
140                                 struct thandle *handle,
141                                 struct lustre_capa *capa);
142 static int   osd_index_delete  (const struct lu_env *env,
143                                 struct dt_object *dt, const struct dt_key *key,
144                                 struct thandle *handle,
145                                 struct lustre_capa *capa);
146 static int   osd_index_probe   (const struct lu_env *env,
147                                 struct osd_object *o,
148                                 const struct dt_index_features *feat);
149 static int   osd_index_try     (const struct lu_env *env,
150                                 struct dt_object *dt,
151                                 const struct dt_index_features *feat);
152 static void  osd_index_fini    (struct osd_object *o);
153
154 static void  osd_it_fini       (const struct lu_env *env, struct dt_it *di);
155 static int   osd_it_get        (const struct lu_env *env,
156                                 struct dt_it *di, const struct dt_key *key);
157 static void  osd_it_put        (const struct lu_env *env, struct dt_it *di);
158 static int   osd_it_next       (const struct lu_env *env, struct dt_it *di);
159 static int   osd_it_del        (const struct lu_env *env, struct dt_it *di,
160                                 struct thandle *th);
161 static int   osd_it_key_size   (const struct lu_env *env,
162                                 const struct dt_it *di);
163 static void  osd_conf_get      (const struct lu_env *env,
164                                 const struct dt_device *dev,
165                                 struct dt_device_param *param);
166 static void  osd_trans_stop    (const struct lu_env *env,
167                                 struct thandle *th);
168 static int   osd_object_is_root(const struct osd_object *obj);
169
170 static struct osd_object  *osd_obj          (const struct lu_object *o);
171 static struct osd_device  *osd_dev          (const struct lu_device *d);
172 static struct osd_device  *osd_dt_dev       (const struct dt_device *d);
173 static struct osd_object  *osd_dt_obj       (const struct dt_object *d);
174 static struct osd_device  *osd_obj2dev      (const struct osd_object *o);
175 static struct lu_device   *osd2lu_dev       (struct osd_device *osd);
176 static struct lu_device   *osd_device_fini  (const struct lu_env *env,
177                                              struct lu_device *d);
178 static struct lu_device   *osd_device_alloc (const struct lu_env *env,
179                                              struct lu_device_type *t,
180                                              struct lustre_cfg *cfg);
181 static struct lu_object   *osd_object_alloc (const struct lu_env *env,
182                                              const struct lu_object_header *hdr,
183                                              struct lu_device *d);
184 static struct inode       *osd_iget         (struct osd_thread_info *info,
185                                              struct osd_device *dev,
186                                              const struct osd_inode_id *id);
187 static struct super_block *osd_sb           (const struct osd_device *dev);
188 static struct dt_it       *osd_it_init      (const struct lu_env *env,
189                                              struct dt_object *dt, int wable,
190                                              struct lustre_capa *capa);
191 static struct dt_key      *osd_it_key       (const struct lu_env *env,
192                                              const struct dt_it *di);
193 static struct dt_rec      *osd_it_rec       (const struct lu_env *env,
194                                              const struct dt_it *di);
195 static struct timespec    *osd_inode_time   (const struct lu_env *env,
196                                              struct inode *inode,
197                                              __u64 seconds);
198 static struct thandle     *osd_trans_start  (const struct lu_env *env,
199                                              struct dt_device *d,
200                                              struct txn_param *p);
201 static journal_t          *osd_journal      (const struct osd_device *dev);
202
203 static struct lu_device_type_operations osd_device_type_ops;
204 static struct lu_device_type            osd_device_type;
205 static struct lu_object_operations      osd_lu_obj_ops;
206 static struct obd_ops                   osd_obd_device_ops;
207 static struct lu_device_operations      osd_lu_ops;
208 static struct lu_context_key            osd_key;
209 static struct dt_object_operations      osd_obj_ops;
210 static struct dt_body_operations        osd_body_ops;
211 static struct dt_index_operations       osd_index_ops;
212 static struct dt_index_operations       osd_index_compat_ops;
213
214 struct osd_thandle {
215         struct thandle          ot_super;
216         handle_t               *ot_handle;
217         struct journal_callback ot_jcb;
218 };
219
220 /*
221  * Invariants, assertions.
222  */
223
224 /*
225  * XXX: do not enable this, until invariant checking code is made thread safe
226  * in the face of pdirops locking.
227  */
228 #define OSD_INVARIANT_CHECKS (0)
229
230 #if OSD_INVARIANT_CHECKS
231 static int osd_invariant(const struct osd_object *obj)
232 {
233         return
234                 obj != NULL &&
235                 ergo(obj->oo_inode != NULL,
236                      obj->oo_inode->i_sb == osd_sb(osd_obj2dev(obj)) &&
237                      atomic_read(&obj->oo_inode->i_count) > 0) &&
238                 ergo(obj->oo_dir != NULL &&
239                      obj->oo_dir->od_conationer.ic_object != NULL,
240                      obj->oo_dir->od_conationer.ic_object == obj->oo_inode);
241 }
242 #else
243 #define osd_invariant(obj) (1)
244 #endif
245
246 static inline struct osd_thread_info *osd_oti_get(const struct lu_env *env)
247 {
248         return lu_context_key_get(&env->le_ctx, &osd_key);
249 }
250
251 #if OSD_COUNTERS
252 /*
253  * Concurrency: doesn't matter
254  */
255 static int osd_read_locked(const struct lu_env *env, struct osd_object *o)
256 {
257         return osd_oti_get(env)->oti_r_locks > 0;
258 }
259
260 /*
261  * Concurrency: doesn't matter
262  */
263 static int osd_write_locked(const struct lu_env *env, struct osd_object *o)
264 {
265         struct osd_thread_info *oti = osd_oti_get(env);
266         return oti->oti_w_locks > 0 && o->oo_owner == env;
267 }
268
269 #define OSD_COUNTERS_DO(exp) exp
270 #else
271
272
273 #define osd_read_locked(env, o) (1)
274 #define osd_write_locked(env, o) (1)
275 #define OSD_COUNTERS_DO(exp) ((void)0)
276 #endif
277
278 /*
279  * Concurrency: doesn't access mutable data
280  */
281 static int osd_root_get(const struct lu_env *env,
282                         struct dt_device *dev, struct lu_fid *f)
283 {
284         struct inode *inode;
285
286         inode = osd_sb(osd_dt_dev(dev))->s_root->d_inode;
287         lu_igif_build(f, inode->i_ino, inode->i_generation);
288         return 0;
289 }
290
291 /*
292  * OSD object methods.
293  */
294
295 /*
296  * Concurrency: no concurrent access is possible that early in object
297  * life-cycle.
298  */
299 static struct lu_object *osd_object_alloc(const struct lu_env *env,
300                                           const struct lu_object_header *hdr,
301                                           struct lu_device *d)
302 {
303         struct osd_object *mo;
304
305         OBD_ALLOC_PTR(mo);
306         if (mo != NULL) {
307                 struct lu_object *l;
308
309                 l = &mo->oo_dt.do_lu;
310                 dt_object_init(&mo->oo_dt, NULL, d);
311                 mo->oo_dt.do_ops = &osd_obj_ops;
312                 l->lo_ops = &osd_lu_obj_ops;
313                 init_rwsem(&mo->oo_sem);
314                 spin_lock_init(&mo->oo_guard);
315                 return l;
316         } else
317                 return NULL;
318 }
319
320 /*
321  * Concurrency: shouldn't matter.
322  */
323 static void osd_object_init0(struct osd_object *obj)
324 {
325         LASSERT(obj->oo_inode != NULL);
326         obj->oo_dt.do_body_ops = &osd_body_ops;
327         obj->oo_dt.do_lu.lo_header->loh_attr |=
328                 (LOHA_EXISTS | (obj->oo_inode->i_mode & S_IFMT));
329 }
330
331 /*
332  * Concurrency: no concurrent access is possible that early in object
333  * life-cycle.
334  */
335 static int osd_object_init(const struct lu_env *env, struct lu_object *l)
336 {
337         struct osd_object *obj = osd_obj(l);
338         int result;
339
340         LASSERT(osd_invariant(obj));
341
342         result = osd_fid_lookup(env, obj, lu_object_fid(l));
343         if (result == 0) {
344                 if (obj->oo_inode != NULL)
345                         osd_object_init0(obj);
346         }
347         LASSERT(osd_invariant(obj));
348         return result;
349 }
350
351 /*
352  * Concurrency: no concurrent access is possible that late in object
353  * life-cycle.
354  */
355 static void osd_object_free(const struct lu_env *env, struct lu_object *l)
356 {
357         struct osd_object *obj = osd_obj(l);
358
359         LASSERT(osd_invariant(obj));
360
361         dt_object_fini(&obj->oo_dt);
362         OBD_FREE_PTR(obj);
363 }
364
365 static struct iam_path_descr *osd_ipd_get(const struct lu_env *env,
366                                           const struct iam_container *bag)
367 {
368         return bag->ic_descr->id_ops->id_ipd_alloc(bag,
369                                                    osd_oti_get(env)->oti_ipd);
370 }
371
372 static void osd_ipd_put(const struct lu_env *env,
373                         const struct iam_container *bag,
374                         struct iam_path_descr *ipd)
375 {
376         bag->ic_descr->id_ops->id_ipd_free(ipd);
377 }
378
379 /*
380  * Concurrency: no concurrent access is possible that late in object
381  * life-cycle.
382  */
383 static void osd_index_fini(struct osd_object *o)
384 {
385         struct iam_container *bag;
386
387         if (o->oo_dir != NULL) {
388                 bag = &o->oo_dir->od_container;
389                 if (o->oo_inode != NULL) {
390                         if (bag->ic_object == o->oo_inode)
391                                 iam_container_fini(bag);
392                 }
393                 OBD_FREE_PTR(o->oo_dir);
394                 o->oo_dir = NULL;
395         }
396 }
397
398 /*
399  * Concurrency: no concurrent access is possible that late in object
400  * life-cycle (for all existing callers, that is. New callers have to provide
401  * their own locking.)
402  */
403 static int osd_inode_unlinked(const struct inode *inode)
404 {
405         return inode->i_nlink == 0;
406 }
407
408 enum {
409         OSD_TXN_OI_DELETE_CREDITS    = 20,
410         OSD_TXN_INODE_DELETE_CREDITS = 20
411 };
412
413 /*
414  * Concurrency: no concurrent access is possible that late in object
415  * life-cycle.
416  */
417 static int osd_inode_remove(const struct lu_env *env, struct osd_object *obj)
418 {
419         const struct lu_fid    *fid = lu_object_fid(&obj->oo_dt.do_lu);
420         struct osd_device      *osd = osd_obj2dev(obj);
421         struct osd_thread_info *oti = osd_oti_get(env);
422         struct txn_param       *prm = &oti->oti_txn;
423         struct thandle         *th;
424         int result;
425
426         txn_param_init(prm, OSD_TXN_OI_DELETE_CREDITS + 
427                             OSD_TXN_INODE_DELETE_CREDITS);
428         th = osd_trans_start(env, &osd->od_dt_dev, prm);
429         if (!IS_ERR(th)) {
430                 result = osd_oi_delete(oti, &osd->od_oi, fid, th);
431                 osd_trans_stop(env, th);
432         } else
433                 result = PTR_ERR(th);
434         return result;
435 }
436
437 /*
438  * Called just before object is freed. Releases all resources except for
439  * object itself (that is released by osd_object_free()).
440  *
441  * Concurrency: no concurrent access is possible that late in object
442  * life-cycle.
443  */
444 static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
445 {
446         struct osd_object *obj   = osd_obj(l);
447         struct inode      *inode = obj->oo_inode;
448
449         LASSERT(osd_invariant(obj));
450
451         /*
452          * If object is unlinked remove fid->ino mapping from object index.
453          *
454          * File body will be deleted by iput().
455          */
456
457         osd_index_fini(obj);
458         if (inode != NULL) {
459                 int result;
460
461                 if (osd_inode_unlinked(inode)) {
462                         result = osd_inode_remove(env, obj);
463                         if (result != 0)
464                                 LU_OBJECT_DEBUG(D_ERROR, env, l,
465                                                 "Failed to cleanup: %d\n",
466                                                 result);
467                 }
468                 iput(inode);
469                 obj->oo_inode = NULL;
470         }
471 }
472
473 /*
474  * Concurrency: ->loo_object_release() is called under site spin-lock.
475  */
476 static void osd_object_release(const struct lu_env *env,
477                                struct lu_object *l)
478 {
479         struct osd_object *o = osd_obj(l);
480
481         LASSERT(!lu_object_is_dying(l->lo_header));
482         if (o->oo_inode != NULL && osd_inode_unlinked(o->oo_inode))
483                 set_bit(LU_OBJECT_HEARD_BANSHEE, &l->lo_header->loh_flags);
484 }
485
486 /*
487  * Concurrency: shouldn't matter.
488  */
489 static int osd_object_print(const struct lu_env *env, void *cookie,
490                             lu_printer_t p, const struct lu_object *l)
491 {
492         struct osd_object *o = osd_obj(l);
493         struct iam_descr  *d;
494
495         if (o->oo_dir != NULL)
496                 d = o->oo_dir->od_container.ic_descr;
497         else
498                 d = NULL;
499         return (*p)(env, cookie, LUSTRE_OSD_NAME"-object@%p(i:%p:%lu/%u)[%s]",
500                     o, o->oo_inode,
501                     o->oo_inode ? o->oo_inode->i_ino : 0UL,
502                     o->oo_inode ? o->oo_inode->i_generation : 0,
503                     d ? d->id_ops->id_name : "plain");
504 }
505
506 /*
507  * Concurrency: shouldn't matter.
508  */
509 int osd_statfs(const struct lu_env *env, struct dt_device *d,
510                struct kstatfs *sfs)
511 {
512         struct osd_device *osd = osd_dt_dev(d);
513         struct super_block *sb = osd_sb(osd);
514         int result = 0;
515
516         spin_lock(&osd->od_osfs_lock);
517         /* cache 1 second */
518         if (cfs_time_before_64(osd->od_osfs_age, cfs_time_shift_64(-1))) {
519                 result = ll_do_statfs(sb, &osd->od_kstatfs);
520                 if (likely(result == 0)) /* N.B. statfs can't really fail */
521                         osd->od_osfs_age = cfs_time_current_64();
522         }
523
524         if (likely(result == 0))
525                 *sfs = osd->od_kstatfs; 
526         spin_unlock(&osd->od_osfs_lock);
527
528         return result;
529 }
530
531 /*
532  * Concurrency: doesn't access mutable data.
533  */
534 static void osd_conf_get(const struct lu_env *env,
535                          const struct dt_device *dev,
536                          struct dt_device_param *param)
537 {
538         /*
539          * XXX should be taken from not-yet-existing fs abstraction layer.
540          */
541         param->ddp_max_name_len  = LDISKFS_NAME_LEN;
542         param->ddp_max_nlink     = LDISKFS_LINK_MAX;
543         param->ddp_block_shift   = osd_sb(osd_dt_dev(dev))->s_blocksize_bits;
544 }
545
546 /*
547  * Journal
548  */
549
550 /*
551  * Concurrency: doesn't access mutable data.
552  */
553 static int osd_param_is_sane(const struct osd_device *dev,
554                              const struct txn_param *param)
555 {
556         return param->tp_credits <= osd_journal(dev)->j_max_transaction_buffers;
557 }
558
559 /*
560  * Concurrency: shouldn't matter.
561  */
562 static void osd_trans_commit_cb(struct journal_callback *jcb, int error)
563 {
564         struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb);
565         struct thandle     *th = &oh->ot_super;
566         struct dt_device   *dev = th->th_dev;
567
568         LASSERT(dev != NULL);
569         LASSERT(oh->ot_handle == NULL);
570
571         if (error) {
572                 CERROR("transaction @0x%p commit error: %d\n", th, error);
573         } else {
574                 struct lu_env *env = &osd_dt_dev(dev)->od_env_for_commit;
575                 /*
576                  * This od_env_for_commit is only for commit usage.  see
577                  * "struct dt_device"
578                  */
579                 lu_context_enter(&env->le_ctx);
580                 dt_txn_hook_commit(env, th);
581                 lu_context_exit(&env->le_ctx);
582         }
583
584         lu_device_put(&dev->dd_lu_dev);
585         th->th_dev = NULL;
586
587         lu_context_exit(&th->th_ctx);
588         lu_context_fini(&th->th_ctx);
589         OBD_FREE_PTR(oh);
590 }
591
592 /*
593  * Concurrency: shouldn't matter.
594  */
595 static struct thandle *osd_trans_start(const struct lu_env *env,
596                                        struct dt_device *d,
597                                        struct txn_param *p)
598 {
599         struct osd_device  *dev = osd_dt_dev(d);
600         handle_t           *jh;
601         struct osd_thandle *oh;
602         struct thandle     *th;
603         int hook_res;
604
605         ENTRY;
606
607         hook_res = dt_txn_hook_start(env, d, p);
608         if (hook_res != 0)
609                 RETURN(ERR_PTR(hook_res));
610
611         if (osd_param_is_sane(dev, p)) {
612                 OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO);
613                 if (oh != NULL) {
614                         /*
615                          * XXX temporary stuff. Some abstraction layer should
616                          * be used.
617                          */
618
619                         jh = journal_start(osd_journal(dev), p->tp_credits);
620                         if (!IS_ERR(jh)) {
621                                 oh->ot_handle = jh;
622                                 th = &oh->ot_super;
623                                 th->th_dev = d;
624                                 th->th_result = 0;
625                                 jh->h_sync = p->tp_sync;
626                                 lu_device_get(&d->dd_lu_dev);
627                                 /* add commit callback */
628                                 lu_context_init(&th->th_ctx, LCT_TX_HANDLE);
629                                 lu_context_enter(&th->th_ctx);
630                                 journal_callback_set(jh, osd_trans_commit_cb,
631                                                      (struct journal_callback *)&oh->ot_jcb);
632 #if OSD_COUNTERS
633                                 {
634                                         struct osd_thread_info *oti =
635                                                 osd_oti_get(env);
636
637                                         LASSERT(oti->oti_txns == 0);
638                                         LASSERT(oti->oti_r_locks == 0);
639                                         LASSERT(oti->oti_w_locks == 0);
640                                         oti->oti_txns++;
641                                 }
642 #endif
643                         } else {
644                                 OBD_FREE_PTR(oh);
645                                 th = (void *)jh;
646                         }
647                 } else
648                         th = ERR_PTR(-ENOMEM);
649         } else {
650                 CERROR("Invalid transaction parameters\n");
651                 th = ERR_PTR(-EINVAL);
652         }
653
654         RETURN(th);
655 }
656
657 /*
658  * Concurrency: shouldn't matter.
659  */
660 static void osd_trans_stop(const struct lu_env *env, struct thandle *th)
661 {
662         int result;
663         struct osd_thandle *oh;
664
665         ENTRY;
666
667         oh = container_of0(th, struct osd_thandle, ot_super);
668         if (oh->ot_handle != NULL) {
669                 handle_t *hdl = oh->ot_handle;
670                 /*
671                  * XXX temporary stuff. Some abstraction layer should be used.
672                  */
673                 result = dt_txn_hook_stop(env, th);
674                 if (result != 0)
675                         CERROR("Failure in transaction hook: %d\n", result);
676
677                 /**/
678                 oh->ot_handle = NULL;
679                 result = journal_stop(hdl);
680                 if (result != 0)
681                         CERROR("Failure to stop transaction: %d\n", result);
682
683 #if OSD_COUNTERS
684                 {
685                         struct osd_thread_info *oti = osd_oti_get(env);
686
687                         LASSERT(oti->oti_txns == 1);
688                         LASSERT(oti->oti_r_locks == 0);
689                         LASSERT(oti->oti_w_locks == 0);
690                         oti->oti_txns--;
691                 }
692 #endif
693         }
694         EXIT;
695 }
696
697 /*
698  * Concurrency: shouldn't matter.
699  */
700 static int osd_sync(const struct lu_env *env, struct dt_device *d)
701 {
702         CDEBUG(D_HA, "syncing OSD %s\n", LUSTRE_OSD_NAME);
703         return ldiskfs_force_commit(osd_sb(osd_dt_dev(d)));
704 }
705
706 /*
707  * Concurrency: shouldn't matter.
708  */
709 lvfs_sbdev_type fsfilt_ldiskfs_journal_sbdev(struct super_block *);
710
711 static void osd_ro(const struct lu_env *env, struct dt_device *d)
712 {
713         ENTRY;
714
715         CERROR("*** setting device %s read-only ***\n", LUSTRE_OSD_NAME);
716
717         __lvfs_set_rdonly(lvfs_sbdev(osd_sb(osd_dt_dev(d))),
718                           fsfilt_ldiskfs_journal_sbdev(osd_sb(osd_dt_dev(d))));
719         EXIT;
720 }
721
722 /*
723  * Concurrency: serialization provided by callers.
724  */
725 static int osd_init_capa_ctxt(const struct lu_env *env, struct dt_device *d,
726                               int mode, unsigned long timeout, __u32 alg,
727                               struct lustre_capa_key *keys)
728 {
729         struct osd_device *dev = osd_dt_dev(d);
730         ENTRY;
731
732         dev->od_fl_capa = mode;
733         dev->od_capa_timeout = timeout;
734         dev->od_capa_alg = alg;
735         dev->od_capa_keys = keys;
736         RETURN(0);
737 }
738
739 /* Note: we did not count into QUOTA here, If we mount with --data_journal
740  * we may need more*/
741 static const int osd_dto_credits[DTO_NR] = {
742         /*
743          * Insert/Delete. IAM EXT3_INDEX_EXTRA_TRANS_BLOCKS(8) +
744          * EXT3_SINGLEDATA_TRANS_BLOCKS 8 XXX Note: maybe iam need more,since
745          * iam have more level than Ext3 htree
746          */
747         [DTO_INDEX_INSERT]  = 16,
748         [DTO_INDEX_DELETE]  = 16,
749         [DTO_IDNEX_UPDATE]  = 16,
750         /*
751          * Create a object. Same as create object in Ext3 filesystem, but did
752          * not count QUOTA i EXT3_DATA_TRANS_BLOCKS(12) +
753          * INDEX_EXTRA_BLOCKS(8) + 3(inode bits,groups, GDT)
754          */
755         [DTO_OBJECT_CREATE] = 23,
756         [DTO_OBJECT_DELETE] = 23,
757         /*
758          * Attr set credits 3 inode, group, GDT
759          */
760         [DTO_ATTR_SET]      = 3,
761         /*
762          * XATTR_SET. SAME AS XATTR of EXT3 EXT3_DATA_TRANS_BLOCKS XXX Note:
763          * in original MDS implmentation EXT3_INDEX_EXTRA_TRANS_BLOCKS are
764          * also counted in. Do not know why?
765          */
766         [DTO_XATTR_SET]     = 16,
767         [DTO_LOG_REC]       = 16,
768         /* creadits for inode change during write */
769         [DTO_WRITE_BASE]    = 3,
770         /* credits for single block write */
771         [DTO_WRITE_BLOCK]   = 12 
772 };
773
774 static int osd_credit_get(const struct lu_env *env, struct dt_device *d,
775                           enum dt_txn_op op)
776 {
777         LASSERT(0 <= op && op < ARRAY_SIZE(osd_dto_credits));
778         return osd_dto_credits[op];
779 }
780
781 static struct dt_device_operations osd_dt_ops = {
782         .dt_root_get       = osd_root_get,
783         .dt_statfs         = osd_statfs,
784         .dt_trans_start    = osd_trans_start,
785         .dt_trans_stop     = osd_trans_stop,
786         .dt_conf_get       = osd_conf_get,
787         .dt_sync           = osd_sync,
788         .dt_ro             = osd_ro,
789         .dt_credit_get     = osd_credit_get,
790         .dt_init_capa_ctxt = osd_init_capa_ctxt,
791 };
792
793 static void osd_object_read_lock(const struct lu_env *env,
794                                  struct dt_object *dt)
795 {
796         struct osd_object *obj = osd_dt_obj(dt);
797
798         LASSERT(osd_invariant(obj));
799
800         OSD_COUNTERS_DO(LASSERT(obj->oo_owner != env));
801         down_read(&obj->oo_sem);
802 #if OSD_COUNTERS
803         {
804                 struct osd_thread_info *oti = osd_oti_get(env);
805
806                 LASSERT(obj->oo_owner == NULL);
807                 oti->oti_r_locks++;
808         }
809 #endif
810 }
811
812 static void osd_object_write_lock(const struct lu_env *env,
813                                   struct dt_object *dt)
814 {
815         struct osd_object *obj = osd_dt_obj(dt);
816
817         LASSERT(osd_invariant(obj));
818
819         OSD_COUNTERS_DO(LASSERT(obj->oo_owner != env));
820         down_write(&obj->oo_sem);
821 #if OSD_COUNTERS
822         {
823                 struct osd_thread_info *oti = osd_oti_get(env);
824
825                 LASSERT(obj->oo_owner == NULL);
826                 obj->oo_owner = env;
827                 oti->oti_w_locks++;
828         }
829 #endif
830 }
831
832 static void osd_object_read_unlock(const struct lu_env *env,
833                                    struct dt_object *dt)
834 {
835         struct osd_object *obj = osd_dt_obj(dt);
836
837         LASSERT(osd_invariant(obj));
838 #if OSD_COUNTERS
839         {
840                 struct osd_thread_info *oti = osd_oti_get(env);
841
842                 LASSERT(oti->oti_r_locks > 0);
843                 oti->oti_r_locks--;
844         }
845 #endif
846         up_read(&obj->oo_sem);
847 }
848
849 static void osd_object_write_unlock(const struct lu_env *env,
850                                     struct dt_object *dt)
851 {
852         struct osd_object *obj = osd_dt_obj(dt);
853
854         LASSERT(osd_invariant(obj));
855 #if OSD_COUNTERS
856         {
857                 struct osd_thread_info *oti = osd_oti_get(env);
858
859                 LASSERT(obj->oo_owner == env);
860                 LASSERT(oti->oti_w_locks > 0);
861                 oti->oti_w_locks--;
862                 obj->oo_owner = NULL;
863         }
864 #endif
865         up_write(&obj->oo_sem);
866 }
867
868 static int capa_is_sane(const struct lu_env *env,
869                         struct osd_device *dev,
870                         struct lustre_capa *capa,
871                         struct lustre_capa_key *keys)
872 {
873         struct osd_thread_info *oti = osd_oti_get(env);
874         struct obd_capa *oc;
875         int i, rc = 0;
876         ENTRY;
877
878         oc = capa_lookup(dev->od_capa_hash, capa, 0);
879         if (oc) {
880                 if (capa_is_expired(oc)) {
881                         DEBUG_CAPA(D_ERROR, capa, "expired");
882                         rc = -ESTALE;
883                 }
884                 capa_put(oc);
885                 RETURN(rc);
886         }
887
888         spin_lock(&capa_lock);
889         for (i = 0; i < 2; i++) {
890                 if (keys[i].lk_keyid == capa->lc_keyid) {
891                         oti->oti_capa_key = keys[i];
892                         break;
893                 }
894         }
895         spin_unlock(&capa_lock);
896
897         if (i == 2) {
898                 DEBUG_CAPA(D_ERROR, capa, "no matched capa key");
899                 RETURN(-ESTALE);
900         }
901
902         rc = capa_hmac(oti->oti_capa.lc_hmac, capa, oti->oti_capa_key.lk_key);
903         if (rc)
904                 RETURN(rc);
905         if (memcmp(oti->oti_capa.lc_hmac, capa->lc_hmac, sizeof(capa->lc_hmac)))
906         {
907                 DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch");
908                 RETURN(-EACCES);
909         }
910
911         oc = capa_add(dev->od_capa_hash, capa);
912         capa_put(oc);
913
914         RETURN(0);
915 }
916
917 static int osd_object_auth(const struct lu_env *env, struct dt_object *dt,
918                            struct lustre_capa *capa, __u64 opc)
919 {
920         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
921         struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
922         int rc;
923
924         if (!dev->od_fl_capa)
925                 return 0;
926
927         if (capa == BYPASS_CAPA)
928                 return 0;
929
930         if (!capa) {
931                 CERROR("no capability is provided for fid "DFID"\n", PFID(fid));
932                 return -EACCES;
933         }
934
935         if (!lu_fid_eq(fid, &capa->lc_fid)) {
936                 DEBUG_CAPA(D_ERROR, capa, "fid "DFID" mismatch with",
937                            PFID(fid));
938                 return -EACCES;
939         }
940
941         if (!capa_opc_supported(capa, opc)) {
942                 DEBUG_CAPA(D_ERROR, capa, "opc "LPX64" not supported by", opc);
943                 return -EACCES;
944         }
945
946         if ((rc = capa_is_sane(env, dev, capa, dev->od_capa_keys))) {
947                 DEBUG_CAPA(D_ERROR, capa, "insane (rc %d)", rc);
948                 return -EACCES;
949         }
950
951         return 0;
952 }
953
954 static int osd_attr_get(const struct lu_env *env,
955                         struct dt_object *dt,
956                         struct lu_attr *attr,
957                         struct lustre_capa *capa)
958 {
959         struct osd_object *obj = osd_dt_obj(dt);
960
961         LASSERT(dt_object_exists(dt));
962         LASSERT(osd_invariant(obj));
963
964         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
965                 return -EACCES;
966
967         spin_lock(&obj->oo_guard);
968         osd_inode_getattr(env, obj->oo_inode, attr);
969         spin_unlock(&obj->oo_guard);
970         return 0;
971 }
972
973 static int osd_attr_set(const struct lu_env *env,
974                         struct dt_object *dt,
975                         const struct lu_attr *attr,
976                         struct thandle *handle,
977                         struct lustre_capa *capa)
978 {
979         struct osd_object *obj = osd_dt_obj(dt);
980
981         LASSERT(handle != NULL);
982         LASSERT(dt_object_exists(dt));
983         LASSERT(osd_invariant(obj));
984
985         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
986                 return -EACCES;
987
988         spin_lock(&obj->oo_guard);
989         osd_inode_setattr(env, obj->oo_inode, attr);
990         spin_unlock(&obj->oo_guard);
991
992         mark_inode_dirty(obj->oo_inode);
993         return 0;
994 }
995
996 static struct timespec *osd_inode_time(const struct lu_env *env,
997                                        struct inode *inode, __u64 seconds)
998 {
999         struct osd_thread_info *oti = osd_oti_get(env);
1000         struct timespec        *t   = &oti->oti_time;
1001
1002         t->tv_sec  = seconds;
1003         t->tv_nsec = 0;
1004         *t = timespec_trunc(*t, get_sb_time_gran(inode->i_sb));
1005         return t;
1006 }
1007
1008 static void osd_inode_setattr(const struct lu_env *env,
1009                               struct inode *inode, const struct lu_attr *attr)
1010 {
1011         __u64 bits;
1012
1013         bits = attr->la_valid;
1014
1015         LASSERT(!(bits & LA_TYPE)); /* Huh? You want too much. */
1016
1017         if (bits & LA_ATIME)
1018                 inode->i_atime  = *osd_inode_time(env, inode, attr->la_atime);
1019         if (bits & LA_CTIME)
1020                 inode->i_ctime  = *osd_inode_time(env, inode, attr->la_ctime);
1021         if (bits & LA_MTIME)
1022                 inode->i_mtime  = *osd_inode_time(env, inode, attr->la_mtime);
1023         if (bits & LA_SIZE) {
1024                 LDISKFS_I(inode)->i_disksize = attr->la_size;
1025                 i_size_write(inode, attr->la_size);
1026         }
1027         if (bits & LA_BLOCKS)
1028                 inode->i_blocks = attr->la_blocks;
1029         if (bits & LA_MODE)
1030                 inode->i_mode   = (inode->i_mode & S_IFMT) |
1031                         (attr->la_mode & ~S_IFMT);
1032         if (bits & LA_UID)
1033                 inode->i_uid    = attr->la_uid;
1034         if (bits & LA_GID)
1035                 inode->i_gid    = attr->la_gid;
1036         if (bits & LA_NLINK)
1037                 inode->i_nlink  = attr->la_nlink;
1038         if (bits & LA_RDEV)
1039                 inode->i_rdev   = attr->la_rdev;
1040
1041         if (bits & LA_FLAGS) {
1042                 struct ldiskfs_inode_info *li = LDISKFS_I(inode);
1043
1044                 li->i_flags = (li->i_flags & ~LDISKFS_FL_USER_MODIFIABLE) |
1045                         (attr->la_flags & LDISKFS_FL_USER_MODIFIABLE);
1046         }
1047 }
1048
1049 /*
1050  * Object creation.
1051  *
1052  * XXX temporary solution.
1053  */
1054
1055 static int osd_create_pre(struct osd_thread_info *info, struct osd_object *obj,
1056                           struct lu_attr *attr, struct thandle *th)
1057 {
1058         return 0;
1059 }
1060
1061 static int osd_create_post(struct osd_thread_info *info, struct osd_object *obj,
1062                            struct lu_attr *attr, struct thandle *th)
1063 {
1064         LASSERT(obj->oo_inode != NULL);
1065
1066         osd_object_init0(obj);
1067         return 0;
1068 }
1069
1070 extern struct inode *ldiskfs_create_inode(handle_t *handle,
1071                                           struct inode * dir, int mode);
1072
1073 static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
1074                       umode_t mode,
1075                       struct dt_allocation_hint *hint,
1076                       struct thandle *th)
1077 {
1078         int result;
1079         struct osd_device  *osd = osd_obj2dev(obj);
1080         struct osd_thandle *oth;
1081         struct inode       *parent;
1082         struct inode       *inode;
1083
1084         LASSERT(osd_invariant(obj));
1085         LASSERT(obj->oo_inode == NULL);
1086         LASSERT(osd->od_obj_area != NULL);
1087
1088         oth = container_of(th, struct osd_thandle, ot_super);
1089         LASSERT(oth->ot_handle->h_transaction != NULL);
1090
1091         if (hint && hint->dah_parent)
1092                 parent = osd_dt_obj(hint->dah_parent)->oo_inode;
1093         else
1094                 parent = osd->od_obj_area->d_inode;
1095         LASSERT(parent->i_op != NULL);
1096
1097         inode = ldiskfs_create_inode(oth->ot_handle, parent, mode);
1098         if (!IS_ERR(inode)) {
1099                 obj->oo_inode = inode;
1100                 result = 0;
1101         } else
1102                 result = PTR_ERR(inode);
1103         LASSERT(osd_invariant(obj));
1104         return result;
1105 }
1106
1107
1108 extern int iam_lvar_create(struct inode *obj, int keysize, int ptrsize,
1109                            int recsize, handle_t *handle);
1110
1111 enum {
1112         OSD_NAME_LEN = 255
1113 };
1114
1115 static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj,
1116                      struct lu_attr *attr,
1117                      struct dt_allocation_hint *hint,
1118                      struct thandle *th)
1119 {
1120         int result;
1121         struct osd_thandle *oth;
1122
1123         LASSERT(S_ISDIR(attr->la_mode));
1124
1125         oth = container_of(th, struct osd_thandle, ot_super);
1126         LASSERT(oth->ot_handle->h_transaction != NULL);
1127         result = osd_mkfile(info, obj, (attr->la_mode &
1128                             (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1129         if (result == 0) {
1130                 LASSERT(obj->oo_inode != NULL);
1131                 /*
1132                  * XXX uh-oh... call low-level iam function directly.
1133                  */
1134                 result = iam_lvar_create(obj->oo_inode, OSD_NAME_LEN, 4,
1135                                          sizeof (struct lu_fid_pack),
1136                                          oth->ot_handle);
1137         }
1138         return result;
1139 }
1140
1141 static int osd_mkreg(struct osd_thread_info *info, struct osd_object *obj,
1142                      struct lu_attr *attr,
1143                      struct dt_allocation_hint *hint,
1144                      struct thandle *th)
1145 {
1146         LASSERT(S_ISREG(attr->la_mode));
1147         return osd_mkfile(info, obj, (attr->la_mode &
1148                                (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1149 }
1150
1151 static int osd_mksym(struct osd_thread_info *info, struct osd_object *obj,
1152                      struct lu_attr *attr,
1153                      struct dt_allocation_hint *hint,
1154                      struct thandle *th)
1155 {
1156         LASSERT(S_ISLNK(attr->la_mode));
1157         return osd_mkfile(info, obj, (attr->la_mode &
1158                               (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1159 }
1160
1161 static int osd_mknod(struct osd_thread_info *info, struct osd_object *obj,
1162                      struct lu_attr *attr,
1163                      struct dt_allocation_hint *hint,
1164                      struct thandle *th)
1165 {
1166         int result;
1167         struct osd_device *osd = osd_obj2dev(obj);
1168         struct inode      *dir;
1169         umode_t mode = attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX);
1170
1171         LASSERT(osd_invariant(obj));
1172         LASSERT(obj->oo_inode == NULL);
1173         LASSERT(osd->od_obj_area != NULL);
1174         LASSERT(S_ISCHR(mode) || S_ISBLK(mode) ||
1175                 S_ISFIFO(mode) || S_ISSOCK(mode));
1176
1177         dir = osd->od_obj_area->d_inode;
1178         LASSERT(dir->i_op != NULL);
1179
1180         result = osd_mkfile(info, obj, mode, hint, th);
1181         if (result == 0) {
1182                 LASSERT(obj->oo_inode != NULL);
1183                 init_special_inode(obj->oo_inode, mode, attr->la_rdev);
1184         }
1185         LASSERT(osd_invariant(obj));
1186         return result;
1187 }
1188
1189 typedef int (*osd_obj_type_f)(struct osd_thread_info *, struct osd_object *,
1190                               struct lu_attr *,
1191                               struct dt_allocation_hint *hint,
1192                               struct thandle *);
1193
1194 static osd_obj_type_f osd_create_type_f(__u32 mode)
1195 {
1196         osd_obj_type_f result;
1197
1198         switch (mode) {
1199         case S_IFDIR:
1200                 result = osd_mkdir;
1201                 break;
1202         case S_IFREG:
1203                 result = osd_mkreg;
1204                 break;
1205         case S_IFLNK:
1206                 result = osd_mksym;
1207                 break;
1208         case S_IFCHR:
1209         case S_IFBLK:
1210         case S_IFIFO:
1211         case S_IFSOCK:
1212                 result = osd_mknod;
1213                 break;
1214         default:
1215                 LBUG();
1216                 break;
1217         }
1218         return result;
1219 }
1220
1221
1222 static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah,
1223                         struct dt_object *parent, umode_t child_mode)
1224 {
1225         LASSERT(ah);
1226
1227         memset(ah, 0, sizeof(*ah));
1228         ah->dah_parent = parent;
1229         ah->dah_mode = child_mode;
1230 }
1231
1232
1233 /*
1234  * Concurrency: @dt is write locked.
1235  */
1236 static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
1237                              struct lu_attr *attr, 
1238                              struct dt_allocation_hint *hint,
1239                              struct thandle *th)
1240 {
1241         const struct lu_fid    *fid  = lu_object_fid(&dt->do_lu);
1242         struct osd_object      *obj  = osd_dt_obj(dt);
1243         struct osd_device      *osd  = osd_obj2dev(obj);
1244         struct osd_thread_info *info = osd_oti_get(env);
1245         int result;
1246
1247         ENTRY;
1248
1249         LASSERT(osd_invariant(obj));
1250         LASSERT(!dt_object_exists(dt));
1251         LASSERT(osd_write_locked(env, obj));
1252         LASSERT(th != NULL);
1253
1254         /*
1255          * XXX missing: Quote handling.
1256          */
1257
1258         result = osd_create_pre(info, obj, attr, th);
1259         if (result == 0) {
1260                 result = osd_create_type_f(attr->la_mode & S_IFMT)(info, obj,
1261                                                                 attr, hint, th);
1262                 if (result == 0)
1263                         result = osd_create_post(info, obj, attr, th);
1264         }
1265         if (result == 0) {
1266                 struct osd_inode_id *id = &info->oti_id;
1267
1268                 LASSERT(obj->oo_inode != NULL);
1269
1270                 id->oii_ino = obj->oo_inode->i_ino;
1271                 id->oii_gen = obj->oo_inode->i_generation;
1272
1273                 result = osd_oi_insert(info, &osd->od_oi, fid, id, th);
1274         }
1275
1276         LASSERT(ergo(result == 0, dt_object_exists(dt)));
1277         LASSERT(osd_invariant(obj));
1278         RETURN(result);
1279 }
1280
1281 /*
1282  * Concurrency: @dt is write locked.
1283  */
1284 static void osd_object_ref_add(const struct lu_env *env,
1285                                struct dt_object *dt,
1286                                struct thandle *th)
1287 {
1288         struct osd_object *obj = osd_dt_obj(dt);
1289         struct inode *inode = obj->oo_inode;
1290
1291         LASSERT(osd_invariant(obj));
1292         LASSERT(dt_object_exists(dt));
1293         LASSERT(osd_write_locked(env, obj));
1294         LASSERT(th != NULL);
1295
1296         spin_lock(&obj->oo_guard);
1297         if (inode->i_nlink < LDISKFS_LINK_MAX) {
1298                 inode->i_nlink ++;
1299                 spin_unlock(&obj->oo_guard);
1300                 mark_inode_dirty(inode);
1301         } else {
1302                 spin_unlock(&obj->oo_guard);
1303                 LU_OBJECT_DEBUG(D_ERROR, env, &dt->do_lu,
1304                                 "Overflowed nlink\n");
1305         }
1306         LASSERT(osd_invariant(obj));
1307 }
1308
1309 /*
1310  * Concurrency: @dt is write locked.
1311  */
1312 static void osd_object_ref_del(const struct lu_env *env,
1313                                struct dt_object *dt,
1314                                struct thandle *th)
1315 {
1316         struct osd_object *obj = osd_dt_obj(dt);
1317         struct inode *inode = obj->oo_inode;
1318
1319         LASSERT(osd_invariant(obj));
1320         LASSERT(dt_object_exists(dt));
1321         LASSERT(osd_write_locked(env, obj));
1322         LASSERT(th != NULL);
1323
1324         spin_lock(&obj->oo_guard);
1325         if (inode->i_nlink > 0) {
1326                 inode->i_nlink --;
1327                 spin_unlock(&obj->oo_guard);
1328                 mark_inode_dirty(inode);
1329         } else {
1330                 spin_unlock(&obj->oo_guard);
1331                 LU_OBJECT_DEBUG(D_ERROR, env, &dt->do_lu,
1332                                 "Underflowed nlink\n");
1333         }
1334         LASSERT(osd_invariant(obj));
1335 }
1336
1337 /*
1338  * Concurrency: @dt is read locked.
1339  */
1340 static int osd_xattr_get(const struct lu_env *env,
1341                          struct dt_object *dt,
1342                          struct lu_buf *buf,
1343                          const char *name,
1344                          struct lustre_capa *capa)
1345 {
1346         struct osd_object      *obj    = osd_dt_obj(dt);
1347         struct inode           *inode  = obj->oo_inode;
1348         struct osd_thread_info *info   = osd_oti_get(env);
1349         struct dentry          *dentry = &info->oti_dentry;
1350
1351         LASSERT(dt_object_exists(dt));
1352         LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
1353         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
1354
1355         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1356                 return -EACCES;
1357
1358         dentry->d_inode = inode;
1359         return inode->i_op->getxattr(dentry, name, buf->lb_buf, buf->lb_len);
1360 }
1361
1362 /*
1363  * Concurrency: @dt is write locked.
1364  */
1365 static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
1366                          const struct lu_buf *buf, const char *name, int fl,
1367                          struct thandle *handle, struct lustre_capa *capa)
1368 {
1369         struct osd_object      *obj    = osd_dt_obj(dt);
1370         struct inode           *inode  = obj->oo_inode;
1371         struct osd_thread_info *info   = osd_oti_get(env);
1372         struct dentry          *dentry = &info->oti_dentry;
1373         struct timespec        *t      = &info->oti_time;
1374         int                     fs_flags = 0, rc;
1375
1376         LASSERT(dt_object_exists(dt));
1377         LASSERT(inode->i_op != NULL && inode->i_op->setxattr != NULL);
1378         LASSERT(osd_write_locked(env, obj));
1379         LASSERT(handle != NULL);
1380
1381         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1382                 return -EACCES;
1383
1384         if (fl & LU_XATTR_REPLACE)
1385                 fs_flags |= XATTR_REPLACE;
1386
1387         if (fl & LU_XATTR_CREATE)
1388                 fs_flags |= XATTR_CREATE;
1389
1390         dentry->d_inode = inode;
1391         *t = inode->i_ctime;
1392         rc = inode->i_op->setxattr(dentry, name,
1393                                    buf->lb_buf, buf->lb_len, fs_flags);
1394         if (likely(rc == 0)) {
1395                 /* ctime should not be updated with server-side time. */
1396                 spin_lock(&obj->oo_guard);
1397                 inode->i_ctime = *t;
1398                 spin_unlock(&obj->oo_guard);
1399                 mark_inode_dirty(inode);
1400         }
1401         return rc;
1402 }
1403
1404 /*
1405  * Concurrency: @dt is read locked.
1406  */
1407 static int osd_xattr_list(const struct lu_env *env,
1408                           struct dt_object *dt,
1409                           struct lu_buf *buf,
1410                           struct lustre_capa *capa)
1411 {
1412         struct osd_object      *obj    = osd_dt_obj(dt);
1413         struct inode           *inode  = obj->oo_inode;
1414         struct osd_thread_info *info   = osd_oti_get(env);
1415         struct dentry          *dentry = &info->oti_dentry;
1416
1417         LASSERT(dt_object_exists(dt));
1418         LASSERT(inode->i_op != NULL && inode->i_op->listxattr != NULL);
1419         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
1420
1421         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1422                 return -EACCES;
1423
1424         dentry->d_inode = inode;
1425         return inode->i_op->listxattr(dentry, buf->lb_buf, buf->lb_len);
1426 }
1427
1428 /*
1429  * Concurrency: @dt is write locked.
1430  */
1431 static int osd_xattr_del(const struct lu_env *env,
1432                          struct dt_object *dt,
1433                          const char *name,
1434                          struct thandle *handle,
1435                          struct lustre_capa *capa)
1436 {
1437         struct osd_object      *obj    = osd_dt_obj(dt);
1438         struct inode           *inode  = obj->oo_inode;
1439         struct osd_thread_info *info   = osd_oti_get(env);
1440         struct dentry          *dentry = &info->oti_dentry;
1441         struct timespec        *t      = &info->oti_time;
1442         int                     rc;
1443
1444         LASSERT(dt_object_exists(dt));
1445         LASSERT(inode->i_op != NULL && inode->i_op->removexattr != NULL);
1446         LASSERT(osd_write_locked(env, obj));
1447         LASSERT(handle != NULL);
1448
1449         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1450                 return -EACCES;
1451
1452         dentry->d_inode = inode;
1453         *t = inode->i_ctime;
1454         rc = inode->i_op->removexattr(dentry, name);
1455         if (likely(rc == 0)) {
1456                 /* ctime should not be updated with server-side time. */
1457                 spin_lock(&obj->oo_guard);
1458                 inode->i_ctime = *t;
1459                 spin_unlock(&obj->oo_guard);
1460                 mark_inode_dirty(inode);
1461         }
1462         return rc;
1463 }
1464
1465 static struct obd_capa *osd_capa_get(const struct lu_env *env,
1466                                      struct dt_object *dt,
1467                                      struct lustre_capa *old,
1468                                      __u64 opc)
1469 {
1470         struct osd_thread_info *info = osd_oti_get(env);
1471         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
1472         struct osd_object *obj = osd_dt_obj(dt);
1473         struct osd_device *dev = osd_obj2dev(obj);
1474         struct lustre_capa_key *key = &info->oti_capa_key;
1475         struct lustre_capa *capa = &info->oti_capa;
1476         struct obd_capa *oc;
1477         int rc;
1478         ENTRY;
1479
1480         if (!dev->od_fl_capa)
1481                 RETURN(ERR_PTR(-ENOENT));
1482
1483         LASSERT(dt_object_exists(dt));
1484         LASSERT(osd_invariant(obj));
1485
1486         /* renewal sanity check */
1487         if (old && osd_object_auth(env, dt, old, opc))
1488                 RETURN(ERR_PTR(-EACCES));
1489
1490         capa->lc_fid = *fid;
1491         capa->lc_opc = opc;
1492         capa->lc_uid = 0;
1493         capa->lc_flags = dev->od_capa_alg << 24;
1494         capa->lc_timeout = dev->od_capa_timeout;
1495         capa->lc_expiry = 0;
1496
1497         oc = capa_lookup(dev->od_capa_hash, capa, 1);
1498         if (oc) {
1499                 LASSERT(!capa_is_expired(oc));
1500                 RETURN(oc);
1501         }
1502
1503         spin_lock(&capa_lock);
1504         *key = dev->od_capa_keys[1];
1505         spin_unlock(&capa_lock);
1506
1507         capa->lc_keyid = key->lk_keyid;
1508         capa->lc_expiry = cfs_time_current_sec() + dev->od_capa_timeout;
1509
1510         rc = capa_hmac(capa->lc_hmac, capa, key->lk_key);
1511         if (rc) {
1512                 DEBUG_CAPA(D_ERROR, capa, "HMAC failed: %d for", rc);
1513                 RETURN(ERR_PTR(rc));
1514         }
1515
1516         oc = capa_add(dev->od_capa_hash, capa);
1517         RETURN(oc);
1518 }
1519
1520 static struct dt_object_operations osd_obj_ops = {
1521         .do_read_lock    = osd_object_read_lock,
1522         .do_write_lock   = osd_object_write_lock,
1523         .do_read_unlock  = osd_object_read_unlock,
1524         .do_write_unlock = osd_object_write_unlock,
1525         .do_attr_get     = osd_attr_get,
1526         .do_attr_set     = osd_attr_set,
1527         .do_ah_init      = osd_ah_init,
1528         .do_create       = osd_object_create,
1529         .do_index_try    = osd_index_try,
1530         .do_ref_add      = osd_object_ref_add,
1531         .do_ref_del      = osd_object_ref_del,
1532         .do_xattr_get    = osd_xattr_get,
1533         .do_xattr_set    = osd_xattr_set,
1534         .do_xattr_del    = osd_xattr_del,
1535         .do_xattr_list   = osd_xattr_list,
1536         .do_capa_get     = osd_capa_get,
1537 };
1538
1539 /*
1540  * Body operations.
1541  */
1542
1543 /*
1544  * XXX: Another layering violation for now.
1545  *
1546  * We don't want to use ->f_op->read methods, because generic file write
1547  *
1548  *         - serializes on ->i_sem, and
1549  *
1550  *         - does a lot of extra work like balance_dirty_pages(),
1551  *
1552  * which doesn't work for globally shared files like /last-received.
1553  */
1554 int fsfilt_ldiskfs_read(struct inode *inode, void *buf, int size, loff_t *offs);
1555 int fsfilt_ldiskfs_write_handle(struct inode *inode, void *buf, int bufsize,
1556                                 loff_t *offs, handle_t *handle);
1557
1558 static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt,
1559                         struct lu_buf *buf, loff_t *pos,
1560                         struct lustre_capa *capa)
1561 {
1562         struct inode *inode = osd_dt_obj(dt)->oo_inode;
1563
1564         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
1565                 RETURN(-EACCES);
1566
1567         return fsfilt_ldiskfs_read(inode, buf->lb_buf, buf->lb_len, pos);
1568 }
1569
1570 static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
1571                          const struct lu_buf *buf, loff_t *pos,
1572                          struct thandle *handle, struct lustre_capa *capa)
1573 {
1574         struct inode       *inode = osd_dt_obj(dt)->oo_inode;
1575         struct osd_thandle *oh;
1576         ssize_t             result;
1577
1578         LASSERT(handle != NULL);
1579
1580         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_WRITE))
1581                 RETURN(-EACCES);
1582
1583         oh = container_of(handle, struct osd_thandle, ot_super);
1584         LASSERT(oh->ot_handle->h_transaction != NULL);
1585         result = fsfilt_ldiskfs_write_handle(inode, buf->lb_buf, buf->lb_len,
1586                                              pos, oh->ot_handle);
1587         if (result == 0)
1588                 result = buf->lb_len;
1589         return result;
1590 }
1591
1592 static struct dt_body_operations osd_body_ops = {
1593         .dbo_read  = osd_read,
1594         .dbo_write = osd_write
1595 };
1596
1597 /*
1598  * Index operations.
1599  */
1600
1601 static int osd_object_is_root(const struct osd_object *obj)
1602 {
1603         return osd_sb(osd_obj2dev(obj))->s_root->d_inode == obj->oo_inode;
1604 }
1605
1606 static int osd_index_probe(const struct lu_env *env, struct osd_object *o,
1607                            const struct dt_index_features *feat)
1608 {
1609         struct iam_descr *descr;
1610
1611         if (osd_object_is_root(o))
1612                 return feat == &dt_directory_features;
1613
1614         LASSERT(o->oo_dir != NULL);
1615
1616         descr = o->oo_dir->od_container.ic_descr;
1617         if (feat == &dt_directory_features)
1618                 return descr == &iam_htree_compat_param ||
1619                         (descr->id_rec_size == sizeof(struct lu_fid_pack) &&
1620                          1 /*
1621                             * XXX check that index looks like directory.
1622                             */
1623                                 );
1624         else
1625                 return
1626                         feat->dif_keysize_min <= descr->id_key_size &&
1627                         descr->id_key_size <= feat->dif_keysize_max &&
1628                         feat->dif_recsize_min <= descr->id_rec_size &&
1629                         descr->id_rec_size <= feat->dif_recsize_max &&
1630                         !(feat->dif_flags & (DT_IND_VARKEY |
1631                                              DT_IND_VARREC | DT_IND_NONUNQ)) &&
1632                         ergo(feat->dif_flags & DT_IND_UPDATE,
1633                              1 /* XXX check that object (and file system) is
1634                                 * writable */);
1635 }
1636
1637 static int osd_container_init(const struct lu_env *env,
1638                               struct osd_object *obj,
1639                               struct osd_directory *dir)
1640 {
1641         int result;
1642         struct iam_container *bag;
1643
1644         bag    = &dir->od_container;
1645         result = iam_container_init(bag, &dir->od_descr, obj->oo_inode);
1646         if (result == 0) {
1647                 result = iam_container_setup(bag);
1648                 if (result == 0)
1649                         obj->oo_dt.do_index_ops = &osd_index_ops;
1650                 else
1651                         iam_container_fini(bag);
1652         }
1653         return result;
1654 }
1655
1656 /*
1657  * Concurrency: no external locking is necessary.
1658  */
1659 static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
1660                          const struct dt_index_features *feat)
1661 {
1662         int result;
1663         struct osd_object *obj = osd_dt_obj(dt);
1664
1665         LASSERT(osd_invariant(obj));
1666         LASSERT(dt_object_exists(dt));
1667
1668         if (osd_object_is_root(obj)) {
1669                 dt->do_index_ops = &osd_index_compat_ops;
1670                 result = 0;
1671         } else if (!osd_has_index(obj)) {
1672                 struct osd_directory *dir;
1673
1674                 OBD_ALLOC_PTR(dir);
1675                 if (dir != NULL) {
1676                         sema_init(&dir->od_sem, 1);
1677
1678                         spin_lock(&obj->oo_guard);
1679                         if (obj->oo_dir == NULL)
1680                                 obj->oo_dir = dir;
1681                         else
1682                                 /*
1683                                  * Concurrent thread allocated container data.
1684                                  */
1685                                 OBD_FREE_PTR(dir);
1686                         spin_unlock(&obj->oo_guard);
1687                         /*
1688                          * Now, that we have container data, serialize its
1689                          * initialization.
1690                          */
1691                         down(&obj->oo_dir->od_sem);
1692                         /*
1693                          * recheck under lock.
1694                          */
1695                         if (!osd_has_index(obj))
1696                                 result = osd_container_init(env, obj, dir);
1697                         else
1698                                 result = 0;
1699                         up(&obj->oo_dir->od_sem);
1700                 } else
1701                         result = -ENOMEM;
1702         } else
1703                 result = 0;
1704
1705         if (result == 0) {
1706                 if (!osd_index_probe(env, obj, feat))
1707                         result = -ENOTDIR;
1708         }
1709         LASSERT(osd_invariant(obj));
1710
1711         return result;
1712 }
1713
1714 static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
1715                             const struct dt_key *key, struct thandle *handle,
1716                             struct lustre_capa *capa)
1717 {
1718         struct osd_object     *obj = osd_dt_obj(dt);
1719         struct osd_thandle    *oh;
1720         struct iam_path_descr *ipd;
1721         struct iam_container  *bag = &obj->oo_dir->od_container;
1722         int rc;
1723
1724         ENTRY;
1725
1726         LASSERT(osd_invariant(obj));
1727         LASSERT(dt_object_exists(dt));
1728         LASSERT(bag->ic_object == obj->oo_inode);
1729         LASSERT(handle != NULL);
1730
1731         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
1732                 RETURN(-EACCES);
1733
1734         ipd = osd_ipd_get(env, bag);
1735         if (unlikely(ipd == NULL))
1736                 RETURN(-ENOMEM);
1737
1738         oh = container_of0(handle, struct osd_thandle, ot_super);
1739         LASSERT(oh->ot_handle != NULL);
1740         LASSERT(oh->ot_handle->h_transaction != NULL);
1741
1742         rc = iam_delete(oh->ot_handle, bag, (const struct iam_key *)key, ipd);
1743         osd_ipd_put(env, bag, ipd);
1744         LASSERT(osd_invariant(obj));
1745         RETURN(rc);
1746 }
1747
1748 static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
1749                             struct dt_rec *rec, const struct dt_key *key,
1750                             struct lustre_capa *capa)
1751 {
1752         struct osd_object     *obj = osd_dt_obj(dt);
1753         struct iam_path_descr *ipd;
1754         struct iam_container  *bag = &obj->oo_dir->od_container;
1755         int rc;
1756
1757         ENTRY;
1758
1759         LASSERT(osd_invariant(obj));
1760         LASSERT(dt_object_exists(dt));
1761         LASSERT(bag->ic_object == obj->oo_inode);
1762
1763         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
1764                 return -EACCES;
1765
1766         ipd = osd_ipd_get(env, bag);
1767         if (unlikely(ipd == NULL))
1768                 RETURN(-ENOMEM);
1769
1770         rc = iam_lookup(bag, (const struct iam_key *)key,
1771                         (struct iam_rec *)rec, ipd);
1772         osd_ipd_put(env, bag, ipd);
1773         LASSERT(osd_invariant(obj));
1774
1775         RETURN(rc);
1776 }
1777
1778 static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
1779                             const struct dt_rec *rec, const struct dt_key *key,
1780                             struct thandle *th, struct lustre_capa *capa)
1781 {
1782         struct osd_object     *obj = osd_dt_obj(dt);
1783         struct iam_path_descr *ipd;
1784         struct osd_thandle    *oh;
1785         struct iam_container  *bag = &obj->oo_dir->od_container;
1786         int rc;
1787
1788         ENTRY;
1789
1790         LASSERT(osd_invariant(obj));
1791         LASSERT(dt_object_exists(dt));
1792         LASSERT(bag->ic_object == obj->oo_inode);
1793         LASSERT(th != NULL);
1794
1795         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
1796                 return -EACCES;
1797
1798         ipd = osd_ipd_get(env, bag);
1799         if (unlikely(ipd == NULL))
1800                 RETURN(-ENOMEM);
1801
1802         oh = container_of0(th, struct osd_thandle, ot_super);
1803         LASSERT(oh->ot_handle != NULL);
1804         LASSERT(oh->ot_handle->h_transaction != NULL);
1805         rc = iam_insert(oh->ot_handle, bag, (const struct iam_key *)key,
1806                         (struct iam_rec *)rec, ipd);
1807         osd_ipd_put(env, bag, ipd);
1808         LASSERT(osd_invariant(obj));
1809         RETURN(rc);
1810 }
1811
1812 /*
1813  * Iterator operations.
1814  */
1815 struct osd_it {
1816         struct osd_object     *oi_obj;
1817         struct iam_path_descr *oi_ipd;
1818         struct iam_iterator    oi_it;
1819 };
1820
1821 static struct dt_it *osd_it_init(const struct lu_env *env,
1822                                  struct dt_object *dt, int writable,
1823                                  struct lustre_capa *capa)
1824 {
1825         struct osd_it         *it;
1826         struct osd_object     *obj = osd_dt_obj(dt);
1827         struct lu_object      *lo  = &dt->do_lu;
1828         struct iam_path_descr *ipd;
1829         struct iam_container  *bag = &obj->oo_dir->od_container;
1830         __u32                  flags;
1831
1832         LASSERT(lu_object_exists(lo));
1833
1834         if (osd_object_auth(env, dt, capa, writable ? CAPA_OPC_BODY_WRITE :
1835                             CAPA_OPC_BODY_READ))
1836                 return ERR_PTR(-EACCES);
1837
1838         flags = writable ? IAM_IT_MOVE|IAM_IT_WRITE : IAM_IT_MOVE;
1839         OBD_ALLOC_PTR(it);
1840         if (it != NULL) {
1841                 /*
1842                  * XXX: as ipd is allocated within osd_thread_info, assignment
1843                  * below implies that iterator usage is confined within single
1844                  * environment.
1845                  */
1846                 ipd = osd_ipd_get(env, bag);
1847                 if (likely(ipd != NULL)) {
1848                         it->oi_obj = obj;
1849                         it->oi_ipd = ipd;
1850                         lu_object_get(lo);
1851                         iam_it_init(&it->oi_it, bag, flags, ipd);
1852                         return (struct dt_it *)it;
1853                 } else
1854                         OBD_FREE_PTR(it);
1855         }
1856         return ERR_PTR(-ENOMEM);
1857 }
1858
1859 static void osd_it_fini(const struct lu_env *env, struct dt_it *di)
1860 {
1861         struct osd_it     *it = (struct osd_it *)di;
1862         struct osd_object *obj = it->oi_obj;
1863
1864         iam_it_fini(&it->oi_it);
1865         osd_ipd_put(env, &obj->oo_dir->od_container, it->oi_ipd);
1866         lu_object_put(env, &obj->oo_dt.do_lu);
1867         OBD_FREE_PTR(it);
1868 }
1869
1870 static int osd_it_get(const struct lu_env *env,
1871                       struct dt_it *di, const struct dt_key *key)
1872 {
1873         struct osd_it *it = (struct osd_it *)di;
1874
1875         return iam_it_get(&it->oi_it, (const struct iam_key *)key);
1876 }
1877
1878 static void osd_it_put(const struct lu_env *env, struct dt_it *di)
1879 {
1880         struct osd_it *it = (struct osd_it *)di;
1881
1882         iam_it_put(&it->oi_it);
1883 }
1884
1885 static int osd_it_next(const struct lu_env *env, struct dt_it *di)
1886 {
1887         struct osd_it *it = (struct osd_it *)di;
1888
1889         return iam_it_next(&it->oi_it);
1890 }
1891
1892 static int osd_it_del(const struct lu_env *env, struct dt_it *di,
1893                       struct thandle *th)
1894 {
1895         struct osd_it      *it = (struct osd_it *)di;
1896         struct osd_thandle *oh;
1897
1898         LASSERT(th != NULL);
1899
1900         oh = container_of0(th, struct osd_thandle, ot_super);
1901         LASSERT(oh->ot_handle != NULL);
1902         LASSERT(oh->ot_handle->h_transaction != NULL);
1903
1904         return iam_it_rec_delete(oh->ot_handle, &it->oi_it);
1905 }
1906
1907 static struct dt_key *osd_it_key(const struct lu_env *env,
1908                                  const struct dt_it *di)
1909 {
1910         struct osd_it *it = (struct osd_it *)di;
1911
1912         return (struct dt_key *)iam_it_key_get(&it->oi_it);
1913 }
1914
1915 static int osd_it_key_size(const struct lu_env *env, const struct dt_it *di)
1916 {
1917         struct osd_it *it = (struct osd_it *)di;
1918
1919         return iam_it_key_size(&it->oi_it);
1920 }
1921
1922 static struct dt_rec *osd_it_rec(const struct lu_env *env,
1923                                  const struct dt_it *di)
1924 {
1925         struct osd_it *it = (struct osd_it *)di;
1926
1927         return (struct dt_rec *)iam_it_rec_get(&it->oi_it);
1928 }
1929
1930 static __u64 osd_it_store(const struct lu_env *env, const struct dt_it *di)
1931 {
1932         struct osd_it *it = (struct osd_it *)di;
1933
1934         return iam_it_store(&it->oi_it);
1935 }
1936
1937 static int osd_it_load(const struct lu_env *env,
1938                        const struct dt_it *di, __u64 hash)
1939 {
1940         struct osd_it *it = (struct osd_it *)di;
1941
1942         return iam_it_load(&it->oi_it, hash);
1943 }
1944
1945 static struct dt_index_operations osd_index_ops = {
1946         .dio_lookup = osd_index_lookup,
1947         .dio_insert = osd_index_insert,
1948         .dio_delete = osd_index_delete,
1949         .dio_it     = {
1950                 .init     = osd_it_init,
1951                 .fini     = osd_it_fini,
1952                 .get      = osd_it_get,
1953                 .put      = osd_it_put,
1954                 .del      = osd_it_del,
1955                 .next     = osd_it_next,
1956                 .key      = osd_it_key,
1957                 .key_size = osd_it_key_size,
1958                 .rec      = osd_it_rec,
1959                 .store    = osd_it_store,
1960                 .load     = osd_it_load
1961         }
1962 };
1963
1964 static int osd_index_compat_delete(const struct lu_env *env,
1965                                    struct dt_object *dt,
1966                                    const struct dt_key *key,
1967                                    struct thandle *handle,
1968                                    struct lustre_capa *capa)
1969 {
1970         struct osd_object *obj = osd_dt_obj(dt);
1971
1972         LASSERT(handle != NULL);
1973         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
1974         ENTRY;
1975
1976 #if 0
1977         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
1978                 RETURN(-EACCES);
1979 #endif
1980
1981         RETURN(-EOPNOTSUPP);
1982 }
1983
1984 /*
1985  * Compatibility index operations.
1986  */
1987
1988
1989 static void osd_build_pack(const struct lu_env *env, struct osd_device *osd,
1990                            struct dentry *dentry, struct lu_fid_pack *pack)
1991 {
1992         struct inode  *inode = dentry->d_inode;
1993         struct lu_fid *fid   = &osd_oti_get(env)->oti_fid;
1994
1995         lu_igif_build(fid, inode->i_ino, inode->i_generation);
1996         fid_cpu_to_be(fid, fid);
1997         pack->fp_len = sizeof *fid + 1;
1998         memcpy(pack->fp_area, fid, sizeof *fid);
1999 }
2000
2001 static int osd_index_compat_lookup(const struct lu_env *env,
2002                                    struct dt_object *dt,
2003                                    struct dt_rec *rec, const struct dt_key *key,
2004                                    struct lustre_capa *capa)
2005 {
2006         struct osd_object *obj = osd_dt_obj(dt);
2007
2008         struct osd_device      *osd  = osd_obj2dev(obj);
2009         struct osd_thread_info *info = osd_oti_get(env);
2010         struct inode           *dir;
2011
2012         int result;
2013
2014         /*
2015          * XXX temporary solution.
2016          */
2017         struct dentry *dentry;
2018         struct dentry *parent;
2019
2020         LASSERT(osd_invariant(obj));
2021         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
2022         LASSERT(osd_has_index(obj));
2023
2024         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
2025                 return -EACCES;
2026
2027         info->oti_str.name = (const char *)key;
2028         info->oti_str.len  = strlen((const char *)key);
2029
2030         dir = obj->oo_inode;
2031         LASSERT(dir->i_op != NULL && dir->i_op->lookup != NULL);
2032
2033         parent = d_alloc_root(dir);
2034         if (parent == NULL)
2035                 return -ENOMEM;
2036         igrab(dir);
2037         dentry = d_alloc(parent, &info->oti_str);
2038         if (dentry != NULL) {
2039                 struct dentry *d;
2040
2041                 /*
2042                  * XXX passing NULL for nameidata should work for
2043                  * ext3/ldiskfs.
2044                  */
2045                 d = dir->i_op->lookup(dir, dentry, NULL);
2046                 if (d == NULL) {
2047                         /*
2048                          * normal case, result is in @dentry.
2049                          */
2050                         if (dentry->d_inode != NULL) {
2051                                 osd_build_pack(env, osd, dentry,
2052                                                (struct lu_fid_pack *)rec);
2053                                 result = 0;
2054                         } else
2055                                 result = -ENOENT;
2056                  } else {
2057                         /* What? Disconnected alias? Ppheeeww... */
2058                         CERROR("Aliasing where not expected\n");
2059                         result = -EIO;
2060                         dput(d);
2061                 }
2062                 dput(dentry);
2063         } else
2064                 result = -ENOMEM;
2065         dput(parent);
2066         LASSERT(osd_invariant(obj));
2067         return result;
2068 }
2069
2070 static int osd_add_rec(struct osd_thread_info *info, struct osd_device *dev,
2071                        struct inode *dir, struct inode *inode, const char *name)
2072 {
2073         struct dentry *old;
2074         struct dentry *new;
2075         struct dentry *parent;
2076
2077         int result;
2078
2079         info->oti_str.name = name;
2080         info->oti_str.len  = strlen(name);
2081
2082         LASSERT(atomic_read(&dir->i_count) > 0);
2083         result = -ENOMEM;
2084         old = d_alloc(dev->od_obj_area, &info->oti_str);
2085         if (old != NULL) {
2086                 d_instantiate(old, inode);
2087                 igrab(inode);
2088                 LASSERT(atomic_read(&dir->i_count) > 0);
2089                 parent = d_alloc_root(dir);
2090                 if (parent != NULL) {
2091                         igrab(dir);
2092                         LASSERT(atomic_read(&dir->i_count) > 1);
2093                         new = d_alloc(parent, &info->oti_str);
2094                         LASSERT(atomic_read(&dir->i_count) > 1);
2095                         if (new != NULL) {
2096                                 LASSERT(atomic_read(&dir->i_count) > 1);
2097                                 result = dir->i_op->link(old, dir, new);
2098                                 LASSERT(atomic_read(&dir->i_count) > 1);
2099                                 dput(new);
2100                                 LASSERT(atomic_read(&dir->i_count) > 1);
2101                         }
2102                         LASSERT(atomic_read(&dir->i_count) > 1);
2103                         dput(parent);
2104                         LASSERT(atomic_read(&dir->i_count) > 0);
2105                 }
2106                 dput(old);
2107         }
2108         LASSERT(atomic_read(&dir->i_count) > 0);
2109         return result;
2110 }
2111
2112
2113 /*
2114  * XXX Temporary stuff.
2115  */
2116 static int osd_index_compat_insert(const struct lu_env *env,
2117                                    struct dt_object *dt,
2118                                    const struct dt_rec *rec,
2119                                    const struct dt_key *key, struct thandle *th,
2120                                    struct lustre_capa *capa)
2121 {
2122         struct osd_object     *obj = osd_dt_obj(dt);
2123
2124         const char          *name = (const char *)key;
2125
2126         struct lu_device    *ludev = dt->do_lu.lo_dev;
2127         struct lu_object    *luch;
2128
2129         struct osd_thread_info   *info = osd_oti_get(env);
2130         const struct lu_fid_pack *pack  = (const struct lu_fid_pack *)rec;
2131         struct lu_fid            *fid   = &osd_oti_get(env)->oti_fid;
2132
2133         int result;
2134
2135         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
2136         LASSERT(osd_invariant(obj));
2137         LASSERT(th != NULL);
2138
2139         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
2140                 return -EACCES;
2141
2142         result = fid_unpack(pack, fid);
2143         if (result != 0)
2144                 return result;
2145
2146         luch = lu_object_find(env, ludev->ld_site, fid);
2147         if (!IS_ERR(luch)) {
2148                 if (lu_object_exists(luch)) {
2149                         struct osd_object *child;
2150
2151                         child = osd_obj(lu_object_locate(luch->lo_header,
2152                                                          ludev->ld_type));
2153                         if (child != NULL)
2154                                 result = osd_add_rec(info, osd_obj2dev(obj),
2155                                                      obj->oo_inode,
2156                                                      child->oo_inode, name);
2157                         else {
2158                                 CERROR("No osd slice.\n");
2159                                 result = -ENOENT;
2160                         }
2161                         LASSERT(osd_invariant(obj));
2162                         LASSERT(osd_invariant(child));
2163                 } else {
2164                         CERROR("Sorry.\n");
2165                         result = -ENOENT;
2166                 }
2167                 lu_object_put(env, luch);
2168         } else
2169                 result = PTR_ERR(luch);
2170         LASSERT(osd_invariant(obj));
2171         return result;
2172 }
2173
2174 static struct dt_index_operations osd_index_compat_ops = {
2175         .dio_lookup = osd_index_compat_lookup,
2176         .dio_insert = osd_index_compat_insert,
2177         .dio_delete = osd_index_compat_delete
2178 };
2179
2180 /* type constructor/destructor: osd_type_init, osd_type_fini */
2181 LU_TYPE_INIT_FINI(osd, &osd_key);
2182
2183 static struct lu_context_key osd_key = {
2184         .lct_tags = LCT_DT_THREAD | LCT_MD_THREAD,
2185         .lct_init = osd_key_init,
2186         .lct_fini = osd_key_fini,
2187         .lct_exit = osd_key_exit
2188 };
2189
2190 static void *osd_key_init(const struct lu_context *ctx,
2191                           struct lu_context_key *key)
2192 {
2193         struct osd_thread_info *info;
2194
2195         OBD_ALLOC_PTR(info);
2196         if (info != NULL)
2197                 info->oti_env = container_of(ctx, struct lu_env, le_ctx);
2198         else
2199                 info = ERR_PTR(-ENOMEM);
2200         return info;
2201 }
2202
2203 /* context key destructor: osd_key_fini */
2204 LU_KEY_FINI(osd, struct osd_thread_info);
2205
2206 static void osd_key_exit(const struct lu_context *ctx,
2207                          struct lu_context_key *key, void *data)
2208 {
2209 #if OSD_COUNTERS
2210         struct osd_thread_info *info = data;
2211
2212         LASSERT(info->oti_r_locks == 0);
2213         LASSERT(info->oti_w_locks == 0);
2214         LASSERT(info->oti_txns    == 0);
2215 #endif
2216 }
2217
2218 static int osd_device_init(const struct lu_env *env, struct lu_device *d,
2219                            const char *name, struct lu_device *next)
2220 {
2221         int rc;
2222         /* context for commit hooks */
2223         rc = lu_context_init(&osd_dev(d)->od_env_for_commit.le_ctx,
2224                              LCT_MD_THREAD);
2225         if (rc == 0)
2226                 rc = osd_procfs_init(osd_dev(d), name);
2227         return rc;
2228 }
2229
2230 static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
2231 {
2232         struct osd_thread_info *info = osd_oti_get(env);
2233         ENTRY;
2234         if (o->od_obj_area != NULL) {
2235                 dput(o->od_obj_area);
2236                 o->od_obj_area = NULL;
2237         }
2238         osd_oi_fini(info, &o->od_oi);
2239
2240         RETURN(0);
2241 }
2242
2243 static int osd_mount(const struct lu_env *env,
2244                      struct osd_device *o, struct lustre_cfg *cfg)
2245 {
2246         struct lustre_mount_info *lmi;
2247         const char               *dev  = lustre_cfg_string(cfg, 0);
2248         struct osd_thread_info   *info = osd_oti_get(env);
2249         int result;
2250
2251         ENTRY;
2252
2253         if (o->od_mount != NULL) {
2254                 CERROR("Already mounted (%s)\n", dev);
2255                 RETURN(-EEXIST);
2256         }
2257
2258         /* get mount */
2259         lmi = server_get_mount(dev);
2260         if (lmi == NULL) {
2261                 CERROR("Cannot get mount info for %s!\n", dev);
2262                 RETURN(-EFAULT);
2263         }
2264
2265         LASSERT(lmi != NULL);
2266         /* save lustre_mount_info in dt_device */
2267         o->od_mount = lmi;
2268
2269         result = osd_oi_init(info, &o->od_oi, &o->od_dt_dev);
2270         if (result == 0) {
2271                 struct dentry *d;
2272
2273                 d = simple_mkdir(osd_sb(o)->s_root, "*OBJ-TEMP*", 0777, 1);
2274                 if (!IS_ERR(d)) {
2275                         o->od_obj_area = d;
2276                 } else
2277                         result = PTR_ERR(d);
2278         }
2279         if (result != 0)
2280                 osd_shutdown(env, o);
2281         RETURN(result);
2282 }
2283
2284 static struct lu_device *osd_device_fini(const struct lu_env *env,
2285                                          struct lu_device *d)
2286 {
2287         int rc;
2288         ENTRY;
2289
2290         shrink_dcache_sb(osd_sb(osd_dev(d)));
2291         osd_sync(env, lu2dt_dev(d));
2292
2293         rc = osd_procfs_fini(osd_dev(d));
2294         if (rc) {
2295                 CERROR("proc fini error %d \n", rc);
2296                 RETURN (ERR_PTR(rc));
2297         }
2298
2299         if (osd_dev(d)->od_mount)
2300                 server_put_mount(osd_dev(d)->od_mount->lmi_name,
2301                                  osd_dev(d)->od_mount->lmi_mnt);
2302         osd_dev(d)->od_mount = NULL;
2303
2304         lu_context_fini(&osd_dev(d)->od_env_for_commit.le_ctx);
2305         RETURN(NULL);
2306 }
2307
2308 static struct lu_device *osd_device_alloc(const struct lu_env *env,
2309                                           struct lu_device_type *t,
2310                                           struct lustre_cfg *cfg)
2311 {
2312         struct lu_device  *l;
2313         struct osd_device *o;
2314
2315         OBD_ALLOC_PTR(o);
2316         if (o != NULL) {
2317                 int result;
2318
2319                 result = dt_device_init(&o->od_dt_dev, t);
2320                 if (result == 0) {
2321                         l = osd2lu_dev(o);
2322                         l->ld_ops = &osd_lu_ops;
2323                         o->od_dt_dev.dd_ops = &osd_dt_ops;
2324                         spin_lock_init(&o->od_osfs_lock);
2325                         o->od_osfs_age = cfs_time_shift_64(-1000);
2326                         o->od_capa_hash = init_capa_hash();
2327                         if (o->od_capa_hash == NULL)
2328                                 l = ERR_PTR(-ENOMEM);
2329                 } else
2330                         l = ERR_PTR(result);
2331         } else
2332                 l = ERR_PTR(-ENOMEM);
2333         return l;
2334 }
2335
2336 static struct lu_device *osd_device_free(const struct lu_env *env,
2337                                          struct lu_device *d)
2338 {
2339         struct osd_device *o = osd_dev(d);
2340         ENTRY;
2341
2342         cleanup_capa_hash(o->od_capa_hash);
2343         dt_device_fini(&o->od_dt_dev);
2344         OBD_FREE_PTR(o);
2345         RETURN(NULL);
2346 }
2347
2348 static int osd_process_config(const struct lu_env *env,
2349                               struct lu_device *d, struct lustre_cfg *cfg)
2350 {
2351         struct osd_device *o = osd_dev(d);
2352         int err;
2353         ENTRY;
2354
2355         switch(cfg->lcfg_command) {
2356         case LCFG_SETUP:
2357                 err = osd_mount(env, o, cfg);
2358                 break;
2359         case LCFG_CLEANUP:
2360                 err = osd_shutdown(env, o);
2361                 break;
2362         default:
2363                 err = -ENOTTY;
2364         }
2365
2366         RETURN(err);
2367 }
2368 extern void ldiskfs_orphan_cleanup (struct super_block * sb,
2369                                     struct ldiskfs_super_block * es);
2370
2371 static int osd_recovery_complete(const struct lu_env *env,
2372                                  struct lu_device *d)
2373 {
2374         struct osd_device *o = osd_dev(d);
2375         ENTRY;
2376         /* TODO: orphans handling */
2377         ldiskfs_orphan_cleanup(osd_sb(o), LDISKFS_SB(osd_sb(o))->s_es);
2378         RETURN(0);
2379 }
2380
2381 static struct inode *osd_iget(struct osd_thread_info *info,
2382                               struct osd_device *dev,
2383                               const struct osd_inode_id *id)
2384 {
2385         struct inode *inode;
2386
2387         inode = iget(osd_sb(dev), id->oii_ino);
2388         if (inode == NULL) {
2389                 CERROR("no inode\n");
2390                 inode = ERR_PTR(-EACCES);
2391         } else if (is_bad_inode(inode)) {
2392                 CERROR("bad inode\n");
2393                 iput(inode);
2394                 inode = ERR_PTR(-ENOENT);
2395         } else if (inode->i_generation != id->oii_gen) {
2396                 CERROR("stale inode\n");
2397                 iput(inode);
2398                 inode = ERR_PTR(-ESTALE);
2399         }
2400
2401         return inode;
2402
2403 }
2404
2405 static int osd_fid_lookup(const struct lu_env *env,
2406                           struct osd_object *obj, const struct lu_fid *fid)
2407 {
2408         struct osd_thread_info *info;
2409         struct lu_device       *ldev = obj->oo_dt.do_lu.lo_dev;
2410         struct osd_device      *dev;
2411         struct osd_inode_id    *id;
2412         struct osd_oi          *oi;
2413         struct inode           *inode;
2414         int                     result;
2415
2416         LASSERT(osd_invariant(obj));
2417         LASSERT(obj->oo_inode == NULL);
2418         LASSERT(fid_is_sane(fid));
2419         /*
2420          * This assertion checks that osd layer sees only local
2421          * fids. Unfortunately it is somewhat expensive (does a
2422          * cache-lookup). Disabling it for production/acceptance-testing.
2423          */
2424         LASSERT(1 || fid_is_local(ldev->ld_site, fid));
2425
2426         ENTRY;
2427
2428         info = osd_oti_get(env);
2429         dev  = osd_dev(ldev);
2430         id   = &info->oti_id;
2431         oi   = &dev->od_oi;
2432
2433         if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT))
2434                 RETURN(-ENOENT);
2435
2436         result = osd_oi_lookup(info, oi, fid, id);
2437         if (result == 0) {
2438                 inode = osd_iget(info, dev, id);
2439                 if (!IS_ERR(inode)) {
2440                         obj->oo_inode = inode;
2441                         LASSERT(obj->oo_inode->i_sb == osd_sb(dev));
2442                         result = 0;
2443                 } else
2444                         /*
2445                          * If fid wasn't found in oi, inode-less object is
2446                          * created, for which lu_object_exists() returns
2447                          * false. This is used in a (frequent) case when
2448                          * objects are created as locking anchors or
2449                          * place holders for objects yet to be created.
2450                          */
2451                         result = PTR_ERR(inode);
2452         } else if (result == -ENOENT)
2453                 result = 0;
2454         LASSERT(osd_invariant(obj));
2455         RETURN(result);
2456 }
2457
2458 static void osd_inode_getattr(const struct lu_env *env,
2459                               struct inode *inode, struct lu_attr *attr)
2460 {
2461         attr->la_valid      |= LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
2462                                LA_SIZE | LA_BLOCKS | LA_UID | LA_GID |
2463                                LA_FLAGS | LA_NLINK | LA_RDEV | LA_BLKSIZE;
2464
2465         attr->la_atime      = LTIME_S(inode->i_atime);
2466         attr->la_mtime      = LTIME_S(inode->i_mtime);
2467         attr->la_ctime      = LTIME_S(inode->i_ctime);
2468         attr->la_mode       = inode->i_mode;
2469         attr->la_size       = i_size_read(inode);
2470         attr->la_blocks     = inode->i_blocks;
2471         attr->la_uid        = inode->i_uid;
2472         attr->la_gid        = inode->i_gid;
2473         attr->la_flags      = LDISKFS_I(inode)->i_flags;
2474         attr->la_nlink      = inode->i_nlink;
2475         attr->la_rdev       = inode->i_rdev;
2476         attr->la_blksize    = ll_inode_blksize(inode);
2477         attr->la_blkbits    = inode->i_blkbits;
2478 }
2479
2480 /*
2481  * Helpers.
2482  */
2483
2484 static int lu_device_is_osd(const struct lu_device *d)
2485 {
2486         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &osd_lu_ops);
2487 }
2488
2489 static struct osd_object *osd_obj(const struct lu_object *o)
2490 {
2491         LASSERT(lu_device_is_osd(o->lo_dev));
2492         return container_of0(o, struct osd_object, oo_dt.do_lu);
2493 }
2494
2495 static struct osd_device *osd_dt_dev(const struct dt_device *d)
2496 {
2497         LASSERT(lu_device_is_osd(&d->dd_lu_dev));
2498         return container_of0(d, struct osd_device, od_dt_dev);
2499 }
2500
2501 static struct osd_device *osd_dev(const struct lu_device *d)
2502 {
2503         LASSERT(lu_device_is_osd(d));
2504         return osd_dt_dev(container_of0(d, struct dt_device, dd_lu_dev));
2505 }
2506
2507 static struct osd_object *osd_dt_obj(const struct dt_object *d)
2508 {
2509         return osd_obj(&d->do_lu);
2510 }
2511
2512 static struct osd_device *osd_obj2dev(const struct osd_object *o)
2513 {
2514         return osd_dev(o->oo_dt.do_lu.lo_dev);
2515 }
2516
2517 static struct lu_device *osd2lu_dev(struct osd_device *osd)
2518 {
2519         return &osd->od_dt_dev.dd_lu_dev;
2520 }
2521
2522 static struct super_block *osd_sb(const struct osd_device *dev)
2523 {
2524         return dev->od_mount->lmi_mnt->mnt_sb;
2525 }
2526
2527 static journal_t *osd_journal(const struct osd_device *dev)
2528 {
2529         return LDISKFS_SB(osd_sb(dev))->s_journal;
2530 }
2531
2532 static int osd_has_index(const struct osd_object *obj)
2533 {
2534         return obj->oo_dt.do_index_ops != NULL;
2535 }
2536
2537 static int osd_object_invariant(const struct lu_object *l)
2538 {
2539         return osd_invariant(osd_obj(l));
2540 }
2541
2542 static struct lu_object_operations osd_lu_obj_ops = {
2543         .loo_object_init      = osd_object_init,
2544         .loo_object_delete    = osd_object_delete,
2545         .loo_object_release   = osd_object_release,
2546         .loo_object_free      = osd_object_free,
2547         .loo_object_print     = osd_object_print,
2548         .loo_object_invariant = osd_object_invariant
2549 };
2550
2551 static struct lu_device_operations osd_lu_ops = {
2552         .ldo_object_alloc      = osd_object_alloc,
2553         .ldo_process_config    = osd_process_config,
2554         .ldo_recovery_complete = osd_recovery_complete
2555 };
2556
2557 static struct lu_device_type_operations osd_device_type_ops = {
2558         .ldto_init = osd_type_init,
2559         .ldto_fini = osd_type_fini,
2560
2561         .ldto_device_alloc = osd_device_alloc,
2562         .ldto_device_free  = osd_device_free,
2563
2564         .ldto_device_init    = osd_device_init,
2565         .ldto_device_fini    = osd_device_fini
2566 };
2567
2568 static struct lu_device_type osd_device_type = {
2569         .ldt_tags     = LU_DEVICE_DT,
2570         .ldt_name     = LUSTRE_OSD_NAME,
2571         .ldt_ops      = &osd_device_type_ops,
2572         .ldt_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
2573 };
2574
2575 /*
2576  * lprocfs legacy support.
2577  */
2578 static struct obd_ops osd_obd_device_ops = {
2579         .o_owner = THIS_MODULE
2580 };
2581
2582 static int __init osd_mod_init(void)
2583 {
2584         struct lprocfs_static_vars lvars;
2585
2586         lprocfs_osd_init_vars(&lvars);
2587         return class_register_type(&osd_obd_device_ops, NULL, lvars.module_vars,
2588                                    LUSTRE_OSD_NAME, &osd_device_type);
2589 }
2590
2591 static void __exit osd_mod_exit(void)
2592 {
2593         class_unregister_type(LUSTRE_OSD_NAME);
2594 }
2595
2596 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2597 MODULE_DESCRIPTION("Lustre Object Storage Device ("LUSTRE_OSD_NAME")");
2598 MODULE_LICENSE("GPL");
2599
2600 cfs_module(osd, "0.0.2", osd_mod_init, osd_mod_exit);