Whamcloud - gitweb
Land b_head_quota onto HEAD (20081116_0105)
[fs/lustre-release.git] / lustre / osd / osd_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/osd/osd_handler.c
37  *
38  * Top-level entry points into osd module
39  *
40  * Author: Nikita Danilov <nikita@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49
50 /* LUSTRE_VERSION_CODE */
51 #include <lustre_ver.h>
52 /* prerequisite for linux/xattr.h */
53 #include <linux/types.h>
54 /* prerequisite for linux/xattr.h */
55 #include <linux/fs.h>
56 /* XATTR_{REPLACE,CREATE} */
57 #include <linux/xattr.h>
58 /*
59  * XXX temporary stuff: direct access to ldiskfs/jdb. Interface between osd
60  * and file system is not yet specified.
61  */
62 /* handle_t, journal_start(), journal_stop() */
63 #include <linux/jbd.h>
64 /* LDISKFS_SB() */
65 #include <linux/ldiskfs_fs.h>
66 #include <linux/ldiskfs_jbd.h>
67 /* simple_mkdir() */
68 #include <lvfs.h>
69
70 /*
71  * struct OBD_{ALLOC,FREE}*()
72  * OBD_FAIL_CHECK
73  */
74 #include <obd_support.h>
75 /* struct ptlrpc_thread */
76 #include <lustre_net.h>
77
78 /* fid_is_local() */
79 #include <lustre_fid.h>
80 #include <linux/lustre_iam.h>
81
82 #include "osd_internal.h"
83 #include "osd_igif.h"
84
85 struct osd_directory {
86         struct iam_container od_container;
87         struct iam_descr     od_descr;
88         struct semaphore     od_sem;
89 };
90
91 struct osd_object {
92         struct dt_object       oo_dt;
93         /**
94          * Inode for file system object represented by this osd_object. This
95          * inode is pinned for the whole duration of lu_object life.
96          *
97          * Not modified concurrently (either setup early during object
98          * creation, or assigned by osd_object_create() under write lock).
99          */
100         struct inode          *oo_inode;
101         struct rw_semaphore    oo_sem;
102         struct osd_directory  *oo_dir;
103         /** protects inode attributes. */
104         spinlock_t             oo_guard;
105         const struct lu_env   *oo_owner;
106 #ifdef CONFIG_LOCKDEP
107         struct lockdep_map     oo_dep_map;
108 #endif
109 };
110
111 static int   osd_root_get      (const struct lu_env *env,
112                                 struct dt_device *dev, struct lu_fid *f);
113
114 static int   lu_device_is_osd  (const struct lu_device *d);
115 static void  osd_mod_exit      (void) __exit;
116 static int   osd_mod_init      (void) __init;
117 static int   osd_type_init     (struct lu_device_type *t);
118 static void  osd_type_fini     (struct lu_device_type *t);
119 static int   osd_object_init   (const struct lu_env *env,
120                                 struct lu_object *l,
121                                 const struct lu_object_conf *_);
122 static void  osd_object_release(const struct lu_env *env,
123                                 struct lu_object *l);
124 static int   osd_object_print  (const struct lu_env *env, void *cookie,
125                                 lu_printer_t p, const struct lu_object *o);
126 static struct lu_device *osd_device_free   (const struct lu_env *env,
127                                 struct lu_device *m);
128 static void *osd_key_init      (const struct lu_context *ctx,
129                                 struct lu_context_key *key);
130 static void  osd_key_fini      (const struct lu_context *ctx,
131                                 struct lu_context_key *key, void *data);
132 static void  osd_key_exit      (const struct lu_context *ctx,
133                                 struct lu_context_key *key, void *data);
134 static int   osd_has_index     (const struct osd_object *obj);
135 static void  osd_object_init0  (struct osd_object *obj);
136 static int   osd_device_init   (const struct lu_env *env,
137                                 struct lu_device *d, const char *,
138                                 struct lu_device *);
139 static int   osd_fid_lookup    (const struct lu_env *env,
140                                 struct osd_object *obj,
141                                 const struct lu_fid *fid);
142 static void  osd_inode_getattr (const struct lu_env *env,
143                                 struct inode *inode, struct lu_attr *attr);
144 static int   osd_inode_setattr (const struct lu_env *env,
145                                 struct inode *inode, const struct lu_attr *attr);
146 static int   osd_param_is_sane (const struct osd_device *dev,
147                                 const struct txn_param *param);
148 static int   osd_index_lookup  (const struct lu_env *env,
149                                 struct dt_object *dt,
150                                 struct dt_rec *rec, const struct dt_key *key,
151                                 struct lustre_capa *capa);
152 static int   osd_index_insert  (const struct lu_env *env,
153                                 struct dt_object *dt,
154                                 const struct dt_rec *rec,
155                                 const struct dt_key *key,
156                                 struct thandle *handle,
157                                 struct lustre_capa *capa,
158                                 int ingore_quota);
159 static int   osd_index_delete  (const struct lu_env *env,
160                                 struct dt_object *dt, const struct dt_key *key,
161                                 struct thandle *handle,
162                                 struct lustre_capa *capa);
163 static int   osd_index_probe   (const struct lu_env *env,
164                                 struct osd_object *o,
165                                 const struct dt_index_features *feat);
166 static int   osd_index_try     (const struct lu_env *env,
167                                 struct dt_object *dt,
168                                 const struct dt_index_features *feat);
169 static void  osd_index_fini    (struct osd_object *o);
170
171 static void  osd_it_fini       (const struct lu_env *env, struct dt_it *di);
172 static int   osd_it_get        (const struct lu_env *env,
173                                 struct dt_it *di, const struct dt_key *key);
174 static void  osd_it_put        (const struct lu_env *env, struct dt_it *di);
175 static int   osd_it_next       (const struct lu_env *env, struct dt_it *di);
176 static int   osd_it_del        (const struct lu_env *env, struct dt_it *di,
177                                 struct thandle *th);
178 static int   osd_it_key_size   (const struct lu_env *env,
179                                 const struct dt_it *di);
180 static void  osd_conf_get      (const struct lu_env *env,
181                                 const struct dt_device *dev,
182                                 struct dt_device_param *param);
183 static void  osd_trans_stop    (const struct lu_env *env,
184                                 struct thandle *th);
185 static int   osd_object_is_root(const struct osd_object *obj);
186
187 static struct osd_object  *osd_obj          (const struct lu_object *o);
188 static struct osd_device  *osd_dev          (const struct lu_device *d);
189 static struct osd_device  *osd_dt_dev       (const struct dt_device *d);
190 static struct osd_object  *osd_dt_obj       (const struct dt_object *d);
191 static struct osd_device  *osd_obj2dev      (const struct osd_object *o);
192 static struct lu_device   *osd2lu_dev       (struct osd_device *osd);
193 static struct lu_device   *osd_device_fini  (const struct lu_env *env,
194                                              struct lu_device *d);
195 static struct lu_device   *osd_device_alloc (const struct lu_env *env,
196                                              struct lu_device_type *t,
197                                              struct lustre_cfg *cfg);
198 static struct lu_object   *osd_object_alloc (const struct lu_env *env,
199                                              const struct lu_object_header *hdr,
200                                              struct lu_device *d);
201 static struct inode       *osd_iget         (struct osd_thread_info *info,
202                                              struct osd_device *dev,
203                                              const struct osd_inode_id *id);
204 static struct super_block *osd_sb           (const struct osd_device *dev);
205 static struct dt_it       *osd_it_init      (const struct lu_env *env,
206                                              struct dt_object *dt, int wable,
207                                              struct lustre_capa *capa);
208 static struct dt_key      *osd_it_key       (const struct lu_env *env,
209                                              const struct dt_it *di);
210 static struct dt_rec      *osd_it_rec       (const struct lu_env *env,
211                                              const struct dt_it *di);
212 static struct timespec    *osd_inode_time   (const struct lu_env *env,
213                                              struct inode *inode,
214                                              __u64 seconds);
215 static struct thandle     *osd_trans_start  (const struct lu_env *env,
216                                              struct dt_device *d,
217                                              struct txn_param *p);
218 static journal_t          *osd_journal      (const struct osd_device *dev);
219
220 static const struct lu_device_type_operations osd_device_type_ops;
221 static       struct lu_device_type            osd_device_type;
222 static const struct lu_object_operations      osd_lu_obj_ops;
223 static       struct obd_ops                   osd_obd_device_ops;
224 static const struct lu_device_operations      osd_lu_ops;
225 static       struct lu_context_key            osd_key;
226 static const struct dt_object_operations      osd_obj_ops;
227 static const struct dt_body_operations        osd_body_ops;
228 static const struct dt_index_operations       osd_index_ops;
229 static const struct dt_index_operations       osd_index_compat_ops;
230
231 struct osd_thandle {
232         struct thandle          ot_super;
233         handle_t               *ot_handle;
234         struct journal_callback ot_jcb;
235         /* Link to the device, for debugging. */
236         struct lu_ref_link     *ot_dev_link;
237
238 };
239
240 #ifdef HAVE_QUOTA_SUPPORT
241 static inline void
242 osd_push_ctxt(const struct lu_env *env, struct osd_ctxt *save)
243 {
244         struct md_ucred    *uc = md_ucred(env);
245
246         LASSERT(uc != NULL);
247
248         save->oc_uid = current->fsuid;
249         save->oc_gid = current->fsgid;
250         save->oc_cap = current->cap_effective;
251         current->fsuid         = uc->mu_fsuid;
252         current->fsgid         = uc->mu_fsgid;
253         current->cap_effective = uc->mu_cap;
254 }
255
256 static inline void
257 osd_pop_ctxt(struct osd_ctxt *save)
258 {
259         current->fsuid         = save->oc_uid;
260         current->fsgid         = save->oc_gid;
261         current->cap_effective = save->oc_cap;
262 }
263 #endif
264
265 /*
266  * Invariants, assertions.
267  */
268
269 /*
270  * XXX: do not enable this, until invariant checking code is made thread safe
271  * in the face of pdirops locking.
272  */
273 #define OSD_INVARIANT_CHECKS (0)
274
275 #if OSD_INVARIANT_CHECKS
276 static int osd_invariant(const struct osd_object *obj)
277 {
278         return
279                 obj != NULL &&
280                 ergo(obj->oo_inode != NULL,
281                      obj->oo_inode->i_sb == osd_sb(osd_obj2dev(obj)) &&
282                      atomic_read(&obj->oo_inode->i_count) > 0) &&
283                 ergo(obj->oo_dir != NULL &&
284                      obj->oo_dir->od_conationer.ic_object != NULL,
285                      obj->oo_dir->od_conationer.ic_object == obj->oo_inode);
286 }
287 #else
288 #define osd_invariant(obj) (1)
289 #endif
290
291 static inline struct osd_thread_info *osd_oti_get(const struct lu_env *env)
292 {
293         return lu_context_key_get(&env->le_ctx, &osd_key);
294 }
295
296 /*
297  * Concurrency: doesn't matter
298  */
299 static int osd_read_locked(const struct lu_env *env, struct osd_object *o)
300 {
301         return osd_oti_get(env)->oti_r_locks > 0;
302 }
303
304 /*
305  * Concurrency: doesn't matter
306  */
307 static int osd_write_locked(const struct lu_env *env, struct osd_object *o)
308 {
309         struct osd_thread_info *oti = osd_oti_get(env);
310         return oti->oti_w_locks > 0 && o->oo_owner == env;
311 }
312
313 /*
314  * Concurrency: doesn't access mutable data
315  */
316 static int osd_root_get(const struct lu_env *env,
317                         struct dt_device *dev, struct lu_fid *f)
318 {
319         struct inode *inode;
320
321         inode = osd_sb(osd_dt_dev(dev))->s_root->d_inode;
322         lu_igif_build(f, inode->i_ino, inode->i_generation);
323         return 0;
324 }
325
326 /*
327  * OSD object methods.
328  */
329
330 /*
331  * Concurrency: no concurrent access is possible that early in object
332  * life-cycle.
333  */
334 static struct lu_object *osd_object_alloc(const struct lu_env *env,
335                                           const struct lu_object_header *hdr,
336                                           struct lu_device *d)
337 {
338         struct osd_object *mo;
339
340         OBD_ALLOC_PTR(mo);
341         if (mo != NULL) {
342                 struct lu_object *l;
343
344                 l = &mo->oo_dt.do_lu;
345                 dt_object_init(&mo->oo_dt, NULL, d);
346                 mo->oo_dt.do_ops = &osd_obj_ops;
347                 l->lo_ops = &osd_lu_obj_ops;
348                 init_rwsem(&mo->oo_sem);
349                 spin_lock_init(&mo->oo_guard);
350                 return l;
351         } else
352                 return NULL;
353 }
354
355 /*
356  * Concurrency: shouldn't matter.
357  */
358 static void osd_object_init0(struct osd_object *obj)
359 {
360         LASSERT(obj->oo_inode != NULL);
361         obj->oo_dt.do_body_ops = &osd_body_ops;
362         obj->oo_dt.do_lu.lo_header->loh_attr |=
363                 (LOHA_EXISTS | (obj->oo_inode->i_mode & S_IFMT));
364 }
365
366 /*
367  * Concurrency: no concurrent access is possible that early in object
368  * life-cycle.
369  */
370 static int osd_object_init(const struct lu_env *env, struct lu_object *l,
371                            const struct lu_object_conf *_)
372 {
373         struct osd_object *obj = osd_obj(l);
374         int result;
375
376         LINVRNT(osd_invariant(obj));
377
378         result = osd_fid_lookup(env, obj, lu_object_fid(l));
379         if (result == 0) {
380                 if (obj->oo_inode != NULL)
381                         osd_object_init0(obj);
382         }
383         LINVRNT(osd_invariant(obj));
384         return result;
385 }
386
387 /*
388  * Concurrency: no concurrent access is possible that late in object
389  * life-cycle.
390  */
391 static void osd_object_free(const struct lu_env *env, struct lu_object *l)
392 {
393         struct osd_object *obj = osd_obj(l);
394
395         LINVRNT(osd_invariant(obj));
396
397         dt_object_fini(&obj->oo_dt);
398         OBD_FREE_PTR(obj);
399 }
400
401 static struct iam_path_descr *osd_ipd_get(const struct lu_env *env,
402                                           const struct iam_container *bag)
403 {
404         return bag->ic_descr->id_ops->id_ipd_alloc(bag,
405                                                    osd_oti_get(env)->oti_ipd);
406 }
407
408 static void osd_ipd_put(const struct lu_env *env,
409                         const struct iam_container *bag,
410                         struct iam_path_descr *ipd)
411 {
412         bag->ic_descr->id_ops->id_ipd_free(ipd);
413 }
414
415 /*
416  * Concurrency: no concurrent access is possible that late in object
417  * life-cycle.
418  */
419 static void osd_index_fini(struct osd_object *o)
420 {
421         struct iam_container *bag;
422
423         if (o->oo_dir != NULL) {
424                 bag = &o->oo_dir->od_container;
425                 if (o->oo_inode != NULL) {
426                         if (bag->ic_object == o->oo_inode)
427                                 iam_container_fini(bag);
428                 }
429                 OBD_FREE_PTR(o->oo_dir);
430                 o->oo_dir = NULL;
431         }
432 }
433
434 /*
435  * Concurrency: no concurrent access is possible that late in object
436  * life-cycle (for all existing callers, that is. New callers have to provide
437  * their own locking.)
438  */
439 static int osd_inode_unlinked(const struct inode *inode)
440 {
441         return inode->i_nlink == 0;
442 }
443
444 enum {
445         OSD_TXN_OI_DELETE_CREDITS    = 20,
446         OSD_TXN_INODE_DELETE_CREDITS = 20
447 };
448
449 /*
450  * Concurrency: no concurrent access is possible that late in object
451  * life-cycle.
452  */
453 static int osd_inode_remove(const struct lu_env *env, struct osd_object *obj)
454 {
455         const struct lu_fid    *fid = lu_object_fid(&obj->oo_dt.do_lu);
456         struct osd_device      *osd = osd_obj2dev(obj);
457         struct osd_thread_info *oti = osd_oti_get(env);
458         struct txn_param       *prm = &oti->oti_txn;
459         struct thandle         *th;
460         int result;
461
462         txn_param_init(prm, OSD_TXN_OI_DELETE_CREDITS +
463                             OSD_TXN_INODE_DELETE_CREDITS);
464         th = osd_trans_start(env, &osd->od_dt_dev, prm);
465         if (!IS_ERR(th)) {
466                 result = osd_oi_delete(oti, &osd->od_oi, fid, th);
467                 osd_trans_stop(env, th);
468         } else
469                 result = PTR_ERR(th);
470         return result;
471 }
472
473 /*
474  * Called just before object is freed. Releases all resources except for
475  * object itself (that is released by osd_object_free()).
476  *
477  * Concurrency: no concurrent access is possible that late in object
478  * life-cycle.
479  */
480 static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
481 {
482         struct osd_object *obj   = osd_obj(l);
483         struct inode      *inode = obj->oo_inode;
484
485         LINVRNT(osd_invariant(obj));
486
487         /*
488          * If object is unlinked remove fid->ino mapping from object index.
489          *
490          * File body will be deleted by iput().
491          */
492
493         osd_index_fini(obj);
494         if (inode != NULL) {
495                 int result;
496
497                 if (osd_inode_unlinked(inode)) {
498                         result = osd_inode_remove(env, obj);
499                         if (result != 0)
500                                 LU_OBJECT_DEBUG(D_ERROR, env, l,
501                                                 "Failed to cleanup: %d\n",
502                                                 result);
503                 }
504                 iput(inode);
505                 obj->oo_inode = NULL;
506         }
507 }
508
509 /*
510  * Concurrency: ->loo_object_release() is called under site spin-lock.
511  */
512 static void osd_object_release(const struct lu_env *env,
513                                struct lu_object *l)
514 {
515         struct osd_object *o = osd_obj(l);
516
517         LASSERT(!lu_object_is_dying(l->lo_header));
518         if (o->oo_inode != NULL && osd_inode_unlinked(o->oo_inode))
519                 set_bit(LU_OBJECT_HEARD_BANSHEE, &l->lo_header->loh_flags);
520 }
521
522 /*
523  * Concurrency: shouldn't matter.
524  */
525 static int osd_object_print(const struct lu_env *env, void *cookie,
526                             lu_printer_t p, const struct lu_object *l)
527 {
528         struct osd_object *o = osd_obj(l);
529         struct iam_descr  *d;
530
531         if (o->oo_dir != NULL)
532                 d = o->oo_dir->od_container.ic_descr;
533         else
534                 d = NULL;
535         return (*p)(env, cookie, LUSTRE_OSD_NAME"-object@%p(i:%p:%lu/%u)[%s]",
536                     o, o->oo_inode,
537                     o->oo_inode ? o->oo_inode->i_ino : 0UL,
538                     o->oo_inode ? o->oo_inode->i_generation : 0,
539                     d ? d->id_ops->id_name : "plain");
540 }
541
542 /*
543  * Concurrency: shouldn't matter.
544  */
545 int osd_statfs(const struct lu_env *env, struct dt_device *d,
546                struct kstatfs *sfs)
547 {
548         struct osd_device *osd = osd_dt_dev(d);
549         struct super_block *sb = osd_sb(osd);
550         int result = 0;
551
552         spin_lock(&osd->od_osfs_lock);
553         /* cache 1 second */
554         if (cfs_time_before_64(osd->od_osfs_age, cfs_time_shift_64(-1))) {
555                 result = ll_do_statfs(sb, &osd->od_kstatfs);
556                 if (likely(result == 0)) /* N.B. statfs can't really fail */
557                         osd->od_osfs_age = cfs_time_current_64();
558         }
559
560         if (likely(result == 0))
561                 *sfs = osd->od_kstatfs;
562         spin_unlock(&osd->od_osfs_lock);
563
564         return result;
565 }
566
567 /*
568  * Concurrency: doesn't access mutable data.
569  */
570 static void osd_conf_get(const struct lu_env *env,
571                          const struct dt_device *dev,
572                          struct dt_device_param *param)
573 {
574         /*
575          * XXX should be taken from not-yet-existing fs abstraction layer.
576          */
577         param->ddp_max_name_len  = LDISKFS_NAME_LEN;
578         param->ddp_max_nlink     = LDISKFS_LINK_MAX;
579         param->ddp_block_shift   = osd_sb(osd_dt_dev(dev))->s_blocksize_bits;
580 }
581
582 /*
583  * Journal
584  */
585
586 /*
587  * Concurrency: doesn't access mutable data.
588  */
589 static int osd_param_is_sane(const struct osd_device *dev,
590                              const struct txn_param *param)
591 {
592         return param->tp_credits <= osd_journal(dev)->j_max_transaction_buffers;
593 }
594
595 /*
596  * Concurrency: shouldn't matter.
597  */
598 static void osd_trans_commit_cb(struct journal_callback *jcb, int error)
599 {
600         struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb);
601         struct thandle     *th  = &oh->ot_super;
602         struct dt_device   *dev = th->th_dev;
603         struct lu_device   *lud = &dev->dd_lu_dev;
604
605         LASSERT(dev != NULL);
606         LASSERT(oh->ot_handle == NULL);
607
608         if (error) {
609                 CERROR("transaction @0x%p commit error: %d\n", th, error);
610         } else {
611                 struct lu_env *env = &osd_dt_dev(dev)->od_env_for_commit;
612                 /*
613                  * This od_env_for_commit is only for commit usage.  see
614                  * "struct dt_device"
615                  */
616                 lu_context_enter(&env->le_ctx);
617                 dt_txn_hook_commit(env, th);
618                 lu_context_exit(&env->le_ctx);
619         }
620
621         lu_ref_del_at(&lud->ld_reference, oh->ot_dev_link, "osd-tx", th);
622         lu_device_put(lud);
623         th->th_dev = NULL;
624
625         lu_context_exit(&th->th_ctx);
626         lu_context_fini(&th->th_ctx);
627         OBD_FREE_PTR(oh);
628 }
629
630 /*
631  * Concurrency: shouldn't matter.
632  */
633 static struct thandle *osd_trans_start(const struct lu_env *env,
634                                        struct dt_device *d,
635                                        struct txn_param *p)
636 {
637         struct osd_device  *dev = osd_dt_dev(d);
638         handle_t           *jh;
639         struct osd_thandle *oh;
640         struct thandle     *th;
641         int hook_res;
642
643         ENTRY;
644
645         hook_res = dt_txn_hook_start(env, d, p);
646         if (hook_res != 0)
647                 RETURN(ERR_PTR(hook_res));
648
649         if (osd_param_is_sane(dev, p)) {
650                 OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO);
651                 if (oh != NULL) {
652                         struct osd_thread_info *oti = osd_oti_get(env);
653
654                         /*
655                          * XXX temporary stuff. Some abstraction layer should
656                          * be used.
657                          */
658
659                         jh = journal_start(osd_journal(dev), p->tp_credits);
660                         if (!IS_ERR(jh)) {
661                                 oh->ot_handle = jh;
662                                 th = &oh->ot_super;
663                                 th->th_dev = d;
664                                 th->th_result = 0;
665                                 jh->h_sync = p->tp_sync;
666                                 lu_device_get(&d->dd_lu_dev);
667                                 oh->ot_dev_link = lu_ref_add
668                                         (&d->dd_lu_dev.ld_reference,
669                                          "osd-tx", th);
670                                 /* add commit callback */
671                                 lu_context_init(&th->th_ctx, LCT_TX_HANDLE);
672                                 lu_context_enter(&th->th_ctx);
673                                 journal_callback_set(jh, osd_trans_commit_cb,
674                                                      (struct journal_callback *)&oh->ot_jcb);
675                                         LASSERT(oti->oti_txns == 0);
676                                         LASSERT(oti->oti_r_locks == 0);
677                                         LASSERT(oti->oti_w_locks == 0);
678                                         oti->oti_txns++;
679                         } else {
680                                 OBD_FREE_PTR(oh);
681                                 th = (void *)jh;
682                         }
683                 } else
684                         th = ERR_PTR(-ENOMEM);
685         } else {
686                 CERROR("Invalid transaction parameters\n");
687                 th = ERR_PTR(-EINVAL);
688         }
689
690         RETURN(th);
691 }
692
693 /*
694  * Concurrency: shouldn't matter.
695  */
696 static void osd_trans_stop(const struct lu_env *env, struct thandle *th)
697 {
698         int result;
699         struct osd_thandle *oh;
700         struct osd_thread_info *oti = osd_oti_get(env);
701
702         ENTRY;
703
704         oh = container_of0(th, struct osd_thandle, ot_super);
705         if (oh->ot_handle != NULL) {
706                 handle_t *hdl = oh->ot_handle;
707
708                 LASSERT(oti->oti_txns == 1);
709                 oti->oti_txns--;
710                 LASSERT(oti->oti_r_locks == 0);
711                 LASSERT(oti->oti_w_locks == 0);
712                 result = dt_txn_hook_stop(env, th);
713                 if (result != 0)
714                         CERROR("Failure in transaction hook: %d\n", result);
715                 oh->ot_handle = NULL;
716                 result = journal_stop(hdl);
717                 if (result != 0)
718                         CERROR("Failure to stop transaction: %d\n", result);
719         }
720         EXIT;
721 }
722
723 /*
724  * Concurrency: shouldn't matter.
725  */
726 static int osd_sync(const struct lu_env *env, struct dt_device *d)
727 {
728         CDEBUG(D_HA, "syncing OSD %s\n", LUSTRE_OSD_NAME);
729         return ldiskfs_force_commit(osd_sb(osd_dt_dev(d)));
730 }
731
732 /**
733  * Start commit for OSD device.
734  *
735  * An implementation of dt_commit_async method for OSD device.
736  * Asychronously starts underlayng fs sync and thereby a transaction
737  * commit.
738  *
739  * \param env environment
740  * \param d dt device
741  *
742  * \see dt_device_operations
743  */
744 static int osd_commit_async(const struct lu_env *env,
745                             struct dt_device *d)
746 {
747         struct super_block *s = osd_sb(osd_dt_dev(d));
748         ENTRY;
749
750         CDEBUG(D_HA, "async commit OSD %s\n", LUSTRE_OSD_NAME);
751         RETURN(s->s_op->sync_fs(s, 0));
752 }
753
754 /*
755  * Concurrency: shouldn't matter.
756  */
757 lvfs_sbdev_type fsfilt_ldiskfs_journal_sbdev(struct super_block *);
758
759 static void osd_ro(const struct lu_env *env, struct dt_device *d)
760 {
761         ENTRY;
762
763         CERROR("*** setting device %s read-only ***\n", LUSTRE_OSD_NAME);
764
765         __lvfs_set_rdonly(lvfs_sbdev(osd_sb(osd_dt_dev(d))),
766                           fsfilt_ldiskfs_journal_sbdev(osd_sb(osd_dt_dev(d))));
767         EXIT;
768 }
769
770 /*
771  * Concurrency: serialization provided by callers.
772  */
773 static int osd_init_capa_ctxt(const struct lu_env *env, struct dt_device *d,
774                               int mode, unsigned long timeout, __u32 alg,
775                               struct lustre_capa_key *keys)
776 {
777         struct osd_device *dev = osd_dt_dev(d);
778         ENTRY;
779
780         dev->od_fl_capa = mode;
781         dev->od_capa_timeout = timeout;
782         dev->od_capa_alg = alg;
783         dev->od_capa_keys = keys;
784         RETURN(0);
785 }
786
787 /**
788  * Concurrency: serialization provided by callers.
789  */
790 static void osd_init_quota_ctxt(const struct lu_env *env, struct dt_device *d,
791                                struct dt_quota_ctxt *ctxt, void *data)
792 {
793         struct obd_device *obd = (void *)ctxt;
794         struct vfsmount *mnt = (struct vfsmount *)data;
795         ENTRY;
796
797         obd->u.obt.obt_sb = mnt->mnt_root->d_inode->i_sb;
798         OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
799         obd->obd_lvfs_ctxt.pwdmnt = mnt;
800         obd->obd_lvfs_ctxt.pwd = mnt->mnt_root;
801         obd->obd_lvfs_ctxt.fs = get_ds();
802
803         EXIT;
804 }
805
806 /**
807  * Note: we do not count into QUOTA here.
808  * If we mount with --data_journal we may need more.
809  */
810 static const int osd_dto_credits_noquota[DTO_NR] = {
811         /**
812          * Insert/Delete.
813          * INDEX_EXTRA_TRANS_BLOCKS(8) +
814          * SINGLEDATA_TRANS_BLOCKS(8)
815          * XXX Note: maybe iam need more, since iam have more level than
816          *           EXT3 htree.
817          */
818         [DTO_INDEX_INSERT]  = 16,
819         [DTO_INDEX_DELETE]  = 16,
820         /**
821          * Unused now
822          */
823         [DTO_IDNEX_UPDATE]  = 16,
824         /**
825          * Create a object. The same as create object in EXT3.
826          * DATA_TRANS_BLOCKS(14) +
827          * INDEX_EXTRA_BLOCKS(8) +
828          * 3(inode bits, groups, GDT)
829          */
830         [DTO_OBJECT_CREATE] = 25,
831         /**
832          * Unused now
833          */
834         [DTO_OBJECT_DELETE] = 25,
835         /**
836          * Attr set credits.
837          * 3(inode bits, group, GDT)
838          */
839         [DTO_ATTR_SET_BASE] = 3,
840         /**
841          * Xattr set. The same as xattr of EXT3.
842          * DATA_TRANS_BLOCKS(14)
843          * XXX Note: in original MDS implmentation INDEX_EXTRA_TRANS_BLOCKS are
844          *           also counted in. Do not know why?
845          */
846         [DTO_XATTR_SET]     = 14,
847         [DTO_LOG_REC]       = 14,
848         /**
849          * creadits for inode change during write.
850          */
851         [DTO_WRITE_BASE]    = 3,
852         /**
853          * credits for single block write.
854          */
855         [DTO_WRITE_BLOCK]   = 14,
856         /**
857          * Attr set credits for chown.
858          * 3 (inode bit, group, GDT)
859          */
860         [DTO_ATTR_SET_CHOWN]= 3
861 };
862
863 /**
864  * Note: we count into QUOTA here.
865  * If we mount with --data_journal we may need more.
866  */
867 static const int osd_dto_credits_quota[DTO_NR] = {
868         /**
869          * INDEX_EXTRA_TRANS_BLOCKS(8) +
870          * SINGLEDATA_TRANS_BLOCKS(8) +
871          * 2 * QUOTA_TRANS_BLOCKS(2)
872          */
873         [DTO_INDEX_INSERT]  = 20,
874         /**
875          * INDEX_EXTRA_TRANS_BLOCKS(8) +
876          * SINGLEDATA_TRANS_BLOCKS(8) +
877          * 2 * QUOTA_TRANS_BLOCKS(2)
878          */
879         [DTO_INDEX_DELETE]  = 20,
880         /**
881          * Unused now.
882          */ 
883         [DTO_IDNEX_UPDATE]  = 16,
884         /*
885          * Create a object. Same as create object in EXT3 filesystem.
886          * DATA_TRANS_BLOCKS(16) +
887          * INDEX_EXTRA_BLOCKS(8) +
888          * 3(inode bits, groups, GDT) +
889          * 2 * QUOTA_INIT_BLOCKS(25)
890          */
891         [DTO_OBJECT_CREATE] = 77,
892         /*
893          * Unused now.
894          * DATA_TRANS_BLOCKS(16) +
895          * INDEX_EXTRA_BLOCKS(8) +
896          * 3(inode bits, groups, GDT) +
897          * QUOTA(?)
898          */ 
899         [DTO_OBJECT_DELETE] = 27,
900         /**
901          * Attr set credits.
902          * 3 (inode bit, group, GDT) +
903          */
904         [DTO_ATTR_SET_BASE] = 3,
905         /**
906          * Xattr set. The same as xattr of EXT3.
907          * DATA_TRANS_BLOCKS(16)
908          * XXX Note: in original MDS implmentation INDEX_EXTRA_TRANS_BLOCKS are
909          *           also counted in. Do not know why?
910          */
911         [DTO_XATTR_SET]     = 16,
912         [DTO_LOG_REC]       = 16,
913         /**
914          * creadits for inode change during write.
915          */
916         [DTO_WRITE_BASE]    = 3,
917         /**
918          * credits for single block write.
919          */
920         [DTO_WRITE_BLOCK]   = 16,
921         /**
922          * Attr set credits for chown.
923          * 3 (inode bit, group, GDT) +
924          * 2 * QUOTA_INIT_BLOCKS(25) +
925          * 2 * QUOTA_DEL_BLOCKS(9)
926          */
927         [DTO_ATTR_SET_CHOWN]= 71
928 };
929
930 static int osd_credit_get(const struct lu_env *env, struct dt_device *d,
931                           enum dt_txn_op op)
932 {
933         LASSERT(ARRAY_SIZE(osd_dto_credits_noquota) ==
934                 ARRAY_SIZE(osd_dto_credits_quota));
935         LASSERT(0 <= op && op < ARRAY_SIZE(osd_dto_credits_noquota));
936 #ifdef HAVE_QUOTA_SUPPORT
937         if (test_opt(osd_sb(osd_dt_dev(d)), QUOTA))
938                 return osd_dto_credits_quota[op];
939         else
940 #endif
941                 return osd_dto_credits_noquota[op];
942 }
943
944 static const struct dt_device_operations osd_dt_ops = {
945         .dt_root_get       = osd_root_get,
946         .dt_statfs         = osd_statfs,
947         .dt_trans_start    = osd_trans_start,
948         .dt_trans_stop     = osd_trans_stop,
949         .dt_conf_get       = osd_conf_get,
950         .dt_sync           = osd_sync,
951         .dt_ro             = osd_ro,
952         .dt_commit_async   = osd_commit_async,
953         .dt_credit_get     = osd_credit_get,
954         .dt_init_capa_ctxt = osd_init_capa_ctxt,
955         .dt_init_quota_ctxt= osd_init_quota_ctxt,
956 };
957
958 static void osd_object_read_lock(const struct lu_env *env,
959                                  struct dt_object *dt, unsigned role)
960 {
961         struct osd_object *obj = osd_dt_obj(dt);
962         struct osd_thread_info *oti = osd_oti_get(env);
963
964         LINVRNT(osd_invariant(obj));
965
966         LASSERT(obj->oo_owner != env);
967         down_read_nested(&obj->oo_sem, role);
968
969         LASSERT(obj->oo_owner == NULL);
970         oti->oti_r_locks++;
971 }
972
973 static void osd_object_write_lock(const struct lu_env *env,
974                                   struct dt_object *dt, unsigned role)
975 {
976         struct osd_object *obj = osd_dt_obj(dt);
977         struct osd_thread_info *oti = osd_oti_get(env);
978
979         LINVRNT(osd_invariant(obj));
980
981         LASSERT(obj->oo_owner != env);
982         down_write_nested(&obj->oo_sem, role);
983
984         LASSERT(obj->oo_owner == NULL);
985         obj->oo_owner = env;
986         oti->oti_w_locks++;
987 }
988
989 static void osd_object_read_unlock(const struct lu_env *env,
990                                    struct dt_object *dt)
991 {
992         struct osd_object *obj = osd_dt_obj(dt);
993         struct osd_thread_info *oti = osd_oti_get(env);
994
995         LINVRNT(osd_invariant(obj));
996
997         LASSERT(oti->oti_r_locks > 0);
998         oti->oti_r_locks--;
999         up_read(&obj->oo_sem);
1000 }
1001
1002 static void osd_object_write_unlock(const struct lu_env *env,
1003                                     struct dt_object *dt)
1004 {
1005         struct osd_object *obj = osd_dt_obj(dt);
1006         struct osd_thread_info *oti = osd_oti_get(env);
1007
1008         LINVRNT(osd_invariant(obj));
1009
1010         LASSERT(obj->oo_owner == env);
1011         LASSERT(oti->oti_w_locks > 0);
1012         oti->oti_w_locks--;
1013         obj->oo_owner = NULL;
1014         up_write(&obj->oo_sem);
1015 }
1016
1017 static int capa_is_sane(const struct lu_env *env,
1018                         struct osd_device *dev,
1019                         struct lustre_capa *capa,
1020                         struct lustre_capa_key *keys)
1021 {
1022         struct osd_thread_info *oti = osd_oti_get(env);
1023         struct lustre_capa *tcapa = &oti->oti_capa;
1024         struct obd_capa *oc;
1025         int i, rc = 0;
1026         ENTRY;
1027
1028         oc = capa_lookup(dev->od_capa_hash, capa, 0);
1029         if (oc) {
1030                 if (capa_is_expired(oc)) {
1031                         DEBUG_CAPA(D_ERROR, capa, "expired");
1032                         rc = -ESTALE;
1033                 }
1034                 capa_put(oc);
1035                 RETURN(rc);
1036         }
1037
1038         if (capa_is_expired_sec(capa)) {
1039                 DEBUG_CAPA(D_ERROR, capa, "expired");
1040                 RETURN(-ESTALE);
1041         }
1042
1043         spin_lock(&capa_lock);
1044         for (i = 0; i < 2; i++) {
1045                 if (keys[i].lk_keyid == capa->lc_keyid) {
1046                         oti->oti_capa_key = keys[i];
1047                         break;
1048                 }
1049         }
1050         spin_unlock(&capa_lock);
1051
1052         if (i == 2) {
1053                 DEBUG_CAPA(D_ERROR, capa, "no matched capa key");
1054                 RETURN(-ESTALE);
1055         }
1056
1057         rc = capa_hmac(tcapa->lc_hmac, capa, oti->oti_capa_key.lk_key);
1058         if (rc)
1059                 RETURN(rc);
1060
1061         if (memcmp(tcapa->lc_hmac, capa->lc_hmac, sizeof(capa->lc_hmac))) {
1062                 DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch");
1063                 RETURN(-EACCES);
1064         }
1065
1066         oc = capa_add(dev->od_capa_hash, capa);
1067         capa_put(oc);
1068
1069         RETURN(0);
1070 }
1071
1072 static int osd_object_auth(const struct lu_env *env, struct dt_object *dt,
1073                            struct lustre_capa *capa, __u64 opc)
1074 {
1075         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
1076         struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
1077         struct md_capainfo *ci;
1078         int rc;
1079
1080         if (!dev->od_fl_capa)
1081                 return 0;
1082
1083         if (capa == BYPASS_CAPA)
1084                 return 0;
1085
1086         ci = md_capainfo(env);
1087         if (unlikely(!ci))
1088                 return 0;
1089
1090         if (ci->mc_auth == LC_ID_NONE)
1091                 return 0;
1092
1093         if (!capa) {
1094                 CERROR("no capability is provided for fid "DFID"\n", PFID(fid));
1095                 return -EACCES;
1096         }
1097
1098         if (!lu_fid_eq(fid, &capa->lc_fid)) {
1099                 DEBUG_CAPA(D_ERROR, capa, "fid "DFID" mismatch with",
1100                            PFID(fid));
1101                 return -EACCES;
1102         }
1103
1104         if (!capa_opc_supported(capa, opc)) {
1105                 DEBUG_CAPA(D_ERROR, capa, "opc "LPX64" not supported by", opc);
1106                 return -EACCES;
1107         }
1108
1109         if ((rc = capa_is_sane(env, dev, capa, dev->od_capa_keys))) {
1110                 DEBUG_CAPA(D_ERROR, capa, "insane (rc %d)", rc);
1111                 return -EACCES;
1112         }
1113
1114         return 0;
1115 }
1116
1117 static int osd_attr_get(const struct lu_env *env,
1118                         struct dt_object *dt,
1119                         struct lu_attr *attr,
1120                         struct lustre_capa *capa)
1121 {
1122         struct osd_object *obj = osd_dt_obj(dt);
1123
1124         LASSERT(dt_object_exists(dt));
1125         LINVRNT(osd_invariant(obj));
1126
1127         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1128                 return -EACCES;
1129
1130         spin_lock(&obj->oo_guard);
1131         osd_inode_getattr(env, obj->oo_inode, attr);
1132         spin_unlock(&obj->oo_guard);
1133         return 0;
1134 }
1135
1136 static int osd_attr_set(const struct lu_env *env,
1137                         struct dt_object *dt,
1138                         const struct lu_attr *attr,
1139                         struct thandle *handle,
1140                         struct lustre_capa *capa)
1141 {
1142         struct osd_object *obj = osd_dt_obj(dt);
1143         int rc;
1144
1145         LASSERT(handle != NULL);
1146         LASSERT(dt_object_exists(dt));
1147         LASSERT(osd_invariant(obj));
1148
1149         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1150                 return -EACCES;
1151
1152         spin_lock(&obj->oo_guard);
1153         rc = osd_inode_setattr(env, obj->oo_inode, attr);
1154         spin_unlock(&obj->oo_guard);
1155
1156         if (!rc)
1157                 mark_inode_dirty(obj->oo_inode);
1158         return rc;
1159 }
1160
1161 static struct timespec *osd_inode_time(const struct lu_env *env,
1162                                        struct inode *inode, __u64 seconds)
1163 {
1164         struct osd_thread_info *oti = osd_oti_get(env);
1165         struct timespec        *t   = &oti->oti_time;
1166
1167         t->tv_sec  = seconds;
1168         t->tv_nsec = 0;
1169         *t = timespec_trunc(*t, get_sb_time_gran(inode->i_sb));
1170         return t;
1171 }
1172
1173 static int osd_inode_setattr(const struct lu_env *env,
1174                              struct inode *inode, const struct lu_attr *attr)
1175 {
1176         __u64 bits;
1177
1178         bits = attr->la_valid;
1179
1180         LASSERT(!(bits & LA_TYPE)); /* Huh? You want too much. */
1181
1182 #ifdef HAVE_QUOTA_SUPPORT
1183         if ((bits & LA_UID && attr->la_uid != inode->i_uid) ||
1184             (bits & LA_GID && attr->la_gid != inode->i_gid)) {
1185                 struct osd_ctxt *save = &osd_oti_get(env)->oti_ctxt;
1186                 struct iattr iattr;
1187                 int rc;
1188
1189                 iattr.ia_valid = bits & (LA_UID | LA_GID);
1190                 iattr.ia_uid = attr->la_uid;
1191                 iattr.ia_gid = attr->la_gid;
1192                 osd_push_ctxt(env, save);
1193                 rc = DQUOT_TRANSFER(inode, &iattr) ? -EDQUOT : 0;
1194                 osd_pop_ctxt(save);
1195                 if (rc != 0)
1196                         return rc;
1197         }
1198 #endif
1199
1200         if (bits & LA_ATIME)
1201                 inode->i_atime  = *osd_inode_time(env, inode, attr->la_atime);
1202         if (bits & LA_CTIME)
1203                 inode->i_ctime  = *osd_inode_time(env, inode, attr->la_ctime);
1204         if (bits & LA_MTIME)
1205                 inode->i_mtime  = *osd_inode_time(env, inode, attr->la_mtime);
1206         if (bits & LA_SIZE) {
1207                 LDISKFS_I(inode)->i_disksize = attr->la_size;
1208                 i_size_write(inode, attr->la_size);
1209         }
1210 # if 0
1211         /*
1212          * OSD should not change "i_blocks" which is used by quota.
1213          * "i_blocks" should be changed by ldiskfs only.
1214          * Disable this assignment until SOM to fix some EA field. */
1215         if (bits & LA_BLOCKS)
1216                 inode->i_blocks = attr->la_blocks;
1217 #endif
1218         if (bits & LA_MODE)
1219                 inode->i_mode   = (inode->i_mode & S_IFMT) |
1220                         (attr->la_mode & ~S_IFMT);
1221         if (bits & LA_UID)
1222                 inode->i_uid    = attr->la_uid;
1223         if (bits & LA_GID)
1224                 inode->i_gid    = attr->la_gid;
1225         if (bits & LA_NLINK)
1226                 inode->i_nlink  = attr->la_nlink;
1227         if (bits & LA_RDEV)
1228                 inode->i_rdev   = attr->la_rdev;
1229
1230         if (bits & LA_FLAGS) {
1231                 struct ldiskfs_inode_info *li = LDISKFS_I(inode);
1232
1233                 li->i_flags = (li->i_flags & ~LDISKFS_FL_USER_MODIFIABLE) |
1234                         (attr->la_flags & LDISKFS_FL_USER_MODIFIABLE);
1235         }
1236         return 0;
1237 }
1238
1239 /*
1240  * Object creation.
1241  *
1242  * XXX temporary solution.
1243  */
1244
1245 static int osd_create_pre(struct osd_thread_info *info, struct osd_object *obj,
1246                           struct lu_attr *attr, struct thandle *th)
1247 {
1248         return 0;
1249 }
1250
1251 static int osd_create_post(struct osd_thread_info *info, struct osd_object *obj,
1252                            struct lu_attr *attr, struct thandle *th)
1253 {
1254         LASSERT(obj->oo_inode != NULL);
1255
1256         osd_object_init0(obj);
1257         return 0;
1258 }
1259
1260 extern struct inode *ldiskfs_create_inode(handle_t *handle,
1261                                           struct inode * dir, int mode);
1262
1263 static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
1264                       umode_t mode,
1265                       struct dt_allocation_hint *hint,
1266                       struct thandle *th)
1267 {
1268         int result;
1269         struct osd_device  *osd = osd_obj2dev(obj);
1270         struct osd_thandle *oth;
1271         struct inode       *parent;
1272         struct inode       *inode;
1273 #ifdef HAVE_QUOTA_SUPPORT
1274         struct osd_ctxt    *save = &info->oti_ctxt;
1275 #endif
1276
1277         LINVRNT(osd_invariant(obj));
1278         LASSERT(obj->oo_inode == NULL);
1279         LASSERT(osd->od_obj_area != NULL);
1280
1281         oth = container_of(th, struct osd_thandle, ot_super);
1282         LASSERT(oth->ot_handle->h_transaction != NULL);
1283
1284         if (hint && hint->dah_parent)
1285                 parent = osd_dt_obj(hint->dah_parent)->oo_inode;
1286         else
1287                 parent = osd->od_obj_area->d_inode;
1288         LASSERT(parent->i_op != NULL);
1289
1290 #ifdef HAVE_QUOTA_SUPPORT
1291         osd_push_ctxt(info->oti_env, save);
1292 #endif
1293         inode = ldiskfs_create_inode(oth->ot_handle, parent, mode);
1294 #ifdef HAVE_QUOTA_SUPPORT
1295         osd_pop_ctxt(save);
1296 #endif
1297         if (!IS_ERR(inode)) {
1298                 obj->oo_inode = inode;
1299                 result = 0;
1300         } else
1301                 result = PTR_ERR(inode);
1302         LINVRNT(osd_invariant(obj));
1303         return result;
1304 }
1305
1306
1307 extern int iam_lvar_create(struct inode *obj, int keysize, int ptrsize,
1308                            int recsize, handle_t *handle);
1309
1310 enum {
1311         OSD_NAME_LEN = 255
1312 };
1313
1314 static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj,
1315                      struct lu_attr *attr,
1316                      struct dt_allocation_hint *hint,
1317                      struct thandle *th)
1318 {
1319         int result;
1320         struct osd_thandle *oth;
1321
1322         LASSERT(S_ISDIR(attr->la_mode));
1323
1324         oth = container_of(th, struct osd_thandle, ot_super);
1325         LASSERT(oth->ot_handle->h_transaction != NULL);
1326         result = osd_mkfile(info, obj, (attr->la_mode &
1327                             (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1328         if (result == 0) {
1329                 LASSERT(obj->oo_inode != NULL);
1330                 /*
1331                  * XXX uh-oh... call low-level iam function directly.
1332                  */
1333                 result = iam_lvar_create(obj->oo_inode, OSD_NAME_LEN, 4,
1334                                          sizeof (struct lu_fid_pack),
1335                                          oth->ot_handle);
1336         }
1337         return result;
1338 }
1339
1340 static int osd_mkreg(struct osd_thread_info *info, struct osd_object *obj,
1341                      struct lu_attr *attr,
1342                      struct dt_allocation_hint *hint,
1343                      struct thandle *th)
1344 {
1345         LASSERT(S_ISREG(attr->la_mode));
1346         return osd_mkfile(info, obj, (attr->la_mode &
1347                                (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1348 }
1349
1350 static int osd_mksym(struct osd_thread_info *info, struct osd_object *obj,
1351                      struct lu_attr *attr,
1352                      struct dt_allocation_hint *hint,
1353                      struct thandle *th)
1354 {
1355         LASSERT(S_ISLNK(attr->la_mode));
1356         return osd_mkfile(info, obj, (attr->la_mode &
1357                               (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1358 }
1359
1360 static int osd_mknod(struct osd_thread_info *info, struct osd_object *obj,
1361                      struct lu_attr *attr,
1362                      struct dt_allocation_hint *hint,
1363                      struct thandle *th)
1364 {
1365         int result;
1366         struct osd_device *osd = osd_obj2dev(obj);
1367         struct inode      *dir;
1368         umode_t mode = attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX);
1369
1370         LINVRNT(osd_invariant(obj));
1371         LASSERT(obj->oo_inode == NULL);
1372         LASSERT(osd->od_obj_area != NULL);
1373         LASSERT(S_ISCHR(mode) || S_ISBLK(mode) ||
1374                 S_ISFIFO(mode) || S_ISSOCK(mode));
1375
1376         dir = osd->od_obj_area->d_inode;
1377         LASSERT(dir->i_op != NULL);
1378
1379         result = osd_mkfile(info, obj, mode, hint, th);
1380         if (result == 0) {
1381                 LASSERT(obj->oo_inode != NULL);
1382                 init_special_inode(obj->oo_inode, mode, attr->la_rdev);
1383         }
1384         LINVRNT(osd_invariant(obj));
1385         return result;
1386 }
1387
1388 typedef int (*osd_obj_type_f)(struct osd_thread_info *, struct osd_object *,
1389                               struct lu_attr *,
1390                               struct dt_allocation_hint *hint,
1391                               struct thandle *);
1392
1393 static osd_obj_type_f osd_create_type_f(__u32 mode)
1394 {
1395         osd_obj_type_f result;
1396
1397         switch (mode) {
1398         case S_IFDIR:
1399                 result = osd_mkdir;
1400                 break;
1401         case S_IFREG:
1402                 result = osd_mkreg;
1403                 break;
1404         case S_IFLNK:
1405                 result = osd_mksym;
1406                 break;
1407         case S_IFCHR:
1408         case S_IFBLK:
1409         case S_IFIFO:
1410         case S_IFSOCK:
1411                 result = osd_mknod;
1412                 break;
1413         default:
1414                 LBUG();
1415                 break;
1416         }
1417         return result;
1418 }
1419
1420
1421 static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah,
1422                         struct dt_object *parent, umode_t child_mode)
1423 {
1424         LASSERT(ah);
1425
1426         memset(ah, 0, sizeof(*ah));
1427         ah->dah_parent = parent;
1428         ah->dah_mode = child_mode;
1429 }
1430
1431
1432 /*
1433  * Concurrency: @dt is write locked.
1434  */
1435 static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
1436                              struct lu_attr *attr,
1437                              struct dt_allocation_hint *hint,
1438                              struct thandle *th)
1439 {
1440         const struct lu_fid    *fid  = lu_object_fid(&dt->do_lu);
1441         struct osd_object      *obj  = osd_dt_obj(dt);
1442         struct osd_device      *osd  = osd_obj2dev(obj);
1443         struct osd_thread_info *info = osd_oti_get(env);
1444         int result;
1445
1446         ENTRY;
1447
1448         LINVRNT(osd_invariant(obj));
1449         LASSERT(!dt_object_exists(dt));
1450         LASSERT(osd_write_locked(env, obj));
1451         LASSERT(th != NULL);
1452
1453         /*
1454          * XXX missing: Quote handling.
1455          */
1456
1457         result = osd_create_pre(info, obj, attr, th);
1458         if (result == 0) {
1459                 result = osd_create_type_f(attr->la_mode & S_IFMT)(info, obj,
1460                                                                 attr, hint, th);
1461                 if (result == 0)
1462                         result = osd_create_post(info, obj, attr, th);
1463         }
1464         if (result == 0) {
1465                 struct osd_inode_id *id = &info->oti_id;
1466                 struct md_ucred     *uc = md_ucred(env);
1467
1468                 LASSERT(obj->oo_inode != NULL);
1469                 LASSERT(uc != NULL);
1470
1471                 id->oii_ino = obj->oo_inode->i_ino;
1472                 id->oii_gen = obj->oo_inode->i_generation;
1473
1474                 result = osd_oi_insert(info, &osd->od_oi, fid, id, th,
1475                                        uc->mu_cap & CFS_CAP_SYS_RESOURCE_MASK);
1476         }
1477
1478         LASSERT(ergo(result == 0, dt_object_exists(dt)));
1479         LINVRNT(osd_invariant(obj));
1480         RETURN(result);
1481 }
1482
1483 /*
1484  * Concurrency: @dt is write locked.
1485  */
1486 static void osd_object_ref_add(const struct lu_env *env,
1487                                struct dt_object *dt,
1488                                struct thandle *th)
1489 {
1490         struct osd_object *obj = osd_dt_obj(dt);
1491         struct inode *inode = obj->oo_inode;
1492
1493         LINVRNT(osd_invariant(obj));
1494         LASSERT(dt_object_exists(dt));
1495         LASSERT(osd_write_locked(env, obj));
1496         LASSERT(th != NULL);
1497
1498         spin_lock(&obj->oo_guard);
1499         LASSERT(inode->i_nlink < LDISKFS_LINK_MAX);
1500         inode->i_nlink++;
1501         spin_unlock(&obj->oo_guard);
1502         mark_inode_dirty(inode);
1503         LINVRNT(osd_invariant(obj));
1504 }
1505
1506 /*
1507  * Concurrency: @dt is write locked.
1508  */
1509 static void osd_object_ref_del(const struct lu_env *env,
1510                                struct dt_object *dt,
1511                                struct thandle *th)
1512 {
1513         struct osd_object *obj = osd_dt_obj(dt);
1514         struct inode *inode = obj->oo_inode;
1515
1516         LINVRNT(osd_invariant(obj));
1517         LASSERT(dt_object_exists(dt));
1518         LASSERT(osd_write_locked(env, obj));
1519         LASSERT(th != NULL);
1520
1521         spin_lock(&obj->oo_guard);
1522         LASSERT(inode->i_nlink > 0);
1523         inode->i_nlink--;
1524         spin_unlock(&obj->oo_guard);
1525         mark_inode_dirty(inode);
1526         LINVRNT(osd_invariant(obj));
1527 }
1528
1529 /*
1530  * Concurrency: @dt is read locked.
1531  */
1532 static int osd_xattr_get(const struct lu_env *env,
1533                          struct dt_object *dt,
1534                          struct lu_buf *buf,
1535                          const char *name,
1536                          struct lustre_capa *capa)
1537 {
1538         struct osd_object      *obj    = osd_dt_obj(dt);
1539         struct inode           *inode  = obj->oo_inode;
1540         struct osd_thread_info *info   = osd_oti_get(env);
1541         struct dentry          *dentry = &info->oti_dentry;
1542
1543         LASSERT(dt_object_exists(dt));
1544         LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
1545         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
1546
1547         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1548                 return -EACCES;
1549
1550         dentry->d_inode = inode;
1551         return inode->i_op->getxattr(dentry, name, buf->lb_buf, buf->lb_len);
1552 }
1553
1554 /*
1555  * Concurrency: @dt is write locked.
1556  */
1557 static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
1558                          const struct lu_buf *buf, const char *name, int fl,
1559                          struct thandle *handle, struct lustre_capa *capa)
1560 {
1561         struct osd_object      *obj    = osd_dt_obj(dt);
1562         struct inode           *inode  = obj->oo_inode;
1563         struct osd_thread_info *info   = osd_oti_get(env);
1564         struct dentry          *dentry = &info->oti_dentry;
1565         struct timespec        *t      = &info->oti_time;
1566         int                     fs_flags = 0, rc;
1567
1568         LASSERT(dt_object_exists(dt));
1569         LASSERT(inode->i_op != NULL && inode->i_op->setxattr != NULL);
1570         LASSERT(osd_write_locked(env, obj));
1571         LASSERT(handle != NULL);
1572
1573         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1574                 return -EACCES;
1575
1576         if (fl & LU_XATTR_REPLACE)
1577                 fs_flags |= XATTR_REPLACE;
1578
1579         if (fl & LU_XATTR_CREATE)
1580                 fs_flags |= XATTR_CREATE;
1581
1582         dentry->d_inode = inode;
1583         *t = inode->i_ctime;
1584         rc = inode->i_op->setxattr(dentry, name,
1585                                    buf->lb_buf, buf->lb_len, fs_flags);
1586         if (likely(rc == 0)) {
1587                 /* ctime should not be updated with server-side time. */
1588                 spin_lock(&obj->oo_guard);
1589                 inode->i_ctime = *t;
1590                 spin_unlock(&obj->oo_guard);
1591                 mark_inode_dirty(inode);
1592         }
1593         return rc;
1594 }
1595
1596 /*
1597  * Concurrency: @dt is read locked.
1598  */
1599 static int osd_xattr_list(const struct lu_env *env,
1600                           struct dt_object *dt,
1601                           struct lu_buf *buf,
1602                           struct lustre_capa *capa)
1603 {
1604         struct osd_object      *obj    = osd_dt_obj(dt);
1605         struct inode           *inode  = obj->oo_inode;
1606         struct osd_thread_info *info   = osd_oti_get(env);
1607         struct dentry          *dentry = &info->oti_dentry;
1608
1609         LASSERT(dt_object_exists(dt));
1610         LASSERT(inode->i_op != NULL && inode->i_op->listxattr != NULL);
1611         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
1612
1613         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1614                 return -EACCES;
1615
1616         dentry->d_inode = inode;
1617         return inode->i_op->listxattr(dentry, buf->lb_buf, buf->lb_len);
1618 }
1619
1620 /*
1621  * Concurrency: @dt is write locked.
1622  */
1623 static int osd_xattr_del(const struct lu_env *env,
1624                          struct dt_object *dt,
1625                          const char *name,
1626                          struct thandle *handle,
1627                          struct lustre_capa *capa)
1628 {
1629         struct osd_object      *obj    = osd_dt_obj(dt);
1630         struct inode           *inode  = obj->oo_inode;
1631         struct osd_thread_info *info   = osd_oti_get(env);
1632         struct dentry          *dentry = &info->oti_dentry;
1633         struct timespec        *t      = &info->oti_time;
1634         int                     rc;
1635
1636         LASSERT(dt_object_exists(dt));
1637         LASSERT(inode->i_op != NULL && inode->i_op->removexattr != NULL);
1638         LASSERT(osd_write_locked(env, obj));
1639         LASSERT(handle != NULL);
1640
1641         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1642                 return -EACCES;
1643
1644         dentry->d_inode = inode;
1645         *t = inode->i_ctime;
1646         rc = inode->i_op->removexattr(dentry, name);
1647         if (likely(rc == 0)) {
1648                 /* ctime should not be updated with server-side time. */
1649                 spin_lock(&obj->oo_guard);
1650                 inode->i_ctime = *t;
1651                 spin_unlock(&obj->oo_guard);
1652                 mark_inode_dirty(inode);
1653         }
1654         return rc;
1655 }
1656
1657 static struct obd_capa *osd_capa_get(const struct lu_env *env,
1658                                      struct dt_object *dt,
1659                                      struct lustre_capa *old,
1660                                      __u64 opc)
1661 {
1662         struct osd_thread_info *info = osd_oti_get(env);
1663         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
1664         struct osd_object *obj = osd_dt_obj(dt);
1665         struct osd_device *dev = osd_obj2dev(obj);
1666         struct lustre_capa_key *key = &info->oti_capa_key;
1667         struct lustre_capa *capa = &info->oti_capa;
1668         struct obd_capa *oc;
1669         struct md_capainfo *ci;
1670         int rc;
1671         ENTRY;
1672
1673         if (!dev->od_fl_capa)
1674                 RETURN(ERR_PTR(-ENOENT));
1675
1676         LASSERT(dt_object_exists(dt));
1677         LINVRNT(osd_invariant(obj));
1678
1679         /* renewal sanity check */
1680         if (old && osd_object_auth(env, dt, old, opc))
1681                 RETURN(ERR_PTR(-EACCES));
1682
1683         ci = md_capainfo(env);
1684         if (unlikely(!ci))
1685                 RETURN(ERR_PTR(-ENOENT));
1686
1687         switch (ci->mc_auth) {
1688         case LC_ID_NONE:
1689                 RETURN(NULL);
1690         case LC_ID_PLAIN:
1691                 capa->lc_uid = obj->oo_inode->i_uid;
1692                 capa->lc_gid = obj->oo_inode->i_gid;
1693                 capa->lc_flags = LC_ID_PLAIN;
1694                 break;
1695         case LC_ID_CONVERT: {
1696                 __u32 d[4], s[4];
1697
1698                 s[0] = obj->oo_inode->i_uid;
1699                 get_random_bytes(&(s[1]), sizeof(__u32));
1700                 s[2] = obj->oo_inode->i_gid;
1701                 get_random_bytes(&(s[3]), sizeof(__u32));
1702                 rc = capa_encrypt_id(d, s, key->lk_key, CAPA_HMAC_KEY_MAX_LEN);
1703                 if (unlikely(rc))
1704                         RETURN(ERR_PTR(rc));
1705
1706                 capa->lc_uid   = ((__u64)d[1] << 32) | d[0];
1707                 capa->lc_gid   = ((__u64)d[3] << 32) | d[2];
1708                 capa->lc_flags = LC_ID_CONVERT;
1709                 break;
1710         }
1711         default:
1712                 RETURN(ERR_PTR(-EINVAL));
1713         }
1714
1715         capa->lc_fid = *fid;
1716         capa->lc_opc = opc;
1717         capa->lc_flags |= dev->od_capa_alg << 24;
1718         capa->lc_timeout = dev->od_capa_timeout;
1719         capa->lc_expiry = 0;
1720
1721         oc = capa_lookup(dev->od_capa_hash, capa, 1);
1722         if (oc) {
1723                 LASSERT(!capa_is_expired(oc));
1724                 RETURN(oc);
1725         }
1726
1727         spin_lock(&capa_lock);
1728         *key = dev->od_capa_keys[1];
1729         spin_unlock(&capa_lock);
1730
1731         capa->lc_keyid = key->lk_keyid;
1732         capa->lc_expiry = cfs_time_current_sec() + dev->od_capa_timeout;
1733
1734         rc = capa_hmac(capa->lc_hmac, capa, key->lk_key);
1735         if (rc) {
1736                 DEBUG_CAPA(D_ERROR, capa, "HMAC failed: %d for", rc);
1737                 RETURN(ERR_PTR(rc));
1738         }
1739
1740         oc = capa_add(dev->od_capa_hash, capa);
1741         RETURN(oc);
1742 }
1743
1744 static int osd_object_sync(const struct lu_env *env, struct dt_object *dt)
1745 {
1746         int rc;
1747         struct osd_object      *obj    = osd_dt_obj(dt);
1748         struct inode           *inode  = obj->oo_inode;
1749         struct osd_thread_info *info   = osd_oti_get(env);
1750         struct dentry          *dentry = &info->oti_dentry;
1751         struct file            *file   = &info->oti_file;
1752         ENTRY;
1753
1754         dentry->d_inode = inode;
1755         file->f_dentry = dentry;
1756         file->f_mapping = inode->i_mapping;
1757         file->f_op = inode->i_fop;
1758         LOCK_INODE_MUTEX(inode);
1759         rc = file->f_op->fsync(file, dentry, 0);
1760         UNLOCK_INODE_MUTEX(inode);
1761         RETURN(rc);
1762 }
1763
1764 static const struct dt_object_operations osd_obj_ops = {
1765         .do_read_lock    = osd_object_read_lock,
1766         .do_write_lock   = osd_object_write_lock,
1767         .do_read_unlock  = osd_object_read_unlock,
1768         .do_write_unlock = osd_object_write_unlock,
1769         .do_attr_get     = osd_attr_get,
1770         .do_attr_set     = osd_attr_set,
1771         .do_ah_init      = osd_ah_init,
1772         .do_create       = osd_object_create,
1773         .do_index_try    = osd_index_try,
1774         .do_ref_add      = osd_object_ref_add,
1775         .do_ref_del      = osd_object_ref_del,
1776         .do_xattr_get    = osd_xattr_get,
1777         .do_xattr_set    = osd_xattr_set,
1778         .do_xattr_del    = osd_xattr_del,
1779         .do_xattr_list   = osd_xattr_list,
1780         .do_capa_get     = osd_capa_get,
1781         .do_object_sync  = osd_object_sync,
1782 };
1783
1784 /*
1785  * Body operations.
1786  */
1787
1788 /*
1789  * XXX: Another layering violation for now.
1790  *
1791  * We don't want to use ->f_op->read methods, because generic file write
1792  *
1793  *         - serializes on ->i_sem, and
1794  *
1795  *         - does a lot of extra work like balance_dirty_pages(),
1796  *
1797  * which doesn't work for globally shared files like /last-received.
1798  */
1799 int fsfilt_ldiskfs_read(struct inode *inode, void *buf, int size, loff_t *offs);
1800 int fsfilt_ldiskfs_write_handle(struct inode *inode, void *buf, int bufsize,
1801                                 loff_t *offs, handle_t *handle);
1802
1803 static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt,
1804                         struct lu_buf *buf, loff_t *pos,
1805                         struct lustre_capa *capa)
1806 {
1807         struct inode *inode = osd_dt_obj(dt)->oo_inode;
1808
1809         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
1810                 RETURN(-EACCES);
1811
1812         return fsfilt_ldiskfs_read(inode, buf->lb_buf, buf->lb_len, pos);
1813 }
1814
1815 static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
1816                          const struct lu_buf *buf, loff_t *pos,
1817                          struct thandle *handle, struct lustre_capa *capa,
1818                          int ignore_quota)
1819 {
1820         struct inode       *inode = osd_dt_obj(dt)->oo_inode;
1821         struct osd_thandle *oh;
1822         ssize_t             result;
1823 #ifdef HAVE_QUOTA_SUPPORT
1824         cfs_cap_t           save = current->cap_effective;
1825 #endif
1826
1827         LASSERT(handle != NULL);
1828
1829         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_WRITE))
1830                 RETURN(-EACCES);
1831
1832         oh = container_of(handle, struct osd_thandle, ot_super);
1833         LASSERT(oh->ot_handle->h_transaction != NULL);
1834 #ifdef HAVE_QUOTA_SUPPORT
1835         if (ignore_quota)
1836                 current->cap_effective |= CFS_CAP_SYS_RESOURCE_MASK;
1837         else
1838                 current->cap_effective &= ~CFS_CAP_SYS_RESOURCE_MASK;
1839 #endif
1840         result = fsfilt_ldiskfs_write_handle(inode, buf->lb_buf, buf->lb_len,
1841                                              pos, oh->ot_handle);
1842 #ifdef HAVE_QUOTA_SUPPORT
1843         current->cap_effective = save;
1844 #endif
1845         if (result == 0)
1846                 result = buf->lb_len;
1847         return result;
1848 }
1849
1850 static const struct dt_body_operations osd_body_ops = {
1851         .dbo_read  = osd_read,
1852         .dbo_write = osd_write
1853 };
1854
1855 /*
1856  * Index operations.
1857  */
1858
1859 static int osd_object_is_root(const struct osd_object *obj)
1860 {
1861         return osd_sb(osd_obj2dev(obj))->s_root->d_inode == obj->oo_inode;
1862 }
1863
1864 static int osd_index_probe(const struct lu_env *env, struct osd_object *o,
1865                            const struct dt_index_features *feat)
1866 {
1867         struct iam_descr *descr;
1868
1869         if (osd_object_is_root(o))
1870                 return feat == &dt_directory_features;
1871
1872         LASSERT(o->oo_dir != NULL);
1873
1874         descr = o->oo_dir->od_container.ic_descr;
1875         if (feat == &dt_directory_features)
1876                 return descr == &iam_htree_compat_param ||
1877                         (descr->id_rec_size == sizeof(struct lu_fid_pack) &&
1878                          1 /*
1879                             * XXX check that index looks like directory.
1880                             */
1881                                 );
1882         else
1883                 return
1884                         feat->dif_keysize_min <= descr->id_key_size &&
1885                         descr->id_key_size <= feat->dif_keysize_max &&
1886                         feat->dif_recsize_min <= descr->id_rec_size &&
1887                         descr->id_rec_size <= feat->dif_recsize_max &&
1888                         !(feat->dif_flags & (DT_IND_VARKEY |
1889                                              DT_IND_VARREC | DT_IND_NONUNQ)) &&
1890                         ergo(feat->dif_flags & DT_IND_UPDATE,
1891                              1 /* XXX check that object (and file system) is
1892                                 * writable */);
1893 }
1894
1895 static int osd_container_init(const struct lu_env *env,
1896                               struct osd_object *obj,
1897                               struct osd_directory *dir)
1898 {
1899         int result;
1900         struct iam_container *bag;
1901
1902         bag    = &dir->od_container;
1903         result = iam_container_init(bag, &dir->od_descr, obj->oo_inode);
1904         if (result == 0) {
1905                 result = iam_container_setup(bag);
1906                 if (result == 0)
1907                         obj->oo_dt.do_index_ops = &osd_index_ops;
1908                 else
1909                         iam_container_fini(bag);
1910         }
1911         return result;
1912 }
1913
1914 /*
1915  * Concurrency: no external locking is necessary.
1916  */
1917 static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
1918                          const struct dt_index_features *feat)
1919 {
1920         int result;
1921         struct osd_object *obj = osd_dt_obj(dt);
1922
1923         LINVRNT(osd_invariant(obj));
1924         LASSERT(dt_object_exists(dt));
1925
1926         if (osd_object_is_root(obj)) {
1927                 dt->do_index_ops = &osd_index_compat_ops;
1928                 result = 0;
1929         } else if (!osd_has_index(obj)) {
1930                 struct osd_directory *dir;
1931
1932                 OBD_ALLOC_PTR(dir);
1933                 if (dir != NULL) {
1934                         sema_init(&dir->od_sem, 1);
1935
1936                         spin_lock(&obj->oo_guard);
1937                         if (obj->oo_dir == NULL)
1938                                 obj->oo_dir = dir;
1939                         else
1940                                 /*
1941                                  * Concurrent thread allocated container data.
1942                                  */
1943                                 OBD_FREE_PTR(dir);
1944                         spin_unlock(&obj->oo_guard);
1945                         /*
1946                          * Now, that we have container data, serialize its
1947                          * initialization.
1948                          */
1949                         down(&obj->oo_dir->od_sem);
1950                         /*
1951                          * recheck under lock.
1952                          */
1953                         if (!osd_has_index(obj))
1954                                 result = osd_container_init(env, obj, dir);
1955                         else
1956                                 result = 0;
1957                         up(&obj->oo_dir->od_sem);
1958                 } else
1959                         result = -ENOMEM;
1960         } else
1961                 result = 0;
1962
1963         if (result == 0) {
1964                 if (!osd_index_probe(env, obj, feat))
1965                         result = -ENOTDIR;
1966         }
1967         LINVRNT(osd_invariant(obj));
1968
1969         return result;
1970 }
1971
1972 static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
1973                             const struct dt_key *key, struct thandle *handle,
1974                             struct lustre_capa *capa)
1975 {
1976         struct osd_object     *obj = osd_dt_obj(dt);
1977         struct osd_thandle    *oh;
1978         struct iam_path_descr *ipd;
1979         struct iam_container  *bag = &obj->oo_dir->od_container;
1980         int rc;
1981
1982         ENTRY;
1983
1984         LINVRNT(osd_invariant(obj));
1985         LASSERT(dt_object_exists(dt));
1986         LASSERT(bag->ic_object == obj->oo_inode);
1987         LASSERT(handle != NULL);
1988
1989         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
1990                 RETURN(-EACCES);
1991
1992         ipd = osd_ipd_get(env, bag);
1993         if (unlikely(ipd == NULL))
1994                 RETURN(-ENOMEM);
1995
1996         oh = container_of0(handle, struct osd_thandle, ot_super);
1997         LASSERT(oh->ot_handle != NULL);
1998         LASSERT(oh->ot_handle->h_transaction != NULL);
1999
2000         rc = iam_delete(oh->ot_handle, bag, (const struct iam_key *)key, ipd);
2001         osd_ipd_put(env, bag, ipd);
2002         LINVRNT(osd_invariant(obj));
2003         RETURN(rc);
2004 }
2005
2006 static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
2007                             struct dt_rec *rec, const struct dt_key *key,
2008                             struct lustre_capa *capa)
2009 {
2010         struct osd_object     *obj = osd_dt_obj(dt);
2011         struct iam_path_descr *ipd;
2012         struct iam_container  *bag = &obj->oo_dir->od_container;
2013         int rc;
2014
2015         ENTRY;
2016
2017         LINVRNT(osd_invariant(obj));
2018         LASSERT(dt_object_exists(dt));
2019         LASSERT(bag->ic_object == obj->oo_inode);
2020
2021         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
2022                 return -EACCES;
2023
2024         ipd = osd_ipd_get(env, bag);
2025         if (unlikely(ipd == NULL))
2026                 RETURN(-ENOMEM);
2027
2028         rc = iam_lookup(bag, (const struct iam_key *)key,
2029                         (struct iam_rec *)rec, ipd);
2030         osd_ipd_put(env, bag, ipd);
2031         LINVRNT(osd_invariant(obj));
2032
2033         RETURN(rc);
2034 }
2035
2036 static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
2037                             const struct dt_rec *rec, const struct dt_key *key,
2038                             struct thandle *th, struct lustre_capa *capa,
2039                             int ignore_quota)
2040 {
2041         struct osd_object     *obj = osd_dt_obj(dt);
2042         struct iam_path_descr *ipd;
2043         struct osd_thandle    *oh;
2044         struct iam_container  *bag = &obj->oo_dir->od_container;
2045 #ifdef HAVE_QUOTA_SUPPORT
2046         cfs_cap_t              save = current->cap_effective;
2047 #endif
2048         int rc;
2049
2050         ENTRY;
2051
2052         LINVRNT(osd_invariant(obj));
2053         LASSERT(dt_object_exists(dt));
2054         LASSERT(bag->ic_object == obj->oo_inode);
2055         LASSERT(th != NULL);
2056
2057         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
2058                 return -EACCES;
2059
2060         ipd = osd_ipd_get(env, bag);
2061         if (unlikely(ipd == NULL))
2062                 RETURN(-ENOMEM);
2063
2064         oh = container_of0(th, struct osd_thandle, ot_super);
2065         LASSERT(oh->ot_handle != NULL);
2066         LASSERT(oh->ot_handle->h_transaction != NULL);
2067 #ifdef HAVE_QUOTA_SUPPORT
2068         if (ignore_quota)
2069                 current->cap_effective |= CFS_CAP_SYS_RESOURCE_MASK;
2070         else
2071                 current->cap_effective &= ~CFS_CAP_SYS_RESOURCE_MASK;
2072 #endif
2073         rc = iam_insert(oh->ot_handle, bag, (const struct iam_key *)key,
2074                         (struct iam_rec *)rec, ipd);
2075 #ifdef HAVE_QUOTA_SUPPORT
2076         current->cap_effective = save;
2077 #endif
2078         osd_ipd_put(env, bag, ipd);
2079         LINVRNT(osd_invariant(obj));
2080         RETURN(rc);
2081 }
2082
2083 /*
2084  * Iterator operations.
2085  */
2086 struct osd_it {
2087         struct osd_object     *oi_obj;
2088         struct iam_path_descr *oi_ipd;
2089         struct iam_iterator    oi_it;
2090 };
2091
2092 static struct dt_it *osd_it_init(const struct lu_env *env,
2093                                  struct dt_object *dt, int writable,
2094                                  struct lustre_capa *capa)
2095 {
2096         struct osd_it         *it;
2097         struct osd_object     *obj = osd_dt_obj(dt);
2098         struct lu_object      *lo  = &dt->do_lu;
2099         struct iam_path_descr *ipd;
2100         struct iam_container  *bag = &obj->oo_dir->od_container;
2101         __u32                  flags;
2102
2103         LASSERT(lu_object_exists(lo));
2104
2105         if (osd_object_auth(env, dt, capa, writable ? CAPA_OPC_BODY_WRITE :
2106                             CAPA_OPC_BODY_READ))
2107                 return ERR_PTR(-EACCES);
2108
2109         flags = writable ? IAM_IT_MOVE|IAM_IT_WRITE : IAM_IT_MOVE;
2110         OBD_ALLOC_PTR(it);
2111         if (it != NULL) {
2112                 /*
2113                  * XXX: as ipd is allocated within osd_thread_info, assignment
2114                  * below implies that iterator usage is confined within single
2115                  * environment.
2116                  */
2117                 ipd = osd_ipd_get(env, bag);
2118                 if (likely(ipd != NULL)) {
2119                         it->oi_obj = obj;
2120                         it->oi_ipd = ipd;
2121                         lu_object_get(lo);
2122                         iam_it_init(&it->oi_it, bag, flags, ipd);
2123                         return (struct dt_it *)it;
2124                 } else
2125                         OBD_FREE_PTR(it);
2126         }
2127         return ERR_PTR(-ENOMEM);
2128 }
2129
2130 static void osd_it_fini(const struct lu_env *env, struct dt_it *di)
2131 {
2132         struct osd_it     *it = (struct osd_it *)di;
2133         struct osd_object *obj = it->oi_obj;
2134
2135         iam_it_fini(&it->oi_it);
2136         osd_ipd_put(env, &obj->oo_dir->od_container, it->oi_ipd);
2137         lu_object_put(env, &obj->oo_dt.do_lu);
2138         OBD_FREE_PTR(it);
2139 }
2140
2141 static int osd_it_get(const struct lu_env *env,
2142                       struct dt_it *di, const struct dt_key *key)
2143 {
2144         struct osd_it *it = (struct osd_it *)di;
2145
2146         return iam_it_get(&it->oi_it, (const struct iam_key *)key);
2147 }
2148
2149 static void osd_it_put(const struct lu_env *env, struct dt_it *di)
2150 {
2151         struct osd_it *it = (struct osd_it *)di;
2152
2153         iam_it_put(&it->oi_it);
2154 }
2155
2156 static int osd_it_next(const struct lu_env *env, struct dt_it *di)
2157 {
2158         struct osd_it *it = (struct osd_it *)di;
2159
2160         return iam_it_next(&it->oi_it);
2161 }
2162
2163 static int osd_it_del(const struct lu_env *env, struct dt_it *di,
2164                       struct thandle *th)
2165 {
2166         struct osd_it      *it = (struct osd_it *)di;
2167         struct osd_thandle *oh;
2168
2169         LASSERT(th != NULL);
2170
2171         oh = container_of0(th, struct osd_thandle, ot_super);
2172         LASSERT(oh->ot_handle != NULL);
2173         LASSERT(oh->ot_handle->h_transaction != NULL);
2174
2175         return iam_it_rec_delete(oh->ot_handle, &it->oi_it);
2176 }
2177
2178 static struct dt_key *osd_it_key(const struct lu_env *env,
2179                                  const struct dt_it *di)
2180 {
2181         struct osd_it *it = (struct osd_it *)di;
2182
2183         return (struct dt_key *)iam_it_key_get(&it->oi_it);
2184 }
2185
2186 static int osd_it_key_size(const struct lu_env *env, const struct dt_it *di)
2187 {
2188         struct osd_it *it = (struct osd_it *)di;
2189
2190         return iam_it_key_size(&it->oi_it);
2191 }
2192
2193 static struct dt_rec *osd_it_rec(const struct lu_env *env,
2194                                  const struct dt_it *di)
2195 {
2196         struct osd_it *it = (struct osd_it *)di;
2197
2198         return (struct dt_rec *)iam_it_rec_get(&it->oi_it);
2199 }
2200
2201 static __u64 osd_it_store(const struct lu_env *env, const struct dt_it *di)
2202 {
2203         struct osd_it *it = (struct osd_it *)di;
2204
2205         return iam_it_store(&it->oi_it);
2206 }
2207
2208 static int osd_it_load(const struct lu_env *env,
2209                        const struct dt_it *di, __u64 hash)
2210 {
2211         struct osd_it *it = (struct osd_it *)di;
2212
2213         return iam_it_load(&it->oi_it, hash);
2214 }
2215
2216 static const struct dt_index_operations osd_index_ops = {
2217         .dio_lookup = osd_index_lookup,
2218         .dio_insert = osd_index_insert,
2219         .dio_delete = osd_index_delete,
2220         .dio_it     = {
2221                 .init     = osd_it_init,
2222                 .fini     = osd_it_fini,
2223                 .get      = osd_it_get,
2224                 .put      = osd_it_put,
2225                 .del      = osd_it_del,
2226                 .next     = osd_it_next,
2227                 .key      = osd_it_key,
2228                 .key_size = osd_it_key_size,
2229                 .rec      = osd_it_rec,
2230                 .store    = osd_it_store,
2231                 .load     = osd_it_load
2232         }
2233 };
2234
2235 static int osd_index_compat_delete(const struct lu_env *env,
2236                                    struct dt_object *dt,
2237                                    const struct dt_key *key,
2238                                    struct thandle *handle,
2239                                    struct lustre_capa *capa)
2240 {
2241         struct osd_object *obj = osd_dt_obj(dt);
2242
2243         LASSERT(handle != NULL);
2244         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
2245         ENTRY;
2246
2247 #if 0
2248         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
2249                 RETURN(-EACCES);
2250 #endif
2251
2252         RETURN(-EOPNOTSUPP);
2253 }
2254
2255 /*
2256  * Compatibility index operations.
2257  */
2258
2259
2260 static void osd_build_pack(const struct lu_env *env, struct osd_device *osd,
2261                            struct dentry *dentry, struct lu_fid_pack *pack)
2262 {
2263         struct inode  *inode = dentry->d_inode;
2264         struct lu_fid *fid   = &osd_oti_get(env)->oti_fid;
2265
2266         lu_igif_build(fid, inode->i_ino, inode->i_generation);
2267         fid_cpu_to_be(fid, fid);
2268         pack->fp_len = sizeof *fid + 1;
2269         memcpy(pack->fp_area, fid, sizeof *fid);
2270 }
2271
2272 static int osd_index_compat_lookup(const struct lu_env *env,
2273                                    struct dt_object *dt,
2274                                    struct dt_rec *rec, const struct dt_key *key,
2275                                    struct lustre_capa *capa)
2276 {
2277         struct osd_object *obj = osd_dt_obj(dt);
2278
2279         struct osd_device      *osd  = osd_obj2dev(obj);
2280         struct osd_thread_info *info = osd_oti_get(env);
2281         struct inode           *dir;
2282
2283         int result;
2284
2285         /*
2286          * XXX temporary solution.
2287          */
2288         struct dentry *dentry;
2289         struct dentry *parent;
2290
2291         LINVRNT(osd_invariant(obj));
2292         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
2293         LASSERT(osd_has_index(obj));
2294
2295         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
2296                 return -EACCES;
2297
2298         info->oti_str.name = (const char *)key;
2299         info->oti_str.len  = strlen((const char *)key);
2300
2301         dir = obj->oo_inode;
2302         LASSERT(dir->i_op != NULL && dir->i_op->lookup != NULL);
2303
2304         parent = d_alloc_root(dir);
2305         if (parent == NULL)
2306                 return -ENOMEM;
2307         igrab(dir);
2308         dentry = d_alloc(parent, &info->oti_str);
2309         if (dentry != NULL) {
2310                 struct dentry *d;
2311
2312                 /*
2313                  * XXX passing NULL for nameidata should work for
2314                  * ext3/ldiskfs.
2315                  */
2316                 d = dir->i_op->lookup(dir, dentry, NULL);
2317                 if (d == NULL) {
2318                         /*
2319                          * normal case, result is in @dentry.
2320                          */
2321                         if (dentry->d_inode != NULL) {
2322                                 osd_build_pack(env, osd, dentry,
2323                                                (struct lu_fid_pack *)rec);
2324                                 result = 0;
2325                         } else
2326                                 result = -ENOENT;
2327                  } else {
2328                         /* What? Disconnected alias? Ppheeeww... */
2329                         CERROR("Aliasing where not expected\n");
2330                         result = -EIO;
2331                         dput(d);
2332                 }
2333                 dput(dentry);
2334         } else
2335                 result = -ENOMEM;
2336         dput(parent);
2337         LINVRNT(osd_invariant(obj));
2338         return result;
2339 }
2340
2341 static int osd_add_rec(struct osd_thread_info *info, struct osd_device *dev,
2342                        struct inode *dir, struct inode *inode, const char *name)
2343 {
2344         struct dentry *old;
2345         struct dentry *new;
2346         struct dentry *parent;
2347
2348         int result;
2349
2350         info->oti_str.name = name;
2351         info->oti_str.len  = strlen(name);
2352
2353         LASSERT(atomic_read(&dir->i_count) > 0);
2354         result = -ENOMEM;
2355         old = d_alloc(dev->od_obj_area, &info->oti_str);
2356         if (old != NULL) {
2357                 d_instantiate(old, inode);
2358                 igrab(inode);
2359                 LASSERT(atomic_read(&dir->i_count) > 0);
2360                 parent = d_alloc_root(dir);
2361                 if (parent != NULL) {
2362                         igrab(dir);
2363                         LASSERT(atomic_read(&dir->i_count) > 1);
2364                         new = d_alloc(parent, &info->oti_str);
2365                         LASSERT(atomic_read(&dir->i_count) > 1);
2366                         if (new != NULL) {
2367                                 LASSERT(atomic_read(&dir->i_count) > 1);
2368                                 result = dir->i_op->link(old, dir, new);
2369                                 LASSERT(atomic_read(&dir->i_count) > 1);
2370                                 dput(new);
2371                                 LASSERT(atomic_read(&dir->i_count) > 1);
2372                         }
2373                         LASSERT(atomic_read(&dir->i_count) > 1);
2374                         dput(parent);
2375                         LASSERT(atomic_read(&dir->i_count) > 0);
2376                 }
2377                 dput(old);
2378         }
2379         LASSERT(atomic_read(&dir->i_count) > 0);
2380         return result;
2381 }
2382
2383
2384 /*
2385  * XXX Temporary stuff.
2386  */
2387 static int osd_index_compat_insert(const struct lu_env *env,
2388                                    struct dt_object *dt,
2389                                    const struct dt_rec *rec,
2390                                    const struct dt_key *key, struct thandle *th,
2391                                    struct lustre_capa *capa,
2392                                    int ignore_quota)
2393 {
2394         struct osd_object     *obj = osd_dt_obj(dt);
2395
2396         const char          *name = (const char *)key;
2397
2398         struct lu_device    *ludev = dt->do_lu.lo_dev;
2399         struct lu_object    *luch;
2400
2401         struct osd_thread_info   *info = osd_oti_get(env);
2402         const struct lu_fid_pack *pack  = (const struct lu_fid_pack *)rec;
2403         struct lu_fid            *fid   = &osd_oti_get(env)->oti_fid;
2404
2405         int result;
2406
2407         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
2408         LINVRNT(osd_invariant(obj));
2409         LASSERT(th != NULL);
2410
2411         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
2412                 return -EACCES;
2413
2414         result = fid_unpack(pack, fid);
2415         if (result != 0)
2416                 return result;
2417
2418         luch = lu_object_find(env, ludev, fid, NULL);
2419         if (!IS_ERR(luch)) {
2420                 if (lu_object_exists(luch)) {
2421                         struct osd_object *child;
2422
2423                         child = osd_obj(lu_object_locate(luch->lo_header,
2424                                                          ludev->ld_type));
2425                         if (child != NULL)
2426                                 result = osd_add_rec(info, osd_obj2dev(obj),
2427                                                      obj->oo_inode,
2428                                                      child->oo_inode, name);
2429                         else {
2430                                 CERROR("No osd slice.\n");
2431                                 result = -ENOENT;
2432                         }
2433                         LINVRNT(osd_invariant(obj));
2434                         LINVRNT(osd_invariant(child));
2435                 } else {
2436                         CERROR("Sorry.\n");
2437                         result = -ENOENT;
2438                 }
2439                 lu_object_put(env, luch);
2440         } else
2441                 result = PTR_ERR(luch);
2442         LINVRNT(osd_invariant(obj));
2443         return result;
2444 }
2445
2446 static const struct dt_index_operations osd_index_compat_ops = {
2447         .dio_lookup = osd_index_compat_lookup,
2448         .dio_insert = osd_index_compat_insert,
2449         .dio_delete = osd_index_compat_delete
2450 };
2451
2452 /* type constructor/destructor: osd_type_init, osd_type_fini */
2453 LU_TYPE_INIT_FINI(osd, &osd_key);
2454
2455 static struct lu_context_key osd_key = {
2456         .lct_tags = LCT_DT_THREAD | LCT_MD_THREAD,
2457         .lct_init = osd_key_init,
2458         .lct_fini = osd_key_fini,
2459         .lct_exit = osd_key_exit
2460 };
2461
2462 static void *osd_key_init(const struct lu_context *ctx,
2463                           struct lu_context_key *key)
2464 {
2465         struct osd_thread_info *info;
2466
2467         OBD_ALLOC_PTR(info);
2468         if (info != NULL)
2469                 info->oti_env = container_of(ctx, struct lu_env, le_ctx);
2470         else
2471                 info = ERR_PTR(-ENOMEM);
2472         return info;
2473 }
2474
2475 /* context key destructor: osd_key_fini */
2476 LU_KEY_FINI(osd, struct osd_thread_info);
2477
2478 static void osd_key_exit(const struct lu_context *ctx,
2479                          struct lu_context_key *key, void *data)
2480 {
2481         struct osd_thread_info *info = data;
2482
2483         LASSERT(info->oti_r_locks == 0);
2484         LASSERT(info->oti_w_locks == 0);
2485         LASSERT(info->oti_txns    == 0);
2486 }
2487
2488 static int osd_device_init(const struct lu_env *env, struct lu_device *d,
2489                            const char *name, struct lu_device *next)
2490 {
2491         int rc;
2492         struct lu_context *ctx;
2493
2494         /* context for commit hooks */
2495         ctx = &osd_dev(d)->od_env_for_commit.le_ctx;
2496         rc = lu_context_init(ctx, LCT_MD_THREAD|LCT_REMEMBER|LCT_NOREF);
2497         if (rc == 0) {
2498                 rc = osd_procfs_init(osd_dev(d), name);
2499                 ctx->lc_cookie = 0x3;
2500         }
2501         return rc;
2502 }
2503
2504 static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
2505 {
2506         struct osd_thread_info *info = osd_oti_get(env);
2507         ENTRY;
2508         if (o->od_obj_area != NULL) {
2509                 dput(o->od_obj_area);
2510                 o->od_obj_area = NULL;
2511         }
2512         osd_oi_fini(info, &o->od_oi);
2513
2514         RETURN(0);
2515 }
2516
2517 static int osd_mount(const struct lu_env *env,
2518                      struct osd_device *o, struct lustre_cfg *cfg)
2519 {
2520         struct lustre_mount_info *lmi;
2521         const char               *dev  = lustre_cfg_string(cfg, 0);
2522         struct osd_thread_info   *info = osd_oti_get(env);
2523         int result;
2524
2525         ENTRY;
2526
2527         if (o->od_mount != NULL) {
2528                 CERROR("Already mounted (%s)\n", dev);
2529                 RETURN(-EEXIST);
2530         }
2531
2532         /* get mount */
2533         lmi = server_get_mount(dev);
2534         if (lmi == NULL) {
2535                 CERROR("Cannot get mount info for %s!\n", dev);
2536                 RETURN(-EFAULT);
2537         }
2538
2539         LASSERT(lmi != NULL);
2540         /* save lustre_mount_info in dt_device */
2541         o->od_mount = lmi;
2542
2543         result = osd_oi_init(info, &o->od_oi, &o->od_dt_dev);
2544         if (result == 0) {
2545                 struct dentry *d;
2546
2547                 d = simple_mkdir(osd_sb(o)->s_root, lmi->lmi_mnt, "*OBJ-TEMP*",
2548                                  0777, 1);
2549                 if (!IS_ERR(d)) {
2550                         o->od_obj_area = d;
2551                 } else
2552                         result = PTR_ERR(d);
2553         }
2554         if (result != 0)
2555                 osd_shutdown(env, o);
2556         RETURN(result);
2557 }
2558
2559 static struct lu_device *osd_device_fini(const struct lu_env *env,
2560                                          struct lu_device *d)
2561 {
2562         int rc;
2563         ENTRY;
2564
2565         shrink_dcache_sb(osd_sb(osd_dev(d)));
2566         osd_sync(env, lu2dt_dev(d));
2567
2568         rc = osd_procfs_fini(osd_dev(d));
2569         if (rc) {
2570                 CERROR("proc fini error %d \n", rc);
2571                 RETURN (ERR_PTR(rc));
2572         }
2573
2574         if (osd_dev(d)->od_mount)
2575                 server_put_mount(osd_dev(d)->od_mount->lmi_name,
2576                                  osd_dev(d)->od_mount->lmi_mnt);
2577         osd_dev(d)->od_mount = NULL;
2578
2579         lu_context_fini(&osd_dev(d)->od_env_for_commit.le_ctx);
2580         RETURN(NULL);
2581 }
2582
2583 static struct lu_device *osd_device_alloc(const struct lu_env *env,
2584                                           struct lu_device_type *t,
2585                                           struct lustre_cfg *cfg)
2586 {
2587         struct lu_device  *l;
2588         struct osd_device *o;
2589
2590         OBD_ALLOC_PTR(o);
2591         if (o != NULL) {
2592                 int result;
2593
2594                 result = dt_device_init(&o->od_dt_dev, t);
2595                 if (result == 0) {
2596                         l = osd2lu_dev(o);
2597                         l->ld_ops = &osd_lu_ops;
2598                         o->od_dt_dev.dd_ops = &osd_dt_ops;
2599                         spin_lock_init(&o->od_osfs_lock);
2600                         o->od_osfs_age = cfs_time_shift_64(-1000);
2601                         o->od_capa_hash = init_capa_hash();
2602                         if (o->od_capa_hash == NULL) {
2603                                 dt_device_fini(&o->od_dt_dev);
2604                                 l = ERR_PTR(-ENOMEM);
2605                         }
2606                 } else
2607                         l = ERR_PTR(result);
2608
2609                 if (IS_ERR(l))
2610                         OBD_FREE_PTR(o);
2611         } else
2612                 l = ERR_PTR(-ENOMEM);
2613         return l;
2614 }
2615
2616 static struct lu_device *osd_device_free(const struct lu_env *env,
2617                                          struct lu_device *d)
2618 {
2619         struct osd_device *o = osd_dev(d);
2620         ENTRY;
2621
2622         cleanup_capa_hash(o->od_capa_hash);
2623         dt_device_fini(&o->od_dt_dev);
2624         OBD_FREE_PTR(o);
2625         RETURN(NULL);
2626 }
2627
2628 static int osd_process_config(const struct lu_env *env,
2629                               struct lu_device *d, struct lustre_cfg *cfg)
2630 {
2631         struct osd_device *o = osd_dev(d);
2632         int err;
2633         ENTRY;
2634
2635         switch(cfg->lcfg_command) {
2636         case LCFG_SETUP:
2637                 err = osd_mount(env, o, cfg);
2638                 break;
2639         case LCFG_CLEANUP:
2640                 err = osd_shutdown(env, o);
2641                 break;
2642         default:
2643                 err = -ENOTTY;
2644         }
2645
2646         RETURN(err);
2647 }
2648 extern void ldiskfs_orphan_cleanup (struct super_block * sb,
2649                                     struct ldiskfs_super_block * es);
2650
2651 static int osd_recovery_complete(const struct lu_env *env,
2652                                  struct lu_device *d)
2653 {
2654         struct osd_device *o = osd_dev(d);
2655         ENTRY;
2656         /* TODO: orphans handling */
2657         ldiskfs_orphan_cleanup(osd_sb(o), LDISKFS_SB(osd_sb(o))->s_es);
2658         RETURN(0);
2659 }
2660
2661 static struct inode *osd_iget(struct osd_thread_info *info,
2662                               struct osd_device *dev,
2663                               const struct osd_inode_id *id)
2664 {
2665         struct inode *inode;
2666
2667         inode = iget(osd_sb(dev), id->oii_ino);
2668         if (inode == NULL) {
2669                 CERROR("no inode\n");
2670                 inode = ERR_PTR(-EACCES);
2671         } else if (is_bad_inode(inode)) {
2672                 CERROR("bad inode\n");
2673                 iput(inode);
2674                 inode = ERR_PTR(-ENOENT);
2675         } else if (inode->i_generation != id->oii_gen) {
2676                 CERROR("stale inode\n");
2677                 iput(inode);
2678                 inode = ERR_PTR(-ESTALE);
2679         }
2680
2681         return inode;
2682
2683 }
2684
2685 static int osd_fid_lookup(const struct lu_env *env,
2686                           struct osd_object *obj, const struct lu_fid *fid)
2687 {
2688         struct osd_thread_info *info;
2689         struct lu_device       *ldev = obj->oo_dt.do_lu.lo_dev;
2690         struct osd_device      *dev;
2691         struct osd_inode_id    *id;
2692         struct osd_oi          *oi;
2693         struct inode           *inode;
2694         int                     result;
2695
2696         LINVRNT(osd_invariant(obj));
2697         LASSERT(obj->oo_inode == NULL);
2698         LASSERT(fid_is_sane(fid));
2699         /*
2700          * This assertion checks that osd layer sees only local
2701          * fids. Unfortunately it is somewhat expensive (does a
2702          * cache-lookup). Disabling it for production/acceptance-testing.
2703          */
2704         LASSERT(1 || fid_is_local(ldev->ld_site, fid));
2705
2706         ENTRY;
2707
2708         info = osd_oti_get(env);
2709         dev  = osd_dev(ldev);
2710         id   = &info->oti_id;
2711         oi   = &dev->od_oi;
2712
2713         if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT))
2714                 RETURN(-ENOENT);
2715
2716         result = osd_oi_lookup(info, oi, fid, id);
2717         if (result == 0) {
2718                 inode = osd_iget(info, dev, id);
2719                 if (!IS_ERR(inode)) {
2720                         obj->oo_inode = inode;
2721                         LASSERT(obj->oo_inode->i_sb == osd_sb(dev));
2722                         result = 0;
2723                 } else
2724                         /*
2725                          * If fid wasn't found in oi, inode-less object is
2726                          * created, for which lu_object_exists() returns
2727                          * false. This is used in a (frequent) case when
2728                          * objects are created as locking anchors or
2729                          * place holders for objects yet to be created.
2730                          */
2731                         result = PTR_ERR(inode);
2732         } else if (result == -ENOENT)
2733                 result = 0;
2734         LINVRNT(osd_invariant(obj));
2735         RETURN(result);
2736 }
2737
2738 static void osd_inode_getattr(const struct lu_env *env,
2739                               struct inode *inode, struct lu_attr *attr)
2740 {
2741         attr->la_valid      |= LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
2742                                LA_SIZE | LA_BLOCKS | LA_UID | LA_GID |
2743                                LA_FLAGS | LA_NLINK | LA_RDEV | LA_BLKSIZE;
2744
2745         attr->la_atime      = LTIME_S(inode->i_atime);
2746         attr->la_mtime      = LTIME_S(inode->i_mtime);
2747         attr->la_ctime      = LTIME_S(inode->i_ctime);
2748         attr->la_mode       = inode->i_mode;
2749         attr->la_size       = i_size_read(inode);
2750         attr->la_blocks     = inode->i_blocks;
2751         attr->la_uid        = inode->i_uid;
2752         attr->la_gid        = inode->i_gid;
2753         attr->la_flags      = LDISKFS_I(inode)->i_flags;
2754         attr->la_nlink      = inode->i_nlink;
2755         attr->la_rdev       = inode->i_rdev;
2756         attr->la_blksize    = ll_inode_blksize(inode);
2757         attr->la_blkbits    = inode->i_blkbits;
2758 }
2759
2760 /*
2761  * Helpers.
2762  */
2763
2764 static int lu_device_is_osd(const struct lu_device *d)
2765 {
2766         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &osd_lu_ops);
2767 }
2768
2769 static struct osd_object *osd_obj(const struct lu_object *o)
2770 {
2771         LASSERT(lu_device_is_osd(o->lo_dev));
2772         return container_of0(o, struct osd_object, oo_dt.do_lu);
2773 }
2774
2775 static struct osd_device *osd_dt_dev(const struct dt_device *d)
2776 {
2777         LASSERT(lu_device_is_osd(&d->dd_lu_dev));
2778         return container_of0(d, struct osd_device, od_dt_dev);
2779 }
2780
2781 static struct osd_device *osd_dev(const struct lu_device *d)
2782 {
2783         LASSERT(lu_device_is_osd(d));
2784         return osd_dt_dev(container_of0(d, struct dt_device, dd_lu_dev));
2785 }
2786
2787 static struct osd_object *osd_dt_obj(const struct dt_object *d)
2788 {
2789         return osd_obj(&d->do_lu);
2790 }
2791
2792 static struct osd_device *osd_obj2dev(const struct osd_object *o)
2793 {
2794         return osd_dev(o->oo_dt.do_lu.lo_dev);
2795 }
2796
2797 static struct lu_device *osd2lu_dev(struct osd_device *osd)
2798 {
2799         return &osd->od_dt_dev.dd_lu_dev;
2800 }
2801
2802 static struct super_block *osd_sb(const struct osd_device *dev)
2803 {
2804         return dev->od_mount->lmi_mnt->mnt_sb;
2805 }
2806
2807 static journal_t *osd_journal(const struct osd_device *dev)
2808 {
2809         return LDISKFS_SB(osd_sb(dev))->s_journal;
2810 }
2811
2812 static int osd_has_index(const struct osd_object *obj)
2813 {
2814         return obj->oo_dt.do_index_ops != NULL;
2815 }
2816
2817 static int osd_object_invariant(const struct lu_object *l)
2818 {
2819         return osd_invariant(osd_obj(l));
2820 }
2821
2822 static const struct lu_object_operations osd_lu_obj_ops = {
2823         .loo_object_init      = osd_object_init,
2824         .loo_object_delete    = osd_object_delete,
2825         .loo_object_release   = osd_object_release,
2826         .loo_object_free      = osd_object_free,
2827         .loo_object_print     = osd_object_print,
2828         .loo_object_invariant = osd_object_invariant
2829 };
2830
2831 static const struct lu_device_operations osd_lu_ops = {
2832         .ldo_object_alloc      = osd_object_alloc,
2833         .ldo_process_config    = osd_process_config,
2834         .ldo_recovery_complete = osd_recovery_complete
2835 };
2836
2837 static const struct lu_device_type_operations osd_device_type_ops = {
2838         .ldto_init = osd_type_init,
2839         .ldto_fini = osd_type_fini,
2840
2841         .ldto_start = osd_type_start,
2842         .ldto_stop  = osd_type_stop,
2843
2844         .ldto_device_alloc = osd_device_alloc,
2845         .ldto_device_free  = osd_device_free,
2846
2847         .ldto_device_init    = osd_device_init,
2848         .ldto_device_fini    = osd_device_fini
2849 };
2850
2851 static struct lu_device_type osd_device_type = {
2852         .ldt_tags     = LU_DEVICE_DT,
2853         .ldt_name     = LUSTRE_OSD_NAME,
2854         .ldt_ops      = &osd_device_type_ops,
2855         .ldt_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
2856 };
2857
2858 /*
2859  * lprocfs legacy support.
2860  */
2861 static struct obd_ops osd_obd_device_ops = {
2862         .o_owner = THIS_MODULE
2863 };
2864
2865 static int __init osd_mod_init(void)
2866 {
2867         struct lprocfs_static_vars lvars;
2868
2869         lprocfs_osd_init_vars(&lvars);
2870         return class_register_type(&osd_obd_device_ops, NULL, lvars.module_vars,
2871                                    LUSTRE_OSD_NAME, &osd_device_type);
2872 }
2873
2874 static void __exit osd_mod_exit(void)
2875 {
2876         class_unregister_type(LUSTRE_OSD_NAME);
2877 }
2878
2879 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2880 MODULE_DESCRIPTION("Lustre Object Storage Device ("LUSTRE_OSD_NAME")");
2881 MODULE_LICENSE("GPL");
2882
2883 cfs_module(osd, "0.0.2", osd_mod_init, osd_mod_exit);