Whamcloud - gitweb
Make previously optional ->oti_{w,r}_locks sanity checks mandatory to simplify
[fs/lustre-release.git] / lustre / osd / osd_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/osd/osd_handler.c
37  *
38  * Top-level entry points into osd module
39  *
40  * Author: Nikita Danilov <nikita@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49
50 /* LUSTRE_VERSION_CODE */
51 #include <lustre_ver.h>
52 /* prerequisite for linux/xattr.h */
53 #include <linux/types.h>
54 /* prerequisite for linux/xattr.h */
55 #include <linux/fs.h>
56 /* XATTR_{REPLACE,CREATE} */
57 #include <linux/xattr.h>
58 /*
59  * XXX temporary stuff: direct access to ldiskfs/jdb. Interface between osd
60  * and file system is not yet specified.
61  */
62 /* handle_t, journal_start(), journal_stop() */
63 #include <linux/jbd.h>
64 /* LDISKFS_SB() */
65 #include <linux/ldiskfs_fs.h>
66 #include <linux/ldiskfs_jbd.h>
67 /* simple_mkdir() */
68 #include <lvfs.h>
69
70 /*
71  * struct OBD_{ALLOC,FREE}*()
72  * OBD_FAIL_CHECK
73  */
74 #include <obd_support.h>
75 /* struct ptlrpc_thread */
76 #include <lustre_net.h>
77
78 /* fid_is_local() */
79 #include <lustre_fid.h>
80 #include <linux/lustre_iam.h>
81
82 #include "osd_internal.h"
83 #include "osd_igif.h"
84
85 struct osd_directory {
86         struct iam_container od_container;
87         struct iam_descr     od_descr;
88         struct semaphore     od_sem;
89 };
90
91 struct osd_object {
92         struct dt_object       oo_dt;
93         /**
94          * Inode for file system object represented by this osd_object. This
95          * inode is pinned for the whole duration of lu_object life.
96          *
97          * Not modified concurrently (either setup early during object
98          * creation, or assigned by osd_object_create() under write lock).
99          */
100         struct inode          *oo_inode;
101         struct rw_semaphore    oo_sem;
102         struct osd_directory  *oo_dir;
103         /* protects inode attributes. */
104         spinlock_t             oo_guard;
105 #if OSD_COUNTERS
106         const struct lu_env   *oo_owner;
107 #endif
108 };
109
110 static int   osd_root_get      (const struct lu_env *env,
111                                 struct dt_device *dev, struct lu_fid *f);
112
113 static int   lu_device_is_osd  (const struct lu_device *d);
114 static void  osd_mod_exit      (void) __exit;
115 static int   osd_mod_init      (void) __init;
116 static int   osd_type_init     (struct lu_device_type *t);
117 static void  osd_type_fini     (struct lu_device_type *t);
118 static int   osd_object_init   (const struct lu_env *env,
119                                 struct lu_object *l);
120 static void  osd_object_release(const struct lu_env *env,
121                                 struct lu_object *l);
122 static int   osd_object_print  (const struct lu_env *env, void *cookie,
123                                 lu_printer_t p, const struct lu_object *o);
124 static struct lu_device *osd_device_free   (const struct lu_env *env,
125                                 struct lu_device *m);
126 static void *osd_key_init      (const struct lu_context *ctx,
127                                 struct lu_context_key *key);
128 static void  osd_key_fini      (const struct lu_context *ctx,
129                                 struct lu_context_key *key, void *data);
130 static void  osd_key_exit      (const struct lu_context *ctx,
131                                 struct lu_context_key *key, void *data);
132 static int   osd_has_index     (const struct osd_object *obj);
133 static void  osd_object_init0  (struct osd_object *obj);
134 static int   osd_device_init   (const struct lu_env *env,
135                                 struct lu_device *d, const char *,
136                                 struct lu_device *);
137 static int   osd_fid_lookup    (const struct lu_env *env,
138                                 struct osd_object *obj,
139                                 const struct lu_fid *fid);
140 static void  osd_inode_getattr (const struct lu_env *env,
141                                 struct inode *inode, struct lu_attr *attr);
142 static void  osd_inode_setattr (const struct lu_env *env,
143                                 struct inode *inode, const struct lu_attr *attr);
144 static int   osd_param_is_sane (const struct osd_device *dev,
145                                 const struct txn_param *param);
146 static int   osd_index_lookup  (const struct lu_env *env,
147                                 struct dt_object *dt,
148                                 struct dt_rec *rec, const struct dt_key *key,
149                                 struct lustre_capa *capa);
150 static int   osd_index_insert  (const struct lu_env *env,
151                                 struct dt_object *dt,
152                                 const struct dt_rec *rec,
153                                 const struct dt_key *key,
154                                 struct thandle *handle,
155                                 struct lustre_capa *capa);
156 static int   osd_index_delete  (const struct lu_env *env,
157                                 struct dt_object *dt, const struct dt_key *key,
158                                 struct thandle *handle,
159                                 struct lustre_capa *capa);
160 static int   osd_index_probe   (const struct lu_env *env,
161                                 struct osd_object *o,
162                                 const struct dt_index_features *feat);
163 static int   osd_index_try     (const struct lu_env *env,
164                                 struct dt_object *dt,
165                                 const struct dt_index_features *feat);
166 static void  osd_index_fini    (struct osd_object *o);
167
168 static void  osd_it_fini       (const struct lu_env *env, struct dt_it *di);
169 static int   osd_it_get        (const struct lu_env *env,
170                                 struct dt_it *di, const struct dt_key *key);
171 static void  osd_it_put        (const struct lu_env *env, struct dt_it *di);
172 static int   osd_it_next       (const struct lu_env *env, struct dt_it *di);
173 static int   osd_it_del        (const struct lu_env *env, struct dt_it *di,
174                                 struct thandle *th);
175 static int   osd_it_key_size   (const struct lu_env *env,
176                                 const struct dt_it *di);
177 static void  osd_conf_get      (const struct lu_env *env,
178                                 const struct dt_device *dev,
179                                 struct dt_device_param *param);
180 static void  osd_trans_stop    (const struct lu_env *env,
181                                 struct thandle *th);
182 static int   osd_object_is_root(const struct osd_object *obj);
183
184 static struct osd_object  *osd_obj          (const struct lu_object *o);
185 static struct osd_device  *osd_dev          (const struct lu_device *d);
186 static struct osd_device  *osd_dt_dev       (const struct dt_device *d);
187 static struct osd_object  *osd_dt_obj       (const struct dt_object *d);
188 static struct osd_device  *osd_obj2dev      (const struct osd_object *o);
189 static struct lu_device   *osd2lu_dev       (struct osd_device *osd);
190 static struct lu_device   *osd_device_fini  (const struct lu_env *env,
191                                              struct lu_device *d);
192 static struct lu_device   *osd_device_alloc (const struct lu_env *env,
193                                              struct lu_device_type *t,
194                                              struct lustre_cfg *cfg);
195 static struct lu_object   *osd_object_alloc (const struct lu_env *env,
196                                              const struct lu_object_header *hdr,
197                                              struct lu_device *d);
198 static struct inode       *osd_iget         (struct osd_thread_info *info,
199                                              struct osd_device *dev,
200                                              const struct osd_inode_id *id);
201 static struct super_block *osd_sb           (const struct osd_device *dev);
202 static struct dt_it       *osd_it_init      (const struct lu_env *env,
203                                              struct dt_object *dt, int wable,
204                                              struct lustre_capa *capa);
205 static struct dt_key      *osd_it_key       (const struct lu_env *env,
206                                              const struct dt_it *di);
207 static struct dt_rec      *osd_it_rec       (const struct lu_env *env,
208                                              const struct dt_it *di);
209 static struct timespec    *osd_inode_time   (const struct lu_env *env,
210                                              struct inode *inode,
211                                              __u64 seconds);
212 static struct thandle     *osd_trans_start  (const struct lu_env *env,
213                                              struct dt_device *d,
214                                              struct txn_param *p);
215 static journal_t          *osd_journal      (const struct osd_device *dev);
216
217 static struct lu_device_type_operations osd_device_type_ops;
218 static struct lu_device_type            osd_device_type;
219 static struct lu_object_operations      osd_lu_obj_ops;
220 static struct obd_ops                   osd_obd_device_ops;
221 static struct lu_device_operations      osd_lu_ops;
222 static struct lu_context_key            osd_key;
223 static struct dt_object_operations      osd_obj_ops;
224 static struct dt_body_operations        osd_body_ops;
225 static struct dt_index_operations       osd_index_ops;
226 static struct dt_index_operations       osd_index_compat_ops;
227
228 struct osd_thandle {
229         struct thandle          ot_super;
230         handle_t               *ot_handle;
231         struct journal_callback ot_jcb;
232 };
233
234 /*
235  * Invariants, assertions.
236  */
237
238 /*
239  * XXX: do not enable this, until invariant checking code is made thread safe
240  * in the face of pdirops locking.
241  */
242 #define OSD_INVARIANT_CHECKS (0)
243
244 #if OSD_INVARIANT_CHECKS
245 static int osd_invariant(const struct osd_object *obj)
246 {
247         return
248                 obj != NULL &&
249                 ergo(obj->oo_inode != NULL,
250                      obj->oo_inode->i_sb == osd_sb(osd_obj2dev(obj)) &&
251                      atomic_read(&obj->oo_inode->i_count) > 0) &&
252                 ergo(obj->oo_dir != NULL &&
253                      obj->oo_dir->od_conationer.ic_object != NULL,
254                      obj->oo_dir->od_conationer.ic_object == obj->oo_inode);
255 }
256 #else
257 #define osd_invariant(obj) (1)
258 #endif
259
260 static inline struct osd_thread_info *osd_oti_get(const struct lu_env *env)
261 {
262         return lu_context_key_get(&env->le_ctx, &osd_key);
263 }
264
265 /*
266  * Concurrency: doesn't matter
267  */
268 static int osd_read_locked(const struct lu_env *env, struct osd_object *o)
269 {
270         return osd_oti_get(env)->oti_r_locks > 0;
271 }
272
273 /*
274  * Concurrency: doesn't matter
275  */
276 static int osd_write_locked(const struct lu_env *env, struct osd_object *o)
277 {
278         struct osd_thread_info *oti = osd_oti_get(env);
279         return oti->oti_w_locks > 0 && o->oo_owner == env;
280 }
281
282 /*
283  * Concurrency: doesn't access mutable data
284  */
285 static int osd_root_get(const struct lu_env *env,
286                         struct dt_device *dev, struct lu_fid *f)
287 {
288         struct inode *inode;
289
290         inode = osd_sb(osd_dt_dev(dev))->s_root->d_inode;
291         lu_igif_build(f, inode->i_ino, inode->i_generation);
292         return 0;
293 }
294
295 /*
296  * OSD object methods.
297  */
298
299 /*
300  * Concurrency: no concurrent access is possible that early in object
301  * life-cycle.
302  */
303 static struct lu_object *osd_object_alloc(const struct lu_env *env,
304                                           const struct lu_object_header *hdr,
305                                           struct lu_device *d)
306 {
307         struct osd_object *mo;
308
309         OBD_ALLOC_PTR(mo);
310         if (mo != NULL) {
311                 struct lu_object *l;
312
313                 l = &mo->oo_dt.do_lu;
314                 dt_object_init(&mo->oo_dt, NULL, d);
315                 mo->oo_dt.do_ops = &osd_obj_ops;
316                 l->lo_ops = &osd_lu_obj_ops;
317                 init_rwsem(&mo->oo_sem);
318                 spin_lock_init(&mo->oo_guard);
319                 return l;
320         } else
321                 return NULL;
322 }
323
324 /*
325  * Concurrency: shouldn't matter.
326  */
327 static void osd_object_init0(struct osd_object *obj)
328 {
329         LASSERT(obj->oo_inode != NULL);
330         obj->oo_dt.do_body_ops = &osd_body_ops;
331         obj->oo_dt.do_lu.lo_header->loh_attr |=
332                 (LOHA_EXISTS | (obj->oo_inode->i_mode & S_IFMT));
333 }
334
335 /*
336  * Concurrency: no concurrent access is possible that early in object
337  * life-cycle.
338  */
339 static int osd_object_init(const struct lu_env *env, struct lu_object *l)
340 {
341         struct osd_object *obj = osd_obj(l);
342         int result;
343
344         LASSERT(osd_invariant(obj));
345
346         result = osd_fid_lookup(env, obj, lu_object_fid(l));
347         if (result == 0) {
348                 if (obj->oo_inode != NULL)
349                         osd_object_init0(obj);
350         }
351         LASSERT(osd_invariant(obj));
352         return result;
353 }
354
355 /*
356  * Concurrency: no concurrent access is possible that late in object
357  * life-cycle.
358  */
359 static void osd_object_free(const struct lu_env *env, struct lu_object *l)
360 {
361         struct osd_object *obj = osd_obj(l);
362
363         LASSERT(osd_invariant(obj));
364
365         dt_object_fini(&obj->oo_dt);
366         OBD_FREE_PTR(obj);
367 }
368
369 static struct iam_path_descr *osd_ipd_get(const struct lu_env *env,
370                                           const struct iam_container *bag)
371 {
372         return bag->ic_descr->id_ops->id_ipd_alloc(bag,
373                                                    osd_oti_get(env)->oti_ipd);
374 }
375
376 static void osd_ipd_put(const struct lu_env *env,
377                         const struct iam_container *bag,
378                         struct iam_path_descr *ipd)
379 {
380         bag->ic_descr->id_ops->id_ipd_free(ipd);
381 }
382
383 /*
384  * Concurrency: no concurrent access is possible that late in object
385  * life-cycle.
386  */
387 static void osd_index_fini(struct osd_object *o)
388 {
389         struct iam_container *bag;
390
391         if (o->oo_dir != NULL) {
392                 bag = &o->oo_dir->od_container;
393                 if (o->oo_inode != NULL) {
394                         if (bag->ic_object == o->oo_inode)
395                                 iam_container_fini(bag);
396                 }
397                 OBD_FREE_PTR(o->oo_dir);
398                 o->oo_dir = NULL;
399         }
400 }
401
402 /*
403  * Concurrency: no concurrent access is possible that late in object
404  * life-cycle (for all existing callers, that is. New callers have to provide
405  * their own locking.)
406  */
407 static int osd_inode_unlinked(const struct inode *inode)
408 {
409         return inode->i_nlink == 0;
410 }
411
412 enum {
413         OSD_TXN_OI_DELETE_CREDITS    = 20,
414         OSD_TXN_INODE_DELETE_CREDITS = 20
415 };
416
417 /*
418  * Concurrency: no concurrent access is possible that late in object
419  * life-cycle.
420  */
421 static int osd_inode_remove(const struct lu_env *env, struct osd_object *obj)
422 {
423         const struct lu_fid    *fid = lu_object_fid(&obj->oo_dt.do_lu);
424         struct osd_device      *osd = osd_obj2dev(obj);
425         struct osd_thread_info *oti = osd_oti_get(env);
426         struct txn_param       *prm = &oti->oti_txn;
427         struct thandle         *th;
428         int result;
429
430         txn_param_init(prm, OSD_TXN_OI_DELETE_CREDITS + 
431                             OSD_TXN_INODE_DELETE_CREDITS);
432         th = osd_trans_start(env, &osd->od_dt_dev, prm);
433         if (!IS_ERR(th)) {
434                 result = osd_oi_delete(oti, &osd->od_oi, fid, th);
435                 osd_trans_stop(env, th);
436         } else
437                 result = PTR_ERR(th);
438         return result;
439 }
440
441 /*
442  * Called just before object is freed. Releases all resources except for
443  * object itself (that is released by osd_object_free()).
444  *
445  * Concurrency: no concurrent access is possible that late in object
446  * life-cycle.
447  */
448 static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
449 {
450         struct osd_object *obj   = osd_obj(l);
451         struct inode      *inode = obj->oo_inode;
452
453         LASSERT(osd_invariant(obj));
454
455         /*
456          * If object is unlinked remove fid->ino mapping from object index.
457          *
458          * File body will be deleted by iput().
459          */
460
461         osd_index_fini(obj);
462         if (inode != NULL) {
463                 int result;
464
465                 if (osd_inode_unlinked(inode)) {
466                         result = osd_inode_remove(env, obj);
467                         if (result != 0)
468                                 LU_OBJECT_DEBUG(D_ERROR, env, l,
469                                                 "Failed to cleanup: %d\n",
470                                                 result);
471                 }
472                 iput(inode);
473                 obj->oo_inode = NULL;
474         }
475 }
476
477 /*
478  * Concurrency: ->loo_object_release() is called under site spin-lock.
479  */
480 static void osd_object_release(const struct lu_env *env,
481                                struct lu_object *l)
482 {
483         struct osd_object *o = osd_obj(l);
484
485         LASSERT(!lu_object_is_dying(l->lo_header));
486         if (o->oo_inode != NULL && osd_inode_unlinked(o->oo_inode))
487                 set_bit(LU_OBJECT_HEARD_BANSHEE, &l->lo_header->loh_flags);
488 }
489
490 /*
491  * Concurrency: shouldn't matter.
492  */
493 static int osd_object_print(const struct lu_env *env, void *cookie,
494                             lu_printer_t p, const struct lu_object *l)
495 {
496         struct osd_object *o = osd_obj(l);
497         struct iam_descr  *d;
498
499         if (o->oo_dir != NULL)
500                 d = o->oo_dir->od_container.ic_descr;
501         else
502                 d = NULL;
503         return (*p)(env, cookie, LUSTRE_OSD_NAME"-object@%p(i:%p:%lu/%u)[%s]",
504                     o, o->oo_inode,
505                     o->oo_inode ? o->oo_inode->i_ino : 0UL,
506                     o->oo_inode ? o->oo_inode->i_generation : 0,
507                     d ? d->id_ops->id_name : "plain");
508 }
509
510 /*
511  * Concurrency: shouldn't matter.
512  */
513 int osd_statfs(const struct lu_env *env, struct dt_device *d,
514                struct kstatfs *sfs)
515 {
516         struct osd_device *osd = osd_dt_dev(d);
517         struct super_block *sb = osd_sb(osd);
518         int result = 0;
519
520         spin_lock(&osd->od_osfs_lock);
521         /* cache 1 second */
522         if (cfs_time_before_64(osd->od_osfs_age, cfs_time_shift_64(-1))) {
523                 result = ll_do_statfs(sb, &osd->od_kstatfs);
524                 if (likely(result == 0)) /* N.B. statfs can't really fail */
525                         osd->od_osfs_age = cfs_time_current_64();
526         }
527
528         if (likely(result == 0))
529                 *sfs = osd->od_kstatfs; 
530         spin_unlock(&osd->od_osfs_lock);
531
532         return result;
533 }
534
535 /*
536  * Concurrency: doesn't access mutable data.
537  */
538 static void osd_conf_get(const struct lu_env *env,
539                          const struct dt_device *dev,
540                          struct dt_device_param *param)
541 {
542         /*
543          * XXX should be taken from not-yet-existing fs abstraction layer.
544          */
545         param->ddp_max_name_len  = LDISKFS_NAME_LEN;
546         param->ddp_max_nlink     = LDISKFS_LINK_MAX;
547         param->ddp_block_shift   = osd_sb(osd_dt_dev(dev))->s_blocksize_bits;
548 }
549
550 /*
551  * Journal
552  */
553
554 /*
555  * Concurrency: doesn't access mutable data.
556  */
557 static int osd_param_is_sane(const struct osd_device *dev,
558                              const struct txn_param *param)
559 {
560         return param->tp_credits <= osd_journal(dev)->j_max_transaction_buffers;
561 }
562
563 /*
564  * Concurrency: shouldn't matter.
565  */
566 static void osd_trans_commit_cb(struct journal_callback *jcb, int error)
567 {
568         struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb);
569         struct thandle     *th = &oh->ot_super;
570         struct dt_device   *dev = th->th_dev;
571
572         LASSERT(dev != NULL);
573         LASSERT(oh->ot_handle == NULL);
574
575         if (error) {
576                 CERROR("transaction @0x%p commit error: %d\n", th, error);
577         } else {
578                 struct lu_env *env = &osd_dt_dev(dev)->od_env_for_commit;
579                 /*
580                  * This od_env_for_commit is only for commit usage.  see
581                  * "struct dt_device"
582                  */
583                 lu_context_enter(&env->le_ctx);
584                 dt_txn_hook_commit(env, th);
585                 lu_context_exit(&env->le_ctx);
586         }
587
588         lu_device_put(&dev->dd_lu_dev);
589         th->th_dev = NULL;
590
591         lu_context_exit(&th->th_ctx);
592         lu_context_fini(&th->th_ctx);
593         OBD_FREE_PTR(oh);
594 }
595
596 /*
597  * Concurrency: shouldn't matter.
598  */
599 static struct thandle *osd_trans_start(const struct lu_env *env,
600                                        struct dt_device *d,
601                                        struct txn_param *p)
602 {
603         struct osd_device  *dev = osd_dt_dev(d);
604         handle_t           *jh;
605         struct osd_thandle *oh;
606         struct thandle     *th;
607         int hook_res;
608
609         ENTRY;
610
611         hook_res = dt_txn_hook_start(env, d, p);
612         if (hook_res != 0)
613                 RETURN(ERR_PTR(hook_res));
614
615         if (osd_param_is_sane(dev, p)) {
616                 OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO);
617                 if (oh != NULL) {
618                         /*
619                          * XXX temporary stuff. Some abstraction layer should
620                          * be used.
621                          */
622
623                         jh = journal_start(osd_journal(dev), p->tp_credits);
624                         if (!IS_ERR(jh)) {
625                                 oh->ot_handle = jh;
626                                 th = &oh->ot_super;
627                                 th->th_dev = d;
628                                 th->th_result = 0;
629                                 jh->h_sync = p->tp_sync;
630                                 lu_device_get(&d->dd_lu_dev);
631                                 /* add commit callback */
632                                 lu_context_init(&th->th_ctx, LCT_TX_HANDLE);
633                                 lu_context_enter(&th->th_ctx);
634                                 journal_callback_set(jh, osd_trans_commit_cb,
635                                                      (struct journal_callback *)&oh->ot_jcb);
636 #if OSD_COUNTERS
637                                 {
638                                         struct osd_thread_info *oti =
639                                                 osd_oti_get(env);
640
641                                         LASSERT(oti->oti_txns == 0);
642                                         LASSERT(oti->oti_r_locks == 0);
643                                         LASSERT(oti->oti_w_locks == 0);
644                                         oti->oti_txns++;
645                                 }
646 #endif
647                         } else {
648                                 OBD_FREE_PTR(oh);
649                                 th = (void *)jh;
650                         }
651                 } else
652                         th = ERR_PTR(-ENOMEM);
653         } else {
654                 CERROR("Invalid transaction parameters\n");
655                 th = ERR_PTR(-EINVAL);
656         }
657
658         RETURN(th);
659 }
660
661 /*
662  * Concurrency: shouldn't matter.
663  */
664 static void osd_trans_stop(const struct lu_env *env, struct thandle *th)
665 {
666         int result;
667         struct osd_thandle *oh;
668         struct osd_thread_info *oti = osd_oti_get(env);
669
670         ENTRY;
671
672         oh = container_of0(th, struct osd_thandle, ot_super);
673         if (oh->ot_handle != NULL) {
674                 handle_t *hdl = oh->ot_handle;
675
676                 LASSERT(oti->oti_txns == 1);
677                 oti->oti_txns--;
678                 LASSERT(oti->oti_r_locks == 0);
679                 LASSERT(oti->oti_w_locks == 0);
680                 result = dt_txn_hook_stop(env, th);
681                 if (result != 0)
682                         CERROR("Failure in transaction hook: %d\n", result);
683                 oh->ot_handle = NULL;
684                 result = journal_stop(hdl);
685                 if (result != 0)
686                         CERROR("Failure to stop transaction: %d\n", result);
687         }
688         EXIT;
689 }
690
691 /*
692  * Concurrency: shouldn't matter.
693  */
694 static int osd_sync(const struct lu_env *env, struct dt_device *d)
695 {
696         CDEBUG(D_HA, "syncing OSD %s\n", LUSTRE_OSD_NAME);
697         return ldiskfs_force_commit(osd_sb(osd_dt_dev(d)));
698 }
699
700 /*
701  * Concurrency: shouldn't matter.
702  */
703 lvfs_sbdev_type fsfilt_ldiskfs_journal_sbdev(struct super_block *);
704
705 static void osd_ro(const struct lu_env *env, struct dt_device *d)
706 {
707         ENTRY;
708
709         CERROR("*** setting device %s read-only ***\n", LUSTRE_OSD_NAME);
710
711         __lvfs_set_rdonly(lvfs_sbdev(osd_sb(osd_dt_dev(d))),
712                           fsfilt_ldiskfs_journal_sbdev(osd_sb(osd_dt_dev(d))));
713         EXIT;
714 }
715
716 /*
717  * Concurrency: serialization provided by callers.
718  */
719 static int osd_init_capa_ctxt(const struct lu_env *env, struct dt_device *d,
720                               int mode, unsigned long timeout, __u32 alg,
721                               struct lustre_capa_key *keys)
722 {
723         struct osd_device *dev = osd_dt_dev(d);
724         ENTRY;
725
726         dev->od_fl_capa = mode;
727         dev->od_capa_timeout = timeout;
728         dev->od_capa_alg = alg;
729         dev->od_capa_keys = keys;
730         RETURN(0);
731 }
732
733 /* Note: we did not count into QUOTA here, If we mount with --data_journal
734  * we may need more*/
735 static const int osd_dto_credits[DTO_NR] = {
736         /*
737          * Insert/Delete. IAM EXT3_INDEX_EXTRA_TRANS_BLOCKS(8) +
738          * EXT3_SINGLEDATA_TRANS_BLOCKS 8 XXX Note: maybe iam need more,since
739          * iam have more level than Ext3 htree
740          */
741         [DTO_INDEX_INSERT]  = 16,
742         [DTO_INDEX_DELETE]  = 16,
743         [DTO_IDNEX_UPDATE]  = 16,
744         /*
745          * Create a object. Same as create object in Ext3 filesystem, but did
746          * not count QUOTA i EXT3_DATA_TRANS_BLOCKS(12) +
747          * INDEX_EXTRA_BLOCKS(8) + 3(inode bits,groups, GDT)
748          */
749         [DTO_OBJECT_CREATE] = 23,
750         [DTO_OBJECT_DELETE] = 23,
751         /*
752          * Attr set credits 3 inode, group, GDT
753          */
754         [DTO_ATTR_SET]      = 3,
755         /*
756          * XATTR_SET. SAME AS XATTR of EXT3 EXT3_DATA_TRANS_BLOCKS XXX Note:
757          * in original MDS implmentation EXT3_INDEX_EXTRA_TRANS_BLOCKS are
758          * also counted in. Do not know why?
759          */
760         [DTO_XATTR_SET]     = 16,
761         [DTO_LOG_REC]       = 16,
762         /* creadits for inode change during write */
763         [DTO_WRITE_BASE]    = 3,
764         /* credits for single block write */
765         [DTO_WRITE_BLOCK]   = 12 
766 };
767
768 static int osd_credit_get(const struct lu_env *env, struct dt_device *d,
769                           enum dt_txn_op op)
770 {
771         LASSERT(0 <= op && op < ARRAY_SIZE(osd_dto_credits));
772         return osd_dto_credits[op];
773 }
774
775 static struct dt_device_operations osd_dt_ops = {
776         .dt_root_get       = osd_root_get,
777         .dt_statfs         = osd_statfs,
778         .dt_trans_start    = osd_trans_start,
779         .dt_trans_stop     = osd_trans_stop,
780         .dt_conf_get       = osd_conf_get,
781         .dt_sync           = osd_sync,
782         .dt_ro             = osd_ro,
783         .dt_credit_get     = osd_credit_get,
784         .dt_init_capa_ctxt = osd_init_capa_ctxt,
785 };
786
787 static void osd_object_read_lock(const struct lu_env *env,
788                                  struct dt_object *dt)
789 {
790         struct osd_object *obj = osd_dt_obj(dt);
791
792         LASSERT(osd_invariant(obj));
793
794         OSD_COUNTERS_DO(LASSERT(obj->oo_owner != env));
795         down_read(&obj->oo_sem);
796 #if OSD_COUNTERS
797         {
798                 struct osd_thread_info *oti = osd_oti_get(env);
799
800                 LASSERT(obj->oo_owner == NULL);
801                 oti->oti_r_locks++;
802         }
803 #endif
804 }
805
806 static void osd_object_write_lock(const struct lu_env *env,
807                                   struct dt_object *dt)
808 {
809         struct osd_object *obj = osd_dt_obj(dt);
810
811         LASSERT(osd_invariant(obj));
812
813         OSD_COUNTERS_DO(LASSERT(obj->oo_owner != env));
814         down_write(&obj->oo_sem);
815 #if OSD_COUNTERS
816         {
817                 struct osd_thread_info *oti = osd_oti_get(env);
818
819                 LASSERT(obj->oo_owner == NULL);
820                 obj->oo_owner = env;
821                 oti->oti_w_locks++;
822         }
823 #endif
824 }
825
826 static void osd_object_read_unlock(const struct lu_env *env,
827                                    struct dt_object *dt)
828 {
829         struct osd_object *obj = osd_dt_obj(dt);
830
831         LASSERT(osd_invariant(obj));
832 #if OSD_COUNTERS
833         {
834                 struct osd_thread_info *oti = osd_oti_get(env);
835
836                 LASSERT(oti->oti_r_locks > 0);
837                 oti->oti_r_locks--;
838         }
839 #endif
840         up_read(&obj->oo_sem);
841 }
842
843 static void osd_object_write_unlock(const struct lu_env *env,
844                                     struct dt_object *dt)
845 {
846         struct osd_object *obj = osd_dt_obj(dt);
847
848         LASSERT(osd_invariant(obj));
849 #if OSD_COUNTERS
850         {
851                 struct osd_thread_info *oti = osd_oti_get(env);
852
853                 LASSERT(obj->oo_owner == env);
854                 LASSERT(oti->oti_w_locks > 0);
855                 oti->oti_w_locks--;
856                 obj->oo_owner = NULL;
857         }
858 #endif
859         up_write(&obj->oo_sem);
860 }
861
862 static int capa_is_sane(const struct lu_env *env,
863                         struct osd_device *dev,
864                         struct lustre_capa *capa,
865                         struct lustre_capa_key *keys)
866 {
867         struct osd_thread_info *oti = osd_oti_get(env);
868         struct obd_capa *oc;
869         int i, rc = 0;
870         ENTRY;
871
872         oc = capa_lookup(dev->od_capa_hash, capa, 0);
873         if (oc) {
874                 if (capa_is_expired(oc)) {
875                         DEBUG_CAPA(D_ERROR, capa, "expired");
876                         rc = -ESTALE;
877                 }
878                 capa_put(oc);
879                 RETURN(rc);
880         }
881
882         spin_lock(&capa_lock);
883         for (i = 0; i < 2; i++) {
884                 if (keys[i].lk_keyid == capa->lc_keyid) {
885                         oti->oti_capa_key = keys[i];
886                         break;
887                 }
888         }
889         spin_unlock(&capa_lock);
890
891         if (i == 2) {
892                 DEBUG_CAPA(D_ERROR, capa, "no matched capa key");
893                 RETURN(-ESTALE);
894         }
895
896         rc = capa_hmac(oti->oti_capa.lc_hmac, capa, oti->oti_capa_key.lk_key);
897         if (rc)
898                 RETURN(rc);
899         if (memcmp(oti->oti_capa.lc_hmac, capa->lc_hmac, sizeof(capa->lc_hmac)))
900         {
901                 DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch");
902                 RETURN(-EACCES);
903         }
904
905         oc = capa_add(dev->od_capa_hash, capa);
906         capa_put(oc);
907
908         RETURN(0);
909 }
910
911 static int osd_object_auth(const struct lu_env *env, struct dt_object *dt,
912                            struct lustre_capa *capa, __u64 opc)
913 {
914         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
915         struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
916         int rc;
917
918         if (!dev->od_fl_capa)
919                 return 0;
920
921         if (capa == BYPASS_CAPA)
922                 return 0;
923
924         if (!capa) {
925                 CERROR("no capability is provided for fid "DFID"\n", PFID(fid));
926                 return -EACCES;
927         }
928
929         if (!lu_fid_eq(fid, &capa->lc_fid)) {
930                 DEBUG_CAPA(D_ERROR, capa, "fid "DFID" mismatch with",
931                            PFID(fid));
932                 return -EACCES;
933         }
934
935         if (!capa_opc_supported(capa, opc)) {
936                 DEBUG_CAPA(D_ERROR, capa, "opc "LPX64" not supported by", opc);
937                 return -EACCES;
938         }
939
940         if ((rc = capa_is_sane(env, dev, capa, dev->od_capa_keys))) {
941                 DEBUG_CAPA(D_ERROR, capa, "insane (rc %d)", rc);
942                 return -EACCES;
943         }
944
945         return 0;
946 }
947
948 static int osd_attr_get(const struct lu_env *env,
949                         struct dt_object *dt,
950                         struct lu_attr *attr,
951                         struct lustre_capa *capa)
952 {
953         struct osd_object *obj = osd_dt_obj(dt);
954
955         LASSERT(dt_object_exists(dt));
956         LASSERT(osd_invariant(obj));
957
958         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
959                 return -EACCES;
960
961         spin_lock(&obj->oo_guard);
962         osd_inode_getattr(env, obj->oo_inode, attr);
963         spin_unlock(&obj->oo_guard);
964         return 0;
965 }
966
967 static int osd_attr_set(const struct lu_env *env,
968                         struct dt_object *dt,
969                         const struct lu_attr *attr,
970                         struct thandle *handle,
971                         struct lustre_capa *capa)
972 {
973         struct osd_object *obj = osd_dt_obj(dt);
974
975         LASSERT(handle != NULL);
976         LASSERT(dt_object_exists(dt));
977         LASSERT(osd_invariant(obj));
978
979         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
980                 return -EACCES;
981
982         spin_lock(&obj->oo_guard);
983         osd_inode_setattr(env, obj->oo_inode, attr);
984         spin_unlock(&obj->oo_guard);
985
986         mark_inode_dirty(obj->oo_inode);
987         return 0;
988 }
989
990 static struct timespec *osd_inode_time(const struct lu_env *env,
991                                        struct inode *inode, __u64 seconds)
992 {
993         struct osd_thread_info *oti = osd_oti_get(env);
994         struct timespec        *t   = &oti->oti_time;
995
996         t->tv_sec  = seconds;
997         t->tv_nsec = 0;
998         *t = timespec_trunc(*t, get_sb_time_gran(inode->i_sb));
999         return t;
1000 }
1001
1002 static void osd_inode_setattr(const struct lu_env *env,
1003                               struct inode *inode, const struct lu_attr *attr)
1004 {
1005         __u64 bits;
1006
1007         bits = attr->la_valid;
1008
1009         LASSERT(!(bits & LA_TYPE)); /* Huh? You want too much. */
1010
1011         if (bits & LA_ATIME)
1012                 inode->i_atime  = *osd_inode_time(env, inode, attr->la_atime);
1013         if (bits & LA_CTIME)
1014                 inode->i_ctime  = *osd_inode_time(env, inode, attr->la_ctime);
1015         if (bits & LA_MTIME)
1016                 inode->i_mtime  = *osd_inode_time(env, inode, attr->la_mtime);
1017         if (bits & LA_SIZE) {
1018                 LDISKFS_I(inode)->i_disksize = attr->la_size;
1019                 i_size_write(inode, attr->la_size);
1020         }
1021         if (bits & LA_BLOCKS)
1022                 inode->i_blocks = attr->la_blocks;
1023         if (bits & LA_MODE)
1024                 inode->i_mode   = (inode->i_mode & S_IFMT) |
1025                         (attr->la_mode & ~S_IFMT);
1026         if (bits & LA_UID)
1027                 inode->i_uid    = attr->la_uid;
1028         if (bits & LA_GID)
1029                 inode->i_gid    = attr->la_gid;
1030         if (bits & LA_NLINK)
1031                 inode->i_nlink  = attr->la_nlink;
1032         if (bits & LA_RDEV)
1033                 inode->i_rdev   = attr->la_rdev;
1034
1035         if (bits & LA_FLAGS) {
1036                 struct ldiskfs_inode_info *li = LDISKFS_I(inode);
1037
1038                 li->i_flags = (li->i_flags & ~LDISKFS_FL_USER_MODIFIABLE) |
1039                         (attr->la_flags & LDISKFS_FL_USER_MODIFIABLE);
1040         }
1041 }
1042
1043 /*
1044  * Object creation.
1045  *
1046  * XXX temporary solution.
1047  */
1048
1049 static int osd_create_pre(struct osd_thread_info *info, struct osd_object *obj,
1050                           struct lu_attr *attr, struct thandle *th)
1051 {
1052         return 0;
1053 }
1054
1055 static int osd_create_post(struct osd_thread_info *info, struct osd_object *obj,
1056                            struct lu_attr *attr, struct thandle *th)
1057 {
1058         LASSERT(obj->oo_inode != NULL);
1059
1060         osd_object_init0(obj);
1061         return 0;
1062 }
1063
1064 extern struct inode *ldiskfs_create_inode(handle_t *handle,
1065                                           struct inode * dir, int mode);
1066
1067 static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
1068                       umode_t mode,
1069                       struct dt_allocation_hint *hint,
1070                       struct thandle *th)
1071 {
1072         int result;
1073         struct osd_device  *osd = osd_obj2dev(obj);
1074         struct osd_thandle *oth;
1075         struct inode       *parent;
1076         struct inode       *inode;
1077
1078         LASSERT(osd_invariant(obj));
1079         LASSERT(obj->oo_inode == NULL);
1080         LASSERT(osd->od_obj_area != NULL);
1081
1082         oth = container_of(th, struct osd_thandle, ot_super);
1083         LASSERT(oth->ot_handle->h_transaction != NULL);
1084
1085         if (hint && hint->dah_parent)
1086                 parent = osd_dt_obj(hint->dah_parent)->oo_inode;
1087         else
1088                 parent = osd->od_obj_area->d_inode;
1089         LASSERT(parent->i_op != NULL);
1090
1091         inode = ldiskfs_create_inode(oth->ot_handle, parent, mode);
1092         if (!IS_ERR(inode)) {
1093                 obj->oo_inode = inode;
1094                 result = 0;
1095         } else
1096                 result = PTR_ERR(inode);
1097         LASSERT(osd_invariant(obj));
1098         return result;
1099 }
1100
1101
1102 extern int iam_lvar_create(struct inode *obj, int keysize, int ptrsize,
1103                            int recsize, handle_t *handle);
1104
1105 enum {
1106         OSD_NAME_LEN = 255
1107 };
1108
1109 static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj,
1110                      struct lu_attr *attr,
1111                      struct dt_allocation_hint *hint,
1112                      struct thandle *th)
1113 {
1114         int result;
1115         struct osd_thandle *oth;
1116
1117         LASSERT(S_ISDIR(attr->la_mode));
1118
1119         oth = container_of(th, struct osd_thandle, ot_super);
1120         LASSERT(oth->ot_handle->h_transaction != NULL);
1121         result = osd_mkfile(info, obj, (attr->la_mode &
1122                             (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1123         if (result == 0) {
1124                 LASSERT(obj->oo_inode != NULL);
1125                 /*
1126                  * XXX uh-oh... call low-level iam function directly.
1127                  */
1128                 result = iam_lvar_create(obj->oo_inode, OSD_NAME_LEN, 4,
1129                                          sizeof (struct lu_fid_pack),
1130                                          oth->ot_handle);
1131         }
1132         return result;
1133 }
1134
1135 static int osd_mkreg(struct osd_thread_info *info, struct osd_object *obj,
1136                      struct lu_attr *attr,
1137                      struct dt_allocation_hint *hint,
1138                      struct thandle *th)
1139 {
1140         LASSERT(S_ISREG(attr->la_mode));
1141         return osd_mkfile(info, obj, (attr->la_mode &
1142                                (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1143 }
1144
1145 static int osd_mksym(struct osd_thread_info *info, struct osd_object *obj,
1146                      struct lu_attr *attr,
1147                      struct dt_allocation_hint *hint,
1148                      struct thandle *th)
1149 {
1150         LASSERT(S_ISLNK(attr->la_mode));
1151         return osd_mkfile(info, obj, (attr->la_mode &
1152                               (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1153 }
1154
1155 static int osd_mknod(struct osd_thread_info *info, struct osd_object *obj,
1156                      struct lu_attr *attr,
1157                      struct dt_allocation_hint *hint,
1158                      struct thandle *th)
1159 {
1160         int result;
1161         struct osd_device *osd = osd_obj2dev(obj);
1162         struct inode      *dir;
1163         umode_t mode = attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX);
1164
1165         LASSERT(osd_invariant(obj));
1166         LASSERT(obj->oo_inode == NULL);
1167         LASSERT(osd->od_obj_area != NULL);
1168         LASSERT(S_ISCHR(mode) || S_ISBLK(mode) ||
1169                 S_ISFIFO(mode) || S_ISSOCK(mode));
1170
1171         dir = osd->od_obj_area->d_inode;
1172         LASSERT(dir->i_op != NULL);
1173
1174         result = osd_mkfile(info, obj, mode, hint, th);
1175         if (result == 0) {
1176                 LASSERT(obj->oo_inode != NULL);
1177                 init_special_inode(obj->oo_inode, mode, attr->la_rdev);
1178         }
1179         LASSERT(osd_invariant(obj));
1180         return result;
1181 }
1182
1183 typedef int (*osd_obj_type_f)(struct osd_thread_info *, struct osd_object *,
1184                               struct lu_attr *,
1185                               struct dt_allocation_hint *hint,
1186                               struct thandle *);
1187
1188 static osd_obj_type_f osd_create_type_f(__u32 mode)
1189 {
1190         osd_obj_type_f result;
1191
1192         switch (mode) {
1193         case S_IFDIR:
1194                 result = osd_mkdir;
1195                 break;
1196         case S_IFREG:
1197                 result = osd_mkreg;
1198                 break;
1199         case S_IFLNK:
1200                 result = osd_mksym;
1201                 break;
1202         case S_IFCHR:
1203         case S_IFBLK:
1204         case S_IFIFO:
1205         case S_IFSOCK:
1206                 result = osd_mknod;
1207                 break;
1208         default:
1209                 LBUG();
1210                 break;
1211         }
1212         return result;
1213 }
1214
1215
1216 static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah,
1217                         struct dt_object *parent, umode_t child_mode)
1218 {
1219         LASSERT(ah);
1220
1221         memset(ah, 0, sizeof(*ah));
1222         ah->dah_parent = parent;
1223         ah->dah_mode = child_mode;
1224 }
1225
1226
1227 /*
1228  * Concurrency: @dt is write locked.
1229  */
1230 static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
1231                              struct lu_attr *attr, 
1232                              struct dt_allocation_hint *hint,
1233                              struct thandle *th)
1234 {
1235         const struct lu_fid    *fid  = lu_object_fid(&dt->do_lu);
1236         struct osd_object      *obj  = osd_dt_obj(dt);
1237         struct osd_device      *osd  = osd_obj2dev(obj);
1238         struct osd_thread_info *info = osd_oti_get(env);
1239         int result;
1240
1241         ENTRY;
1242
1243         LASSERT(osd_invariant(obj));
1244         LASSERT(!dt_object_exists(dt));
1245         LASSERT(osd_write_locked(env, obj));
1246         LASSERT(th != NULL);
1247
1248         /*
1249          * XXX missing: Quote handling.
1250          */
1251
1252         result = osd_create_pre(info, obj, attr, th);
1253         if (result == 0) {
1254                 result = osd_create_type_f(attr->la_mode & S_IFMT)(info, obj,
1255                                                                 attr, hint, th);
1256                 if (result == 0)
1257                         result = osd_create_post(info, obj, attr, th);
1258         }
1259         if (result == 0) {
1260                 struct osd_inode_id *id = &info->oti_id;
1261
1262                 LASSERT(obj->oo_inode != NULL);
1263
1264                 id->oii_ino = obj->oo_inode->i_ino;
1265                 id->oii_gen = obj->oo_inode->i_generation;
1266
1267                 result = osd_oi_insert(info, &osd->od_oi, fid, id, th);
1268         }
1269
1270         LASSERT(ergo(result == 0, dt_object_exists(dt)));
1271         LASSERT(osd_invariant(obj));
1272         RETURN(result);
1273 }
1274
1275 /*
1276  * Concurrency: @dt is write locked.
1277  */
1278 static void osd_object_ref_add(const struct lu_env *env,
1279                                struct dt_object *dt,
1280                                struct thandle *th)
1281 {
1282         struct osd_object *obj = osd_dt_obj(dt);
1283         struct inode *inode = obj->oo_inode;
1284
1285         LASSERT(osd_invariant(obj));
1286         LASSERT(dt_object_exists(dt));
1287         LASSERT(osd_write_locked(env, obj));
1288         LASSERT(th != NULL);
1289
1290         spin_lock(&obj->oo_guard);
1291         LASSERT(inode->i_nlink < LDISKFS_LINK_MAX);
1292         inode->i_nlink++;
1293         spin_unlock(&obj->oo_guard);
1294         mark_inode_dirty(inode);
1295         LASSERT(osd_invariant(obj));
1296 }
1297
1298 /*
1299  * Concurrency: @dt is write locked.
1300  */
1301 static void osd_object_ref_del(const struct lu_env *env,
1302                                struct dt_object *dt,
1303                                struct thandle *th)
1304 {
1305         struct osd_object *obj = osd_dt_obj(dt);
1306         struct inode *inode = obj->oo_inode;
1307
1308         LASSERT(osd_invariant(obj));
1309         LASSERT(dt_object_exists(dt));
1310         LASSERT(osd_write_locked(env, obj));
1311         LASSERT(th != NULL);
1312
1313         spin_lock(&obj->oo_guard);
1314         LASSERT(inode->i_nlink > 0);
1315         inode->i_nlink--;
1316         spin_unlock(&obj->oo_guard);
1317         mark_inode_dirty(inode);
1318         LASSERT(osd_invariant(obj));
1319 }
1320
1321 /*
1322  * Concurrency: @dt is read locked.
1323  */
1324 static int osd_xattr_get(const struct lu_env *env,
1325                          struct dt_object *dt,
1326                          struct lu_buf *buf,
1327                          const char *name,
1328                          struct lustre_capa *capa)
1329 {
1330         struct osd_object      *obj    = osd_dt_obj(dt);
1331         struct inode           *inode  = obj->oo_inode;
1332         struct osd_thread_info *info   = osd_oti_get(env);
1333         struct dentry          *dentry = &info->oti_dentry;
1334
1335         LASSERT(dt_object_exists(dt));
1336         LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
1337         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
1338
1339         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1340                 return -EACCES;
1341
1342         dentry->d_inode = inode;
1343         return inode->i_op->getxattr(dentry, name, buf->lb_buf, buf->lb_len);
1344 }
1345
1346 /*
1347  * Concurrency: @dt is write locked.
1348  */
1349 static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
1350                          const struct lu_buf *buf, const char *name, int fl,
1351                          struct thandle *handle, struct lustre_capa *capa)
1352 {
1353         struct osd_object      *obj    = osd_dt_obj(dt);
1354         struct inode           *inode  = obj->oo_inode;
1355         struct osd_thread_info *info   = osd_oti_get(env);
1356         struct dentry          *dentry = &info->oti_dentry;
1357         struct timespec        *t      = &info->oti_time;
1358         int                     fs_flags = 0, rc;
1359
1360         LASSERT(dt_object_exists(dt));
1361         LASSERT(inode->i_op != NULL && inode->i_op->setxattr != NULL);
1362         LASSERT(osd_write_locked(env, obj));
1363         LASSERT(handle != NULL);
1364
1365         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1366                 return -EACCES;
1367
1368         if (fl & LU_XATTR_REPLACE)
1369                 fs_flags |= XATTR_REPLACE;
1370
1371         if (fl & LU_XATTR_CREATE)
1372                 fs_flags |= XATTR_CREATE;
1373
1374         dentry->d_inode = inode;
1375         *t = inode->i_ctime;
1376         rc = inode->i_op->setxattr(dentry, name,
1377                                    buf->lb_buf, buf->lb_len, fs_flags);
1378         if (likely(rc == 0)) {
1379                 /* ctime should not be updated with server-side time. */
1380                 spin_lock(&obj->oo_guard);
1381                 inode->i_ctime = *t;
1382                 spin_unlock(&obj->oo_guard);
1383                 mark_inode_dirty(inode);
1384         }
1385         return rc;
1386 }
1387
1388 /*
1389  * Concurrency: @dt is read locked.
1390  */
1391 static int osd_xattr_list(const struct lu_env *env,
1392                           struct dt_object *dt,
1393                           struct lu_buf *buf,
1394                           struct lustre_capa *capa)
1395 {
1396         struct osd_object      *obj    = osd_dt_obj(dt);
1397         struct inode           *inode  = obj->oo_inode;
1398         struct osd_thread_info *info   = osd_oti_get(env);
1399         struct dentry          *dentry = &info->oti_dentry;
1400
1401         LASSERT(dt_object_exists(dt));
1402         LASSERT(inode->i_op != NULL && inode->i_op->listxattr != NULL);
1403         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
1404
1405         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1406                 return -EACCES;
1407
1408         dentry->d_inode = inode;
1409         return inode->i_op->listxattr(dentry, buf->lb_buf, buf->lb_len);
1410 }
1411
1412 /*
1413  * Concurrency: @dt is write locked.
1414  */
1415 static int osd_xattr_del(const struct lu_env *env,
1416                          struct dt_object *dt,
1417                          const char *name,
1418                          struct thandle *handle,
1419                          struct lustre_capa *capa)
1420 {
1421         struct osd_object      *obj    = osd_dt_obj(dt);
1422         struct inode           *inode  = obj->oo_inode;
1423         struct osd_thread_info *info   = osd_oti_get(env);
1424         struct dentry          *dentry = &info->oti_dentry;
1425         struct timespec        *t      = &info->oti_time;
1426         int                     rc;
1427
1428         LASSERT(dt_object_exists(dt));
1429         LASSERT(inode->i_op != NULL && inode->i_op->removexattr != NULL);
1430         LASSERT(osd_write_locked(env, obj));
1431         LASSERT(handle != NULL);
1432
1433         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1434                 return -EACCES;
1435
1436         dentry->d_inode = inode;
1437         *t = inode->i_ctime;
1438         rc = inode->i_op->removexattr(dentry, name);
1439         if (likely(rc == 0)) {
1440                 /* ctime should not be updated with server-side time. */
1441                 spin_lock(&obj->oo_guard);
1442                 inode->i_ctime = *t;
1443                 spin_unlock(&obj->oo_guard);
1444                 mark_inode_dirty(inode);
1445         }
1446         return rc;
1447 }
1448
1449 static struct obd_capa *osd_capa_get(const struct lu_env *env,
1450                                      struct dt_object *dt,
1451                                      struct lustre_capa *old,
1452                                      __u64 opc)
1453 {
1454         struct osd_thread_info *info = osd_oti_get(env);
1455         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
1456         struct osd_object *obj = osd_dt_obj(dt);
1457         struct osd_device *dev = osd_obj2dev(obj);
1458         struct lustre_capa_key *key = &info->oti_capa_key;
1459         struct lustre_capa *capa = &info->oti_capa;
1460         struct obd_capa *oc;
1461         int rc;
1462         ENTRY;
1463
1464         if (!dev->od_fl_capa)
1465                 RETURN(ERR_PTR(-ENOENT));
1466
1467         LASSERT(dt_object_exists(dt));
1468         LASSERT(osd_invariant(obj));
1469
1470         /* renewal sanity check */
1471         if (old && osd_object_auth(env, dt, old, opc))
1472                 RETURN(ERR_PTR(-EACCES));
1473
1474         capa->lc_fid = *fid;
1475         capa->lc_opc = opc;
1476         capa->lc_uid = 0;
1477         capa->lc_flags = dev->od_capa_alg << 24;
1478         capa->lc_timeout = dev->od_capa_timeout;
1479         capa->lc_expiry = 0;
1480
1481         oc = capa_lookup(dev->od_capa_hash, capa, 1);
1482         if (oc) {
1483                 LASSERT(!capa_is_expired(oc));
1484                 RETURN(oc);
1485         }
1486
1487         spin_lock(&capa_lock);
1488         *key = dev->od_capa_keys[1];
1489         spin_unlock(&capa_lock);
1490
1491         capa->lc_keyid = key->lk_keyid;
1492         capa->lc_expiry = cfs_time_current_sec() + dev->od_capa_timeout;
1493
1494         rc = capa_hmac(capa->lc_hmac, capa, key->lk_key);
1495         if (rc) {
1496                 DEBUG_CAPA(D_ERROR, capa, "HMAC failed: %d for", rc);
1497                 RETURN(ERR_PTR(rc));
1498         }
1499
1500         oc = capa_add(dev->od_capa_hash, capa);
1501         RETURN(oc);
1502 }
1503
1504 static int osd_object_sync(const struct lu_env *env, struct dt_object *dt)
1505 {
1506         int rc;
1507         struct osd_object      *obj    = osd_dt_obj(dt);
1508         struct inode           *inode  = obj->oo_inode;
1509         struct osd_thread_info *info   = osd_oti_get(env);
1510         struct dentry          *dentry = &info->oti_dentry;
1511         struct file            *file   = &info->oti_file;
1512         ENTRY;
1513
1514         dentry->d_inode = inode;
1515         file->f_dentry = dentry;
1516         file->f_mapping = inode->i_mapping;
1517         file->f_op = inode->i_fop;
1518         LOCK_INODE_MUTEX(inode);
1519         rc = file->f_op->fsync(file, dentry, 0);
1520         UNLOCK_INODE_MUTEX(inode);
1521         RETURN(rc);
1522 }
1523
1524 static struct dt_object_operations osd_obj_ops = {
1525         .do_read_lock    = osd_object_read_lock,
1526         .do_write_lock   = osd_object_write_lock,
1527         .do_read_unlock  = osd_object_read_unlock,
1528         .do_write_unlock = osd_object_write_unlock,
1529         .do_attr_get     = osd_attr_get,
1530         .do_attr_set     = osd_attr_set,
1531         .do_ah_init      = osd_ah_init,
1532         .do_create       = osd_object_create,
1533         .do_index_try    = osd_index_try,
1534         .do_ref_add      = osd_object_ref_add,
1535         .do_ref_del      = osd_object_ref_del,
1536         .do_xattr_get    = osd_xattr_get,
1537         .do_xattr_set    = osd_xattr_set,
1538         .do_xattr_del    = osd_xattr_del,
1539         .do_xattr_list   = osd_xattr_list,
1540         .do_capa_get     = osd_capa_get,
1541         .do_object_sync  = osd_object_sync,
1542 };
1543
1544 /*
1545  * Body operations.
1546  */
1547
1548 /*
1549  * XXX: Another layering violation for now.
1550  *
1551  * We don't want to use ->f_op->read methods, because generic file write
1552  *
1553  *         - serializes on ->i_sem, and
1554  *
1555  *         - does a lot of extra work like balance_dirty_pages(),
1556  *
1557  * which doesn't work for globally shared files like /last-received.
1558  */
1559 int fsfilt_ldiskfs_read(struct inode *inode, void *buf, int size, loff_t *offs);
1560 int fsfilt_ldiskfs_write_handle(struct inode *inode, void *buf, int bufsize,
1561                                 loff_t *offs, handle_t *handle);
1562
1563 static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt,
1564                         struct lu_buf *buf, loff_t *pos,
1565                         struct lustre_capa *capa)
1566 {
1567         struct inode *inode = osd_dt_obj(dt)->oo_inode;
1568
1569         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
1570                 RETURN(-EACCES);
1571
1572         return fsfilt_ldiskfs_read(inode, buf->lb_buf, buf->lb_len, pos);
1573 }
1574
1575 static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
1576                          const struct lu_buf *buf, loff_t *pos,
1577                          struct thandle *handle, struct lustre_capa *capa)
1578 {
1579         struct inode       *inode = osd_dt_obj(dt)->oo_inode;
1580         struct osd_thandle *oh;
1581         ssize_t             result;
1582
1583         LASSERT(handle != NULL);
1584
1585         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_WRITE))
1586                 RETURN(-EACCES);
1587
1588         oh = container_of(handle, struct osd_thandle, ot_super);
1589         LASSERT(oh->ot_handle->h_transaction != NULL);
1590         result = fsfilt_ldiskfs_write_handle(inode, buf->lb_buf, buf->lb_len,
1591                                              pos, oh->ot_handle);
1592         if (result == 0)
1593                 result = buf->lb_len;
1594         return result;
1595 }
1596
1597 static struct dt_body_operations osd_body_ops = {
1598         .dbo_read  = osd_read,
1599         .dbo_write = osd_write
1600 };
1601
1602 /*
1603  * Index operations.
1604  */
1605
1606 static int osd_object_is_root(const struct osd_object *obj)
1607 {
1608         return osd_sb(osd_obj2dev(obj))->s_root->d_inode == obj->oo_inode;
1609 }
1610
1611 static int osd_index_probe(const struct lu_env *env, struct osd_object *o,
1612                            const struct dt_index_features *feat)
1613 {
1614         struct iam_descr *descr;
1615
1616         if (osd_object_is_root(o))
1617                 return feat == &dt_directory_features;
1618
1619         LASSERT(o->oo_dir != NULL);
1620
1621         descr = o->oo_dir->od_container.ic_descr;
1622         if (feat == &dt_directory_features)
1623                 return descr == &iam_htree_compat_param ||
1624                         (descr->id_rec_size == sizeof(struct lu_fid_pack) &&
1625                          1 /*
1626                             * XXX check that index looks like directory.
1627                             */
1628                                 );
1629         else
1630                 return
1631                         feat->dif_keysize_min <= descr->id_key_size &&
1632                         descr->id_key_size <= feat->dif_keysize_max &&
1633                         feat->dif_recsize_min <= descr->id_rec_size &&
1634                         descr->id_rec_size <= feat->dif_recsize_max &&
1635                         !(feat->dif_flags & (DT_IND_VARKEY |
1636                                              DT_IND_VARREC | DT_IND_NONUNQ)) &&
1637                         ergo(feat->dif_flags & DT_IND_UPDATE,
1638                              1 /* XXX check that object (and file system) is
1639                                 * writable */);
1640 }
1641
1642 static int osd_container_init(const struct lu_env *env,
1643                               struct osd_object *obj,
1644                               struct osd_directory *dir)
1645 {
1646         int result;
1647         struct iam_container *bag;
1648
1649         bag    = &dir->od_container;
1650         result = iam_container_init(bag, &dir->od_descr, obj->oo_inode);
1651         if (result == 0) {
1652                 result = iam_container_setup(bag);
1653                 if (result == 0)
1654                         obj->oo_dt.do_index_ops = &osd_index_ops;
1655                 else
1656                         iam_container_fini(bag);
1657         }
1658         return result;
1659 }
1660
1661 /*
1662  * Concurrency: no external locking is necessary.
1663  */
1664 static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
1665                          const struct dt_index_features *feat)
1666 {
1667         int result;
1668         struct osd_object *obj = osd_dt_obj(dt);
1669
1670         LASSERT(osd_invariant(obj));
1671         LASSERT(dt_object_exists(dt));
1672
1673         if (osd_object_is_root(obj)) {
1674                 dt->do_index_ops = &osd_index_compat_ops;
1675                 result = 0;
1676         } else if (!osd_has_index(obj)) {
1677                 struct osd_directory *dir;
1678
1679                 OBD_ALLOC_PTR(dir);
1680                 if (dir != NULL) {
1681                         sema_init(&dir->od_sem, 1);
1682
1683                         spin_lock(&obj->oo_guard);
1684                         if (obj->oo_dir == NULL)
1685                                 obj->oo_dir = dir;
1686                         else
1687                                 /*
1688                                  * Concurrent thread allocated container data.
1689                                  */
1690                                 OBD_FREE_PTR(dir);
1691                         spin_unlock(&obj->oo_guard);
1692                         /*
1693                          * Now, that we have container data, serialize its
1694                          * initialization.
1695                          */
1696                         down(&obj->oo_dir->od_sem);
1697                         /*
1698                          * recheck under lock.
1699                          */
1700                         if (!osd_has_index(obj))
1701                                 result = osd_container_init(env, obj, dir);
1702                         else
1703                                 result = 0;
1704                         up(&obj->oo_dir->od_sem);
1705                 } else
1706                         result = -ENOMEM;
1707         } else
1708                 result = 0;
1709
1710         if (result == 0) {
1711                 if (!osd_index_probe(env, obj, feat))
1712                         result = -ENOTDIR;
1713         }
1714         LASSERT(osd_invariant(obj));
1715
1716         return result;
1717 }
1718
1719 static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
1720                             const struct dt_key *key, struct thandle *handle,
1721                             struct lustre_capa *capa)
1722 {
1723         struct osd_object     *obj = osd_dt_obj(dt);
1724         struct osd_thandle    *oh;
1725         struct iam_path_descr *ipd;
1726         struct iam_container  *bag = &obj->oo_dir->od_container;
1727         int rc;
1728
1729         ENTRY;
1730
1731         LASSERT(osd_invariant(obj));
1732         LASSERT(dt_object_exists(dt));
1733         LASSERT(bag->ic_object == obj->oo_inode);
1734         LASSERT(handle != NULL);
1735
1736         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
1737                 RETURN(-EACCES);
1738
1739         ipd = osd_ipd_get(env, bag);
1740         if (unlikely(ipd == NULL))
1741                 RETURN(-ENOMEM);
1742
1743         oh = container_of0(handle, struct osd_thandle, ot_super);
1744         LASSERT(oh->ot_handle != NULL);
1745         LASSERT(oh->ot_handle->h_transaction != NULL);
1746
1747         rc = iam_delete(oh->ot_handle, bag, (const struct iam_key *)key, ipd);
1748         osd_ipd_put(env, bag, ipd);
1749         LASSERT(osd_invariant(obj));
1750         RETURN(rc);
1751 }
1752
1753 static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
1754                             struct dt_rec *rec, const struct dt_key *key,
1755                             struct lustre_capa *capa)
1756 {
1757         struct osd_object     *obj = osd_dt_obj(dt);
1758         struct iam_path_descr *ipd;
1759         struct iam_container  *bag = &obj->oo_dir->od_container;
1760         int rc;
1761
1762         ENTRY;
1763
1764         LASSERT(osd_invariant(obj));
1765         LASSERT(dt_object_exists(dt));
1766         LASSERT(bag->ic_object == obj->oo_inode);
1767
1768         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
1769                 return -EACCES;
1770
1771         ipd = osd_ipd_get(env, bag);
1772         if (unlikely(ipd == NULL))
1773                 RETURN(-ENOMEM);
1774
1775         rc = iam_lookup(bag, (const struct iam_key *)key,
1776                         (struct iam_rec *)rec, ipd);
1777         osd_ipd_put(env, bag, ipd);
1778         LASSERT(osd_invariant(obj));
1779
1780         RETURN(rc);
1781 }
1782
1783 static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
1784                             const struct dt_rec *rec, const struct dt_key *key,
1785                             struct thandle *th, struct lustre_capa *capa)
1786 {
1787         struct osd_object     *obj = osd_dt_obj(dt);
1788         struct iam_path_descr *ipd;
1789         struct osd_thandle    *oh;
1790         struct iam_container  *bag = &obj->oo_dir->od_container;
1791         int rc;
1792
1793         ENTRY;
1794
1795         LASSERT(osd_invariant(obj));
1796         LASSERT(dt_object_exists(dt));
1797         LASSERT(bag->ic_object == obj->oo_inode);
1798         LASSERT(th != NULL);
1799
1800         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
1801                 return -EACCES;
1802
1803         ipd = osd_ipd_get(env, bag);
1804         if (unlikely(ipd == NULL))
1805                 RETURN(-ENOMEM);
1806
1807         oh = container_of0(th, struct osd_thandle, ot_super);
1808         LASSERT(oh->ot_handle != NULL);
1809         LASSERT(oh->ot_handle->h_transaction != NULL);
1810         rc = iam_insert(oh->ot_handle, bag, (const struct iam_key *)key,
1811                         (struct iam_rec *)rec, ipd);
1812         osd_ipd_put(env, bag, ipd);
1813         LASSERT(osd_invariant(obj));
1814         RETURN(rc);
1815 }
1816
1817 /*
1818  * Iterator operations.
1819  */
1820 struct osd_it {
1821         struct osd_object     *oi_obj;
1822         struct iam_path_descr *oi_ipd;
1823         struct iam_iterator    oi_it;
1824 };
1825
1826 static struct dt_it *osd_it_init(const struct lu_env *env,
1827                                  struct dt_object *dt, int writable,
1828                                  struct lustre_capa *capa)
1829 {
1830         struct osd_it         *it;
1831         struct osd_object     *obj = osd_dt_obj(dt);
1832         struct lu_object      *lo  = &dt->do_lu;
1833         struct iam_path_descr *ipd;
1834         struct iam_container  *bag = &obj->oo_dir->od_container;
1835         __u32                  flags;
1836
1837         LASSERT(lu_object_exists(lo));
1838
1839         if (osd_object_auth(env, dt, capa, writable ? CAPA_OPC_BODY_WRITE :
1840                             CAPA_OPC_BODY_READ))
1841                 return ERR_PTR(-EACCES);
1842
1843         flags = writable ? IAM_IT_MOVE|IAM_IT_WRITE : IAM_IT_MOVE;
1844         OBD_ALLOC_PTR(it);
1845         if (it != NULL) {
1846                 /*
1847                  * XXX: as ipd is allocated within osd_thread_info, assignment
1848                  * below implies that iterator usage is confined within single
1849                  * environment.
1850                  */
1851                 ipd = osd_ipd_get(env, bag);
1852                 if (likely(ipd != NULL)) {
1853                         it->oi_obj = obj;
1854                         it->oi_ipd = ipd;
1855                         lu_object_get(lo);
1856                         iam_it_init(&it->oi_it, bag, flags, ipd);
1857                         return (struct dt_it *)it;
1858                 } else
1859                         OBD_FREE_PTR(it);
1860         }
1861         return ERR_PTR(-ENOMEM);
1862 }
1863
1864 static void osd_it_fini(const struct lu_env *env, struct dt_it *di)
1865 {
1866         struct osd_it     *it = (struct osd_it *)di;
1867         struct osd_object *obj = it->oi_obj;
1868
1869         iam_it_fini(&it->oi_it);
1870         osd_ipd_put(env, &obj->oo_dir->od_container, it->oi_ipd);
1871         lu_object_put(env, &obj->oo_dt.do_lu);
1872         OBD_FREE_PTR(it);
1873 }
1874
1875 static int osd_it_get(const struct lu_env *env,
1876                       struct dt_it *di, const struct dt_key *key)
1877 {
1878         struct osd_it *it = (struct osd_it *)di;
1879
1880         return iam_it_get(&it->oi_it, (const struct iam_key *)key);
1881 }
1882
1883 static void osd_it_put(const struct lu_env *env, struct dt_it *di)
1884 {
1885         struct osd_it *it = (struct osd_it *)di;
1886
1887         iam_it_put(&it->oi_it);
1888 }
1889
1890 static int osd_it_next(const struct lu_env *env, struct dt_it *di)
1891 {
1892         struct osd_it *it = (struct osd_it *)di;
1893
1894         return iam_it_next(&it->oi_it);
1895 }
1896
1897 static int osd_it_del(const struct lu_env *env, struct dt_it *di,
1898                       struct thandle *th)
1899 {
1900         struct osd_it      *it = (struct osd_it *)di;
1901         struct osd_thandle *oh;
1902
1903         LASSERT(th != NULL);
1904
1905         oh = container_of0(th, struct osd_thandle, ot_super);
1906         LASSERT(oh->ot_handle != NULL);
1907         LASSERT(oh->ot_handle->h_transaction != NULL);
1908
1909         return iam_it_rec_delete(oh->ot_handle, &it->oi_it);
1910 }
1911
1912 static struct dt_key *osd_it_key(const struct lu_env *env,
1913                                  const struct dt_it *di)
1914 {
1915         struct osd_it *it = (struct osd_it *)di;
1916
1917         return (struct dt_key *)iam_it_key_get(&it->oi_it);
1918 }
1919
1920 static int osd_it_key_size(const struct lu_env *env, const struct dt_it *di)
1921 {
1922         struct osd_it *it = (struct osd_it *)di;
1923
1924         return iam_it_key_size(&it->oi_it);
1925 }
1926
1927 static struct dt_rec *osd_it_rec(const struct lu_env *env,
1928                                  const struct dt_it *di)
1929 {
1930         struct osd_it *it = (struct osd_it *)di;
1931
1932         return (struct dt_rec *)iam_it_rec_get(&it->oi_it);
1933 }
1934
1935 static __u64 osd_it_store(const struct lu_env *env, const struct dt_it *di)
1936 {
1937         struct osd_it *it = (struct osd_it *)di;
1938
1939         return iam_it_store(&it->oi_it);
1940 }
1941
1942 static int osd_it_load(const struct lu_env *env,
1943                        const struct dt_it *di, __u64 hash)
1944 {
1945         struct osd_it *it = (struct osd_it *)di;
1946
1947         return iam_it_load(&it->oi_it, hash);
1948 }
1949
1950 static struct dt_index_operations osd_index_ops = {
1951         .dio_lookup = osd_index_lookup,
1952         .dio_insert = osd_index_insert,
1953         .dio_delete = osd_index_delete,
1954         .dio_it     = {
1955                 .init     = osd_it_init,
1956                 .fini     = osd_it_fini,
1957                 .get      = osd_it_get,
1958                 .put      = osd_it_put,
1959                 .del      = osd_it_del,
1960                 .next     = osd_it_next,
1961                 .key      = osd_it_key,
1962                 .key_size = osd_it_key_size,
1963                 .rec      = osd_it_rec,
1964                 .store    = osd_it_store,
1965                 .load     = osd_it_load
1966         }
1967 };
1968
1969 static int osd_index_compat_delete(const struct lu_env *env,
1970                                    struct dt_object *dt,
1971                                    const struct dt_key *key,
1972                                    struct thandle *handle,
1973                                    struct lustre_capa *capa)
1974 {
1975         struct osd_object *obj = osd_dt_obj(dt);
1976
1977         LASSERT(handle != NULL);
1978         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
1979         ENTRY;
1980
1981 #if 0
1982         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
1983                 RETURN(-EACCES);
1984 #endif
1985
1986         RETURN(-EOPNOTSUPP);
1987 }
1988
1989 /*
1990  * Compatibility index operations.
1991  */
1992
1993
1994 static void osd_build_pack(const struct lu_env *env, struct osd_device *osd,
1995                            struct dentry *dentry, struct lu_fid_pack *pack)
1996 {
1997         struct inode  *inode = dentry->d_inode;
1998         struct lu_fid *fid   = &osd_oti_get(env)->oti_fid;
1999
2000         lu_igif_build(fid, inode->i_ino, inode->i_generation);
2001         fid_cpu_to_be(fid, fid);
2002         pack->fp_len = sizeof *fid + 1;
2003         memcpy(pack->fp_area, fid, sizeof *fid);
2004 }
2005
2006 static int osd_index_compat_lookup(const struct lu_env *env,
2007                                    struct dt_object *dt,
2008                                    struct dt_rec *rec, const struct dt_key *key,
2009                                    struct lustre_capa *capa)
2010 {
2011         struct osd_object *obj = osd_dt_obj(dt);
2012
2013         struct osd_device      *osd  = osd_obj2dev(obj);
2014         struct osd_thread_info *info = osd_oti_get(env);
2015         struct inode           *dir;
2016
2017         int result;
2018
2019         /*
2020          * XXX temporary solution.
2021          */
2022         struct dentry *dentry;
2023         struct dentry *parent;
2024
2025         LASSERT(osd_invariant(obj));
2026         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
2027         LASSERT(osd_has_index(obj));
2028
2029         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
2030                 return -EACCES;
2031
2032         info->oti_str.name = (const char *)key;
2033         info->oti_str.len  = strlen((const char *)key);
2034
2035         dir = obj->oo_inode;
2036         LASSERT(dir->i_op != NULL && dir->i_op->lookup != NULL);
2037
2038         parent = d_alloc_root(dir);
2039         if (parent == NULL)
2040                 return -ENOMEM;
2041         igrab(dir);
2042         dentry = d_alloc(parent, &info->oti_str);
2043         if (dentry != NULL) {
2044                 struct dentry *d;
2045
2046                 /*
2047                  * XXX passing NULL for nameidata should work for
2048                  * ext3/ldiskfs.
2049                  */
2050                 d = dir->i_op->lookup(dir, dentry, NULL);
2051                 if (d == NULL) {
2052                         /*
2053                          * normal case, result is in @dentry.
2054                          */
2055                         if (dentry->d_inode != NULL) {
2056                                 osd_build_pack(env, osd, dentry,
2057                                                (struct lu_fid_pack *)rec);
2058                                 result = 0;
2059                         } else
2060                                 result = -ENOENT;
2061                  } else {
2062                         /* What? Disconnected alias? Ppheeeww... */
2063                         CERROR("Aliasing where not expected\n");
2064                         result = -EIO;
2065                         dput(d);
2066                 }
2067                 dput(dentry);
2068         } else
2069                 result = -ENOMEM;
2070         dput(parent);
2071         LASSERT(osd_invariant(obj));
2072         return result;
2073 }
2074
2075 static int osd_add_rec(struct osd_thread_info *info, struct osd_device *dev,
2076                        struct inode *dir, struct inode *inode, const char *name)
2077 {
2078         struct dentry *old;
2079         struct dentry *new;
2080         struct dentry *parent;
2081
2082         int result;
2083
2084         info->oti_str.name = name;
2085         info->oti_str.len  = strlen(name);
2086
2087         LASSERT(atomic_read(&dir->i_count) > 0);
2088         result = -ENOMEM;
2089         old = d_alloc(dev->od_obj_area, &info->oti_str);
2090         if (old != NULL) {
2091                 d_instantiate(old, inode);
2092                 igrab(inode);
2093                 LASSERT(atomic_read(&dir->i_count) > 0);
2094                 parent = d_alloc_root(dir);
2095                 if (parent != NULL) {
2096                         igrab(dir);
2097                         LASSERT(atomic_read(&dir->i_count) > 1);
2098                         new = d_alloc(parent, &info->oti_str);
2099                         LASSERT(atomic_read(&dir->i_count) > 1);
2100                         if (new != NULL) {
2101                                 LASSERT(atomic_read(&dir->i_count) > 1);
2102                                 result = dir->i_op->link(old, dir, new);
2103                                 LASSERT(atomic_read(&dir->i_count) > 1);
2104                                 dput(new);
2105                                 LASSERT(atomic_read(&dir->i_count) > 1);
2106                         }
2107                         LASSERT(atomic_read(&dir->i_count) > 1);
2108                         dput(parent);
2109                         LASSERT(atomic_read(&dir->i_count) > 0);
2110                 }
2111                 dput(old);
2112         }
2113         LASSERT(atomic_read(&dir->i_count) > 0);
2114         return result;
2115 }
2116
2117
2118 /*
2119  * XXX Temporary stuff.
2120  */
2121 static int osd_index_compat_insert(const struct lu_env *env,
2122                                    struct dt_object *dt,
2123                                    const struct dt_rec *rec,
2124                                    const struct dt_key *key, struct thandle *th,
2125                                    struct lustre_capa *capa)
2126 {
2127         struct osd_object     *obj = osd_dt_obj(dt);
2128
2129         const char          *name = (const char *)key;
2130
2131         struct lu_device    *ludev = dt->do_lu.lo_dev;
2132         struct lu_object    *luch;
2133
2134         struct osd_thread_info   *info = osd_oti_get(env);
2135         const struct lu_fid_pack *pack  = (const struct lu_fid_pack *)rec;
2136         struct lu_fid            *fid   = &osd_oti_get(env)->oti_fid;
2137
2138         int result;
2139
2140         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
2141         LASSERT(osd_invariant(obj));
2142         LASSERT(th != NULL);
2143
2144         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
2145                 return -EACCES;
2146
2147         result = fid_unpack(pack, fid);
2148         if (result != 0)
2149                 return result;
2150
2151         luch = lu_object_find(env, ludev->ld_site, fid);
2152         if (!IS_ERR(luch)) {
2153                 if (lu_object_exists(luch)) {
2154                         struct osd_object *child;
2155
2156                         child = osd_obj(lu_object_locate(luch->lo_header,
2157                                                          ludev->ld_type));
2158                         if (child != NULL)
2159                                 result = osd_add_rec(info, osd_obj2dev(obj),
2160                                                      obj->oo_inode,
2161                                                      child->oo_inode, name);
2162                         else {
2163                                 CERROR("No osd slice.\n");
2164                                 result = -ENOENT;
2165                         }
2166                         LASSERT(osd_invariant(obj));
2167                         LASSERT(osd_invariant(child));
2168                 } else {
2169                         CERROR("Sorry.\n");
2170                         result = -ENOENT;
2171                 }
2172                 lu_object_put(env, luch);
2173         } else
2174                 result = PTR_ERR(luch);
2175         LASSERT(osd_invariant(obj));
2176         return result;
2177 }
2178
2179 static struct dt_index_operations osd_index_compat_ops = {
2180         .dio_lookup = osd_index_compat_lookup,
2181         .dio_insert = osd_index_compat_insert,
2182         .dio_delete = osd_index_compat_delete
2183 };
2184
2185 /* type constructor/destructor: osd_type_init, osd_type_fini */
2186 LU_TYPE_INIT_FINI(osd, &osd_key);
2187
2188 static struct lu_context_key osd_key = {
2189         .lct_tags = LCT_DT_THREAD | LCT_MD_THREAD,
2190         .lct_init = osd_key_init,
2191         .lct_fini = osd_key_fini,
2192         .lct_exit = osd_key_exit
2193 };
2194
2195 static void *osd_key_init(const struct lu_context *ctx,
2196                           struct lu_context_key *key)
2197 {
2198         struct osd_thread_info *info;
2199
2200         OBD_ALLOC_PTR(info);
2201         if (info != NULL)
2202                 info->oti_env = container_of(ctx, struct lu_env, le_ctx);
2203         else
2204                 info = ERR_PTR(-ENOMEM);
2205         return info;
2206 }
2207
2208 /* context key destructor: osd_key_fini */
2209 LU_KEY_FINI(osd, struct osd_thread_info);
2210
2211 static void osd_key_exit(const struct lu_context *ctx,
2212                          struct lu_context_key *key, void *data)
2213 {
2214         struct osd_thread_info *info = data;
2215
2216         LASSERT(info->oti_r_locks == 0);
2217         LASSERT(info->oti_w_locks == 0);
2218         LASSERT(info->oti_txns    == 0);
2219 }
2220
2221 static int osd_device_init(const struct lu_env *env, struct lu_device *d,
2222                            const char *name, struct lu_device *next)
2223 {
2224         int rc;
2225         /* context for commit hooks */
2226         rc = lu_context_init(&osd_dev(d)->od_env_for_commit.le_ctx,
2227                              LCT_MD_THREAD);
2228         if (rc == 0)
2229                 rc = osd_procfs_init(osd_dev(d), name);
2230         return rc;
2231 }
2232
2233 static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
2234 {
2235         struct osd_thread_info *info = osd_oti_get(env);
2236         ENTRY;
2237         if (o->od_obj_area != NULL) {
2238                 dput(o->od_obj_area);
2239                 o->od_obj_area = NULL;
2240         }
2241         osd_oi_fini(info, &o->od_oi);
2242
2243         RETURN(0);
2244 }
2245
2246 static int osd_mount(const struct lu_env *env,
2247                      struct osd_device *o, struct lustre_cfg *cfg)
2248 {
2249         struct lustre_mount_info *lmi;
2250         const char               *dev  = lustre_cfg_string(cfg, 0);
2251         struct osd_thread_info   *info = osd_oti_get(env);
2252         int result;
2253
2254         ENTRY;
2255
2256         if (o->od_mount != NULL) {
2257                 CERROR("Already mounted (%s)\n", dev);
2258                 RETURN(-EEXIST);
2259         }
2260
2261         /* get mount */
2262         lmi = server_get_mount(dev);
2263         if (lmi == NULL) {
2264                 CERROR("Cannot get mount info for %s!\n", dev);
2265                 RETURN(-EFAULT);
2266         }
2267
2268         LASSERT(lmi != NULL);
2269         /* save lustre_mount_info in dt_device */
2270         o->od_mount = lmi;
2271
2272         result = osd_oi_init(info, &o->od_oi, &o->od_dt_dev);
2273         if (result == 0) {
2274                 struct dentry *d;
2275
2276                 d = simple_mkdir(osd_sb(o)->s_root, lmi->lmi_mnt, "*OBJ-TEMP*",
2277                                  0777, 1);
2278                 if (!IS_ERR(d)) {
2279                         o->od_obj_area = d;
2280                 } else
2281                         result = PTR_ERR(d);
2282         }
2283         if (result != 0)
2284                 osd_shutdown(env, o);
2285         RETURN(result);
2286 }
2287
2288 static struct lu_device *osd_device_fini(const struct lu_env *env,
2289                                          struct lu_device *d)
2290 {
2291         int rc;
2292         ENTRY;
2293
2294         shrink_dcache_sb(osd_sb(osd_dev(d)));
2295         osd_sync(env, lu2dt_dev(d));
2296
2297         rc = osd_procfs_fini(osd_dev(d));
2298         if (rc) {
2299                 CERROR("proc fini error %d \n", rc);
2300                 RETURN (ERR_PTR(rc));
2301         }
2302
2303         if (osd_dev(d)->od_mount)
2304                 server_put_mount(osd_dev(d)->od_mount->lmi_name,
2305                                  osd_dev(d)->od_mount->lmi_mnt);
2306         osd_dev(d)->od_mount = NULL;
2307
2308         lu_context_fini(&osd_dev(d)->od_env_for_commit.le_ctx);
2309         RETURN(NULL);
2310 }
2311
2312 static struct lu_device *osd_device_alloc(const struct lu_env *env,
2313                                           struct lu_device_type *t,
2314                                           struct lustre_cfg *cfg)
2315 {
2316         struct lu_device  *l;
2317         struct osd_device *o;
2318
2319         OBD_ALLOC_PTR(o);
2320         if (o != NULL) {
2321                 int result;
2322
2323                 result = dt_device_init(&o->od_dt_dev, t);
2324                 if (result == 0) {
2325                         l = osd2lu_dev(o);
2326                         l->ld_ops = &osd_lu_ops;
2327                         o->od_dt_dev.dd_ops = &osd_dt_ops;
2328                         spin_lock_init(&o->od_osfs_lock);
2329                         o->od_osfs_age = cfs_time_shift_64(-1000);
2330                         o->od_capa_hash = init_capa_hash();
2331                         if (o->od_capa_hash == NULL) {
2332                                 dt_device_fini(&o->od_dt_dev);
2333                                 l = ERR_PTR(-ENOMEM);
2334                         }
2335                 } else
2336                         l = ERR_PTR(result);
2337
2338                 if (IS_ERR(l))
2339                         OBD_FREE_PTR(o);
2340         } else
2341                 l = ERR_PTR(-ENOMEM);
2342         return l;
2343 }
2344
2345 static struct lu_device *osd_device_free(const struct lu_env *env,
2346                                          struct lu_device *d)
2347 {
2348         struct osd_device *o = osd_dev(d);
2349         ENTRY;
2350
2351         cleanup_capa_hash(o->od_capa_hash);
2352         dt_device_fini(&o->od_dt_dev);
2353         OBD_FREE_PTR(o);
2354         RETURN(NULL);
2355 }
2356
2357 static int osd_process_config(const struct lu_env *env,
2358                               struct lu_device *d, struct lustre_cfg *cfg)
2359 {
2360         struct osd_device *o = osd_dev(d);
2361         int err;
2362         ENTRY;
2363
2364         switch(cfg->lcfg_command) {
2365         case LCFG_SETUP:
2366                 err = osd_mount(env, o, cfg);
2367                 break;
2368         case LCFG_CLEANUP:
2369                 err = osd_shutdown(env, o);
2370                 break;
2371         default:
2372                 err = -ENOTTY;
2373         }
2374
2375         RETURN(err);
2376 }
2377 extern void ldiskfs_orphan_cleanup (struct super_block * sb,
2378                                     struct ldiskfs_super_block * es);
2379
2380 static int osd_recovery_complete(const struct lu_env *env,
2381                                  struct lu_device *d)
2382 {
2383         struct osd_device *o = osd_dev(d);
2384         ENTRY;
2385         /* TODO: orphans handling */
2386         ldiskfs_orphan_cleanup(osd_sb(o), LDISKFS_SB(osd_sb(o))->s_es);
2387         RETURN(0);
2388 }
2389
2390 static struct inode *osd_iget(struct osd_thread_info *info,
2391                               struct osd_device *dev,
2392                               const struct osd_inode_id *id)
2393 {
2394         struct inode *inode;
2395
2396         inode = iget(osd_sb(dev), id->oii_ino);
2397         if (inode == NULL) {
2398                 CERROR("no inode\n");
2399                 inode = ERR_PTR(-EACCES);
2400         } else if (is_bad_inode(inode)) {
2401                 CERROR("bad inode\n");
2402                 iput(inode);
2403                 inode = ERR_PTR(-ENOENT);
2404         } else if (inode->i_generation != id->oii_gen) {
2405                 CERROR("stale inode\n");
2406                 iput(inode);
2407                 inode = ERR_PTR(-ESTALE);
2408         }
2409
2410         return inode;
2411
2412 }
2413
2414 static int osd_fid_lookup(const struct lu_env *env,
2415                           struct osd_object *obj, const struct lu_fid *fid)
2416 {
2417         struct osd_thread_info *info;
2418         struct lu_device       *ldev = obj->oo_dt.do_lu.lo_dev;
2419         struct osd_device      *dev;
2420         struct osd_inode_id    *id;
2421         struct osd_oi          *oi;
2422         struct inode           *inode;
2423         int                     result;
2424
2425         LASSERT(osd_invariant(obj));
2426         LASSERT(obj->oo_inode == NULL);
2427         LASSERT(fid_is_sane(fid));
2428         /*
2429          * This assertion checks that osd layer sees only local
2430          * fids. Unfortunately it is somewhat expensive (does a
2431          * cache-lookup). Disabling it for production/acceptance-testing.
2432          */
2433         LASSERT(1 || fid_is_local(ldev->ld_site, fid));
2434
2435         ENTRY;
2436
2437         info = osd_oti_get(env);
2438         dev  = osd_dev(ldev);
2439         id   = &info->oti_id;
2440         oi   = &dev->od_oi;
2441
2442         if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT))
2443                 RETURN(-ENOENT);
2444
2445         result = osd_oi_lookup(info, oi, fid, id);
2446         if (result == 0) {
2447                 inode = osd_iget(info, dev, id);
2448                 if (!IS_ERR(inode)) {
2449                         obj->oo_inode = inode;
2450                         LASSERT(obj->oo_inode->i_sb == osd_sb(dev));
2451                         result = 0;
2452                 } else
2453                         /*
2454                          * If fid wasn't found in oi, inode-less object is
2455                          * created, for which lu_object_exists() returns
2456                          * false. This is used in a (frequent) case when
2457                          * objects are created as locking anchors or
2458                          * place holders for objects yet to be created.
2459                          */
2460                         result = PTR_ERR(inode);
2461         } else if (result == -ENOENT)
2462                 result = 0;
2463         LASSERT(osd_invariant(obj));
2464         RETURN(result);
2465 }
2466
2467 static void osd_inode_getattr(const struct lu_env *env,
2468                               struct inode *inode, struct lu_attr *attr)
2469 {
2470         attr->la_valid      |= LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
2471                                LA_SIZE | LA_BLOCKS | LA_UID | LA_GID |
2472                                LA_FLAGS | LA_NLINK | LA_RDEV | LA_BLKSIZE;
2473
2474         attr->la_atime      = LTIME_S(inode->i_atime);
2475         attr->la_mtime      = LTIME_S(inode->i_mtime);
2476         attr->la_ctime      = LTIME_S(inode->i_ctime);
2477         attr->la_mode       = inode->i_mode;
2478         attr->la_size       = i_size_read(inode);
2479         attr->la_blocks     = inode->i_blocks;
2480         attr->la_uid        = inode->i_uid;
2481         attr->la_gid        = inode->i_gid;
2482         attr->la_flags      = LDISKFS_I(inode)->i_flags;
2483         attr->la_nlink      = inode->i_nlink;
2484         attr->la_rdev       = inode->i_rdev;
2485         attr->la_blksize    = ll_inode_blksize(inode);
2486         attr->la_blkbits    = inode->i_blkbits;
2487 }
2488
2489 /*
2490  * Helpers.
2491  */
2492
2493 static int lu_device_is_osd(const struct lu_device *d)
2494 {
2495         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &osd_lu_ops);
2496 }
2497
2498 static struct osd_object *osd_obj(const struct lu_object *o)
2499 {
2500         LASSERT(lu_device_is_osd(o->lo_dev));
2501         return container_of0(o, struct osd_object, oo_dt.do_lu);
2502 }
2503
2504 static struct osd_device *osd_dt_dev(const struct dt_device *d)
2505 {
2506         LASSERT(lu_device_is_osd(&d->dd_lu_dev));
2507         return container_of0(d, struct osd_device, od_dt_dev);
2508 }
2509
2510 static struct osd_device *osd_dev(const struct lu_device *d)
2511 {
2512         LASSERT(lu_device_is_osd(d));
2513         return osd_dt_dev(container_of0(d, struct dt_device, dd_lu_dev));
2514 }
2515
2516 static struct osd_object *osd_dt_obj(const struct dt_object *d)
2517 {
2518         return osd_obj(&d->do_lu);
2519 }
2520
2521 static struct osd_device *osd_obj2dev(const struct osd_object *o)
2522 {
2523         return osd_dev(o->oo_dt.do_lu.lo_dev);
2524 }
2525
2526 static struct lu_device *osd2lu_dev(struct osd_device *osd)
2527 {
2528         return &osd->od_dt_dev.dd_lu_dev;
2529 }
2530
2531 static struct super_block *osd_sb(const struct osd_device *dev)
2532 {
2533         return dev->od_mount->lmi_mnt->mnt_sb;
2534 }
2535
2536 static journal_t *osd_journal(const struct osd_device *dev)
2537 {
2538         return LDISKFS_SB(osd_sb(dev))->s_journal;
2539 }
2540
2541 static int osd_has_index(const struct osd_object *obj)
2542 {
2543         return obj->oo_dt.do_index_ops != NULL;
2544 }
2545
2546 static int osd_object_invariant(const struct lu_object *l)
2547 {
2548         return osd_invariant(osd_obj(l));
2549 }
2550
2551 static struct lu_object_operations osd_lu_obj_ops = {
2552         .loo_object_init      = osd_object_init,
2553         .loo_object_delete    = osd_object_delete,
2554         .loo_object_release   = osd_object_release,
2555         .loo_object_free      = osd_object_free,
2556         .loo_object_print     = osd_object_print,
2557         .loo_object_invariant = osd_object_invariant
2558 };
2559
2560 static struct lu_device_operations osd_lu_ops = {
2561         .ldo_object_alloc      = osd_object_alloc,
2562         .ldo_process_config    = osd_process_config,
2563         .ldo_recovery_complete = osd_recovery_complete
2564 };
2565
2566 static struct lu_device_type_operations osd_device_type_ops = {
2567         .ldto_init = osd_type_init,
2568         .ldto_fini = osd_type_fini,
2569
2570         .ldto_device_alloc = osd_device_alloc,
2571         .ldto_device_free  = osd_device_free,
2572
2573         .ldto_device_init    = osd_device_init,
2574         .ldto_device_fini    = osd_device_fini
2575 };
2576
2577 static struct lu_device_type osd_device_type = {
2578         .ldt_tags     = LU_DEVICE_DT,
2579         .ldt_name     = LUSTRE_OSD_NAME,
2580         .ldt_ops      = &osd_device_type_ops,
2581         .ldt_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
2582 };
2583
2584 /*
2585  * lprocfs legacy support.
2586  */
2587 static struct obd_ops osd_obd_device_ops = {
2588         .o_owner = THIS_MODULE
2589 };
2590
2591 static int __init osd_mod_init(void)
2592 {
2593         struct lprocfs_static_vars lvars;
2594
2595         lprocfs_osd_init_vars(&lvars);
2596         return class_register_type(&osd_obd_device_ops, NULL, lvars.module_vars,
2597                                    LUSTRE_OSD_NAME, &osd_device_type);
2598 }
2599
2600 static void __exit osd_mod_exit(void)
2601 {
2602         class_unregister_type(LUSTRE_OSD_NAME);
2603 }
2604
2605 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2606 MODULE_DESCRIPTION("Lustre Object Storage Device ("LUSTRE_OSD_NAME")");
2607 MODULE_LICENSE("GPL");
2608
2609 cfs_module(osd, "0.0.2", osd_mod_init, osd_mod_exit);