Whamcloud - gitweb
On a server, a file system object is uniquely identified by a fid, which is
[fs/lustre-release.git] / lustre / osd / osd_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/osd/osd_handler.c
37  *
38  * Top-level entry points into osd module
39  *
40  * Author: Nikita Danilov <nikita@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49
50 /* LUSTRE_VERSION_CODE */
51 #include <lustre_ver.h>
52 /* prerequisite for linux/xattr.h */
53 #include <linux/types.h>
54 /* prerequisite for linux/xattr.h */
55 #include <linux/fs.h>
56 /* XATTR_{REPLACE,CREATE} */
57 #include <linux/xattr.h>
58 /*
59  * XXX temporary stuff: direct access to ldiskfs/jdb. Interface between osd
60  * and file system is not yet specified.
61  */
62 /* handle_t, journal_start(), journal_stop() */
63 #include <linux/jbd.h>
64 /* LDISKFS_SB() */
65 #include <linux/ldiskfs_fs.h>
66 #include <linux/ldiskfs_jbd.h>
67 /* simple_mkdir() */
68 #include <lvfs.h>
69
70 /*
71  * struct OBD_{ALLOC,FREE}*()
72  * OBD_FAIL_CHECK
73  */
74 #include <obd_support.h>
75 /* struct ptlrpc_thread */
76 #include <lustre_net.h>
77
78 /* fid_is_local() */
79 #include <lustre_fid.h>
80 #include <linux/lustre_iam.h>
81
82 #include "osd_internal.h"
83 #include "osd_igif.h"
84
85 struct osd_directory {
86         struct iam_container od_container;
87         struct iam_descr     od_descr;
88         struct semaphore     od_sem;
89 };
90
91 struct osd_object {
92         struct dt_object       oo_dt;
93         /**
94          * Inode for file system object represented by this osd_object. This
95          * inode is pinned for the whole duration of lu_object life.
96          *
97          * Not modified concurrently (either setup early during object
98          * creation, or assigned by osd_object_create() under write lock).
99          */
100         struct inode          *oo_inode;
101         struct rw_semaphore    oo_sem;
102         struct osd_directory  *oo_dir;
103         /** protects inode attributes. */
104         spinlock_t             oo_guard;
105         const struct lu_env   *oo_owner;
106 #ifdef CONFIG_LOCKDEP
107         struct lockdep_map     oo_dep_map;
108 #endif
109 };
110
111 static int   osd_root_get      (const struct lu_env *env,
112                                 struct dt_device *dev, struct lu_fid *f);
113
114 static int   lu_device_is_osd  (const struct lu_device *d);
115 static void  osd_mod_exit      (void) __exit;
116 static int   osd_mod_init      (void) __init;
117 static int   osd_type_init     (struct lu_device_type *t);
118 static void  osd_type_fini     (struct lu_device_type *t);
119 static int   osd_object_init   (const struct lu_env *env,
120                                 struct lu_object *l,
121                                 const struct lu_object_conf *_);
122 static void  osd_object_release(const struct lu_env *env,
123                                 struct lu_object *l);
124 static int   osd_object_print  (const struct lu_env *env, void *cookie,
125                                 lu_printer_t p, const struct lu_object *o);
126 static struct lu_device *osd_device_free   (const struct lu_env *env,
127                                 struct lu_device *m);
128 static void *osd_key_init      (const struct lu_context *ctx,
129                                 struct lu_context_key *key);
130 static void  osd_key_fini      (const struct lu_context *ctx,
131                                 struct lu_context_key *key, void *data);
132 static void  osd_key_exit      (const struct lu_context *ctx,
133                                 struct lu_context_key *key, void *data);
134 static int   osd_has_index     (const struct osd_object *obj);
135 static void  osd_object_init0  (struct osd_object *obj);
136 static int   osd_device_init   (const struct lu_env *env,
137                                 struct lu_device *d, const char *,
138                                 struct lu_device *);
139 static int   osd_fid_lookup    (const struct lu_env *env,
140                                 struct osd_object *obj,
141                                 const struct lu_fid *fid);
142 static void  osd_inode_getattr (const struct lu_env *env,
143                                 struct inode *inode, struct lu_attr *attr);
144 static void  osd_inode_setattr (const struct lu_env *env,
145                                 struct inode *inode, const struct lu_attr *attr);
146 static int   osd_param_is_sane (const struct osd_device *dev,
147                                 const struct txn_param *param);
148 static int   osd_index_lookup  (const struct lu_env *env,
149                                 struct dt_object *dt,
150                                 struct dt_rec *rec, const struct dt_key *key,
151                                 struct lustre_capa *capa);
152 static int   osd_index_insert  (const struct lu_env *env,
153                                 struct dt_object *dt,
154                                 const struct dt_rec *rec,
155                                 const struct dt_key *key,
156                                 struct thandle *handle,
157                                 struct lustre_capa *capa);
158 static int   osd_index_delete  (const struct lu_env *env,
159                                 struct dt_object *dt, const struct dt_key *key,
160                                 struct thandle *handle,
161                                 struct lustre_capa *capa);
162 static int   osd_index_probe   (const struct lu_env *env,
163                                 struct osd_object *o,
164                                 const struct dt_index_features *feat);
165 static int   osd_index_try     (const struct lu_env *env,
166                                 struct dt_object *dt,
167                                 const struct dt_index_features *feat);
168 static void  osd_index_fini    (struct osd_object *o);
169
170 static void  osd_it_fini       (const struct lu_env *env, struct dt_it *di);
171 static int   osd_it_get        (const struct lu_env *env,
172                                 struct dt_it *di, const struct dt_key *key);
173 static void  osd_it_put        (const struct lu_env *env, struct dt_it *di);
174 static int   osd_it_next       (const struct lu_env *env, struct dt_it *di);
175 static int   osd_it_del        (const struct lu_env *env, struct dt_it *di,
176                                 struct thandle *th);
177 static int   osd_it_key_size   (const struct lu_env *env,
178                                 const struct dt_it *di);
179 static void  osd_conf_get      (const struct lu_env *env,
180                                 const struct dt_device *dev,
181                                 struct dt_device_param *param);
182 static void  osd_trans_stop    (const struct lu_env *env,
183                                 struct thandle *th);
184 static int   osd_object_is_root(const struct osd_object *obj);
185
186 static struct osd_object  *osd_obj          (const struct lu_object *o);
187 static struct osd_device  *osd_dev          (const struct lu_device *d);
188 static struct osd_device  *osd_dt_dev       (const struct dt_device *d);
189 static struct osd_object  *osd_dt_obj       (const struct dt_object *d);
190 static struct osd_device  *osd_obj2dev      (const struct osd_object *o);
191 static struct lu_device   *osd2lu_dev       (struct osd_device *osd);
192 static struct lu_device   *osd_device_fini  (const struct lu_env *env,
193                                              struct lu_device *d);
194 static struct lu_device   *osd_device_alloc (const struct lu_env *env,
195                                              struct lu_device_type *t,
196                                              struct lustre_cfg *cfg);
197 static struct lu_object   *osd_object_alloc (const struct lu_env *env,
198                                              const struct lu_object_header *hdr,
199                                              struct lu_device *d);
200 static struct inode       *osd_iget         (struct osd_thread_info *info,
201                                              struct osd_device *dev,
202                                              const struct osd_inode_id *id);
203 static struct super_block *osd_sb           (const struct osd_device *dev);
204 static struct dt_it       *osd_it_init      (const struct lu_env *env,
205                                              struct dt_object *dt, int wable,
206                                              struct lustre_capa *capa);
207 static struct dt_key      *osd_it_key       (const struct lu_env *env,
208                                              const struct dt_it *di);
209 static struct dt_rec      *osd_it_rec       (const struct lu_env *env,
210                                              const struct dt_it *di);
211 static struct timespec    *osd_inode_time   (const struct lu_env *env,
212                                              struct inode *inode,
213                                              __u64 seconds);
214 static struct thandle     *osd_trans_start  (const struct lu_env *env,
215                                              struct dt_device *d,
216                                              struct txn_param *p);
217 static journal_t          *osd_journal      (const struct osd_device *dev);
218
219 static struct lu_device_type_operations osd_device_type_ops;
220 static struct lu_device_type            osd_device_type;
221 static struct lu_object_operations      osd_lu_obj_ops;
222 static struct obd_ops                   osd_obd_device_ops;
223 static struct lu_device_operations      osd_lu_ops;
224 static struct lu_context_key            osd_key;
225 static struct dt_object_operations      osd_obj_ops;
226 static struct dt_body_operations        osd_body_ops;
227 static struct dt_index_operations       osd_index_ops;
228 static struct dt_index_operations       osd_index_compat_ops;
229
230 struct osd_thandle {
231         struct thandle          ot_super;
232         handle_t               *ot_handle;
233         struct journal_callback ot_jcb;
234 };
235
236 /*
237  * Invariants, assertions.
238  */
239
240 /*
241  * XXX: do not enable this, until invariant checking code is made thread safe
242  * in the face of pdirops locking.
243  */
244 #define OSD_INVARIANT_CHECKS (0)
245
246 #if OSD_INVARIANT_CHECKS
247 static int osd_invariant(const struct osd_object *obj)
248 {
249         return
250                 obj != NULL &&
251                 ergo(obj->oo_inode != NULL,
252                      obj->oo_inode->i_sb == osd_sb(osd_obj2dev(obj)) &&
253                      atomic_read(&obj->oo_inode->i_count) > 0) &&
254                 ergo(obj->oo_dir != NULL &&
255                      obj->oo_dir->od_conationer.ic_object != NULL,
256                      obj->oo_dir->od_conationer.ic_object == obj->oo_inode);
257 }
258 #else
259 #define osd_invariant(obj) (1)
260 #endif
261
262 static inline struct osd_thread_info *osd_oti_get(const struct lu_env *env)
263 {
264         return lu_context_key_get(&env->le_ctx, &osd_key);
265 }
266
267 /*
268  * Concurrency: doesn't matter
269  */
270 static int osd_read_locked(const struct lu_env *env, struct osd_object *o)
271 {
272         return osd_oti_get(env)->oti_r_locks > 0;
273 }
274
275 /*
276  * Concurrency: doesn't matter
277  */
278 static int osd_write_locked(const struct lu_env *env, struct osd_object *o)
279 {
280         struct osd_thread_info *oti = osd_oti_get(env);
281         return oti->oti_w_locks > 0 && o->oo_owner == env;
282 }
283
284 /*
285  * Concurrency: doesn't access mutable data
286  */
287 static int osd_root_get(const struct lu_env *env,
288                         struct dt_device *dev, struct lu_fid *f)
289 {
290         struct inode *inode;
291
292         inode = osd_sb(osd_dt_dev(dev))->s_root->d_inode;
293         lu_igif_build(f, inode->i_ino, inode->i_generation);
294         return 0;
295 }
296
297 /*
298  * OSD object methods.
299  */
300
301 /*
302  * Concurrency: no concurrent access is possible that early in object
303  * life-cycle.
304  */
305 static struct lu_object *osd_object_alloc(const struct lu_env *env,
306                                           const struct lu_object_header *hdr,
307                                           struct lu_device *d)
308 {
309         struct osd_object *mo;
310
311         OBD_ALLOC_PTR(mo);
312         if (mo != NULL) {
313                 struct lu_object *l;
314
315                 l = &mo->oo_dt.do_lu;
316                 dt_object_init(&mo->oo_dt, NULL, d);
317                 mo->oo_dt.do_ops = &osd_obj_ops;
318                 l->lo_ops = &osd_lu_obj_ops;
319                 init_rwsem(&mo->oo_sem);
320                 spin_lock_init(&mo->oo_guard);
321                 return l;
322         } else
323                 return NULL;
324 }
325
326 /*
327  * Concurrency: shouldn't matter.
328  */
329 static void osd_object_init0(struct osd_object *obj)
330 {
331         LASSERT(obj->oo_inode != NULL);
332         obj->oo_dt.do_body_ops = &osd_body_ops;
333         obj->oo_dt.do_lu.lo_header->loh_attr |=
334                 (LOHA_EXISTS | (obj->oo_inode->i_mode & S_IFMT));
335 }
336
337 /*
338  * Concurrency: no concurrent access is possible that early in object
339  * life-cycle.
340  */
341 static int osd_object_init(const struct lu_env *env, struct lu_object *l,
342                            const struct lu_object_conf *_)
343 {
344         struct osd_object *obj = osd_obj(l);
345         int result;
346
347         LINVRNT(osd_invariant(obj));
348
349         result = osd_fid_lookup(env, obj, lu_object_fid(l));
350         if (result == 0) {
351                 if (obj->oo_inode != NULL)
352                         osd_object_init0(obj);
353         }
354         LINVRNT(osd_invariant(obj));
355         return result;
356 }
357
358 /*
359  * Concurrency: no concurrent access is possible that late in object
360  * life-cycle.
361  */
362 static void osd_object_free(const struct lu_env *env, struct lu_object *l)
363 {
364         struct osd_object *obj = osd_obj(l);
365
366         LINVRNT(osd_invariant(obj));
367
368         dt_object_fini(&obj->oo_dt);
369         OBD_FREE_PTR(obj);
370 }
371
372 static struct iam_path_descr *osd_ipd_get(const struct lu_env *env,
373                                           const struct iam_container *bag)
374 {
375         return bag->ic_descr->id_ops->id_ipd_alloc(bag,
376                                                    osd_oti_get(env)->oti_ipd);
377 }
378
379 static void osd_ipd_put(const struct lu_env *env,
380                         const struct iam_container *bag,
381                         struct iam_path_descr *ipd)
382 {
383         bag->ic_descr->id_ops->id_ipd_free(ipd);
384 }
385
386 /*
387  * Concurrency: no concurrent access is possible that late in object
388  * life-cycle.
389  */
390 static void osd_index_fini(struct osd_object *o)
391 {
392         struct iam_container *bag;
393
394         if (o->oo_dir != NULL) {
395                 bag = &o->oo_dir->od_container;
396                 if (o->oo_inode != NULL) {
397                         if (bag->ic_object == o->oo_inode)
398                                 iam_container_fini(bag);
399                 }
400                 OBD_FREE_PTR(o->oo_dir);
401                 o->oo_dir = NULL;
402         }
403 }
404
405 /*
406  * Concurrency: no concurrent access is possible that late in object
407  * life-cycle (for all existing callers, that is. New callers have to provide
408  * their own locking.)
409  */
410 static int osd_inode_unlinked(const struct inode *inode)
411 {
412         return inode->i_nlink == 0;
413 }
414
415 enum {
416         OSD_TXN_OI_DELETE_CREDITS    = 20,
417         OSD_TXN_INODE_DELETE_CREDITS = 20
418 };
419
420 /*
421  * Concurrency: no concurrent access is possible that late in object
422  * life-cycle.
423  */
424 static int osd_inode_remove(const struct lu_env *env, struct osd_object *obj)
425 {
426         const struct lu_fid    *fid = lu_object_fid(&obj->oo_dt.do_lu);
427         struct osd_device      *osd = osd_obj2dev(obj);
428         struct osd_thread_info *oti = osd_oti_get(env);
429         struct txn_param       *prm = &oti->oti_txn;
430         struct thandle         *th;
431         int result;
432
433         txn_param_init(prm, OSD_TXN_OI_DELETE_CREDITS + 
434                             OSD_TXN_INODE_DELETE_CREDITS);
435         th = osd_trans_start(env, &osd->od_dt_dev, prm);
436         if (!IS_ERR(th)) {
437                 result = osd_oi_delete(oti, &osd->od_oi, fid, th);
438                 osd_trans_stop(env, th);
439         } else
440                 result = PTR_ERR(th);
441         return result;
442 }
443
444 /*
445  * Called just before object is freed. Releases all resources except for
446  * object itself (that is released by osd_object_free()).
447  *
448  * Concurrency: no concurrent access is possible that late in object
449  * life-cycle.
450  */
451 static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
452 {
453         struct osd_object *obj   = osd_obj(l);
454         struct inode      *inode = obj->oo_inode;
455
456         LINVRNT(osd_invariant(obj));
457
458         /*
459          * If object is unlinked remove fid->ino mapping from object index.
460          *
461          * File body will be deleted by iput().
462          */
463
464         osd_index_fini(obj);
465         if (inode != NULL) {
466                 int result;
467
468                 if (osd_inode_unlinked(inode)) {
469                         result = osd_inode_remove(env, obj);
470                         if (result != 0)
471                                 LU_OBJECT_DEBUG(D_ERROR, env, l,
472                                                 "Failed to cleanup: %d\n",
473                                                 result);
474                 }
475                 iput(inode);
476                 obj->oo_inode = NULL;
477         }
478 }
479
480 /*
481  * Concurrency: ->loo_object_release() is called under site spin-lock.
482  */
483 static void osd_object_release(const struct lu_env *env,
484                                struct lu_object *l)
485 {
486         struct osd_object *o = osd_obj(l);
487
488         LASSERT(!lu_object_is_dying(l->lo_header));
489         if (o->oo_inode != NULL && osd_inode_unlinked(o->oo_inode))
490                 set_bit(LU_OBJECT_HEARD_BANSHEE, &l->lo_header->loh_flags);
491 }
492
493 /*
494  * Concurrency: shouldn't matter.
495  */
496 static int osd_object_print(const struct lu_env *env, void *cookie,
497                             lu_printer_t p, const struct lu_object *l)
498 {
499         struct osd_object *o = osd_obj(l);
500         struct iam_descr  *d;
501
502         if (o->oo_dir != NULL)
503                 d = o->oo_dir->od_container.ic_descr;
504         else
505                 d = NULL;
506         return (*p)(env, cookie, LUSTRE_OSD_NAME"-object@%p(i:%p:%lu/%u)[%s]",
507                     o, o->oo_inode,
508                     o->oo_inode ? o->oo_inode->i_ino : 0UL,
509                     o->oo_inode ? o->oo_inode->i_generation : 0,
510                     d ? d->id_ops->id_name : "plain");
511 }
512
513 /*
514  * Concurrency: shouldn't matter.
515  */
516 int osd_statfs(const struct lu_env *env, struct dt_device *d,
517                struct kstatfs *sfs)
518 {
519         struct osd_device *osd = osd_dt_dev(d);
520         struct super_block *sb = osd_sb(osd);
521         int result = 0;
522
523         spin_lock(&osd->od_osfs_lock);
524         /* cache 1 second */
525         if (cfs_time_before_64(osd->od_osfs_age, cfs_time_shift_64(-1))) {
526                 result = ll_do_statfs(sb, &osd->od_kstatfs);
527                 if (likely(result == 0)) /* N.B. statfs can't really fail */
528                         osd->od_osfs_age = cfs_time_current_64();
529         }
530
531         if (likely(result == 0))
532                 *sfs = osd->od_kstatfs; 
533         spin_unlock(&osd->od_osfs_lock);
534
535         return result;
536 }
537
538 /*
539  * Concurrency: doesn't access mutable data.
540  */
541 static void osd_conf_get(const struct lu_env *env,
542                          const struct dt_device *dev,
543                          struct dt_device_param *param)
544 {
545         /*
546          * XXX should be taken from not-yet-existing fs abstraction layer.
547          */
548         param->ddp_max_name_len  = LDISKFS_NAME_LEN;
549         param->ddp_max_nlink     = LDISKFS_LINK_MAX;
550         param->ddp_block_shift   = osd_sb(osd_dt_dev(dev))->s_blocksize_bits;
551 }
552
553 /*
554  * Journal
555  */
556
557 /*
558  * Concurrency: doesn't access mutable data.
559  */
560 static int osd_param_is_sane(const struct osd_device *dev,
561                              const struct txn_param *param)
562 {
563         return param->tp_credits <= osd_journal(dev)->j_max_transaction_buffers;
564 }
565
566 /*
567  * Concurrency: shouldn't matter.
568  */
569 static void osd_trans_commit_cb(struct journal_callback *jcb, int error)
570 {
571         struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb);
572         struct thandle     *th = &oh->ot_super;
573         struct dt_device   *dev = th->th_dev;
574
575         LASSERT(dev != NULL);
576         LASSERT(oh->ot_handle == NULL);
577
578         if (error) {
579                 CERROR("transaction @0x%p commit error: %d\n", th, error);
580         } else {
581                 struct lu_env *env = &osd_dt_dev(dev)->od_env_for_commit;
582                 /*
583                  * This od_env_for_commit is only for commit usage.  see
584                  * "struct dt_device"
585                  */
586                 lu_context_enter(&env->le_ctx);
587                 dt_txn_hook_commit(env, th);
588                 lu_context_exit(&env->le_ctx);
589         }
590
591         lu_device_put(&dev->dd_lu_dev);
592         th->th_dev = NULL;
593
594         lu_context_exit(&th->th_ctx);
595         lu_context_fini(&th->th_ctx);
596         OBD_FREE_PTR(oh);
597 }
598
599 /*
600  * Concurrency: shouldn't matter.
601  */
602 static struct thandle *osd_trans_start(const struct lu_env *env,
603                                        struct dt_device *d,
604                                        struct txn_param *p)
605 {
606         struct osd_device  *dev = osd_dt_dev(d);
607         handle_t           *jh;
608         struct osd_thandle *oh;
609         struct thandle     *th;
610         int hook_res;
611
612         ENTRY;
613
614         hook_res = dt_txn_hook_start(env, d, p);
615         if (hook_res != 0)
616                 RETURN(ERR_PTR(hook_res));
617
618         if (osd_param_is_sane(dev, p)) {
619                 OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO);
620                 if (oh != NULL) {
621                         /*
622                          * XXX temporary stuff. Some abstraction layer should
623                          * be used.
624                          */
625
626                         jh = journal_start(osd_journal(dev), p->tp_credits);
627                         if (!IS_ERR(jh)) {
628                                 oh->ot_handle = jh;
629                                 th = &oh->ot_super;
630                                 th->th_dev = d;
631                                 th->th_result = 0;
632                                 jh->h_sync = p->tp_sync;
633                                 lu_device_get(&d->dd_lu_dev);
634                                 /* add commit callback */
635                                 lu_context_init(&th->th_ctx, LCT_TX_HANDLE);
636                                 lu_context_enter(&th->th_ctx);
637                                 journal_callback_set(jh, osd_trans_commit_cb,
638                                                      (struct journal_callback *)&oh->ot_jcb);
639 #if OSD_COUNTERS
640                                 {
641                                         struct osd_thread_info *oti =
642                                                 osd_oti_get(env);
643
644                                         LASSERT(oti->oti_txns == 0);
645                                         LASSERT(oti->oti_r_locks == 0);
646                                         LASSERT(oti->oti_w_locks == 0);
647                                         oti->oti_txns++;
648                                 }
649 #endif
650                         } else {
651                                 OBD_FREE_PTR(oh);
652                                 th = (void *)jh;
653                         }
654                 } else
655                         th = ERR_PTR(-ENOMEM);
656         } else {
657                 CERROR("Invalid transaction parameters\n");
658                 th = ERR_PTR(-EINVAL);
659         }
660
661         RETURN(th);
662 }
663
664 /*
665  * Concurrency: shouldn't matter.
666  */
667 static void osd_trans_stop(const struct lu_env *env, struct thandle *th)
668 {
669         int result;
670         struct osd_thandle *oh;
671         struct osd_thread_info *oti = osd_oti_get(env);
672
673         ENTRY;
674
675         oh = container_of0(th, struct osd_thandle, ot_super);
676         if (oh->ot_handle != NULL) {
677                 handle_t *hdl = oh->ot_handle;
678
679                 LASSERT(oti->oti_txns == 1);
680                 oti->oti_txns--;
681                 LASSERT(oti->oti_r_locks == 0);
682                 LASSERT(oti->oti_w_locks == 0);
683                 result = dt_txn_hook_stop(env, th);
684                 if (result != 0)
685                         CERROR("Failure in transaction hook: %d\n", result);
686                 oh->ot_handle = NULL;
687                 result = journal_stop(hdl);
688                 if (result != 0)
689                         CERROR("Failure to stop transaction: %d\n", result);
690         }
691         EXIT;
692 }
693
694 /*
695  * Concurrency: shouldn't matter.
696  */
697 static int osd_sync(const struct lu_env *env, struct dt_device *d)
698 {
699         CDEBUG(D_HA, "syncing OSD %s\n", LUSTRE_OSD_NAME);
700         return ldiskfs_force_commit(osd_sb(osd_dt_dev(d)));
701 }
702
703 /*
704  * Concurrency: shouldn't matter.
705  */
706 lvfs_sbdev_type fsfilt_ldiskfs_journal_sbdev(struct super_block *);
707
708 static void osd_ro(const struct lu_env *env, struct dt_device *d)
709 {
710         ENTRY;
711
712         CERROR("*** setting device %s read-only ***\n", LUSTRE_OSD_NAME);
713
714         __lvfs_set_rdonly(lvfs_sbdev(osd_sb(osd_dt_dev(d))),
715                           fsfilt_ldiskfs_journal_sbdev(osd_sb(osd_dt_dev(d))));
716         EXIT;
717 }
718
719 /*
720  * Concurrency: serialization provided by callers.
721  */
722 static int osd_init_capa_ctxt(const struct lu_env *env, struct dt_device *d,
723                               int mode, unsigned long timeout, __u32 alg,
724                               struct lustre_capa_key *keys)
725 {
726         struct osd_device *dev = osd_dt_dev(d);
727         ENTRY;
728
729         dev->od_fl_capa = mode;
730         dev->od_capa_timeout = timeout;
731         dev->od_capa_alg = alg;
732         dev->od_capa_keys = keys;
733         RETURN(0);
734 }
735
736 /* Note: we did not count into QUOTA here, If we mount with --data_journal
737  * we may need more*/
738 static const int osd_dto_credits[DTO_NR] = {
739         /*
740          * Insert/Delete. IAM EXT3_INDEX_EXTRA_TRANS_BLOCKS(8) +
741          * EXT3_SINGLEDATA_TRANS_BLOCKS 8 XXX Note: maybe iam need more,since
742          * iam have more level than Ext3 htree
743          */
744         [DTO_INDEX_INSERT]  = 16,
745         [DTO_INDEX_DELETE]  = 16,
746         [DTO_IDNEX_UPDATE]  = 16,
747         /*
748          * Create a object. Same as create object in Ext3 filesystem, but did
749          * not count QUOTA i EXT3_DATA_TRANS_BLOCKS(12) +
750          * INDEX_EXTRA_BLOCKS(8) + 3(inode bits,groups, GDT)
751          */
752         [DTO_OBJECT_CREATE] = 23,
753         [DTO_OBJECT_DELETE] = 23,
754         /*
755          * Attr set credits 3 inode, group, GDT
756          */
757         [DTO_ATTR_SET]      = 3,
758         /*
759          * XATTR_SET. SAME AS XATTR of EXT3 EXT3_DATA_TRANS_BLOCKS XXX Note:
760          * in original MDS implmentation EXT3_INDEX_EXTRA_TRANS_BLOCKS are
761          * also counted in. Do not know why?
762          */
763         [DTO_XATTR_SET]     = 16,
764         [DTO_LOG_REC]       = 16,
765         /* creadits for inode change during write */
766         [DTO_WRITE_BASE]    = 3,
767         /* credits for single block write */
768         [DTO_WRITE_BLOCK]   = 12 
769 };
770
771 static int osd_credit_get(const struct lu_env *env, struct dt_device *d,
772                           enum dt_txn_op op)
773 {
774         LASSERT(0 <= op && op < ARRAY_SIZE(osd_dto_credits));
775         return osd_dto_credits[op];
776 }
777
778 static struct dt_device_operations osd_dt_ops = {
779         .dt_root_get       = osd_root_get,
780         .dt_statfs         = osd_statfs,
781         .dt_trans_start    = osd_trans_start,
782         .dt_trans_stop     = osd_trans_stop,
783         .dt_conf_get       = osd_conf_get,
784         .dt_sync           = osd_sync,
785         .dt_ro             = osd_ro,
786         .dt_credit_get     = osd_credit_get,
787         .dt_init_capa_ctxt = osd_init_capa_ctxt,
788 };
789
790 static void osd_object_read_lock(const struct lu_env *env,
791                                  struct dt_object *dt, unsigned role)
792 {
793         struct osd_object *obj = osd_dt_obj(dt);
794         struct osd_thread_info *oti = osd_oti_get(env);
795
796         LINVRNT(osd_invariant(obj));
797
798         LASSERT(obj->oo_owner != env);
799         down_read_nested(&obj->oo_sem, role);
800
801                 LASSERT(obj->oo_owner == NULL);
802                 oti->oti_r_locks++;
803 }
804
805 static void osd_object_write_lock(const struct lu_env *env,
806                                   struct dt_object *dt, unsigned role)
807 {
808         struct osd_object *obj = osd_dt_obj(dt);
809         struct osd_thread_info *oti = osd_oti_get(env);
810
811         LINVRNT(osd_invariant(obj));
812
813         LASSERT(obj->oo_owner != env);
814         down_write_nested(&obj->oo_sem, role);
815
816                 LASSERT(obj->oo_owner == NULL);
817                 obj->oo_owner = env;
818                 oti->oti_w_locks++;
819 }
820
821 static void osd_object_read_unlock(const struct lu_env *env,
822                                    struct dt_object *dt)
823 {
824         struct osd_object *obj = osd_dt_obj(dt);
825                 struct osd_thread_info *oti = osd_oti_get(env);
826
827         LINVRNT(osd_invariant(obj));
828
829                 LASSERT(oti->oti_r_locks > 0);
830                 oti->oti_r_locks--;
831         up_read(&obj->oo_sem);
832 }
833
834 static void osd_object_write_unlock(const struct lu_env *env,
835                                     struct dt_object *dt)
836 {
837         struct osd_object *obj = osd_dt_obj(dt);
838                 struct osd_thread_info *oti = osd_oti_get(env);
839
840         LINVRNT(osd_invariant(obj));
841
842                 LASSERT(obj->oo_owner == env);
843                 LASSERT(oti->oti_w_locks > 0);
844                 oti->oti_w_locks--;
845                 obj->oo_owner = NULL;
846         up_write(&obj->oo_sem);
847 }
848
849 static int capa_is_sane(const struct lu_env *env,
850                         struct osd_device *dev,
851                         struct lustre_capa *capa,
852                         struct lustre_capa_key *keys)
853 {
854         struct osd_thread_info *oti = osd_oti_get(env);
855         struct obd_capa *oc;
856         int i, rc = 0;
857         ENTRY;
858
859         oc = capa_lookup(dev->od_capa_hash, capa, 0);
860         if (oc) {
861                 if (capa_is_expired(oc)) {
862                         DEBUG_CAPA(D_ERROR, capa, "expired");
863                         rc = -ESTALE;
864                 }
865                 capa_put(oc);
866                 RETURN(rc);
867         }
868
869         spin_lock(&capa_lock);
870         for (i = 0; i < 2; i++) {
871                 if (keys[i].lk_keyid == capa->lc_keyid) {
872                         oti->oti_capa_key = keys[i];
873                         break;
874                 }
875         }
876         spin_unlock(&capa_lock);
877
878         if (i == 2) {
879                 DEBUG_CAPA(D_ERROR, capa, "no matched capa key");
880                 RETURN(-ESTALE);
881         }
882
883         rc = capa_hmac(oti->oti_capa.lc_hmac, capa, oti->oti_capa_key.lk_key);
884         if (rc)
885                 RETURN(rc);
886         if (memcmp(oti->oti_capa.lc_hmac, capa->lc_hmac, sizeof(capa->lc_hmac)))
887         {
888                 DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch");
889                 RETURN(-EACCES);
890         }
891
892         oc = capa_add(dev->od_capa_hash, capa);
893         capa_put(oc);
894
895         RETURN(0);
896 }
897
898 static int osd_object_auth(const struct lu_env *env, struct dt_object *dt,
899                            struct lustre_capa *capa, __u64 opc)
900 {
901         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
902         struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
903         int rc;
904
905         if (!dev->od_fl_capa)
906                 return 0;
907
908         if (capa == BYPASS_CAPA)
909                 return 0;
910
911         if (!capa) {
912                 CERROR("no capability is provided for fid "DFID"\n", PFID(fid));
913                 return -EACCES;
914         }
915
916         if (!lu_fid_eq(fid, &capa->lc_fid)) {
917                 DEBUG_CAPA(D_ERROR, capa, "fid "DFID" mismatch with",
918                            PFID(fid));
919                 return -EACCES;
920         }
921
922         if (!capa_opc_supported(capa, opc)) {
923                 DEBUG_CAPA(D_ERROR, capa, "opc "LPX64" not supported by", opc);
924                 return -EACCES;
925         }
926
927         if ((rc = capa_is_sane(env, dev, capa, dev->od_capa_keys))) {
928                 DEBUG_CAPA(D_ERROR, capa, "insane (rc %d)", rc);
929                 return -EACCES;
930         }
931
932         return 0;
933 }
934
935 static int osd_attr_get(const struct lu_env *env,
936                         struct dt_object *dt,
937                         struct lu_attr *attr,
938                         struct lustre_capa *capa)
939 {
940         struct osd_object *obj = osd_dt_obj(dt);
941
942         LASSERT(dt_object_exists(dt));
943         LINVRNT(osd_invariant(obj));
944
945         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
946                 return -EACCES;
947
948         spin_lock(&obj->oo_guard);
949         osd_inode_getattr(env, obj->oo_inode, attr);
950         spin_unlock(&obj->oo_guard);
951         return 0;
952 }
953
954 static int osd_attr_set(const struct lu_env *env,
955                         struct dt_object *dt,
956                         const struct lu_attr *attr,
957                         struct thandle *handle,
958                         struct lustre_capa *capa)
959 {
960         struct osd_object *obj = osd_dt_obj(dt);
961
962         LASSERT(handle != NULL);
963         LASSERT(dt_object_exists(dt));
964         LASSERT(osd_invariant(obj));
965
966         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
967                 return -EACCES;
968
969         spin_lock(&obj->oo_guard);
970         osd_inode_setattr(env, obj->oo_inode, attr);
971         spin_unlock(&obj->oo_guard);
972
973         mark_inode_dirty(obj->oo_inode);
974         return 0;
975 }
976
977 static struct timespec *osd_inode_time(const struct lu_env *env,
978                                        struct inode *inode, __u64 seconds)
979 {
980         struct osd_thread_info *oti = osd_oti_get(env);
981         struct timespec        *t   = &oti->oti_time;
982
983         t->tv_sec  = seconds;
984         t->tv_nsec = 0;
985         *t = timespec_trunc(*t, get_sb_time_gran(inode->i_sb));
986         return t;
987 }
988
989 static void osd_inode_setattr(const struct lu_env *env,
990                               struct inode *inode, const struct lu_attr *attr)
991 {
992         __u64 bits;
993
994         bits = attr->la_valid;
995
996         LASSERT(!(bits & LA_TYPE)); /* Huh? You want too much. */
997
998         if (bits & LA_ATIME)
999                 inode->i_atime  = *osd_inode_time(env, inode, attr->la_atime);
1000         if (bits & LA_CTIME)
1001                 inode->i_ctime  = *osd_inode_time(env, inode, attr->la_ctime);
1002         if (bits & LA_MTIME)
1003                 inode->i_mtime  = *osd_inode_time(env, inode, attr->la_mtime);
1004         if (bits & LA_SIZE) {
1005                 LDISKFS_I(inode)->i_disksize = attr->la_size;
1006                 i_size_write(inode, attr->la_size);
1007         }
1008         if (bits & LA_BLOCKS)
1009                 inode->i_blocks = attr->la_blocks;
1010         if (bits & LA_MODE)
1011                 inode->i_mode   = (inode->i_mode & S_IFMT) |
1012                         (attr->la_mode & ~S_IFMT);
1013         if (bits & LA_UID)
1014                 inode->i_uid    = attr->la_uid;
1015         if (bits & LA_GID)
1016                 inode->i_gid    = attr->la_gid;
1017         if (bits & LA_NLINK)
1018                 inode->i_nlink  = attr->la_nlink;
1019         if (bits & LA_RDEV)
1020                 inode->i_rdev   = attr->la_rdev;
1021
1022         if (bits & LA_FLAGS) {
1023                 struct ldiskfs_inode_info *li = LDISKFS_I(inode);
1024
1025                 li->i_flags = (li->i_flags & ~LDISKFS_FL_USER_MODIFIABLE) |
1026                         (attr->la_flags & LDISKFS_FL_USER_MODIFIABLE);
1027         }
1028 }
1029
1030 /*
1031  * Object creation.
1032  *
1033  * XXX temporary solution.
1034  */
1035
1036 static int osd_create_pre(struct osd_thread_info *info, struct osd_object *obj,
1037                           struct lu_attr *attr, struct thandle *th)
1038 {
1039         return 0;
1040 }
1041
1042 static int osd_create_post(struct osd_thread_info *info, struct osd_object *obj,
1043                            struct lu_attr *attr, struct thandle *th)
1044 {
1045         LASSERT(obj->oo_inode != NULL);
1046
1047         osd_object_init0(obj);
1048         return 0;
1049 }
1050
1051 extern struct inode *ldiskfs_create_inode(handle_t *handle,
1052                                           struct inode * dir, int mode);
1053
1054 static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
1055                       umode_t mode,
1056                       struct dt_allocation_hint *hint,
1057                       struct thandle *th)
1058 {
1059         int result;
1060         struct osd_device  *osd = osd_obj2dev(obj);
1061         struct osd_thandle *oth;
1062         struct inode       *parent;
1063         struct inode       *inode;
1064
1065         LINVRNT(osd_invariant(obj));
1066         LASSERT(obj->oo_inode == NULL);
1067         LASSERT(osd->od_obj_area != NULL);
1068
1069         oth = container_of(th, struct osd_thandle, ot_super);
1070         LASSERT(oth->ot_handle->h_transaction != NULL);
1071
1072         if (hint && hint->dah_parent)
1073                 parent = osd_dt_obj(hint->dah_parent)->oo_inode;
1074         else
1075                 parent = osd->od_obj_area->d_inode;
1076         LASSERT(parent->i_op != NULL);
1077
1078         inode = ldiskfs_create_inode(oth->ot_handle, parent, mode);
1079         if (!IS_ERR(inode)) {
1080                 obj->oo_inode = inode;
1081                 result = 0;
1082         } else
1083                 result = PTR_ERR(inode);
1084         LINVRNT(osd_invariant(obj));
1085         return result;
1086 }
1087
1088
1089 extern int iam_lvar_create(struct inode *obj, int keysize, int ptrsize,
1090                            int recsize, handle_t *handle);
1091
1092 enum {
1093         OSD_NAME_LEN = 255
1094 };
1095
1096 static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj,
1097                      struct lu_attr *attr,
1098                      struct dt_allocation_hint *hint,
1099                      struct thandle *th)
1100 {
1101         int result;
1102         struct osd_thandle *oth;
1103
1104         LASSERT(S_ISDIR(attr->la_mode));
1105
1106         oth = container_of(th, struct osd_thandle, ot_super);
1107         LASSERT(oth->ot_handle->h_transaction != NULL);
1108         result = osd_mkfile(info, obj, (attr->la_mode &
1109                             (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1110         if (result == 0) {
1111                 LASSERT(obj->oo_inode != NULL);
1112                 /*
1113                  * XXX uh-oh... call low-level iam function directly.
1114                  */
1115                 result = iam_lvar_create(obj->oo_inode, OSD_NAME_LEN, 4,
1116                                          sizeof (struct lu_fid_pack),
1117                                          oth->ot_handle);
1118         }
1119         return result;
1120 }
1121
1122 static int osd_mkreg(struct osd_thread_info *info, struct osd_object *obj,
1123                      struct lu_attr *attr,
1124                      struct dt_allocation_hint *hint,
1125                      struct thandle *th)
1126 {
1127         LASSERT(S_ISREG(attr->la_mode));
1128         return osd_mkfile(info, obj, (attr->la_mode &
1129                                (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1130 }
1131
1132 static int osd_mksym(struct osd_thread_info *info, struct osd_object *obj,
1133                      struct lu_attr *attr,
1134                      struct dt_allocation_hint *hint,
1135                      struct thandle *th)
1136 {
1137         LASSERT(S_ISLNK(attr->la_mode));
1138         return osd_mkfile(info, obj, (attr->la_mode &
1139                               (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1140 }
1141
1142 static int osd_mknod(struct osd_thread_info *info, struct osd_object *obj,
1143                      struct lu_attr *attr,
1144                      struct dt_allocation_hint *hint,
1145                      struct thandle *th)
1146 {
1147         int result;
1148         struct osd_device *osd = osd_obj2dev(obj);
1149         struct inode      *dir;
1150         umode_t mode = attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX);
1151
1152         LINVRNT(osd_invariant(obj));
1153         LASSERT(obj->oo_inode == NULL);
1154         LASSERT(osd->od_obj_area != NULL);
1155         LASSERT(S_ISCHR(mode) || S_ISBLK(mode) ||
1156                 S_ISFIFO(mode) || S_ISSOCK(mode));
1157
1158         dir = osd->od_obj_area->d_inode;
1159         LASSERT(dir->i_op != NULL);
1160
1161         result = osd_mkfile(info, obj, mode, hint, th);
1162         if (result == 0) {
1163                 LASSERT(obj->oo_inode != NULL);
1164                 init_special_inode(obj->oo_inode, mode, attr->la_rdev);
1165         }
1166         LINVRNT(osd_invariant(obj));
1167         return result;
1168 }
1169
1170 typedef int (*osd_obj_type_f)(struct osd_thread_info *, struct osd_object *,
1171                               struct lu_attr *,
1172                               struct dt_allocation_hint *hint,
1173                               struct thandle *);
1174
1175 static osd_obj_type_f osd_create_type_f(__u32 mode)
1176 {
1177         osd_obj_type_f result;
1178
1179         switch (mode) {
1180         case S_IFDIR:
1181                 result = osd_mkdir;
1182                 break;
1183         case S_IFREG:
1184                 result = osd_mkreg;
1185                 break;
1186         case S_IFLNK:
1187                 result = osd_mksym;
1188                 break;
1189         case S_IFCHR:
1190         case S_IFBLK:
1191         case S_IFIFO:
1192         case S_IFSOCK:
1193                 result = osd_mknod;
1194                 break;
1195         default:
1196                 LBUG();
1197                 break;
1198         }
1199         return result;
1200 }
1201
1202
1203 static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah,
1204                         struct dt_object *parent, umode_t child_mode)
1205 {
1206         LASSERT(ah);
1207
1208         memset(ah, 0, sizeof(*ah));
1209         ah->dah_parent = parent;
1210         ah->dah_mode = child_mode;
1211 }
1212
1213
1214 /*
1215  * Concurrency: @dt is write locked.
1216  */
1217 static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
1218                              struct lu_attr *attr, 
1219                              struct dt_allocation_hint *hint,
1220                              struct thandle *th)
1221 {
1222         const struct lu_fid    *fid  = lu_object_fid(&dt->do_lu);
1223         struct osd_object      *obj  = osd_dt_obj(dt);
1224         struct osd_device      *osd  = osd_obj2dev(obj);
1225         struct osd_thread_info *info = osd_oti_get(env);
1226         int result;
1227
1228         ENTRY;
1229
1230         LINVRNT(osd_invariant(obj));
1231         LASSERT(!dt_object_exists(dt));
1232         LASSERT(osd_write_locked(env, obj));
1233         LASSERT(th != NULL);
1234
1235         /*
1236          * XXX missing: Quote handling.
1237          */
1238
1239         result = osd_create_pre(info, obj, attr, th);
1240         if (result == 0) {
1241                 result = osd_create_type_f(attr->la_mode & S_IFMT)(info, obj,
1242                                                                 attr, hint, th);
1243                 if (result == 0)
1244                         result = osd_create_post(info, obj, attr, th);
1245         }
1246         if (result == 0) {
1247                 struct osd_inode_id *id = &info->oti_id;
1248
1249                 LASSERT(obj->oo_inode != NULL);
1250
1251                 id->oii_ino = obj->oo_inode->i_ino;
1252                 id->oii_gen = obj->oo_inode->i_generation;
1253
1254                 result = osd_oi_insert(info, &osd->od_oi, fid, id, th);
1255         }
1256
1257         LASSERT(ergo(result == 0, dt_object_exists(dt)));
1258         LINVRNT(osd_invariant(obj));
1259         RETURN(result);
1260 }
1261
1262 /*
1263  * Concurrency: @dt is write locked.
1264  */
1265 static void osd_object_ref_add(const struct lu_env *env,
1266                                struct dt_object *dt,
1267                                struct thandle *th)
1268 {
1269         struct osd_object *obj = osd_dt_obj(dt);
1270         struct inode *inode = obj->oo_inode;
1271
1272         LINVRNT(osd_invariant(obj));
1273         LASSERT(dt_object_exists(dt));
1274         LASSERT(osd_write_locked(env, obj));
1275         LASSERT(th != NULL);
1276
1277         spin_lock(&obj->oo_guard);
1278         LASSERT(inode->i_nlink < LDISKFS_LINK_MAX);
1279         inode->i_nlink++;
1280         spin_unlock(&obj->oo_guard);
1281         mark_inode_dirty(inode);
1282         LINVRNT(osd_invariant(obj));
1283 }
1284
1285 /*
1286  * Concurrency: @dt is write locked.
1287  */
1288 static void osd_object_ref_del(const struct lu_env *env,
1289                                struct dt_object *dt,
1290                                struct thandle *th)
1291 {
1292         struct osd_object *obj = osd_dt_obj(dt);
1293         struct inode *inode = obj->oo_inode;
1294
1295         LINVRNT(osd_invariant(obj));
1296         LASSERT(dt_object_exists(dt));
1297         LASSERT(osd_write_locked(env, obj));
1298         LASSERT(th != NULL);
1299
1300         spin_lock(&obj->oo_guard);
1301         LASSERT(inode->i_nlink > 0);
1302         inode->i_nlink--;
1303         spin_unlock(&obj->oo_guard);
1304         mark_inode_dirty(inode);
1305         LINVRNT(osd_invariant(obj));
1306 }
1307
1308 /*
1309  * Concurrency: @dt is read locked.
1310  */
1311 static int osd_xattr_get(const struct lu_env *env,
1312                          struct dt_object *dt,
1313                          struct lu_buf *buf,
1314                          const char *name,
1315                          struct lustre_capa *capa)
1316 {
1317         struct osd_object      *obj    = osd_dt_obj(dt);
1318         struct inode           *inode  = obj->oo_inode;
1319         struct osd_thread_info *info   = osd_oti_get(env);
1320         struct dentry          *dentry = &info->oti_dentry;
1321
1322         LASSERT(dt_object_exists(dt));
1323         LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
1324         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
1325
1326         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1327                 return -EACCES;
1328
1329         dentry->d_inode = inode;
1330         return inode->i_op->getxattr(dentry, name, buf->lb_buf, buf->lb_len);
1331 }
1332
1333 /*
1334  * Concurrency: @dt is write locked.
1335  */
1336 static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
1337                          const struct lu_buf *buf, const char *name, int fl,
1338                          struct thandle *handle, struct lustre_capa *capa)
1339 {
1340         struct osd_object      *obj    = osd_dt_obj(dt);
1341         struct inode           *inode  = obj->oo_inode;
1342         struct osd_thread_info *info   = osd_oti_get(env);
1343         struct dentry          *dentry = &info->oti_dentry;
1344         struct timespec        *t      = &info->oti_time;
1345         int                     fs_flags = 0, rc;
1346
1347         LASSERT(dt_object_exists(dt));
1348         LASSERT(inode->i_op != NULL && inode->i_op->setxattr != NULL);
1349         LASSERT(osd_write_locked(env, obj));
1350         LASSERT(handle != NULL);
1351
1352         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1353                 return -EACCES;
1354
1355         if (fl & LU_XATTR_REPLACE)
1356                 fs_flags |= XATTR_REPLACE;
1357
1358         if (fl & LU_XATTR_CREATE)
1359                 fs_flags |= XATTR_CREATE;
1360
1361         dentry->d_inode = inode;
1362         *t = inode->i_ctime;
1363         rc = inode->i_op->setxattr(dentry, name,
1364                                    buf->lb_buf, buf->lb_len, fs_flags);
1365         if (likely(rc == 0)) {
1366                 /* ctime should not be updated with server-side time. */
1367                 spin_lock(&obj->oo_guard);
1368                 inode->i_ctime = *t;
1369                 spin_unlock(&obj->oo_guard);
1370                 mark_inode_dirty(inode);
1371         }
1372         return rc;
1373 }
1374
1375 /*
1376  * Concurrency: @dt is read locked.
1377  */
1378 static int osd_xattr_list(const struct lu_env *env,
1379                           struct dt_object *dt,
1380                           struct lu_buf *buf,
1381                           struct lustre_capa *capa)
1382 {
1383         struct osd_object      *obj    = osd_dt_obj(dt);
1384         struct inode           *inode  = obj->oo_inode;
1385         struct osd_thread_info *info   = osd_oti_get(env);
1386         struct dentry          *dentry = &info->oti_dentry;
1387
1388         LASSERT(dt_object_exists(dt));
1389         LASSERT(inode->i_op != NULL && inode->i_op->listxattr != NULL);
1390         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
1391
1392         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1393                 return -EACCES;
1394
1395         dentry->d_inode = inode;
1396         return inode->i_op->listxattr(dentry, buf->lb_buf, buf->lb_len);
1397 }
1398
1399 /*
1400  * Concurrency: @dt is write locked.
1401  */
1402 static int osd_xattr_del(const struct lu_env *env,
1403                          struct dt_object *dt,
1404                          const char *name,
1405                          struct thandle *handle,
1406                          struct lustre_capa *capa)
1407 {
1408         struct osd_object      *obj    = osd_dt_obj(dt);
1409         struct inode           *inode  = obj->oo_inode;
1410         struct osd_thread_info *info   = osd_oti_get(env);
1411         struct dentry          *dentry = &info->oti_dentry;
1412         struct timespec        *t      = &info->oti_time;
1413         int                     rc;
1414
1415         LASSERT(dt_object_exists(dt));
1416         LASSERT(inode->i_op != NULL && inode->i_op->removexattr != NULL);
1417         LASSERT(osd_write_locked(env, obj));
1418         LASSERT(handle != NULL);
1419
1420         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1421                 return -EACCES;
1422
1423         dentry->d_inode = inode;
1424         *t = inode->i_ctime;
1425         rc = inode->i_op->removexattr(dentry, name);
1426         if (likely(rc == 0)) {
1427                 /* ctime should not be updated with server-side time. */
1428                 spin_lock(&obj->oo_guard);
1429                 inode->i_ctime = *t;
1430                 spin_unlock(&obj->oo_guard);
1431                 mark_inode_dirty(inode);
1432         }
1433         return rc;
1434 }
1435
1436 static struct obd_capa *osd_capa_get(const struct lu_env *env,
1437                                      struct dt_object *dt,
1438                                      struct lustre_capa *old,
1439                                      __u64 opc)
1440 {
1441         struct osd_thread_info *info = osd_oti_get(env);
1442         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
1443         struct osd_object *obj = osd_dt_obj(dt);
1444         struct osd_device *dev = osd_obj2dev(obj);
1445         struct lustre_capa_key *key = &info->oti_capa_key;
1446         struct lustre_capa *capa = &info->oti_capa;
1447         struct obd_capa *oc;
1448         int rc;
1449         ENTRY;
1450
1451         if (!dev->od_fl_capa)
1452                 RETURN(ERR_PTR(-ENOENT));
1453
1454         LASSERT(dt_object_exists(dt));
1455         LINVRNT(osd_invariant(obj));
1456
1457         /* renewal sanity check */
1458         if (old && osd_object_auth(env, dt, old, opc))
1459                 RETURN(ERR_PTR(-EACCES));
1460
1461         capa->lc_fid = *fid;
1462         capa->lc_opc = opc;
1463         capa->lc_uid = 0;
1464         capa->lc_flags = dev->od_capa_alg << 24;
1465         capa->lc_timeout = dev->od_capa_timeout;
1466         capa->lc_expiry = 0;
1467
1468         oc = capa_lookup(dev->od_capa_hash, capa, 1);
1469         if (oc) {
1470                 LASSERT(!capa_is_expired(oc));
1471                 RETURN(oc);
1472         }
1473
1474         spin_lock(&capa_lock);
1475         *key = dev->od_capa_keys[1];
1476         spin_unlock(&capa_lock);
1477
1478         capa->lc_keyid = key->lk_keyid;
1479         capa->lc_expiry = cfs_time_current_sec() + dev->od_capa_timeout;
1480
1481         rc = capa_hmac(capa->lc_hmac, capa, key->lk_key);
1482         if (rc) {
1483                 DEBUG_CAPA(D_ERROR, capa, "HMAC failed: %d for", rc);
1484                 RETURN(ERR_PTR(rc));
1485         }
1486
1487         oc = capa_add(dev->od_capa_hash, capa);
1488         RETURN(oc);
1489 }
1490
1491 static int osd_object_sync(const struct lu_env *env, struct dt_object *dt)
1492 {
1493         int rc;
1494         struct osd_object      *obj    = osd_dt_obj(dt);
1495         struct inode           *inode  = obj->oo_inode;
1496         struct osd_thread_info *info   = osd_oti_get(env);
1497         struct dentry          *dentry = &info->oti_dentry;
1498         struct file            *file   = &info->oti_file;
1499         ENTRY;
1500
1501         dentry->d_inode = inode;
1502         file->f_dentry = dentry;
1503         file->f_mapping = inode->i_mapping;
1504         file->f_op = inode->i_fop;
1505         LOCK_INODE_MUTEX(inode);
1506         rc = file->f_op->fsync(file, dentry, 0);
1507         UNLOCK_INODE_MUTEX(inode);
1508         RETURN(rc);
1509 }
1510
1511 static struct dt_object_operations osd_obj_ops = {
1512         .do_read_lock    = osd_object_read_lock,
1513         .do_write_lock   = osd_object_write_lock,
1514         .do_read_unlock  = osd_object_read_unlock,
1515         .do_write_unlock = osd_object_write_unlock,
1516         .do_attr_get     = osd_attr_get,
1517         .do_attr_set     = osd_attr_set,
1518         .do_ah_init      = osd_ah_init,
1519         .do_create       = osd_object_create,
1520         .do_index_try    = osd_index_try,
1521         .do_ref_add      = osd_object_ref_add,
1522         .do_ref_del      = osd_object_ref_del,
1523         .do_xattr_get    = osd_xattr_get,
1524         .do_xattr_set    = osd_xattr_set,
1525         .do_xattr_del    = osd_xattr_del,
1526         .do_xattr_list   = osd_xattr_list,
1527         .do_capa_get     = osd_capa_get,
1528         .do_object_sync  = osd_object_sync,
1529 };
1530
1531 /*
1532  * Body operations.
1533  */
1534
1535 /*
1536  * XXX: Another layering violation for now.
1537  *
1538  * We don't want to use ->f_op->read methods, because generic file write
1539  *
1540  *         - serializes on ->i_sem, and
1541  *
1542  *         - does a lot of extra work like balance_dirty_pages(),
1543  *
1544  * which doesn't work for globally shared files like /last-received.
1545  */
1546 int fsfilt_ldiskfs_read(struct inode *inode, void *buf, int size, loff_t *offs);
1547 int fsfilt_ldiskfs_write_handle(struct inode *inode, void *buf, int bufsize,
1548                                 loff_t *offs, handle_t *handle);
1549
1550 static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt,
1551                         struct lu_buf *buf, loff_t *pos,
1552                         struct lustre_capa *capa)
1553 {
1554         struct inode *inode = osd_dt_obj(dt)->oo_inode;
1555
1556         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
1557                 RETURN(-EACCES);
1558
1559         return fsfilt_ldiskfs_read(inode, buf->lb_buf, buf->lb_len, pos);
1560 }
1561
1562 static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
1563                          const struct lu_buf *buf, loff_t *pos,
1564                          struct thandle *handle, struct lustre_capa *capa)
1565 {
1566         struct inode       *inode = osd_dt_obj(dt)->oo_inode;
1567         struct osd_thandle *oh;
1568         ssize_t             result;
1569
1570         LASSERT(handle != NULL);
1571
1572         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_WRITE))
1573                 RETURN(-EACCES);
1574
1575         oh = container_of(handle, struct osd_thandle, ot_super);
1576         LASSERT(oh->ot_handle->h_transaction != NULL);
1577         result = fsfilt_ldiskfs_write_handle(inode, buf->lb_buf, buf->lb_len,
1578                                              pos, oh->ot_handle);
1579         if (result == 0)
1580                 result = buf->lb_len;
1581         return result;
1582 }
1583
1584 static struct dt_body_operations osd_body_ops = {
1585         .dbo_read  = osd_read,
1586         .dbo_write = osd_write
1587 };
1588
1589 /*
1590  * Index operations.
1591  */
1592
1593 static int osd_object_is_root(const struct osd_object *obj)
1594 {
1595         return osd_sb(osd_obj2dev(obj))->s_root->d_inode == obj->oo_inode;
1596 }
1597
1598 static int osd_index_probe(const struct lu_env *env, struct osd_object *o,
1599                            const struct dt_index_features *feat)
1600 {
1601         struct iam_descr *descr;
1602
1603         if (osd_object_is_root(o))
1604                 return feat == &dt_directory_features;
1605
1606         LASSERT(o->oo_dir != NULL);
1607
1608         descr = o->oo_dir->od_container.ic_descr;
1609         if (feat == &dt_directory_features)
1610                 return descr == &iam_htree_compat_param ||
1611                         (descr->id_rec_size == sizeof(struct lu_fid_pack) &&
1612                          1 /*
1613                             * XXX check that index looks like directory.
1614                             */
1615                                 );
1616         else
1617                 return
1618                         feat->dif_keysize_min <= descr->id_key_size &&
1619                         descr->id_key_size <= feat->dif_keysize_max &&
1620                         feat->dif_recsize_min <= descr->id_rec_size &&
1621                         descr->id_rec_size <= feat->dif_recsize_max &&
1622                         !(feat->dif_flags & (DT_IND_VARKEY |
1623                                              DT_IND_VARREC | DT_IND_NONUNQ)) &&
1624                         ergo(feat->dif_flags & DT_IND_UPDATE,
1625                              1 /* XXX check that object (and file system) is
1626                                 * writable */);
1627 }
1628
1629 static int osd_container_init(const struct lu_env *env,
1630                               struct osd_object *obj,
1631                               struct osd_directory *dir)
1632 {
1633         int result;
1634         struct iam_container *bag;
1635
1636         bag    = &dir->od_container;
1637         result = iam_container_init(bag, &dir->od_descr, obj->oo_inode);
1638         if (result == 0) {
1639                 result = iam_container_setup(bag);
1640                 if (result == 0)
1641                         obj->oo_dt.do_index_ops = &osd_index_ops;
1642                 else
1643                         iam_container_fini(bag);
1644         }
1645         return result;
1646 }
1647
1648 /*
1649  * Concurrency: no external locking is necessary.
1650  */
1651 static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
1652                          const struct dt_index_features *feat)
1653 {
1654         int result;
1655         struct osd_object *obj = osd_dt_obj(dt);
1656
1657         LINVRNT(osd_invariant(obj));
1658         LASSERT(dt_object_exists(dt));
1659
1660         if (osd_object_is_root(obj)) {
1661                 dt->do_index_ops = &osd_index_compat_ops;
1662                 result = 0;
1663         } else if (!osd_has_index(obj)) {
1664                 struct osd_directory *dir;
1665
1666                 OBD_ALLOC_PTR(dir);
1667                 if (dir != NULL) {
1668                         sema_init(&dir->od_sem, 1);
1669
1670                         spin_lock(&obj->oo_guard);
1671                         if (obj->oo_dir == NULL)
1672                                 obj->oo_dir = dir;
1673                         else
1674                                 /*
1675                                  * Concurrent thread allocated container data.
1676                                  */
1677                                 OBD_FREE_PTR(dir);
1678                         spin_unlock(&obj->oo_guard);
1679                         /*
1680                          * Now, that we have container data, serialize its
1681                          * initialization.
1682                          */
1683                         down(&obj->oo_dir->od_sem);
1684                         /*
1685                          * recheck under lock.
1686                          */
1687                         if (!osd_has_index(obj))
1688                                 result = osd_container_init(env, obj, dir);
1689                         else
1690                                 result = 0;
1691                         up(&obj->oo_dir->od_sem);
1692                 } else
1693                         result = -ENOMEM;
1694         } else
1695                 result = 0;
1696
1697         if (result == 0) {
1698                 if (!osd_index_probe(env, obj, feat))
1699                         result = -ENOTDIR;
1700         }
1701         LINVRNT(osd_invariant(obj));
1702
1703         return result;
1704 }
1705
1706 static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
1707                             const struct dt_key *key, struct thandle *handle,
1708                             struct lustre_capa *capa)
1709 {
1710         struct osd_object     *obj = osd_dt_obj(dt);
1711         struct osd_thandle    *oh;
1712         struct iam_path_descr *ipd;
1713         struct iam_container  *bag = &obj->oo_dir->od_container;
1714         int rc;
1715
1716         ENTRY;
1717
1718         LINVRNT(osd_invariant(obj));
1719         LASSERT(dt_object_exists(dt));
1720         LASSERT(bag->ic_object == obj->oo_inode);
1721         LASSERT(handle != NULL);
1722
1723         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
1724                 RETURN(-EACCES);
1725
1726         ipd = osd_ipd_get(env, bag);
1727         if (unlikely(ipd == NULL))
1728                 RETURN(-ENOMEM);
1729
1730         oh = container_of0(handle, struct osd_thandle, ot_super);
1731         LASSERT(oh->ot_handle != NULL);
1732         LASSERT(oh->ot_handle->h_transaction != NULL);
1733
1734         rc = iam_delete(oh->ot_handle, bag, (const struct iam_key *)key, ipd);
1735         osd_ipd_put(env, bag, ipd);
1736         LINVRNT(osd_invariant(obj));
1737         RETURN(rc);
1738 }
1739
1740 static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
1741                             struct dt_rec *rec, const struct dt_key *key,
1742                             struct lustre_capa *capa)
1743 {
1744         struct osd_object     *obj = osd_dt_obj(dt);
1745         struct iam_path_descr *ipd;
1746         struct iam_container  *bag = &obj->oo_dir->od_container;
1747         int rc;
1748
1749         ENTRY;
1750
1751         LINVRNT(osd_invariant(obj));
1752         LASSERT(dt_object_exists(dt));
1753         LASSERT(bag->ic_object == obj->oo_inode);
1754
1755         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
1756                 return -EACCES;
1757
1758         ipd = osd_ipd_get(env, bag);
1759         if (unlikely(ipd == NULL))
1760                 RETURN(-ENOMEM);
1761
1762         rc = iam_lookup(bag, (const struct iam_key *)key,
1763                         (struct iam_rec *)rec, ipd);
1764         osd_ipd_put(env, bag, ipd);
1765         LINVRNT(osd_invariant(obj));
1766
1767         RETURN(rc);
1768 }
1769
1770 static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
1771                             const struct dt_rec *rec, const struct dt_key *key,
1772                             struct thandle *th, struct lustre_capa *capa)
1773 {
1774         struct osd_object     *obj = osd_dt_obj(dt);
1775         struct iam_path_descr *ipd;
1776         struct osd_thandle    *oh;
1777         struct iam_container  *bag = &obj->oo_dir->od_container;
1778         int rc;
1779
1780         ENTRY;
1781
1782         LINVRNT(osd_invariant(obj));
1783         LASSERT(dt_object_exists(dt));
1784         LASSERT(bag->ic_object == obj->oo_inode);
1785         LASSERT(th != NULL);
1786
1787         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
1788                 return -EACCES;
1789
1790         ipd = osd_ipd_get(env, bag);
1791         if (unlikely(ipd == NULL))
1792                 RETURN(-ENOMEM);
1793
1794         oh = container_of0(th, struct osd_thandle, ot_super);
1795         LASSERT(oh->ot_handle != NULL);
1796         LASSERT(oh->ot_handle->h_transaction != NULL);
1797         rc = iam_insert(oh->ot_handle, bag, (const struct iam_key *)key,
1798                         (struct iam_rec *)rec, ipd);
1799         osd_ipd_put(env, bag, ipd);
1800         LINVRNT(osd_invariant(obj));
1801         RETURN(rc);
1802 }
1803
1804 /*
1805  * Iterator operations.
1806  */
1807 struct osd_it {
1808         struct osd_object     *oi_obj;
1809         struct iam_path_descr *oi_ipd;
1810         struct iam_iterator    oi_it;
1811 };
1812
1813 static struct dt_it *osd_it_init(const struct lu_env *env,
1814                                  struct dt_object *dt, int writable,
1815                                  struct lustre_capa *capa)
1816 {
1817         struct osd_it         *it;
1818         struct osd_object     *obj = osd_dt_obj(dt);
1819         struct lu_object      *lo  = &dt->do_lu;
1820         struct iam_path_descr *ipd;
1821         struct iam_container  *bag = &obj->oo_dir->od_container;
1822         __u32                  flags;
1823
1824         LASSERT(lu_object_exists(lo));
1825
1826         if (osd_object_auth(env, dt, capa, writable ? CAPA_OPC_BODY_WRITE :
1827                             CAPA_OPC_BODY_READ))
1828                 return ERR_PTR(-EACCES);
1829
1830         flags = writable ? IAM_IT_MOVE|IAM_IT_WRITE : IAM_IT_MOVE;
1831         OBD_ALLOC_PTR(it);
1832         if (it != NULL) {
1833                 /*
1834                  * XXX: as ipd is allocated within osd_thread_info, assignment
1835                  * below implies that iterator usage is confined within single
1836                  * environment.
1837                  */
1838                 ipd = osd_ipd_get(env, bag);
1839                 if (likely(ipd != NULL)) {
1840                         it->oi_obj = obj;
1841                         it->oi_ipd = ipd;
1842                         lu_object_get(lo);
1843                         iam_it_init(&it->oi_it, bag, flags, ipd);
1844                         return (struct dt_it *)it;
1845                 } else
1846                         OBD_FREE_PTR(it);
1847         }
1848         return ERR_PTR(-ENOMEM);
1849 }
1850
1851 static void osd_it_fini(const struct lu_env *env, struct dt_it *di)
1852 {
1853         struct osd_it     *it = (struct osd_it *)di;
1854         struct osd_object *obj = it->oi_obj;
1855
1856         iam_it_fini(&it->oi_it);
1857         osd_ipd_put(env, &obj->oo_dir->od_container, it->oi_ipd);
1858         lu_object_put(env, &obj->oo_dt.do_lu);
1859         OBD_FREE_PTR(it);
1860 }
1861
1862 static int osd_it_get(const struct lu_env *env,
1863                       struct dt_it *di, const struct dt_key *key)
1864 {
1865         struct osd_it *it = (struct osd_it *)di;
1866
1867         return iam_it_get(&it->oi_it, (const struct iam_key *)key);
1868 }
1869
1870 static void osd_it_put(const struct lu_env *env, struct dt_it *di)
1871 {
1872         struct osd_it *it = (struct osd_it *)di;
1873
1874         iam_it_put(&it->oi_it);
1875 }
1876
1877 static int osd_it_next(const struct lu_env *env, struct dt_it *di)
1878 {
1879         struct osd_it *it = (struct osd_it *)di;
1880
1881         return iam_it_next(&it->oi_it);
1882 }
1883
1884 static int osd_it_del(const struct lu_env *env, struct dt_it *di,
1885                       struct thandle *th)
1886 {
1887         struct osd_it      *it = (struct osd_it *)di;
1888         struct osd_thandle *oh;
1889
1890         LASSERT(th != NULL);
1891
1892         oh = container_of0(th, struct osd_thandle, ot_super);
1893         LASSERT(oh->ot_handle != NULL);
1894         LASSERT(oh->ot_handle->h_transaction != NULL);
1895
1896         return iam_it_rec_delete(oh->ot_handle, &it->oi_it);
1897 }
1898
1899 static struct dt_key *osd_it_key(const struct lu_env *env,
1900                                  const struct dt_it *di)
1901 {
1902         struct osd_it *it = (struct osd_it *)di;
1903
1904         return (struct dt_key *)iam_it_key_get(&it->oi_it);
1905 }
1906
1907 static int osd_it_key_size(const struct lu_env *env, const struct dt_it *di)
1908 {
1909         struct osd_it *it = (struct osd_it *)di;
1910
1911         return iam_it_key_size(&it->oi_it);
1912 }
1913
1914 static struct dt_rec *osd_it_rec(const struct lu_env *env,
1915                                  const struct dt_it *di)
1916 {
1917         struct osd_it *it = (struct osd_it *)di;
1918
1919         return (struct dt_rec *)iam_it_rec_get(&it->oi_it);
1920 }
1921
1922 static __u64 osd_it_store(const struct lu_env *env, const struct dt_it *di)
1923 {
1924         struct osd_it *it = (struct osd_it *)di;
1925
1926         return iam_it_store(&it->oi_it);
1927 }
1928
1929 static int osd_it_load(const struct lu_env *env,
1930                        const struct dt_it *di, __u64 hash)
1931 {
1932         struct osd_it *it = (struct osd_it *)di;
1933
1934         return iam_it_load(&it->oi_it, hash);
1935 }
1936
1937 static struct dt_index_operations osd_index_ops = {
1938         .dio_lookup = osd_index_lookup,
1939         .dio_insert = osd_index_insert,
1940         .dio_delete = osd_index_delete,
1941         .dio_it     = {
1942                 .init     = osd_it_init,
1943                 .fini     = osd_it_fini,
1944                 .get      = osd_it_get,
1945                 .put      = osd_it_put,
1946                 .del      = osd_it_del,
1947                 .next     = osd_it_next,
1948                 .key      = osd_it_key,
1949                 .key_size = osd_it_key_size,
1950                 .rec      = osd_it_rec,
1951                 .store    = osd_it_store,
1952                 .load     = osd_it_load
1953         }
1954 };
1955
1956 static int osd_index_compat_delete(const struct lu_env *env,
1957                                    struct dt_object *dt,
1958                                    const struct dt_key *key,
1959                                    struct thandle *handle,
1960                                    struct lustre_capa *capa)
1961 {
1962         struct osd_object *obj = osd_dt_obj(dt);
1963
1964         LASSERT(handle != NULL);
1965         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
1966         ENTRY;
1967
1968 #if 0
1969         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
1970                 RETURN(-EACCES);
1971 #endif
1972
1973         RETURN(-EOPNOTSUPP);
1974 }
1975
1976 /*
1977  * Compatibility index operations.
1978  */
1979
1980
1981 static void osd_build_pack(const struct lu_env *env, struct osd_device *osd,
1982                            struct dentry *dentry, struct lu_fid_pack *pack)
1983 {
1984         struct inode  *inode = dentry->d_inode;
1985         struct lu_fid *fid   = &osd_oti_get(env)->oti_fid;
1986
1987         lu_igif_build(fid, inode->i_ino, inode->i_generation);
1988         fid_cpu_to_be(fid, fid);
1989         pack->fp_len = sizeof *fid + 1;
1990         memcpy(pack->fp_area, fid, sizeof *fid);
1991 }
1992
1993 static int osd_index_compat_lookup(const struct lu_env *env,
1994                                    struct dt_object *dt,
1995                                    struct dt_rec *rec, const struct dt_key *key,
1996                                    struct lustre_capa *capa)
1997 {
1998         struct osd_object *obj = osd_dt_obj(dt);
1999
2000         struct osd_device      *osd  = osd_obj2dev(obj);
2001         struct osd_thread_info *info = osd_oti_get(env);
2002         struct inode           *dir;
2003
2004         int result;
2005
2006         /*
2007          * XXX temporary solution.
2008          */
2009         struct dentry *dentry;
2010         struct dentry *parent;
2011
2012         LINVRNT(osd_invariant(obj));
2013         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
2014         LASSERT(osd_has_index(obj));
2015
2016         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
2017                 return -EACCES;
2018
2019         info->oti_str.name = (const char *)key;
2020         info->oti_str.len  = strlen((const char *)key);
2021
2022         dir = obj->oo_inode;
2023         LASSERT(dir->i_op != NULL && dir->i_op->lookup != NULL);
2024
2025         parent = d_alloc_root(dir);
2026         if (parent == NULL)
2027                 return -ENOMEM;
2028         igrab(dir);
2029         dentry = d_alloc(parent, &info->oti_str);
2030         if (dentry != NULL) {
2031                 struct dentry *d;
2032
2033                 /*
2034                  * XXX passing NULL for nameidata should work for
2035                  * ext3/ldiskfs.
2036                  */
2037                 d = dir->i_op->lookup(dir, dentry, NULL);
2038                 if (d == NULL) {
2039                         /*
2040                          * normal case, result is in @dentry.
2041                          */
2042                         if (dentry->d_inode != NULL) {
2043                                 osd_build_pack(env, osd, dentry,
2044                                                (struct lu_fid_pack *)rec);
2045                                 result = 0;
2046                         } else
2047                                 result = -ENOENT;
2048                  } else {
2049                         /* What? Disconnected alias? Ppheeeww... */
2050                         CERROR("Aliasing where not expected\n");
2051                         result = -EIO;
2052                         dput(d);
2053                 }
2054                 dput(dentry);
2055         } else
2056                 result = -ENOMEM;
2057         dput(parent);
2058         LINVRNT(osd_invariant(obj));
2059         return result;
2060 }
2061
2062 static int osd_add_rec(struct osd_thread_info *info, struct osd_device *dev,
2063                        struct inode *dir, struct inode *inode, const char *name)
2064 {
2065         struct dentry *old;
2066         struct dentry *new;
2067         struct dentry *parent;
2068
2069         int result;
2070
2071         info->oti_str.name = name;
2072         info->oti_str.len  = strlen(name);
2073
2074         LASSERT(atomic_read(&dir->i_count) > 0);
2075         result = -ENOMEM;
2076         old = d_alloc(dev->od_obj_area, &info->oti_str);
2077         if (old != NULL) {
2078                 d_instantiate(old, inode);
2079                 igrab(inode);
2080                 LASSERT(atomic_read(&dir->i_count) > 0);
2081                 parent = d_alloc_root(dir);
2082                 if (parent != NULL) {
2083                         igrab(dir);
2084                         LASSERT(atomic_read(&dir->i_count) > 1);
2085                         new = d_alloc(parent, &info->oti_str);
2086                         LASSERT(atomic_read(&dir->i_count) > 1);
2087                         if (new != NULL) {
2088                                 LASSERT(atomic_read(&dir->i_count) > 1);
2089                                 result = dir->i_op->link(old, dir, new);
2090                                 LASSERT(atomic_read(&dir->i_count) > 1);
2091                                 dput(new);
2092                                 LASSERT(atomic_read(&dir->i_count) > 1);
2093                         }
2094                         LASSERT(atomic_read(&dir->i_count) > 1);
2095                         dput(parent);
2096                         LASSERT(atomic_read(&dir->i_count) > 0);
2097                 }
2098                 dput(old);
2099         }
2100         LASSERT(atomic_read(&dir->i_count) > 0);
2101         return result;
2102 }
2103
2104
2105 /*
2106  * XXX Temporary stuff.
2107  */
2108 static int osd_index_compat_insert(const struct lu_env *env,
2109                                    struct dt_object *dt,
2110                                    const struct dt_rec *rec,
2111                                    const struct dt_key *key, struct thandle *th,
2112                                    struct lustre_capa *capa)
2113 {
2114         struct osd_object     *obj = osd_dt_obj(dt);
2115
2116         const char          *name = (const char *)key;
2117
2118         struct lu_device    *ludev = dt->do_lu.lo_dev;
2119         struct lu_object    *luch;
2120
2121         struct osd_thread_info   *info = osd_oti_get(env);
2122         const struct lu_fid_pack *pack  = (const struct lu_fid_pack *)rec;
2123         struct lu_fid            *fid   = &osd_oti_get(env)->oti_fid;
2124
2125         int result;
2126
2127         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
2128         LINVRNT(osd_invariant(obj));
2129         LASSERT(th != NULL);
2130
2131         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
2132                 return -EACCES;
2133
2134         result = fid_unpack(pack, fid);
2135         if (result != 0)
2136                 return result;
2137
2138         luch = lu_object_find(env, ludev, fid, NULL);
2139         if (!IS_ERR(luch)) {
2140                 if (lu_object_exists(luch)) {
2141                         struct osd_object *child;
2142
2143                         child = osd_obj(lu_object_locate(luch->lo_header,
2144                                                          ludev->ld_type));
2145                         if (child != NULL)
2146                                 result = osd_add_rec(info, osd_obj2dev(obj),
2147                                                      obj->oo_inode,
2148                                                      child->oo_inode, name);
2149                         else {
2150                                 CERROR("No osd slice.\n");
2151                                 result = -ENOENT;
2152                         }
2153                         LINVRNT(osd_invariant(obj));
2154                         LINVRNT(osd_invariant(child));
2155                 } else {
2156                         CERROR("Sorry.\n");
2157                         result = -ENOENT;
2158                 }
2159                 lu_object_put(env, luch);
2160         } else
2161                 result = PTR_ERR(luch);
2162         LINVRNT(osd_invariant(obj));
2163         return result;
2164 }
2165
2166 static const struct dt_index_operations osd_index_compat_ops = {
2167         .dio_lookup = osd_index_compat_lookup,
2168         .dio_insert = osd_index_compat_insert,
2169         .dio_delete = osd_index_compat_delete
2170 };
2171
2172 /* type constructor/destructor: osd_type_init, osd_type_fini */
2173 LU_TYPE_INIT_FINI(osd, &osd_key);
2174
2175 static struct lu_context_key osd_key = {
2176         .lct_tags = LCT_DT_THREAD | LCT_MD_THREAD,
2177         .lct_init = osd_key_init,
2178         .lct_fini = osd_key_fini,
2179         .lct_exit = osd_key_exit
2180 };
2181
2182 static void *osd_key_init(const struct lu_context *ctx,
2183                           struct lu_context_key *key)
2184 {
2185         struct osd_thread_info *info;
2186
2187         OBD_ALLOC_PTR(info);
2188         if (info != NULL)
2189                 info->oti_env = container_of(ctx, struct lu_env, le_ctx);
2190         else
2191                 info = ERR_PTR(-ENOMEM);
2192         return info;
2193 }
2194
2195 /* context key destructor: osd_key_fini */
2196 LU_KEY_FINI(osd, struct osd_thread_info);
2197
2198 static void osd_key_exit(const struct lu_context *ctx,
2199                          struct lu_context_key *key, void *data)
2200 {
2201         struct osd_thread_info *info = data;
2202
2203         LASSERT(info->oti_r_locks == 0);
2204         LASSERT(info->oti_w_locks == 0);
2205         LASSERT(info->oti_txns    == 0);
2206 }
2207
2208 static int osd_device_init(const struct lu_env *env, struct lu_device *d,
2209                            const char *name, struct lu_device *next)
2210 {
2211         int rc;
2212         /* context for commit hooks */
2213         rc = lu_context_init(&osd_dev(d)->od_env_for_commit.le_ctx,
2214                              LCT_MD_THREAD);
2215         if (rc == 0)
2216                 rc = osd_procfs_init(osd_dev(d), name);
2217         return rc;
2218 }
2219
2220 static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
2221 {
2222         struct osd_thread_info *info = osd_oti_get(env);
2223         ENTRY;
2224         if (o->od_obj_area != NULL) {
2225                 dput(o->od_obj_area);
2226                 o->od_obj_area = NULL;
2227         }
2228         osd_oi_fini(info, &o->od_oi);
2229
2230         RETURN(0);
2231 }
2232
2233 static int osd_mount(const struct lu_env *env,
2234                      struct osd_device *o, struct lustre_cfg *cfg)
2235 {
2236         struct lustre_mount_info *lmi;
2237         const char               *dev  = lustre_cfg_string(cfg, 0);
2238         struct osd_thread_info   *info = osd_oti_get(env);
2239         int result;
2240
2241         ENTRY;
2242
2243         if (o->od_mount != NULL) {
2244                 CERROR("Already mounted (%s)\n", dev);
2245                 RETURN(-EEXIST);
2246         }
2247
2248         /* get mount */
2249         lmi = server_get_mount(dev);
2250         if (lmi == NULL) {
2251                 CERROR("Cannot get mount info for %s!\n", dev);
2252                 RETURN(-EFAULT);
2253         }
2254
2255         LASSERT(lmi != NULL);
2256         /* save lustre_mount_info in dt_device */
2257         o->od_mount = lmi;
2258
2259         result = osd_oi_init(info, &o->od_oi, &o->od_dt_dev);
2260         if (result == 0) {
2261                 struct dentry *d;
2262
2263                 d = simple_mkdir(osd_sb(o)->s_root, lmi->lmi_mnt, "*OBJ-TEMP*",
2264                                  0777, 1);
2265                 if (!IS_ERR(d)) {
2266                         o->od_obj_area = d;
2267                 } else
2268                         result = PTR_ERR(d);
2269         }
2270         if (result != 0)
2271                 osd_shutdown(env, o);
2272         RETURN(result);
2273 }
2274
2275 static struct lu_device *osd_device_fini(const struct lu_env *env,
2276                                          struct lu_device *d)
2277 {
2278         int rc;
2279         ENTRY;
2280
2281         shrink_dcache_sb(osd_sb(osd_dev(d)));
2282         osd_sync(env, lu2dt_dev(d));
2283
2284         rc = osd_procfs_fini(osd_dev(d));
2285         if (rc) {
2286                 CERROR("proc fini error %d \n", rc);
2287                 RETURN (ERR_PTR(rc));
2288         }
2289
2290         if (osd_dev(d)->od_mount)
2291                 server_put_mount(osd_dev(d)->od_mount->lmi_name,
2292                                  osd_dev(d)->od_mount->lmi_mnt);
2293         osd_dev(d)->od_mount = NULL;
2294
2295         lu_context_fini(&osd_dev(d)->od_env_for_commit.le_ctx);
2296         RETURN(NULL);
2297 }
2298
2299 static struct lu_device *osd_device_alloc(const struct lu_env *env,
2300                                           struct lu_device_type *t,
2301                                           struct lustre_cfg *cfg)
2302 {
2303         struct lu_device  *l;
2304         struct osd_device *o;
2305
2306         OBD_ALLOC_PTR(o);
2307         if (o != NULL) {
2308                 int result;
2309
2310                 result = dt_device_init(&o->od_dt_dev, t);
2311                 if (result == 0) {
2312                         l = osd2lu_dev(o);
2313                         l->ld_ops = &osd_lu_ops;
2314                         o->od_dt_dev.dd_ops = &osd_dt_ops;
2315                         spin_lock_init(&o->od_osfs_lock);
2316                         o->od_osfs_age = cfs_time_shift_64(-1000);
2317                         o->od_capa_hash = init_capa_hash();
2318                         if (o->od_capa_hash == NULL) {
2319                                 dt_device_fini(&o->od_dt_dev);
2320                                 l = ERR_PTR(-ENOMEM);
2321                         }
2322                 } else
2323                         l = ERR_PTR(result);
2324
2325                 if (IS_ERR(l))
2326                         OBD_FREE_PTR(o);
2327         } else
2328                 l = ERR_PTR(-ENOMEM);
2329         return l;
2330 }
2331
2332 static struct lu_device *osd_device_free(const struct lu_env *env,
2333                                          struct lu_device *d)
2334 {
2335         struct osd_device *o = osd_dev(d);
2336         ENTRY;
2337
2338         cleanup_capa_hash(o->od_capa_hash);
2339         dt_device_fini(&o->od_dt_dev);
2340         OBD_FREE_PTR(o);
2341         RETURN(NULL);
2342 }
2343
2344 static int osd_process_config(const struct lu_env *env,
2345                               struct lu_device *d, struct lustre_cfg *cfg)
2346 {
2347         struct osd_device *o = osd_dev(d);
2348         int err;
2349         ENTRY;
2350
2351         switch(cfg->lcfg_command) {
2352         case LCFG_SETUP:
2353                 err = osd_mount(env, o, cfg);
2354                 break;
2355         case LCFG_CLEANUP:
2356                 err = osd_shutdown(env, o);
2357                 break;
2358         default:
2359                 err = -ENOTTY;
2360         }
2361
2362         RETURN(err);
2363 }
2364 extern void ldiskfs_orphan_cleanup (struct super_block * sb,
2365                                     struct ldiskfs_super_block * es);
2366
2367 static int osd_recovery_complete(const struct lu_env *env,
2368                                  struct lu_device *d)
2369 {
2370         struct osd_device *o = osd_dev(d);
2371         ENTRY;
2372         /* TODO: orphans handling */
2373         ldiskfs_orphan_cleanup(osd_sb(o), LDISKFS_SB(osd_sb(o))->s_es);
2374         RETURN(0);
2375 }
2376
2377 static struct inode *osd_iget(struct osd_thread_info *info,
2378                               struct osd_device *dev,
2379                               const struct osd_inode_id *id)
2380 {
2381         struct inode *inode;
2382
2383         inode = iget(osd_sb(dev), id->oii_ino);
2384         if (inode == NULL) {
2385                 CERROR("no inode\n");
2386                 inode = ERR_PTR(-EACCES);
2387         } else if (is_bad_inode(inode)) {
2388                 CERROR("bad inode\n");
2389                 iput(inode);
2390                 inode = ERR_PTR(-ENOENT);
2391         } else if (inode->i_generation != id->oii_gen) {
2392                 CERROR("stale inode\n");
2393                 iput(inode);
2394                 inode = ERR_PTR(-ESTALE);
2395         }
2396
2397         return inode;
2398
2399 }
2400
2401 static int osd_fid_lookup(const struct lu_env *env,
2402                           struct osd_object *obj, const struct lu_fid *fid)
2403 {
2404         struct osd_thread_info *info;
2405         struct lu_device       *ldev = obj->oo_dt.do_lu.lo_dev;
2406         struct osd_device      *dev;
2407         struct osd_inode_id    *id;
2408         struct osd_oi          *oi;
2409         struct inode           *inode;
2410         int                     result;
2411
2412         LINVRNT(osd_invariant(obj));
2413         LASSERT(obj->oo_inode == NULL);
2414         LASSERT(fid_is_sane(fid));
2415         /*
2416          * This assertion checks that osd layer sees only local
2417          * fids. Unfortunately it is somewhat expensive (does a
2418          * cache-lookup). Disabling it for production/acceptance-testing.
2419          */
2420         LASSERT(1 || fid_is_local(ldev->ld_site, fid));
2421
2422         ENTRY;
2423
2424         info = osd_oti_get(env);
2425         dev  = osd_dev(ldev);
2426         id   = &info->oti_id;
2427         oi   = &dev->od_oi;
2428
2429         if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT))
2430                 RETURN(-ENOENT);
2431
2432         result = osd_oi_lookup(info, oi, fid, id);
2433         if (result == 0) {
2434                 inode = osd_iget(info, dev, id);
2435                 if (!IS_ERR(inode)) {
2436                         obj->oo_inode = inode;
2437                         LASSERT(obj->oo_inode->i_sb == osd_sb(dev));
2438                         result = 0;
2439                 } else
2440                         /*
2441                          * If fid wasn't found in oi, inode-less object is
2442                          * created, for which lu_object_exists() returns
2443                          * false. This is used in a (frequent) case when
2444                          * objects are created as locking anchors or
2445                          * place holders for objects yet to be created.
2446                          */
2447                         result = PTR_ERR(inode);
2448         } else if (result == -ENOENT)
2449                 result = 0;
2450         LINVRNT(osd_invariant(obj));
2451         RETURN(result);
2452 }
2453
2454 static void osd_inode_getattr(const struct lu_env *env,
2455                               struct inode *inode, struct lu_attr *attr)
2456 {
2457         attr->la_valid      |= LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
2458                                LA_SIZE | LA_BLOCKS | LA_UID | LA_GID |
2459                                LA_FLAGS | LA_NLINK | LA_RDEV | LA_BLKSIZE;
2460
2461         attr->la_atime      = LTIME_S(inode->i_atime);
2462         attr->la_mtime      = LTIME_S(inode->i_mtime);
2463         attr->la_ctime      = LTIME_S(inode->i_ctime);
2464         attr->la_mode       = inode->i_mode;
2465         attr->la_size       = i_size_read(inode);
2466         attr->la_blocks     = inode->i_blocks;
2467         attr->la_uid        = inode->i_uid;
2468         attr->la_gid        = inode->i_gid;
2469         attr->la_flags      = LDISKFS_I(inode)->i_flags;
2470         attr->la_nlink      = inode->i_nlink;
2471         attr->la_rdev       = inode->i_rdev;
2472         attr->la_blksize    = ll_inode_blksize(inode);
2473         attr->la_blkbits    = inode->i_blkbits;
2474 }
2475
2476 /*
2477  * Helpers.
2478  */
2479
2480 static int lu_device_is_osd(const struct lu_device *d)
2481 {
2482         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &osd_lu_ops);
2483 }
2484
2485 static struct osd_object *osd_obj(const struct lu_object *o)
2486 {
2487         LASSERT(lu_device_is_osd(o->lo_dev));
2488         return container_of0(o, struct osd_object, oo_dt.do_lu);
2489 }
2490
2491 static struct osd_device *osd_dt_dev(const struct dt_device *d)
2492 {
2493         LASSERT(lu_device_is_osd(&d->dd_lu_dev));
2494         return container_of0(d, struct osd_device, od_dt_dev);
2495 }
2496
2497 static struct osd_device *osd_dev(const struct lu_device *d)
2498 {
2499         LASSERT(lu_device_is_osd(d));
2500         return osd_dt_dev(container_of0(d, struct dt_device, dd_lu_dev));
2501 }
2502
2503 static struct osd_object *osd_dt_obj(const struct dt_object *d)
2504 {
2505         return osd_obj(&d->do_lu);
2506 }
2507
2508 static struct osd_device *osd_obj2dev(const struct osd_object *o)
2509 {
2510         return osd_dev(o->oo_dt.do_lu.lo_dev);
2511 }
2512
2513 static struct lu_device *osd2lu_dev(struct osd_device *osd)
2514 {
2515         return &osd->od_dt_dev.dd_lu_dev;
2516 }
2517
2518 static struct super_block *osd_sb(const struct osd_device *dev)
2519 {
2520         return dev->od_mount->lmi_mnt->mnt_sb;
2521 }
2522
2523 static journal_t *osd_journal(const struct osd_device *dev)
2524 {
2525         return LDISKFS_SB(osd_sb(dev))->s_journal;
2526 }
2527
2528 static int osd_has_index(const struct osd_object *obj)
2529 {
2530         return obj->oo_dt.do_index_ops != NULL;
2531 }
2532
2533 static int osd_object_invariant(const struct lu_object *l)
2534 {
2535         return osd_invariant(osd_obj(l));
2536 }
2537
2538 static struct lu_object_operations osd_lu_obj_ops = {
2539         .loo_object_init      = osd_object_init,
2540         .loo_object_delete    = osd_object_delete,
2541         .loo_object_release   = osd_object_release,
2542         .loo_object_free      = osd_object_free,
2543         .loo_object_print     = osd_object_print,
2544         .loo_object_invariant = osd_object_invariant
2545 };
2546
2547 static struct lu_device_operations osd_lu_ops = {
2548         .ldo_object_alloc      = osd_object_alloc,
2549         .ldo_process_config    = osd_process_config,
2550         .ldo_recovery_complete = osd_recovery_complete
2551 };
2552
2553 static struct lu_device_type_operations osd_device_type_ops = {
2554         .ldto_init = osd_type_init,
2555         .ldto_fini = osd_type_fini,
2556
2557         .ldto_device_alloc = osd_device_alloc,
2558         .ldto_device_free  = osd_device_free,
2559
2560         .ldto_device_init    = osd_device_init,
2561         .ldto_device_fini    = osd_device_fini
2562 };
2563
2564 static struct lu_device_type osd_device_type = {
2565         .ldt_tags     = LU_DEVICE_DT,
2566         .ldt_name     = LUSTRE_OSD_NAME,
2567         .ldt_ops      = &osd_device_type_ops,
2568         .ldt_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
2569 };
2570
2571 /*
2572  * lprocfs legacy support.
2573  */
2574 static struct obd_ops osd_obd_device_ops = {
2575         .o_owner = THIS_MODULE
2576 };
2577
2578 static int __init osd_mod_init(void)
2579 {
2580         struct lprocfs_static_vars lvars;
2581
2582         lprocfs_osd_init_vars(&lvars);
2583         return class_register_type(&osd_obd_device_ops, NULL, lvars.module_vars,
2584                                    LUSTRE_OSD_NAME, &osd_device_type);
2585 }
2586
2587 static void __exit osd_mod_exit(void)
2588 {
2589         class_unregister_type(LUSTRE_OSD_NAME);
2590 }
2591
2592 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2593 MODULE_DESCRIPTION("Lustre Object Storage Device ("LUSTRE_OSD_NAME")");
2594 MODULE_LICENSE("GPL");
2595
2596 cfs_module(osd, "0.0.2", osd_mod_init, osd_mod_exit);