Whamcloud - gitweb
0269a078b0f5dbb73d7f2e257db9f6529f6c5617
[fs/lustre-release.git] / lustre / osd / osd_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/osd/osd_handler.c
37  *
38  * Top-level entry points into osd module
39  *
40  * Author: Nikita Danilov <nikita@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49
50 /* LUSTRE_VERSION_CODE */
51 #include <lustre_ver.h>
52 /* prerequisite for linux/xattr.h */
53 #include <linux/types.h>
54 /* prerequisite for linux/xattr.h */
55 #include <linux/fs.h>
56 /* XATTR_{REPLACE,CREATE} */
57 #include <linux/xattr.h>
58 /*
59  * XXX temporary stuff: direct access to ldiskfs/jdb. Interface between osd
60  * and file system is not yet specified.
61  */
62 /* handle_t, journal_start(), journal_stop() */
63 #include <linux/jbd.h>
64 /* LDISKFS_SB() */
65 #include <linux/ldiskfs_fs.h>
66 #include <linux/ldiskfs_jbd.h>
67 /* simple_mkdir() */
68 #include <lvfs.h>
69
70 /*
71  * struct OBD_{ALLOC,FREE}*()
72  * OBD_FAIL_CHECK
73  */
74 #include <obd_support.h>
75 /* struct ptlrpc_thread */
76 #include <lustre_net.h>
77
78 /* fid_is_local() */
79 #include <lustre_fid.h>
80 #include <linux/lustre_iam.h>
81
82 #include "osd_internal.h"
83 #include "osd_igif.h"
84
85 struct osd_directory {
86         struct iam_container od_container;
87         struct iam_descr     od_descr;
88         struct semaphore     od_sem;
89 };
90
91 struct osd_object {
92         struct dt_object       oo_dt;
93         /**
94          * Inode for file system object represented by this osd_object. This
95          * inode is pinned for the whole duration of lu_object life.
96          *
97          * Not modified concurrently (either setup early during object
98          * creation, or assigned by osd_object_create() under write lock).
99          */
100         struct inode          *oo_inode;
101         struct rw_semaphore    oo_sem;
102         struct osd_directory  *oo_dir;
103         /** protects inode attributes. */
104         spinlock_t             oo_guard;
105         const struct lu_env   *oo_owner;
106 #ifdef CONFIG_LOCKDEP
107         struct lockdep_map     oo_dep_map;
108 #endif
109 };
110
111 static int   osd_root_get      (const struct lu_env *env,
112                                 struct dt_device *dev, struct lu_fid *f);
113
114 static int   lu_device_is_osd  (const struct lu_device *d);
115 static void  osd_mod_exit      (void) __exit;
116 static int   osd_mod_init      (void) __init;
117 static int   osd_type_init     (struct lu_device_type *t);
118 static void  osd_type_fini     (struct lu_device_type *t);
119 static int   osd_object_init   (const struct lu_env *env,
120                                 struct lu_object *l);
121 static void  osd_object_release(const struct lu_env *env,
122                                 struct lu_object *l);
123 static int   osd_object_print  (const struct lu_env *env, void *cookie,
124                                 lu_printer_t p, const struct lu_object *o);
125 static struct lu_device *osd_device_free   (const struct lu_env *env,
126                                 struct lu_device *m);
127 static void *osd_key_init      (const struct lu_context *ctx,
128                                 struct lu_context_key *key);
129 static void  osd_key_fini      (const struct lu_context *ctx,
130                                 struct lu_context_key *key, void *data);
131 static void  osd_key_exit      (const struct lu_context *ctx,
132                                 struct lu_context_key *key, void *data);
133 static int   osd_has_index     (const struct osd_object *obj);
134 static void  osd_object_init0  (struct osd_object *obj);
135 static int   osd_device_init   (const struct lu_env *env,
136                                 struct lu_device *d, const char *,
137                                 struct lu_device *);
138 static int   osd_fid_lookup    (const struct lu_env *env,
139                                 struct osd_object *obj,
140                                 const struct lu_fid *fid);
141 static void  osd_inode_getattr (const struct lu_env *env,
142                                 struct inode *inode, struct lu_attr *attr);
143 static void  osd_inode_setattr (const struct lu_env *env,
144                                 struct inode *inode, const struct lu_attr *attr);
145 static int   osd_param_is_sane (const struct osd_device *dev,
146                                 const struct txn_param *param);
147 static int   osd_index_lookup  (const struct lu_env *env,
148                                 struct dt_object *dt,
149                                 struct dt_rec *rec, const struct dt_key *key,
150                                 struct lustre_capa *capa);
151 static int   osd_index_insert  (const struct lu_env *env,
152                                 struct dt_object *dt,
153                                 const struct dt_rec *rec,
154                                 const struct dt_key *key,
155                                 struct thandle *handle,
156                                 struct lustre_capa *capa);
157 static int   osd_index_delete  (const struct lu_env *env,
158                                 struct dt_object *dt, const struct dt_key *key,
159                                 struct thandle *handle,
160                                 struct lustre_capa *capa);
161 static int   osd_index_probe   (const struct lu_env *env,
162                                 struct osd_object *o,
163                                 const struct dt_index_features *feat);
164 static int   osd_index_try     (const struct lu_env *env,
165                                 struct dt_object *dt,
166                                 const struct dt_index_features *feat);
167 static void  osd_index_fini    (struct osd_object *o);
168
169 static void  osd_it_fini       (const struct lu_env *env, struct dt_it *di);
170 static int   osd_it_get        (const struct lu_env *env,
171                                 struct dt_it *di, const struct dt_key *key);
172 static void  osd_it_put        (const struct lu_env *env, struct dt_it *di);
173 static int   osd_it_next       (const struct lu_env *env, struct dt_it *di);
174 static int   osd_it_del        (const struct lu_env *env, struct dt_it *di,
175                                 struct thandle *th);
176 static int   osd_it_key_size   (const struct lu_env *env,
177                                 const struct dt_it *di);
178 static void  osd_conf_get      (const struct lu_env *env,
179                                 const struct dt_device *dev,
180                                 struct dt_device_param *param);
181 static void  osd_trans_stop    (const struct lu_env *env,
182                                 struct thandle *th);
183 static int   osd_object_is_root(const struct osd_object *obj);
184
185 static struct osd_object  *osd_obj          (const struct lu_object *o);
186 static struct osd_device  *osd_dev          (const struct lu_device *d);
187 static struct osd_device  *osd_dt_dev       (const struct dt_device *d);
188 static struct osd_object  *osd_dt_obj       (const struct dt_object *d);
189 static struct osd_device  *osd_obj2dev      (const struct osd_object *o);
190 static struct lu_device   *osd2lu_dev       (struct osd_device *osd);
191 static struct lu_device   *osd_device_fini  (const struct lu_env *env,
192                                              struct lu_device *d);
193 static struct lu_device   *osd_device_alloc (const struct lu_env *env,
194                                              struct lu_device_type *t,
195                                              struct lustre_cfg *cfg);
196 static struct lu_object   *osd_object_alloc (const struct lu_env *env,
197                                              const struct lu_object_header *hdr,
198                                              struct lu_device *d);
199 static struct inode       *osd_iget         (struct osd_thread_info *info,
200                                              struct osd_device *dev,
201                                              const struct osd_inode_id *id);
202 static struct super_block *osd_sb           (const struct osd_device *dev);
203 static struct dt_it       *osd_it_init      (const struct lu_env *env,
204                                              struct dt_object *dt, int wable,
205                                              struct lustre_capa *capa);
206 static struct dt_key      *osd_it_key       (const struct lu_env *env,
207                                              const struct dt_it *di);
208 static struct dt_rec      *osd_it_rec       (const struct lu_env *env,
209                                              const struct dt_it *di);
210 static struct timespec    *osd_inode_time   (const struct lu_env *env,
211                                              struct inode *inode,
212                                              __u64 seconds);
213 static struct thandle     *osd_trans_start  (const struct lu_env *env,
214                                              struct dt_device *d,
215                                              struct txn_param *p);
216 static journal_t          *osd_journal      (const struct osd_device *dev);
217
218 static struct lu_device_type_operations osd_device_type_ops;
219 static struct lu_device_type            osd_device_type;
220 static struct lu_object_operations      osd_lu_obj_ops;
221 static struct obd_ops                   osd_obd_device_ops;
222 static struct lu_device_operations      osd_lu_ops;
223 static struct lu_context_key            osd_key;
224 static struct dt_object_operations      osd_obj_ops;
225 static struct dt_body_operations        osd_body_ops;
226 static struct dt_index_operations       osd_index_ops;
227 static struct dt_index_operations       osd_index_compat_ops;
228
229 struct osd_thandle {
230         struct thandle          ot_super;
231         handle_t               *ot_handle;
232         struct journal_callback ot_jcb;
233 };
234
235 /*
236  * Invariants, assertions.
237  */
238
239 /*
240  * XXX: do not enable this, until invariant checking code is made thread safe
241  * in the face of pdirops locking.
242  */
243 #define OSD_INVARIANT_CHECKS (0)
244
245 #if OSD_INVARIANT_CHECKS
246 static int osd_invariant(const struct osd_object *obj)
247 {
248         return
249                 obj != NULL &&
250                 ergo(obj->oo_inode != NULL,
251                      obj->oo_inode->i_sb == osd_sb(osd_obj2dev(obj)) &&
252                      atomic_read(&obj->oo_inode->i_count) > 0) &&
253                 ergo(obj->oo_dir != NULL &&
254                      obj->oo_dir->od_conationer.ic_object != NULL,
255                      obj->oo_dir->od_conationer.ic_object == obj->oo_inode);
256 }
257 #else
258 #define osd_invariant(obj) (1)
259 #endif
260
261 static inline struct osd_thread_info *osd_oti_get(const struct lu_env *env)
262 {
263         return lu_context_key_get(&env->le_ctx, &osd_key);
264 }
265
266 /*
267  * Concurrency: doesn't matter
268  */
269 static int osd_read_locked(const struct lu_env *env, struct osd_object *o)
270 {
271         return osd_oti_get(env)->oti_r_locks > 0;
272 }
273
274 /*
275  * Concurrency: doesn't matter
276  */
277 static int osd_write_locked(const struct lu_env *env, struct osd_object *o)
278 {
279         struct osd_thread_info *oti = osd_oti_get(env);
280         return oti->oti_w_locks > 0 && o->oo_owner == env;
281 }
282
283 /*
284  * Concurrency: doesn't access mutable data
285  */
286 static int osd_root_get(const struct lu_env *env,
287                         struct dt_device *dev, struct lu_fid *f)
288 {
289         struct inode *inode;
290
291         inode = osd_sb(osd_dt_dev(dev))->s_root->d_inode;
292         lu_igif_build(f, inode->i_ino, inode->i_generation);
293         return 0;
294 }
295
296 /*
297  * OSD object methods.
298  */
299
300 /*
301  * Concurrency: no concurrent access is possible that early in object
302  * life-cycle.
303  */
304 static struct lu_object *osd_object_alloc(const struct lu_env *env,
305                                           const struct lu_object_header *hdr,
306                                           struct lu_device *d)
307 {
308         struct osd_object *mo;
309
310         OBD_ALLOC_PTR(mo);
311         if (mo != NULL) {
312                 struct lu_object *l;
313
314                 l = &mo->oo_dt.do_lu;
315                 dt_object_init(&mo->oo_dt, NULL, d);
316                 mo->oo_dt.do_ops = &osd_obj_ops;
317                 l->lo_ops = &osd_lu_obj_ops;
318                 init_rwsem(&mo->oo_sem);
319                 spin_lock_init(&mo->oo_guard);
320                 return l;
321         } else
322                 return NULL;
323 }
324
325 /*
326  * Concurrency: shouldn't matter.
327  */
328 static void osd_object_init0(struct osd_object *obj)
329 {
330         LASSERT(obj->oo_inode != NULL);
331         obj->oo_dt.do_body_ops = &osd_body_ops;
332         obj->oo_dt.do_lu.lo_header->loh_attr |=
333                 (LOHA_EXISTS | (obj->oo_inode->i_mode & S_IFMT));
334 }
335
336 /*
337  * Concurrency: no concurrent access is possible that early in object
338  * life-cycle.
339  */
340 static int osd_object_init(const struct lu_env *env, struct lu_object *l)
341 {
342         struct osd_object *obj = osd_obj(l);
343         int result;
344
345         LINVRNT(osd_invariant(obj));
346
347         result = osd_fid_lookup(env, obj, lu_object_fid(l));
348         if (result == 0) {
349                 if (obj->oo_inode != NULL)
350                         osd_object_init0(obj);
351         }
352         LINVRNT(osd_invariant(obj));
353         return result;
354 }
355
356 /*
357  * Concurrency: no concurrent access is possible that late in object
358  * life-cycle.
359  */
360 static void osd_object_free(const struct lu_env *env, struct lu_object *l)
361 {
362         struct osd_object *obj = osd_obj(l);
363
364         LINVRNT(osd_invariant(obj));
365
366         dt_object_fini(&obj->oo_dt);
367         OBD_FREE_PTR(obj);
368 }
369
370 static struct iam_path_descr *osd_ipd_get(const struct lu_env *env,
371                                           const struct iam_container *bag)
372 {
373         return bag->ic_descr->id_ops->id_ipd_alloc(bag,
374                                                    osd_oti_get(env)->oti_ipd);
375 }
376
377 static void osd_ipd_put(const struct lu_env *env,
378                         const struct iam_container *bag,
379                         struct iam_path_descr *ipd)
380 {
381         bag->ic_descr->id_ops->id_ipd_free(ipd);
382 }
383
384 /*
385  * Concurrency: no concurrent access is possible that late in object
386  * life-cycle.
387  */
388 static void osd_index_fini(struct osd_object *o)
389 {
390         struct iam_container *bag;
391
392         if (o->oo_dir != NULL) {
393                 bag = &o->oo_dir->od_container;
394                 if (o->oo_inode != NULL) {
395                         if (bag->ic_object == o->oo_inode)
396                                 iam_container_fini(bag);
397                 }
398                 OBD_FREE_PTR(o->oo_dir);
399                 o->oo_dir = NULL;
400         }
401 }
402
403 /*
404  * Concurrency: no concurrent access is possible that late in object
405  * life-cycle (for all existing callers, that is. New callers have to provide
406  * their own locking.)
407  */
408 static int osd_inode_unlinked(const struct inode *inode)
409 {
410         return inode->i_nlink == 0;
411 }
412
413 enum {
414         OSD_TXN_OI_DELETE_CREDITS    = 20,
415         OSD_TXN_INODE_DELETE_CREDITS = 20
416 };
417
418 /*
419  * Concurrency: no concurrent access is possible that late in object
420  * life-cycle.
421  */
422 static int osd_inode_remove(const struct lu_env *env, struct osd_object *obj)
423 {
424         const struct lu_fid    *fid = lu_object_fid(&obj->oo_dt.do_lu);
425         struct osd_device      *osd = osd_obj2dev(obj);
426         struct osd_thread_info *oti = osd_oti_get(env);
427         struct txn_param       *prm = &oti->oti_txn;
428         struct thandle         *th;
429         int result;
430
431         txn_param_init(prm, OSD_TXN_OI_DELETE_CREDITS + 
432                             OSD_TXN_INODE_DELETE_CREDITS);
433         th = osd_trans_start(env, &osd->od_dt_dev, prm);
434         if (!IS_ERR(th)) {
435                 result = osd_oi_delete(oti, &osd->od_oi, fid, th);
436                 osd_trans_stop(env, th);
437         } else
438                 result = PTR_ERR(th);
439         return result;
440 }
441
442 /*
443  * Called just before object is freed. Releases all resources except for
444  * object itself (that is released by osd_object_free()).
445  *
446  * Concurrency: no concurrent access is possible that late in object
447  * life-cycle.
448  */
449 static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
450 {
451         struct osd_object *obj   = osd_obj(l);
452         struct inode      *inode = obj->oo_inode;
453
454         LINVRNT(osd_invariant(obj));
455
456         /*
457          * If object is unlinked remove fid->ino mapping from object index.
458          *
459          * File body will be deleted by iput().
460          */
461
462         osd_index_fini(obj);
463         if (inode != NULL) {
464                 int result;
465
466                 if (osd_inode_unlinked(inode)) {
467                         result = osd_inode_remove(env, obj);
468                         if (result != 0)
469                                 LU_OBJECT_DEBUG(D_ERROR, env, l,
470                                                 "Failed to cleanup: %d\n",
471                                                 result);
472                 }
473                 iput(inode);
474                 obj->oo_inode = NULL;
475         }
476 }
477
478 /*
479  * Concurrency: ->loo_object_release() is called under site spin-lock.
480  */
481 static void osd_object_release(const struct lu_env *env,
482                                struct lu_object *l)
483 {
484         struct osd_object *o = osd_obj(l);
485
486         LASSERT(!lu_object_is_dying(l->lo_header));
487         if (o->oo_inode != NULL && osd_inode_unlinked(o->oo_inode))
488                 set_bit(LU_OBJECT_HEARD_BANSHEE, &l->lo_header->loh_flags);
489 }
490
491 /*
492  * Concurrency: shouldn't matter.
493  */
494 static int osd_object_print(const struct lu_env *env, void *cookie,
495                             lu_printer_t p, const struct lu_object *l)
496 {
497         struct osd_object *o = osd_obj(l);
498         struct iam_descr  *d;
499
500         if (o->oo_dir != NULL)
501                 d = o->oo_dir->od_container.ic_descr;
502         else
503                 d = NULL;
504         return (*p)(env, cookie, LUSTRE_OSD_NAME"-object@%p(i:%p:%lu/%u)[%s]",
505                     o, o->oo_inode,
506                     o->oo_inode ? o->oo_inode->i_ino : 0UL,
507                     o->oo_inode ? o->oo_inode->i_generation : 0,
508                     d ? d->id_ops->id_name : "plain");
509 }
510
511 /*
512  * Concurrency: shouldn't matter.
513  */
514 int osd_statfs(const struct lu_env *env, struct dt_device *d,
515                struct kstatfs *sfs)
516 {
517         struct osd_device *osd = osd_dt_dev(d);
518         struct super_block *sb = osd_sb(osd);
519         int result = 0;
520
521         spin_lock(&osd->od_osfs_lock);
522         /* cache 1 second */
523         if (cfs_time_before_64(osd->od_osfs_age, cfs_time_shift_64(-1))) {
524                 result = ll_do_statfs(sb, &osd->od_kstatfs);
525                 if (likely(result == 0)) /* N.B. statfs can't really fail */
526                         osd->od_osfs_age = cfs_time_current_64();
527         }
528
529         if (likely(result == 0))
530                 *sfs = osd->od_kstatfs; 
531         spin_unlock(&osd->od_osfs_lock);
532
533         return result;
534 }
535
536 /*
537  * Concurrency: doesn't access mutable data.
538  */
539 static void osd_conf_get(const struct lu_env *env,
540                          const struct dt_device *dev,
541                          struct dt_device_param *param)
542 {
543         /*
544          * XXX should be taken from not-yet-existing fs abstraction layer.
545          */
546         param->ddp_max_name_len  = LDISKFS_NAME_LEN;
547         param->ddp_max_nlink     = LDISKFS_LINK_MAX;
548         param->ddp_block_shift   = osd_sb(osd_dt_dev(dev))->s_blocksize_bits;
549 }
550
551 /*
552  * Journal
553  */
554
555 /*
556  * Concurrency: doesn't access mutable data.
557  */
558 static int osd_param_is_sane(const struct osd_device *dev,
559                              const struct txn_param *param)
560 {
561         return param->tp_credits <= osd_journal(dev)->j_max_transaction_buffers;
562 }
563
564 /*
565  * Concurrency: shouldn't matter.
566  */
567 static void osd_trans_commit_cb(struct journal_callback *jcb, int error)
568 {
569         struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb);
570         struct thandle     *th = &oh->ot_super;
571         struct dt_device   *dev = th->th_dev;
572
573         LASSERT(dev != NULL);
574         LASSERT(oh->ot_handle == NULL);
575
576         if (error) {
577                 CERROR("transaction @0x%p commit error: %d\n", th, error);
578         } else {
579                 struct lu_env *env = &osd_dt_dev(dev)->od_env_for_commit;
580                 /*
581                  * This od_env_for_commit is only for commit usage.  see
582                  * "struct dt_device"
583                  */
584                 lu_context_enter(&env->le_ctx);
585                 dt_txn_hook_commit(env, th);
586                 lu_context_exit(&env->le_ctx);
587         }
588
589         lu_device_put(&dev->dd_lu_dev);
590         th->th_dev = NULL;
591
592         lu_context_exit(&th->th_ctx);
593         lu_context_fini(&th->th_ctx);
594         OBD_FREE_PTR(oh);
595 }
596
597 /*
598  * Concurrency: shouldn't matter.
599  */
600 static struct thandle *osd_trans_start(const struct lu_env *env,
601                                        struct dt_device *d,
602                                        struct txn_param *p)
603 {
604         struct osd_device  *dev = osd_dt_dev(d);
605         handle_t           *jh;
606         struct osd_thandle *oh;
607         struct thandle     *th;
608         int hook_res;
609
610         ENTRY;
611
612         hook_res = dt_txn_hook_start(env, d, p);
613         if (hook_res != 0)
614                 RETURN(ERR_PTR(hook_res));
615
616         if (osd_param_is_sane(dev, p)) {
617                 OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO);
618                 if (oh != NULL) {
619                         /*
620                          * XXX temporary stuff. Some abstraction layer should
621                          * be used.
622                          */
623
624                         jh = journal_start(osd_journal(dev), p->tp_credits);
625                         if (!IS_ERR(jh)) {
626                                 oh->ot_handle = jh;
627                                 th = &oh->ot_super;
628                                 th->th_dev = d;
629                                 th->th_result = 0;
630                                 jh->h_sync = p->tp_sync;
631                                 lu_device_get(&d->dd_lu_dev);
632                                 /* add commit callback */
633                                 lu_context_init(&th->th_ctx, LCT_TX_HANDLE);
634                                 lu_context_enter(&th->th_ctx);
635                                 journal_callback_set(jh, osd_trans_commit_cb,
636                                                      (struct journal_callback *)&oh->ot_jcb);
637 #if OSD_COUNTERS
638                                 {
639                                         struct osd_thread_info *oti =
640                                                 osd_oti_get(env);
641
642                                         LASSERT(oti->oti_txns == 0);
643                                         LASSERT(oti->oti_r_locks == 0);
644                                         LASSERT(oti->oti_w_locks == 0);
645                                         oti->oti_txns++;
646                                 }
647 #endif
648                         } else {
649                                 OBD_FREE_PTR(oh);
650                                 th = (void *)jh;
651                         }
652                 } else
653                         th = ERR_PTR(-ENOMEM);
654         } else {
655                 CERROR("Invalid transaction parameters\n");
656                 th = ERR_PTR(-EINVAL);
657         }
658
659         RETURN(th);
660 }
661
662 /*
663  * Concurrency: shouldn't matter.
664  */
665 static void osd_trans_stop(const struct lu_env *env, struct thandle *th)
666 {
667         int result;
668         struct osd_thandle *oh;
669         struct osd_thread_info *oti = osd_oti_get(env);
670
671         ENTRY;
672
673         oh = container_of0(th, struct osd_thandle, ot_super);
674         if (oh->ot_handle != NULL) {
675                 handle_t *hdl = oh->ot_handle;
676
677                 LASSERT(oti->oti_txns == 1);
678                 oti->oti_txns--;
679                 LASSERT(oti->oti_r_locks == 0);
680                 LASSERT(oti->oti_w_locks == 0);
681                 result = dt_txn_hook_stop(env, th);
682                 if (result != 0)
683                         CERROR("Failure in transaction hook: %d\n", result);
684                 oh->ot_handle = NULL;
685                 result = journal_stop(hdl);
686                 if (result != 0)
687                         CERROR("Failure to stop transaction: %d\n", result);
688         }
689         EXIT;
690 }
691
692 /*
693  * Concurrency: shouldn't matter.
694  */
695 static int osd_sync(const struct lu_env *env, struct dt_device *d)
696 {
697         CDEBUG(D_HA, "syncing OSD %s\n", LUSTRE_OSD_NAME);
698         return ldiskfs_force_commit(osd_sb(osd_dt_dev(d)));
699 }
700
701 /*
702  * Concurrency: shouldn't matter.
703  */
704 lvfs_sbdev_type fsfilt_ldiskfs_journal_sbdev(struct super_block *);
705
706 static void osd_ro(const struct lu_env *env, struct dt_device *d)
707 {
708         ENTRY;
709
710         CERROR("*** setting device %s read-only ***\n", LUSTRE_OSD_NAME);
711
712         __lvfs_set_rdonly(lvfs_sbdev(osd_sb(osd_dt_dev(d))),
713                           fsfilt_ldiskfs_journal_sbdev(osd_sb(osd_dt_dev(d))));
714         EXIT;
715 }
716
717 /*
718  * Concurrency: serialization provided by callers.
719  */
720 static int osd_init_capa_ctxt(const struct lu_env *env, struct dt_device *d,
721                               int mode, unsigned long timeout, __u32 alg,
722                               struct lustre_capa_key *keys)
723 {
724         struct osd_device *dev = osd_dt_dev(d);
725         ENTRY;
726
727         dev->od_fl_capa = mode;
728         dev->od_capa_timeout = timeout;
729         dev->od_capa_alg = alg;
730         dev->od_capa_keys = keys;
731         RETURN(0);
732 }
733
734 /* Note: we did not count into QUOTA here, If we mount with --data_journal
735  * we may need more*/
736 static const int osd_dto_credits[DTO_NR] = {
737         /*
738          * Insert/Delete. IAM EXT3_INDEX_EXTRA_TRANS_BLOCKS(8) +
739          * EXT3_SINGLEDATA_TRANS_BLOCKS 8 XXX Note: maybe iam need more,since
740          * iam have more level than Ext3 htree
741          */
742         [DTO_INDEX_INSERT]  = 16,
743         [DTO_INDEX_DELETE]  = 16,
744         [DTO_IDNEX_UPDATE]  = 16,
745         /*
746          * Create a object. Same as create object in Ext3 filesystem, but did
747          * not count QUOTA i EXT3_DATA_TRANS_BLOCKS(12) +
748          * INDEX_EXTRA_BLOCKS(8) + 3(inode bits,groups, GDT)
749          */
750         [DTO_OBJECT_CREATE] = 23,
751         [DTO_OBJECT_DELETE] = 23,
752         /*
753          * Attr set credits 3 inode, group, GDT
754          */
755         [DTO_ATTR_SET]      = 3,
756         /*
757          * XATTR_SET. SAME AS XATTR of EXT3 EXT3_DATA_TRANS_BLOCKS XXX Note:
758          * in original MDS implmentation EXT3_INDEX_EXTRA_TRANS_BLOCKS are
759          * also counted in. Do not know why?
760          */
761         [DTO_XATTR_SET]     = 16,
762         [DTO_LOG_REC]       = 16,
763         /* creadits for inode change during write */
764         [DTO_WRITE_BASE]    = 3,
765         /* credits for single block write */
766         [DTO_WRITE_BLOCK]   = 12 
767 };
768
769 static int osd_credit_get(const struct lu_env *env, struct dt_device *d,
770                           enum dt_txn_op op)
771 {
772         LASSERT(0 <= op && op < ARRAY_SIZE(osd_dto_credits));
773         return osd_dto_credits[op];
774 }
775
776 static struct dt_device_operations osd_dt_ops = {
777         .dt_root_get       = osd_root_get,
778         .dt_statfs         = osd_statfs,
779         .dt_trans_start    = osd_trans_start,
780         .dt_trans_stop     = osd_trans_stop,
781         .dt_conf_get       = osd_conf_get,
782         .dt_sync           = osd_sync,
783         .dt_ro             = osd_ro,
784         .dt_credit_get     = osd_credit_get,
785         .dt_init_capa_ctxt = osd_init_capa_ctxt,
786 };
787
788 static void osd_object_read_lock(const struct lu_env *env,
789                                  struct dt_object *dt, unsigned role)
790 {
791         struct osd_object *obj = osd_dt_obj(dt);
792         struct osd_thread_info *oti = osd_oti_get(env);
793
794         LINVRNT(osd_invariant(obj));
795
796         LASSERT(obj->oo_owner != env);
797         down_read_nested(&obj->oo_sem, role);
798
799                 LASSERT(obj->oo_owner == NULL);
800                 oti->oti_r_locks++;
801 }
802
803 static void osd_object_write_lock(const struct lu_env *env,
804                                   struct dt_object *dt, unsigned role)
805 {
806         struct osd_object *obj = osd_dt_obj(dt);
807         struct osd_thread_info *oti = osd_oti_get(env);
808
809         LINVRNT(osd_invariant(obj));
810
811         LASSERT(obj->oo_owner != env);
812         down_write_nested(&obj->oo_sem, role);
813
814                 LASSERT(obj->oo_owner == NULL);
815                 obj->oo_owner = env;
816                 oti->oti_w_locks++;
817 }
818
819 static void osd_object_read_unlock(const struct lu_env *env,
820                                    struct dt_object *dt)
821 {
822         struct osd_object *obj = osd_dt_obj(dt);
823                 struct osd_thread_info *oti = osd_oti_get(env);
824
825         LINVRNT(osd_invariant(obj));
826
827                 LASSERT(oti->oti_r_locks > 0);
828                 oti->oti_r_locks--;
829         up_read(&obj->oo_sem);
830 }
831
832 static void osd_object_write_unlock(const struct lu_env *env,
833                                     struct dt_object *dt)
834 {
835         struct osd_object *obj = osd_dt_obj(dt);
836                 struct osd_thread_info *oti = osd_oti_get(env);
837
838         LINVRNT(osd_invariant(obj));
839
840                 LASSERT(obj->oo_owner == env);
841                 LASSERT(oti->oti_w_locks > 0);
842                 oti->oti_w_locks--;
843                 obj->oo_owner = NULL;
844         up_write(&obj->oo_sem);
845 }
846
847 static int capa_is_sane(const struct lu_env *env,
848                         struct osd_device *dev,
849                         struct lustre_capa *capa,
850                         struct lustre_capa_key *keys)
851 {
852         struct osd_thread_info *oti = osd_oti_get(env);
853         struct obd_capa *oc;
854         int i, rc = 0;
855         ENTRY;
856
857         oc = capa_lookup(dev->od_capa_hash, capa, 0);
858         if (oc) {
859                 if (capa_is_expired(oc)) {
860                         DEBUG_CAPA(D_ERROR, capa, "expired");
861                         rc = -ESTALE;
862                 }
863                 capa_put(oc);
864                 RETURN(rc);
865         }
866
867         spin_lock(&capa_lock);
868         for (i = 0; i < 2; i++) {
869                 if (keys[i].lk_keyid == capa->lc_keyid) {
870                         oti->oti_capa_key = keys[i];
871                         break;
872                 }
873         }
874         spin_unlock(&capa_lock);
875
876         if (i == 2) {
877                 DEBUG_CAPA(D_ERROR, capa, "no matched capa key");
878                 RETURN(-ESTALE);
879         }
880
881         rc = capa_hmac(oti->oti_capa.lc_hmac, capa, oti->oti_capa_key.lk_key);
882         if (rc)
883                 RETURN(rc);
884         if (memcmp(oti->oti_capa.lc_hmac, capa->lc_hmac, sizeof(capa->lc_hmac)))
885         {
886                 DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch");
887                 RETURN(-EACCES);
888         }
889
890         oc = capa_add(dev->od_capa_hash, capa);
891         capa_put(oc);
892
893         RETURN(0);
894 }
895
896 static int osd_object_auth(const struct lu_env *env, struct dt_object *dt,
897                            struct lustre_capa *capa, __u64 opc)
898 {
899         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
900         struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
901         int rc;
902
903         if (!dev->od_fl_capa)
904                 return 0;
905
906         if (capa == BYPASS_CAPA)
907                 return 0;
908
909         if (!capa) {
910                 CERROR("no capability is provided for fid "DFID"\n", PFID(fid));
911                 return -EACCES;
912         }
913
914         if (!lu_fid_eq(fid, &capa->lc_fid)) {
915                 DEBUG_CAPA(D_ERROR, capa, "fid "DFID" mismatch with",
916                            PFID(fid));
917                 return -EACCES;
918         }
919
920         if (!capa_opc_supported(capa, opc)) {
921                 DEBUG_CAPA(D_ERROR, capa, "opc "LPX64" not supported by", opc);
922                 return -EACCES;
923         }
924
925         if ((rc = capa_is_sane(env, dev, capa, dev->od_capa_keys))) {
926                 DEBUG_CAPA(D_ERROR, capa, "insane (rc %d)", rc);
927                 return -EACCES;
928         }
929
930         return 0;
931 }
932
933 static int osd_attr_get(const struct lu_env *env,
934                         struct dt_object *dt,
935                         struct lu_attr *attr,
936                         struct lustre_capa *capa)
937 {
938         struct osd_object *obj = osd_dt_obj(dt);
939
940         LASSERT(dt_object_exists(dt));
941         LINVRNT(osd_invariant(obj));
942
943         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
944                 return -EACCES;
945
946         spin_lock(&obj->oo_guard);
947         osd_inode_getattr(env, obj->oo_inode, attr);
948         spin_unlock(&obj->oo_guard);
949         return 0;
950 }
951
952 static int osd_attr_set(const struct lu_env *env,
953                         struct dt_object *dt,
954                         const struct lu_attr *attr,
955                         struct thandle *handle,
956                         struct lustre_capa *capa)
957 {
958         struct osd_object *obj = osd_dt_obj(dt);
959
960         LASSERT(handle != NULL);
961         LASSERT(dt_object_exists(dt));
962         LASSERT(osd_invariant(obj));
963
964         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
965                 return -EACCES;
966
967         spin_lock(&obj->oo_guard);
968         osd_inode_setattr(env, obj->oo_inode, attr);
969         spin_unlock(&obj->oo_guard);
970
971         mark_inode_dirty(obj->oo_inode);
972         return 0;
973 }
974
975 static struct timespec *osd_inode_time(const struct lu_env *env,
976                                        struct inode *inode, __u64 seconds)
977 {
978         struct osd_thread_info *oti = osd_oti_get(env);
979         struct timespec        *t   = &oti->oti_time;
980
981         t->tv_sec  = seconds;
982         t->tv_nsec = 0;
983         *t = timespec_trunc(*t, get_sb_time_gran(inode->i_sb));
984         return t;
985 }
986
987 static void osd_inode_setattr(const struct lu_env *env,
988                               struct inode *inode, const struct lu_attr *attr)
989 {
990         __u64 bits;
991
992         bits = attr->la_valid;
993
994         LASSERT(!(bits & LA_TYPE)); /* Huh? You want too much. */
995
996         if (bits & LA_ATIME)
997                 inode->i_atime  = *osd_inode_time(env, inode, attr->la_atime);
998         if (bits & LA_CTIME)
999                 inode->i_ctime  = *osd_inode_time(env, inode, attr->la_ctime);
1000         if (bits & LA_MTIME)
1001                 inode->i_mtime  = *osd_inode_time(env, inode, attr->la_mtime);
1002         if (bits & LA_SIZE) {
1003                 LDISKFS_I(inode)->i_disksize = attr->la_size;
1004                 i_size_write(inode, attr->la_size);
1005         }
1006         if (bits & LA_BLOCKS)
1007                 inode->i_blocks = attr->la_blocks;
1008         if (bits & LA_MODE)
1009                 inode->i_mode   = (inode->i_mode & S_IFMT) |
1010                         (attr->la_mode & ~S_IFMT);
1011         if (bits & LA_UID)
1012                 inode->i_uid    = attr->la_uid;
1013         if (bits & LA_GID)
1014                 inode->i_gid    = attr->la_gid;
1015         if (bits & LA_NLINK)
1016                 inode->i_nlink  = attr->la_nlink;
1017         if (bits & LA_RDEV)
1018                 inode->i_rdev   = attr->la_rdev;
1019
1020         if (bits & LA_FLAGS) {
1021                 struct ldiskfs_inode_info *li = LDISKFS_I(inode);
1022
1023                 li->i_flags = (li->i_flags & ~LDISKFS_FL_USER_MODIFIABLE) |
1024                         (attr->la_flags & LDISKFS_FL_USER_MODIFIABLE);
1025         }
1026 }
1027
1028 /*
1029  * Object creation.
1030  *
1031  * XXX temporary solution.
1032  */
1033
1034 static int osd_create_pre(struct osd_thread_info *info, struct osd_object *obj,
1035                           struct lu_attr *attr, struct thandle *th)
1036 {
1037         return 0;
1038 }
1039
1040 static int osd_create_post(struct osd_thread_info *info, struct osd_object *obj,
1041                            struct lu_attr *attr, struct thandle *th)
1042 {
1043         LASSERT(obj->oo_inode != NULL);
1044
1045         osd_object_init0(obj);
1046         return 0;
1047 }
1048
1049 extern struct inode *ldiskfs_create_inode(handle_t *handle,
1050                                           struct inode * dir, int mode);
1051
1052 static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
1053                       umode_t mode,
1054                       struct dt_allocation_hint *hint,
1055                       struct thandle *th)
1056 {
1057         int result;
1058         struct osd_device  *osd = osd_obj2dev(obj);
1059         struct osd_thandle *oth;
1060         struct inode       *parent;
1061         struct inode       *inode;
1062
1063         LINVRNT(osd_invariant(obj));
1064         LASSERT(obj->oo_inode == NULL);
1065         LASSERT(osd->od_obj_area != NULL);
1066
1067         oth = container_of(th, struct osd_thandle, ot_super);
1068         LASSERT(oth->ot_handle->h_transaction != NULL);
1069
1070         if (hint && hint->dah_parent)
1071                 parent = osd_dt_obj(hint->dah_parent)->oo_inode;
1072         else
1073                 parent = osd->od_obj_area->d_inode;
1074         LASSERT(parent->i_op != NULL);
1075
1076         inode = ldiskfs_create_inode(oth->ot_handle, parent, mode);
1077         if (!IS_ERR(inode)) {
1078                 obj->oo_inode = inode;
1079                 result = 0;
1080         } else
1081                 result = PTR_ERR(inode);
1082         LINVRNT(osd_invariant(obj));
1083         return result;
1084 }
1085
1086
1087 extern int iam_lvar_create(struct inode *obj, int keysize, int ptrsize,
1088                            int recsize, handle_t *handle);
1089
1090 enum {
1091         OSD_NAME_LEN = 255
1092 };
1093
1094 static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj,
1095                      struct lu_attr *attr,
1096                      struct dt_allocation_hint *hint,
1097                      struct thandle *th)
1098 {
1099         int result;
1100         struct osd_thandle *oth;
1101
1102         LASSERT(S_ISDIR(attr->la_mode));
1103
1104         oth = container_of(th, struct osd_thandle, ot_super);
1105         LASSERT(oth->ot_handle->h_transaction != NULL);
1106         result = osd_mkfile(info, obj, (attr->la_mode &
1107                             (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1108         if (result == 0) {
1109                 LASSERT(obj->oo_inode != NULL);
1110                 /*
1111                  * XXX uh-oh... call low-level iam function directly.
1112                  */
1113                 result = iam_lvar_create(obj->oo_inode, OSD_NAME_LEN, 4,
1114                                          sizeof (struct lu_fid_pack),
1115                                          oth->ot_handle);
1116         }
1117         return result;
1118 }
1119
1120 static int osd_mkreg(struct osd_thread_info *info, struct osd_object *obj,
1121                      struct lu_attr *attr,
1122                      struct dt_allocation_hint *hint,
1123                      struct thandle *th)
1124 {
1125         LASSERT(S_ISREG(attr->la_mode));
1126         return osd_mkfile(info, obj, (attr->la_mode &
1127                                (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1128 }
1129
1130 static int osd_mksym(struct osd_thread_info *info, struct osd_object *obj,
1131                      struct lu_attr *attr,
1132                      struct dt_allocation_hint *hint,
1133                      struct thandle *th)
1134 {
1135         LASSERT(S_ISLNK(attr->la_mode));
1136         return osd_mkfile(info, obj, (attr->la_mode &
1137                               (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1138 }
1139
1140 static int osd_mknod(struct osd_thread_info *info, struct osd_object *obj,
1141                      struct lu_attr *attr,
1142                      struct dt_allocation_hint *hint,
1143                      struct thandle *th)
1144 {
1145         int result;
1146         struct osd_device *osd = osd_obj2dev(obj);
1147         struct inode      *dir;
1148         umode_t mode = attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX);
1149
1150         LINVRNT(osd_invariant(obj));
1151         LASSERT(obj->oo_inode == NULL);
1152         LASSERT(osd->od_obj_area != NULL);
1153         LASSERT(S_ISCHR(mode) || S_ISBLK(mode) ||
1154                 S_ISFIFO(mode) || S_ISSOCK(mode));
1155
1156         dir = osd->od_obj_area->d_inode;
1157         LASSERT(dir->i_op != NULL);
1158
1159         result = osd_mkfile(info, obj, mode, hint, th);
1160         if (result == 0) {
1161                 LASSERT(obj->oo_inode != NULL);
1162                 init_special_inode(obj->oo_inode, mode, attr->la_rdev);
1163         }
1164         LINVRNT(osd_invariant(obj));
1165         return result;
1166 }
1167
1168 typedef int (*osd_obj_type_f)(struct osd_thread_info *, struct osd_object *,
1169                               struct lu_attr *,
1170                               struct dt_allocation_hint *hint,
1171                               struct thandle *);
1172
1173 static osd_obj_type_f osd_create_type_f(__u32 mode)
1174 {
1175         osd_obj_type_f result;
1176
1177         switch (mode) {
1178         case S_IFDIR:
1179                 result = osd_mkdir;
1180                 break;
1181         case S_IFREG:
1182                 result = osd_mkreg;
1183                 break;
1184         case S_IFLNK:
1185                 result = osd_mksym;
1186                 break;
1187         case S_IFCHR:
1188         case S_IFBLK:
1189         case S_IFIFO:
1190         case S_IFSOCK:
1191                 result = osd_mknod;
1192                 break;
1193         default:
1194                 LBUG();
1195                 break;
1196         }
1197         return result;
1198 }
1199
1200
1201 static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah,
1202                         struct dt_object *parent, umode_t child_mode)
1203 {
1204         LASSERT(ah);
1205
1206         memset(ah, 0, sizeof(*ah));
1207         ah->dah_parent = parent;
1208         ah->dah_mode = child_mode;
1209 }
1210
1211
1212 /*
1213  * Concurrency: @dt is write locked.
1214  */
1215 static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
1216                              struct lu_attr *attr, 
1217                              struct dt_allocation_hint *hint,
1218                              struct thandle *th)
1219 {
1220         const struct lu_fid    *fid  = lu_object_fid(&dt->do_lu);
1221         struct osd_object      *obj  = osd_dt_obj(dt);
1222         struct osd_device      *osd  = osd_obj2dev(obj);
1223         struct osd_thread_info *info = osd_oti_get(env);
1224         int result;
1225
1226         ENTRY;
1227
1228         LINVRNT(osd_invariant(obj));
1229         LASSERT(!dt_object_exists(dt));
1230         LASSERT(osd_write_locked(env, obj));
1231         LASSERT(th != NULL);
1232
1233         /*
1234          * XXX missing: Quote handling.
1235          */
1236
1237         result = osd_create_pre(info, obj, attr, th);
1238         if (result == 0) {
1239                 result = osd_create_type_f(attr->la_mode & S_IFMT)(info, obj,
1240                                                                 attr, hint, th);
1241                 if (result == 0)
1242                         result = osd_create_post(info, obj, attr, th);
1243         }
1244         if (result == 0) {
1245                 struct osd_inode_id *id = &info->oti_id;
1246
1247                 LASSERT(obj->oo_inode != NULL);
1248
1249                 id->oii_ino = obj->oo_inode->i_ino;
1250                 id->oii_gen = obj->oo_inode->i_generation;
1251
1252                 result = osd_oi_insert(info, &osd->od_oi, fid, id, th);
1253         }
1254
1255         LASSERT(ergo(result == 0, dt_object_exists(dt)));
1256         LINVRNT(osd_invariant(obj));
1257         RETURN(result);
1258 }
1259
1260 /*
1261  * Concurrency: @dt is write locked.
1262  */
1263 static void osd_object_ref_add(const struct lu_env *env,
1264                                struct dt_object *dt,
1265                                struct thandle *th)
1266 {
1267         struct osd_object *obj = osd_dt_obj(dt);
1268         struct inode *inode = obj->oo_inode;
1269
1270         LINVRNT(osd_invariant(obj));
1271         LASSERT(dt_object_exists(dt));
1272         LASSERT(osd_write_locked(env, obj));
1273         LASSERT(th != NULL);
1274
1275         spin_lock(&obj->oo_guard);
1276         LASSERT(inode->i_nlink < LDISKFS_LINK_MAX);
1277         inode->i_nlink++;
1278         spin_unlock(&obj->oo_guard);
1279         mark_inode_dirty(inode);
1280         LINVRNT(osd_invariant(obj));
1281 }
1282
1283 /*
1284  * Concurrency: @dt is write locked.
1285  */
1286 static void osd_object_ref_del(const struct lu_env *env,
1287                                struct dt_object *dt,
1288                                struct thandle *th)
1289 {
1290         struct osd_object *obj = osd_dt_obj(dt);
1291         struct inode *inode = obj->oo_inode;
1292
1293         LINVRNT(osd_invariant(obj));
1294         LASSERT(dt_object_exists(dt));
1295         LASSERT(osd_write_locked(env, obj));
1296         LASSERT(th != NULL);
1297
1298         spin_lock(&obj->oo_guard);
1299         LASSERT(inode->i_nlink > 0);
1300         inode->i_nlink--;
1301         spin_unlock(&obj->oo_guard);
1302         mark_inode_dirty(inode);
1303         LINVRNT(osd_invariant(obj));
1304 }
1305
1306 /*
1307  * Concurrency: @dt is read locked.
1308  */
1309 static int osd_xattr_get(const struct lu_env *env,
1310                          struct dt_object *dt,
1311                          struct lu_buf *buf,
1312                          const char *name,
1313                          struct lustre_capa *capa)
1314 {
1315         struct osd_object      *obj    = osd_dt_obj(dt);
1316         struct inode           *inode  = obj->oo_inode;
1317         struct osd_thread_info *info   = osd_oti_get(env);
1318         struct dentry          *dentry = &info->oti_dentry;
1319
1320         LASSERT(dt_object_exists(dt));
1321         LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
1322         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
1323
1324         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1325                 return -EACCES;
1326
1327         dentry->d_inode = inode;
1328         return inode->i_op->getxattr(dentry, name, buf->lb_buf, buf->lb_len);
1329 }
1330
1331 /*
1332  * Concurrency: @dt is write locked.
1333  */
1334 static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
1335                          const struct lu_buf *buf, const char *name, int fl,
1336                          struct thandle *handle, struct lustre_capa *capa)
1337 {
1338         struct osd_object      *obj    = osd_dt_obj(dt);
1339         struct inode           *inode  = obj->oo_inode;
1340         struct osd_thread_info *info   = osd_oti_get(env);
1341         struct dentry          *dentry = &info->oti_dentry;
1342         struct timespec        *t      = &info->oti_time;
1343         int                     fs_flags = 0, rc;
1344
1345         LASSERT(dt_object_exists(dt));
1346         LASSERT(inode->i_op != NULL && inode->i_op->setxattr != NULL);
1347         LASSERT(osd_write_locked(env, obj));
1348         LASSERT(handle != NULL);
1349
1350         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1351                 return -EACCES;
1352
1353         if (fl & LU_XATTR_REPLACE)
1354                 fs_flags |= XATTR_REPLACE;
1355
1356         if (fl & LU_XATTR_CREATE)
1357                 fs_flags |= XATTR_CREATE;
1358
1359         dentry->d_inode = inode;
1360         *t = inode->i_ctime;
1361         rc = inode->i_op->setxattr(dentry, name,
1362                                    buf->lb_buf, buf->lb_len, fs_flags);
1363         if (likely(rc == 0)) {
1364                 /* ctime should not be updated with server-side time. */
1365                 spin_lock(&obj->oo_guard);
1366                 inode->i_ctime = *t;
1367                 spin_unlock(&obj->oo_guard);
1368                 mark_inode_dirty(inode);
1369         }
1370         return rc;
1371 }
1372
1373 /*
1374  * Concurrency: @dt is read locked.
1375  */
1376 static int osd_xattr_list(const struct lu_env *env,
1377                           struct dt_object *dt,
1378                           struct lu_buf *buf,
1379                           struct lustre_capa *capa)
1380 {
1381         struct osd_object      *obj    = osd_dt_obj(dt);
1382         struct inode           *inode  = obj->oo_inode;
1383         struct osd_thread_info *info   = osd_oti_get(env);
1384         struct dentry          *dentry = &info->oti_dentry;
1385
1386         LASSERT(dt_object_exists(dt));
1387         LASSERT(inode->i_op != NULL && inode->i_op->listxattr != NULL);
1388         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
1389
1390         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1391                 return -EACCES;
1392
1393         dentry->d_inode = inode;
1394         return inode->i_op->listxattr(dentry, buf->lb_buf, buf->lb_len);
1395 }
1396
1397 /*
1398  * Concurrency: @dt is write locked.
1399  */
1400 static int osd_xattr_del(const struct lu_env *env,
1401                          struct dt_object *dt,
1402                          const char *name,
1403                          struct thandle *handle,
1404                          struct lustre_capa *capa)
1405 {
1406         struct osd_object      *obj    = osd_dt_obj(dt);
1407         struct inode           *inode  = obj->oo_inode;
1408         struct osd_thread_info *info   = osd_oti_get(env);
1409         struct dentry          *dentry = &info->oti_dentry;
1410         struct timespec        *t      = &info->oti_time;
1411         int                     rc;
1412
1413         LASSERT(dt_object_exists(dt));
1414         LASSERT(inode->i_op != NULL && inode->i_op->removexattr != NULL);
1415         LASSERT(osd_write_locked(env, obj));
1416         LASSERT(handle != NULL);
1417
1418         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1419                 return -EACCES;
1420
1421         dentry->d_inode = inode;
1422         *t = inode->i_ctime;
1423         rc = inode->i_op->removexattr(dentry, name);
1424         if (likely(rc == 0)) {
1425                 /* ctime should not be updated with server-side time. */
1426                 spin_lock(&obj->oo_guard);
1427                 inode->i_ctime = *t;
1428                 spin_unlock(&obj->oo_guard);
1429                 mark_inode_dirty(inode);
1430         }
1431         return rc;
1432 }
1433
1434 static struct obd_capa *osd_capa_get(const struct lu_env *env,
1435                                      struct dt_object *dt,
1436                                      struct lustre_capa *old,
1437                                      __u64 opc)
1438 {
1439         struct osd_thread_info *info = osd_oti_get(env);
1440         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
1441         struct osd_object *obj = osd_dt_obj(dt);
1442         struct osd_device *dev = osd_obj2dev(obj);
1443         struct lustre_capa_key *key = &info->oti_capa_key;
1444         struct lustre_capa *capa = &info->oti_capa;
1445         struct obd_capa *oc;
1446         int rc;
1447         ENTRY;
1448
1449         if (!dev->od_fl_capa)
1450                 RETURN(ERR_PTR(-ENOENT));
1451
1452         LASSERT(dt_object_exists(dt));
1453         LINVRNT(osd_invariant(obj));
1454
1455         /* renewal sanity check */
1456         if (old && osd_object_auth(env, dt, old, opc))
1457                 RETURN(ERR_PTR(-EACCES));
1458
1459         capa->lc_fid = *fid;
1460         capa->lc_opc = opc;
1461         capa->lc_uid = 0;
1462         capa->lc_flags = dev->od_capa_alg << 24;
1463         capa->lc_timeout = dev->od_capa_timeout;
1464         capa->lc_expiry = 0;
1465
1466         oc = capa_lookup(dev->od_capa_hash, capa, 1);
1467         if (oc) {
1468                 LASSERT(!capa_is_expired(oc));
1469                 RETURN(oc);
1470         }
1471
1472         spin_lock(&capa_lock);
1473         *key = dev->od_capa_keys[1];
1474         spin_unlock(&capa_lock);
1475
1476         capa->lc_keyid = key->lk_keyid;
1477         capa->lc_expiry = cfs_time_current_sec() + dev->od_capa_timeout;
1478
1479         rc = capa_hmac(capa->lc_hmac, capa, key->lk_key);
1480         if (rc) {
1481                 DEBUG_CAPA(D_ERROR, capa, "HMAC failed: %d for", rc);
1482                 RETURN(ERR_PTR(rc));
1483         }
1484
1485         oc = capa_add(dev->od_capa_hash, capa);
1486         RETURN(oc);
1487 }
1488
1489 static int osd_object_sync(const struct lu_env *env, struct dt_object *dt)
1490 {
1491         int rc;
1492         struct osd_object      *obj    = osd_dt_obj(dt);
1493         struct inode           *inode  = obj->oo_inode;
1494         struct osd_thread_info *info   = osd_oti_get(env);
1495         struct dentry          *dentry = &info->oti_dentry;
1496         struct file            *file   = &info->oti_file;
1497         ENTRY;
1498
1499         dentry->d_inode = inode;
1500         file->f_dentry = dentry;
1501         file->f_mapping = inode->i_mapping;
1502         file->f_op = inode->i_fop;
1503         LOCK_INODE_MUTEX(inode);
1504         rc = file->f_op->fsync(file, dentry, 0);
1505         UNLOCK_INODE_MUTEX(inode);
1506         RETURN(rc);
1507 }
1508
1509 static struct dt_object_operations osd_obj_ops = {
1510         .do_read_lock    = osd_object_read_lock,
1511         .do_write_lock   = osd_object_write_lock,
1512         .do_read_unlock  = osd_object_read_unlock,
1513         .do_write_unlock = osd_object_write_unlock,
1514         .do_attr_get     = osd_attr_get,
1515         .do_attr_set     = osd_attr_set,
1516         .do_ah_init      = osd_ah_init,
1517         .do_create       = osd_object_create,
1518         .do_index_try    = osd_index_try,
1519         .do_ref_add      = osd_object_ref_add,
1520         .do_ref_del      = osd_object_ref_del,
1521         .do_xattr_get    = osd_xattr_get,
1522         .do_xattr_set    = osd_xattr_set,
1523         .do_xattr_del    = osd_xattr_del,
1524         .do_xattr_list   = osd_xattr_list,
1525         .do_capa_get     = osd_capa_get,
1526         .do_object_sync  = osd_object_sync,
1527 };
1528
1529 /*
1530  * Body operations.
1531  */
1532
1533 /*
1534  * XXX: Another layering violation for now.
1535  *
1536  * We don't want to use ->f_op->read methods, because generic file write
1537  *
1538  *         - serializes on ->i_sem, and
1539  *
1540  *         - does a lot of extra work like balance_dirty_pages(),
1541  *
1542  * which doesn't work for globally shared files like /last-received.
1543  */
1544 int fsfilt_ldiskfs_read(struct inode *inode, void *buf, int size, loff_t *offs);
1545 int fsfilt_ldiskfs_write_handle(struct inode *inode, void *buf, int bufsize,
1546                                 loff_t *offs, handle_t *handle);
1547
1548 static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt,
1549                         struct lu_buf *buf, loff_t *pos,
1550                         struct lustre_capa *capa)
1551 {
1552         struct inode *inode = osd_dt_obj(dt)->oo_inode;
1553
1554         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
1555                 RETURN(-EACCES);
1556
1557         return fsfilt_ldiskfs_read(inode, buf->lb_buf, buf->lb_len, pos);
1558 }
1559
1560 static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
1561                          const struct lu_buf *buf, loff_t *pos,
1562                          struct thandle *handle, struct lustre_capa *capa)
1563 {
1564         struct inode       *inode = osd_dt_obj(dt)->oo_inode;
1565         struct osd_thandle *oh;
1566         ssize_t             result;
1567
1568         LASSERT(handle != NULL);
1569
1570         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_WRITE))
1571                 RETURN(-EACCES);
1572
1573         oh = container_of(handle, struct osd_thandle, ot_super);
1574         LASSERT(oh->ot_handle->h_transaction != NULL);
1575         result = fsfilt_ldiskfs_write_handle(inode, buf->lb_buf, buf->lb_len,
1576                                              pos, oh->ot_handle);
1577         if (result == 0)
1578                 result = buf->lb_len;
1579         return result;
1580 }
1581
1582 static struct dt_body_operations osd_body_ops = {
1583         .dbo_read  = osd_read,
1584         .dbo_write = osd_write
1585 };
1586
1587 /*
1588  * Index operations.
1589  */
1590
1591 static int osd_object_is_root(const struct osd_object *obj)
1592 {
1593         return osd_sb(osd_obj2dev(obj))->s_root->d_inode == obj->oo_inode;
1594 }
1595
1596 static int osd_index_probe(const struct lu_env *env, struct osd_object *o,
1597                            const struct dt_index_features *feat)
1598 {
1599         struct iam_descr *descr;
1600
1601         if (osd_object_is_root(o))
1602                 return feat == &dt_directory_features;
1603
1604         LASSERT(o->oo_dir != NULL);
1605
1606         descr = o->oo_dir->od_container.ic_descr;
1607         if (feat == &dt_directory_features)
1608                 return descr == &iam_htree_compat_param ||
1609                         (descr->id_rec_size == sizeof(struct lu_fid_pack) &&
1610                          1 /*
1611                             * XXX check that index looks like directory.
1612                             */
1613                                 );
1614         else
1615                 return
1616                         feat->dif_keysize_min <= descr->id_key_size &&
1617                         descr->id_key_size <= feat->dif_keysize_max &&
1618                         feat->dif_recsize_min <= descr->id_rec_size &&
1619                         descr->id_rec_size <= feat->dif_recsize_max &&
1620                         !(feat->dif_flags & (DT_IND_VARKEY |
1621                                              DT_IND_VARREC | DT_IND_NONUNQ)) &&
1622                         ergo(feat->dif_flags & DT_IND_UPDATE,
1623                              1 /* XXX check that object (and file system) is
1624                                 * writable */);
1625 }
1626
1627 static int osd_container_init(const struct lu_env *env,
1628                               struct osd_object *obj,
1629                               struct osd_directory *dir)
1630 {
1631         int result;
1632         struct iam_container *bag;
1633
1634         bag    = &dir->od_container;
1635         result = iam_container_init(bag, &dir->od_descr, obj->oo_inode);
1636         if (result == 0) {
1637                 result = iam_container_setup(bag);
1638                 if (result == 0)
1639                         obj->oo_dt.do_index_ops = &osd_index_ops;
1640                 else
1641                         iam_container_fini(bag);
1642         }
1643         return result;
1644 }
1645
1646 /*
1647  * Concurrency: no external locking is necessary.
1648  */
1649 static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
1650                          const struct dt_index_features *feat)
1651 {
1652         int result;
1653         struct osd_object *obj = osd_dt_obj(dt);
1654
1655         LINVRNT(osd_invariant(obj));
1656         LASSERT(dt_object_exists(dt));
1657
1658         if (osd_object_is_root(obj)) {
1659                 dt->do_index_ops = &osd_index_compat_ops;
1660                 result = 0;
1661         } else if (!osd_has_index(obj)) {
1662                 struct osd_directory *dir;
1663
1664                 OBD_ALLOC_PTR(dir);
1665                 if (dir != NULL) {
1666                         sema_init(&dir->od_sem, 1);
1667
1668                         spin_lock(&obj->oo_guard);
1669                         if (obj->oo_dir == NULL)
1670                                 obj->oo_dir = dir;
1671                         else
1672                                 /*
1673                                  * Concurrent thread allocated container data.
1674                                  */
1675                                 OBD_FREE_PTR(dir);
1676                         spin_unlock(&obj->oo_guard);
1677                         /*
1678                          * Now, that we have container data, serialize its
1679                          * initialization.
1680                          */
1681                         down(&obj->oo_dir->od_sem);
1682                         /*
1683                          * recheck under lock.
1684                          */
1685                         if (!osd_has_index(obj))
1686                                 result = osd_container_init(env, obj, dir);
1687                         else
1688                                 result = 0;
1689                         up(&obj->oo_dir->od_sem);
1690                 } else
1691                         result = -ENOMEM;
1692         } else
1693                 result = 0;
1694
1695         if (result == 0) {
1696                 if (!osd_index_probe(env, obj, feat))
1697                         result = -ENOTDIR;
1698         }
1699         LINVRNT(osd_invariant(obj));
1700
1701         return result;
1702 }
1703
1704 static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
1705                             const struct dt_key *key, struct thandle *handle,
1706                             struct lustre_capa *capa)
1707 {
1708         struct osd_object     *obj = osd_dt_obj(dt);
1709         struct osd_thandle    *oh;
1710         struct iam_path_descr *ipd;
1711         struct iam_container  *bag = &obj->oo_dir->od_container;
1712         int rc;
1713
1714         ENTRY;
1715
1716         LINVRNT(osd_invariant(obj));
1717         LASSERT(dt_object_exists(dt));
1718         LASSERT(bag->ic_object == obj->oo_inode);
1719         LASSERT(handle != NULL);
1720
1721         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
1722                 RETURN(-EACCES);
1723
1724         ipd = osd_ipd_get(env, bag);
1725         if (unlikely(ipd == NULL))
1726                 RETURN(-ENOMEM);
1727
1728         oh = container_of0(handle, struct osd_thandle, ot_super);
1729         LASSERT(oh->ot_handle != NULL);
1730         LASSERT(oh->ot_handle->h_transaction != NULL);
1731
1732         rc = iam_delete(oh->ot_handle, bag, (const struct iam_key *)key, ipd);
1733         osd_ipd_put(env, bag, ipd);
1734         LINVRNT(osd_invariant(obj));
1735         RETURN(rc);
1736 }
1737
1738 static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
1739                             struct dt_rec *rec, const struct dt_key *key,
1740                             struct lustre_capa *capa)
1741 {
1742         struct osd_object     *obj = osd_dt_obj(dt);
1743         struct iam_path_descr *ipd;
1744         struct iam_container  *bag = &obj->oo_dir->od_container;
1745         int rc;
1746
1747         ENTRY;
1748
1749         LINVRNT(osd_invariant(obj));
1750         LASSERT(dt_object_exists(dt));
1751         LASSERT(bag->ic_object == obj->oo_inode);
1752
1753         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
1754                 return -EACCES;
1755
1756         ipd = osd_ipd_get(env, bag);
1757         if (unlikely(ipd == NULL))
1758                 RETURN(-ENOMEM);
1759
1760         rc = iam_lookup(bag, (const struct iam_key *)key,
1761                         (struct iam_rec *)rec, ipd);
1762         osd_ipd_put(env, bag, ipd);
1763         LINVRNT(osd_invariant(obj));
1764
1765         RETURN(rc);
1766 }
1767
1768 static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
1769                             const struct dt_rec *rec, const struct dt_key *key,
1770                             struct thandle *th, struct lustre_capa *capa)
1771 {
1772         struct osd_object     *obj = osd_dt_obj(dt);
1773         struct iam_path_descr *ipd;
1774         struct osd_thandle    *oh;
1775         struct iam_container  *bag = &obj->oo_dir->od_container;
1776         int rc;
1777
1778         ENTRY;
1779
1780         LINVRNT(osd_invariant(obj));
1781         LASSERT(dt_object_exists(dt));
1782         LASSERT(bag->ic_object == obj->oo_inode);
1783         LASSERT(th != NULL);
1784
1785         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
1786                 return -EACCES;
1787
1788         ipd = osd_ipd_get(env, bag);
1789         if (unlikely(ipd == NULL))
1790                 RETURN(-ENOMEM);
1791
1792         oh = container_of0(th, struct osd_thandle, ot_super);
1793         LASSERT(oh->ot_handle != NULL);
1794         LASSERT(oh->ot_handle->h_transaction != NULL);
1795         rc = iam_insert(oh->ot_handle, bag, (const struct iam_key *)key,
1796                         (struct iam_rec *)rec, ipd);
1797         osd_ipd_put(env, bag, ipd);
1798         LINVRNT(osd_invariant(obj));
1799         RETURN(rc);
1800 }
1801
1802 /*
1803  * Iterator operations.
1804  */
1805 struct osd_it {
1806         struct osd_object     *oi_obj;
1807         struct iam_path_descr *oi_ipd;
1808         struct iam_iterator    oi_it;
1809 };
1810
1811 static struct dt_it *osd_it_init(const struct lu_env *env,
1812                                  struct dt_object *dt, int writable,
1813                                  struct lustre_capa *capa)
1814 {
1815         struct osd_it         *it;
1816         struct osd_object     *obj = osd_dt_obj(dt);
1817         struct lu_object      *lo  = &dt->do_lu;
1818         struct iam_path_descr *ipd;
1819         struct iam_container  *bag = &obj->oo_dir->od_container;
1820         __u32                  flags;
1821
1822         LASSERT(lu_object_exists(lo));
1823
1824         if (osd_object_auth(env, dt, capa, writable ? CAPA_OPC_BODY_WRITE :
1825                             CAPA_OPC_BODY_READ))
1826                 return ERR_PTR(-EACCES);
1827
1828         flags = writable ? IAM_IT_MOVE|IAM_IT_WRITE : IAM_IT_MOVE;
1829         OBD_ALLOC_PTR(it);
1830         if (it != NULL) {
1831                 /*
1832                  * XXX: as ipd is allocated within osd_thread_info, assignment
1833                  * below implies that iterator usage is confined within single
1834                  * environment.
1835                  */
1836                 ipd = osd_ipd_get(env, bag);
1837                 if (likely(ipd != NULL)) {
1838                         it->oi_obj = obj;
1839                         it->oi_ipd = ipd;
1840                         lu_object_get(lo);
1841                         iam_it_init(&it->oi_it, bag, flags, ipd);
1842                         return (struct dt_it *)it;
1843                 } else
1844                         OBD_FREE_PTR(it);
1845         }
1846         return ERR_PTR(-ENOMEM);
1847 }
1848
1849 static void osd_it_fini(const struct lu_env *env, struct dt_it *di)
1850 {
1851         struct osd_it     *it = (struct osd_it *)di;
1852         struct osd_object *obj = it->oi_obj;
1853
1854         iam_it_fini(&it->oi_it);
1855         osd_ipd_put(env, &obj->oo_dir->od_container, it->oi_ipd);
1856         lu_object_put(env, &obj->oo_dt.do_lu);
1857         OBD_FREE_PTR(it);
1858 }
1859
1860 static int osd_it_get(const struct lu_env *env,
1861                       struct dt_it *di, const struct dt_key *key)
1862 {
1863         struct osd_it *it = (struct osd_it *)di;
1864
1865         return iam_it_get(&it->oi_it, (const struct iam_key *)key);
1866 }
1867
1868 static void osd_it_put(const struct lu_env *env, struct dt_it *di)
1869 {
1870         struct osd_it *it = (struct osd_it *)di;
1871
1872         iam_it_put(&it->oi_it);
1873 }
1874
1875 static int osd_it_next(const struct lu_env *env, struct dt_it *di)
1876 {
1877         struct osd_it *it = (struct osd_it *)di;
1878
1879         return iam_it_next(&it->oi_it);
1880 }
1881
1882 static int osd_it_del(const struct lu_env *env, struct dt_it *di,
1883                       struct thandle *th)
1884 {
1885         struct osd_it      *it = (struct osd_it *)di;
1886         struct osd_thandle *oh;
1887
1888         LASSERT(th != NULL);
1889
1890         oh = container_of0(th, struct osd_thandle, ot_super);
1891         LASSERT(oh->ot_handle != NULL);
1892         LASSERT(oh->ot_handle->h_transaction != NULL);
1893
1894         return iam_it_rec_delete(oh->ot_handle, &it->oi_it);
1895 }
1896
1897 static struct dt_key *osd_it_key(const struct lu_env *env,
1898                                  const struct dt_it *di)
1899 {
1900         struct osd_it *it = (struct osd_it *)di;
1901
1902         return (struct dt_key *)iam_it_key_get(&it->oi_it);
1903 }
1904
1905 static int osd_it_key_size(const struct lu_env *env, const struct dt_it *di)
1906 {
1907         struct osd_it *it = (struct osd_it *)di;
1908
1909         return iam_it_key_size(&it->oi_it);
1910 }
1911
1912 static struct dt_rec *osd_it_rec(const struct lu_env *env,
1913                                  const struct dt_it *di)
1914 {
1915         struct osd_it *it = (struct osd_it *)di;
1916
1917         return (struct dt_rec *)iam_it_rec_get(&it->oi_it);
1918 }
1919
1920 static __u64 osd_it_store(const struct lu_env *env, const struct dt_it *di)
1921 {
1922         struct osd_it *it = (struct osd_it *)di;
1923
1924         return iam_it_store(&it->oi_it);
1925 }
1926
1927 static int osd_it_load(const struct lu_env *env,
1928                        const struct dt_it *di, __u64 hash)
1929 {
1930         struct osd_it *it = (struct osd_it *)di;
1931
1932         return iam_it_load(&it->oi_it, hash);
1933 }
1934
1935 static struct dt_index_operations osd_index_ops = {
1936         .dio_lookup = osd_index_lookup,
1937         .dio_insert = osd_index_insert,
1938         .dio_delete = osd_index_delete,
1939         .dio_it     = {
1940                 .init     = osd_it_init,
1941                 .fini     = osd_it_fini,
1942                 .get      = osd_it_get,
1943                 .put      = osd_it_put,
1944                 .del      = osd_it_del,
1945                 .next     = osd_it_next,
1946                 .key      = osd_it_key,
1947                 .key_size = osd_it_key_size,
1948                 .rec      = osd_it_rec,
1949                 .store    = osd_it_store,
1950                 .load     = osd_it_load
1951         }
1952 };
1953
1954 static int osd_index_compat_delete(const struct lu_env *env,
1955                                    struct dt_object *dt,
1956                                    const struct dt_key *key,
1957                                    struct thandle *handle,
1958                                    struct lustre_capa *capa)
1959 {
1960         struct osd_object *obj = osd_dt_obj(dt);
1961
1962         LASSERT(handle != NULL);
1963         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
1964         ENTRY;
1965
1966 #if 0
1967         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
1968                 RETURN(-EACCES);
1969 #endif
1970
1971         RETURN(-EOPNOTSUPP);
1972 }
1973
1974 /*
1975  * Compatibility index operations.
1976  */
1977
1978
1979 static void osd_build_pack(const struct lu_env *env, struct osd_device *osd,
1980                            struct dentry *dentry, struct lu_fid_pack *pack)
1981 {
1982         struct inode  *inode = dentry->d_inode;
1983         struct lu_fid *fid   = &osd_oti_get(env)->oti_fid;
1984
1985         lu_igif_build(fid, inode->i_ino, inode->i_generation);
1986         fid_cpu_to_be(fid, fid);
1987         pack->fp_len = sizeof *fid + 1;
1988         memcpy(pack->fp_area, fid, sizeof *fid);
1989 }
1990
1991 static int osd_index_compat_lookup(const struct lu_env *env,
1992                                    struct dt_object *dt,
1993                                    struct dt_rec *rec, const struct dt_key *key,
1994                                    struct lustre_capa *capa)
1995 {
1996         struct osd_object *obj = osd_dt_obj(dt);
1997
1998         struct osd_device      *osd  = osd_obj2dev(obj);
1999         struct osd_thread_info *info = osd_oti_get(env);
2000         struct inode           *dir;
2001
2002         int result;
2003
2004         /*
2005          * XXX temporary solution.
2006          */
2007         struct dentry *dentry;
2008         struct dentry *parent;
2009
2010         LINVRNT(osd_invariant(obj));
2011         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
2012         LASSERT(osd_has_index(obj));
2013
2014         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
2015                 return -EACCES;
2016
2017         info->oti_str.name = (const char *)key;
2018         info->oti_str.len  = strlen((const char *)key);
2019
2020         dir = obj->oo_inode;
2021         LASSERT(dir->i_op != NULL && dir->i_op->lookup != NULL);
2022
2023         parent = d_alloc_root(dir);
2024         if (parent == NULL)
2025                 return -ENOMEM;
2026         igrab(dir);
2027         dentry = d_alloc(parent, &info->oti_str);
2028         if (dentry != NULL) {
2029                 struct dentry *d;
2030
2031                 /*
2032                  * XXX passing NULL for nameidata should work for
2033                  * ext3/ldiskfs.
2034                  */
2035                 d = dir->i_op->lookup(dir, dentry, NULL);
2036                 if (d == NULL) {
2037                         /*
2038                          * normal case, result is in @dentry.
2039                          */
2040                         if (dentry->d_inode != NULL) {
2041                                 osd_build_pack(env, osd, dentry,
2042                                                (struct lu_fid_pack *)rec);
2043                                 result = 0;
2044                         } else
2045                                 result = -ENOENT;
2046                  } else {
2047                         /* What? Disconnected alias? Ppheeeww... */
2048                         CERROR("Aliasing where not expected\n");
2049                         result = -EIO;
2050                         dput(d);
2051                 }
2052                 dput(dentry);
2053         } else
2054                 result = -ENOMEM;
2055         dput(parent);
2056         LINVRNT(osd_invariant(obj));
2057         return result;
2058 }
2059
2060 static int osd_add_rec(struct osd_thread_info *info, struct osd_device *dev,
2061                        struct inode *dir, struct inode *inode, const char *name)
2062 {
2063         struct dentry *old;
2064         struct dentry *new;
2065         struct dentry *parent;
2066
2067         int result;
2068
2069         info->oti_str.name = name;
2070         info->oti_str.len  = strlen(name);
2071
2072         LASSERT(atomic_read(&dir->i_count) > 0);
2073         result = -ENOMEM;
2074         old = d_alloc(dev->od_obj_area, &info->oti_str);
2075         if (old != NULL) {
2076                 d_instantiate(old, inode);
2077                 igrab(inode);
2078                 LASSERT(atomic_read(&dir->i_count) > 0);
2079                 parent = d_alloc_root(dir);
2080                 if (parent != NULL) {
2081                         igrab(dir);
2082                         LASSERT(atomic_read(&dir->i_count) > 1);
2083                         new = d_alloc(parent, &info->oti_str);
2084                         LASSERT(atomic_read(&dir->i_count) > 1);
2085                         if (new != NULL) {
2086                                 LASSERT(atomic_read(&dir->i_count) > 1);
2087                                 result = dir->i_op->link(old, dir, new);
2088                                 LASSERT(atomic_read(&dir->i_count) > 1);
2089                                 dput(new);
2090                                 LASSERT(atomic_read(&dir->i_count) > 1);
2091                         }
2092                         LASSERT(atomic_read(&dir->i_count) > 1);
2093                         dput(parent);
2094                         LASSERT(atomic_read(&dir->i_count) > 0);
2095                 }
2096                 dput(old);
2097         }
2098         LASSERT(atomic_read(&dir->i_count) > 0);
2099         return result;
2100 }
2101
2102
2103 /*
2104  * XXX Temporary stuff.
2105  */
2106 static int osd_index_compat_insert(const struct lu_env *env,
2107                                    struct dt_object *dt,
2108                                    const struct dt_rec *rec,
2109                                    const struct dt_key *key, struct thandle *th,
2110                                    struct lustre_capa *capa)
2111 {
2112         struct osd_object     *obj = osd_dt_obj(dt);
2113
2114         const char          *name = (const char *)key;
2115
2116         struct lu_device    *ludev = dt->do_lu.lo_dev;
2117         struct lu_object    *luch;
2118
2119         struct osd_thread_info   *info = osd_oti_get(env);
2120         const struct lu_fid_pack *pack  = (const struct lu_fid_pack *)rec;
2121         struct lu_fid            *fid   = &osd_oti_get(env)->oti_fid;
2122
2123         int result;
2124
2125         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
2126         LINVRNT(osd_invariant(obj));
2127         LASSERT(th != NULL);
2128
2129         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
2130                 return -EACCES;
2131
2132         result = fid_unpack(pack, fid);
2133         if (result != 0)
2134                 return result;
2135
2136         luch = lu_object_find(env, ludev->ld_site, fid);
2137         if (!IS_ERR(luch)) {
2138                 if (lu_object_exists(luch)) {
2139                         struct osd_object *child;
2140
2141                         child = osd_obj(lu_object_locate(luch->lo_header,
2142                                                          ludev->ld_type));
2143                         if (child != NULL)
2144                                 result = osd_add_rec(info, osd_obj2dev(obj),
2145                                                      obj->oo_inode,
2146                                                      child->oo_inode, name);
2147                         else {
2148                                 CERROR("No osd slice.\n");
2149                                 result = -ENOENT;
2150                         }
2151                         LINVRNT(osd_invariant(obj));
2152                         LINVRNT(osd_invariant(child));
2153                 } else {
2154                         CERROR("Sorry.\n");
2155                         result = -ENOENT;
2156                 }
2157                 lu_object_put(env, luch);
2158         } else
2159                 result = PTR_ERR(luch);
2160         LINVRNT(osd_invariant(obj));
2161         return result;
2162 }
2163
2164 static const struct dt_index_operations osd_index_compat_ops = {
2165         .dio_lookup = osd_index_compat_lookup,
2166         .dio_insert = osd_index_compat_insert,
2167         .dio_delete = osd_index_compat_delete
2168 };
2169
2170 /* type constructor/destructor: osd_type_init, osd_type_fini */
2171 LU_TYPE_INIT_FINI(osd, &osd_key);
2172
2173 static struct lu_context_key osd_key = {
2174         .lct_tags = LCT_DT_THREAD | LCT_MD_THREAD,
2175         .lct_init = osd_key_init,
2176         .lct_fini = osd_key_fini,
2177         .lct_exit = osd_key_exit
2178 };
2179
2180 static void *osd_key_init(const struct lu_context *ctx,
2181                           struct lu_context_key *key)
2182 {
2183         struct osd_thread_info *info;
2184
2185         OBD_ALLOC_PTR(info);
2186         if (info != NULL)
2187                 info->oti_env = container_of(ctx, struct lu_env, le_ctx);
2188         else
2189                 info = ERR_PTR(-ENOMEM);
2190         return info;
2191 }
2192
2193 /* context key destructor: osd_key_fini */
2194 LU_KEY_FINI(osd, struct osd_thread_info);
2195
2196 static void osd_key_exit(const struct lu_context *ctx,
2197                          struct lu_context_key *key, void *data)
2198 {
2199         struct osd_thread_info *info = data;
2200
2201         LASSERT(info->oti_r_locks == 0);
2202         LASSERT(info->oti_w_locks == 0);
2203         LASSERT(info->oti_txns    == 0);
2204 }
2205
2206 static int osd_device_init(const struct lu_env *env, struct lu_device *d,
2207                            const char *name, struct lu_device *next)
2208 {
2209         int rc;
2210         /* context for commit hooks */
2211         rc = lu_context_init(&osd_dev(d)->od_env_for_commit.le_ctx,
2212                              LCT_MD_THREAD);
2213         if (rc == 0)
2214                 rc = osd_procfs_init(osd_dev(d), name);
2215         return rc;
2216 }
2217
2218 static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
2219 {
2220         struct osd_thread_info *info = osd_oti_get(env);
2221         ENTRY;
2222         if (o->od_obj_area != NULL) {
2223                 dput(o->od_obj_area);
2224                 o->od_obj_area = NULL;
2225         }
2226         osd_oi_fini(info, &o->od_oi);
2227
2228         RETURN(0);
2229 }
2230
2231 static int osd_mount(const struct lu_env *env,
2232                      struct osd_device *o, struct lustre_cfg *cfg)
2233 {
2234         struct lustre_mount_info *lmi;
2235         const char               *dev  = lustre_cfg_string(cfg, 0);
2236         struct osd_thread_info   *info = osd_oti_get(env);
2237         int result;
2238
2239         ENTRY;
2240
2241         if (o->od_mount != NULL) {
2242                 CERROR("Already mounted (%s)\n", dev);
2243                 RETURN(-EEXIST);
2244         }
2245
2246         /* get mount */
2247         lmi = server_get_mount(dev);
2248         if (lmi == NULL) {
2249                 CERROR("Cannot get mount info for %s!\n", dev);
2250                 RETURN(-EFAULT);
2251         }
2252
2253         LASSERT(lmi != NULL);
2254         /* save lustre_mount_info in dt_device */
2255         o->od_mount = lmi;
2256
2257         result = osd_oi_init(info, &o->od_oi, &o->od_dt_dev);
2258         if (result == 0) {
2259                 struct dentry *d;
2260
2261                 d = simple_mkdir(osd_sb(o)->s_root, lmi->lmi_mnt, "*OBJ-TEMP*",
2262                                  0777, 1);
2263                 if (!IS_ERR(d)) {
2264                         o->od_obj_area = d;
2265                 } else
2266                         result = PTR_ERR(d);
2267         }
2268         if (result != 0)
2269                 osd_shutdown(env, o);
2270         RETURN(result);
2271 }
2272
2273 static struct lu_device *osd_device_fini(const struct lu_env *env,
2274                                          struct lu_device *d)
2275 {
2276         int rc;
2277         ENTRY;
2278
2279         shrink_dcache_sb(osd_sb(osd_dev(d)));
2280         osd_sync(env, lu2dt_dev(d));
2281
2282         rc = osd_procfs_fini(osd_dev(d));
2283         if (rc) {
2284                 CERROR("proc fini error %d \n", rc);
2285                 RETURN (ERR_PTR(rc));
2286         }
2287
2288         if (osd_dev(d)->od_mount)
2289                 server_put_mount(osd_dev(d)->od_mount->lmi_name,
2290                                  osd_dev(d)->od_mount->lmi_mnt);
2291         osd_dev(d)->od_mount = NULL;
2292
2293         lu_context_fini(&osd_dev(d)->od_env_for_commit.le_ctx);
2294         RETURN(NULL);
2295 }
2296
2297 static struct lu_device *osd_device_alloc(const struct lu_env *env,
2298                                           struct lu_device_type *t,
2299                                           struct lustre_cfg *cfg)
2300 {
2301         struct lu_device  *l;
2302         struct osd_device *o;
2303
2304         OBD_ALLOC_PTR(o);
2305         if (o != NULL) {
2306                 int result;
2307
2308                 result = dt_device_init(&o->od_dt_dev, t);
2309                 if (result == 0) {
2310                         l = osd2lu_dev(o);
2311                         l->ld_ops = &osd_lu_ops;
2312                         o->od_dt_dev.dd_ops = &osd_dt_ops;
2313                         spin_lock_init(&o->od_osfs_lock);
2314                         o->od_osfs_age = cfs_time_shift_64(-1000);
2315                         o->od_capa_hash = init_capa_hash();
2316                         if (o->od_capa_hash == NULL) {
2317                                 dt_device_fini(&o->od_dt_dev);
2318                                 l = ERR_PTR(-ENOMEM);
2319                         }
2320                 } else
2321                         l = ERR_PTR(result);
2322
2323                 if (IS_ERR(l))
2324                         OBD_FREE_PTR(o);
2325         } else
2326                 l = ERR_PTR(-ENOMEM);
2327         return l;
2328 }
2329
2330 static struct lu_device *osd_device_free(const struct lu_env *env,
2331                                          struct lu_device *d)
2332 {
2333         struct osd_device *o = osd_dev(d);
2334         ENTRY;
2335
2336         cleanup_capa_hash(o->od_capa_hash);
2337         dt_device_fini(&o->od_dt_dev);
2338         OBD_FREE_PTR(o);
2339         RETURN(NULL);
2340 }
2341
2342 static int osd_process_config(const struct lu_env *env,
2343                               struct lu_device *d, struct lustre_cfg *cfg)
2344 {
2345         struct osd_device *o = osd_dev(d);
2346         int err;
2347         ENTRY;
2348
2349         switch(cfg->lcfg_command) {
2350         case LCFG_SETUP:
2351                 err = osd_mount(env, o, cfg);
2352                 break;
2353         case LCFG_CLEANUP:
2354                 err = osd_shutdown(env, o);
2355                 break;
2356         default:
2357                 err = -ENOTTY;
2358         }
2359
2360         RETURN(err);
2361 }
2362 extern void ldiskfs_orphan_cleanup (struct super_block * sb,
2363                                     struct ldiskfs_super_block * es);
2364
2365 static int osd_recovery_complete(const struct lu_env *env,
2366                                  struct lu_device *d)
2367 {
2368         struct osd_device *o = osd_dev(d);
2369         ENTRY;
2370         /* TODO: orphans handling */
2371         ldiskfs_orphan_cleanup(osd_sb(o), LDISKFS_SB(osd_sb(o))->s_es);
2372         RETURN(0);
2373 }
2374
2375 static struct inode *osd_iget(struct osd_thread_info *info,
2376                               struct osd_device *dev,
2377                               const struct osd_inode_id *id)
2378 {
2379         struct inode *inode;
2380
2381         inode = iget(osd_sb(dev), id->oii_ino);
2382         if (inode == NULL) {
2383                 CERROR("no inode\n");
2384                 inode = ERR_PTR(-EACCES);
2385         } else if (is_bad_inode(inode)) {
2386                 CERROR("bad inode\n");
2387                 iput(inode);
2388                 inode = ERR_PTR(-ENOENT);
2389         } else if (inode->i_generation != id->oii_gen) {
2390                 CERROR("stale inode\n");
2391                 iput(inode);
2392                 inode = ERR_PTR(-ESTALE);
2393         }
2394
2395         return inode;
2396
2397 }
2398
2399 static int osd_fid_lookup(const struct lu_env *env,
2400                           struct osd_object *obj, const struct lu_fid *fid)
2401 {
2402         struct osd_thread_info *info;
2403         struct lu_device       *ldev = obj->oo_dt.do_lu.lo_dev;
2404         struct osd_device      *dev;
2405         struct osd_inode_id    *id;
2406         struct osd_oi          *oi;
2407         struct inode           *inode;
2408         int                     result;
2409
2410         LINVRNT(osd_invariant(obj));
2411         LASSERT(obj->oo_inode == NULL);
2412         LASSERT(fid_is_sane(fid));
2413         /*
2414          * This assertion checks that osd layer sees only local
2415          * fids. Unfortunately it is somewhat expensive (does a
2416          * cache-lookup). Disabling it for production/acceptance-testing.
2417          */
2418         LASSERT(1 || fid_is_local(ldev->ld_site, fid));
2419
2420         ENTRY;
2421
2422         info = osd_oti_get(env);
2423         dev  = osd_dev(ldev);
2424         id   = &info->oti_id;
2425         oi   = &dev->od_oi;
2426
2427         if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT))
2428                 RETURN(-ENOENT);
2429
2430         result = osd_oi_lookup(info, oi, fid, id);
2431         if (result == 0) {
2432                 inode = osd_iget(info, dev, id);
2433                 if (!IS_ERR(inode)) {
2434                         obj->oo_inode = inode;
2435                         LASSERT(obj->oo_inode->i_sb == osd_sb(dev));
2436                         result = 0;
2437                 } else
2438                         /*
2439                          * If fid wasn't found in oi, inode-less object is
2440                          * created, for which lu_object_exists() returns
2441                          * false. This is used in a (frequent) case when
2442                          * objects are created as locking anchors or
2443                          * place holders for objects yet to be created.
2444                          */
2445                         result = PTR_ERR(inode);
2446         } else if (result == -ENOENT)
2447                 result = 0;
2448         LINVRNT(osd_invariant(obj));
2449         RETURN(result);
2450 }
2451
2452 static void osd_inode_getattr(const struct lu_env *env,
2453                               struct inode *inode, struct lu_attr *attr)
2454 {
2455         attr->la_valid      |= LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
2456                                LA_SIZE | LA_BLOCKS | LA_UID | LA_GID |
2457                                LA_FLAGS | LA_NLINK | LA_RDEV | LA_BLKSIZE;
2458
2459         attr->la_atime      = LTIME_S(inode->i_atime);
2460         attr->la_mtime      = LTIME_S(inode->i_mtime);
2461         attr->la_ctime      = LTIME_S(inode->i_ctime);
2462         attr->la_mode       = inode->i_mode;
2463         attr->la_size       = i_size_read(inode);
2464         attr->la_blocks     = inode->i_blocks;
2465         attr->la_uid        = inode->i_uid;
2466         attr->la_gid        = inode->i_gid;
2467         attr->la_flags      = LDISKFS_I(inode)->i_flags;
2468         attr->la_nlink      = inode->i_nlink;
2469         attr->la_rdev       = inode->i_rdev;
2470         attr->la_blksize    = ll_inode_blksize(inode);
2471         attr->la_blkbits    = inode->i_blkbits;
2472 }
2473
2474 /*
2475  * Helpers.
2476  */
2477
2478 static int lu_device_is_osd(const struct lu_device *d)
2479 {
2480         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &osd_lu_ops);
2481 }
2482
2483 static struct osd_object *osd_obj(const struct lu_object *o)
2484 {
2485         LASSERT(lu_device_is_osd(o->lo_dev));
2486         return container_of0(o, struct osd_object, oo_dt.do_lu);
2487 }
2488
2489 static struct osd_device *osd_dt_dev(const struct dt_device *d)
2490 {
2491         LASSERT(lu_device_is_osd(&d->dd_lu_dev));
2492         return container_of0(d, struct osd_device, od_dt_dev);
2493 }
2494
2495 static struct osd_device *osd_dev(const struct lu_device *d)
2496 {
2497         LASSERT(lu_device_is_osd(d));
2498         return osd_dt_dev(container_of0(d, struct dt_device, dd_lu_dev));
2499 }
2500
2501 static struct osd_object *osd_dt_obj(const struct dt_object *d)
2502 {
2503         return osd_obj(&d->do_lu);
2504 }
2505
2506 static struct osd_device *osd_obj2dev(const struct osd_object *o)
2507 {
2508         return osd_dev(o->oo_dt.do_lu.lo_dev);
2509 }
2510
2511 static struct lu_device *osd2lu_dev(struct osd_device *osd)
2512 {
2513         return &osd->od_dt_dev.dd_lu_dev;
2514 }
2515
2516 static struct super_block *osd_sb(const struct osd_device *dev)
2517 {
2518         return dev->od_mount->lmi_mnt->mnt_sb;
2519 }
2520
2521 static journal_t *osd_journal(const struct osd_device *dev)
2522 {
2523         return LDISKFS_SB(osd_sb(dev))->s_journal;
2524 }
2525
2526 static int osd_has_index(const struct osd_object *obj)
2527 {
2528         return obj->oo_dt.do_index_ops != NULL;
2529 }
2530
2531 static int osd_object_invariant(const struct lu_object *l)
2532 {
2533         return osd_invariant(osd_obj(l));
2534 }
2535
2536 static struct lu_object_operations osd_lu_obj_ops = {
2537         .loo_object_init      = osd_object_init,
2538         .loo_object_delete    = osd_object_delete,
2539         .loo_object_release   = osd_object_release,
2540         .loo_object_free      = osd_object_free,
2541         .loo_object_print     = osd_object_print,
2542         .loo_object_invariant = osd_object_invariant
2543 };
2544
2545 static struct lu_device_operations osd_lu_ops = {
2546         .ldo_object_alloc      = osd_object_alloc,
2547         .ldo_process_config    = osd_process_config,
2548         .ldo_recovery_complete = osd_recovery_complete
2549 };
2550
2551 static struct lu_device_type_operations osd_device_type_ops = {
2552         .ldto_init = osd_type_init,
2553         .ldto_fini = osd_type_fini,
2554
2555         .ldto_device_alloc = osd_device_alloc,
2556         .ldto_device_free  = osd_device_free,
2557
2558         .ldto_device_init    = osd_device_init,
2559         .ldto_device_fini    = osd_device_fini
2560 };
2561
2562 static struct lu_device_type osd_device_type = {
2563         .ldt_tags     = LU_DEVICE_DT,
2564         .ldt_name     = LUSTRE_OSD_NAME,
2565         .ldt_ops      = &osd_device_type_ops,
2566         .ldt_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
2567 };
2568
2569 /*
2570  * lprocfs legacy support.
2571  */
2572 static struct obd_ops osd_obd_device_ops = {
2573         .o_owner = THIS_MODULE
2574 };
2575
2576 static int __init osd_mod_init(void)
2577 {
2578         struct lprocfs_static_vars lvars;
2579
2580         lprocfs_osd_init_vars(&lvars);
2581         return class_register_type(&osd_obd_device_ops, NULL, lvars.module_vars,
2582                                    LUSTRE_OSD_NAME, &osd_device_type);
2583 }
2584
2585 static void __exit osd_mod_exit(void)
2586 {
2587         class_unregister_type(LUSTRE_OSD_NAME);
2588 }
2589
2590 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2591 MODULE_DESCRIPTION("Lustre Object Storage Device ("LUSTRE_OSD_NAME")");
2592 MODULE_LICENSE("GPL");
2593
2594 cfs_module(osd, "0.0.2", osd_mod_init, osd_mod_exit);