Whamcloud - gitweb
LU-795 osd api: Commit callback per transaction
[fs/lustre-release.git] / lustre / osd-ldiskfs / osd_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011 Whamcloud, Inc.
33  *
34  */
35 /*
36  * Copyright (c) 2011 Whamcloud, Inc.
37  */
38 /*
39  * This file is part of Lustre, http://www.lustre.org/
40  * Lustre is a trademark of Sun Microsystems, Inc.
41  *
42  * lustre/osd/osd_handler.c
43  *
44  * Top-level entry points into osd module
45  *
46  * Author: Nikita Danilov <nikita@clusterfs.com>
47  *         Pravin Shelar <pravin.shelar@sun.com> : Added fid in dirent
48  */
49
50 #ifndef EXPORT_SYMTAB
51 # define EXPORT_SYMTAB
52 #endif
53 #define DEBUG_SUBSYSTEM S_MDS
54
55 #include <linux/module.h>
56
57 /* LUSTRE_VERSION_CODE */
58 #include <lustre_ver.h>
59 /* prerequisite for linux/xattr.h */
60 #include <linux/types.h>
61 /* prerequisite for linux/xattr.h */
62 #include <linux/fs.h>
63 /* XATTR_{REPLACE,CREATE} */
64 #include <linux/xattr.h>
65 /* simple_mkdir() */
66 #include <lvfs.h>
67
68 /*
69  * struct OBD_{ALLOC,FREE}*()
70  * OBD_FAIL_CHECK
71  */
72 #include <obd_support.h>
73 /* struct ptlrpc_thread */
74 #include <lustre_net.h>
75
76 /* fid_is_local() */
77 #include <lustre_fid.h>
78
79 #include "osd_internal.h"
80 #include "osd_igif.h"
81
82 /* llo_* api support */
83 #include <md_object.h>
84
85 static const char dot[] = ".";
86 static const char dotdot[] = "..";
87 static const char remote_obj_dir[] = "REM_OBJ_DIR";
88
89 struct osd_directory {
90         struct iam_container od_container;
91         struct iam_descr     od_descr;
92 };
93
94 struct osd_object {
95         struct dt_object       oo_dt;
96         /**
97          * Inode for file system object represented by this osd_object. This
98          * inode is pinned for the whole duration of lu_object life.
99          *
100          * Not modified concurrently (either setup early during object
101          * creation, or assigned by osd_object_create() under write lock).
102          */
103         struct inode          *oo_inode;
104         /**
105          * to protect index ops.
106          */
107         cfs_rw_semaphore_t     oo_ext_idx_sem;
108         cfs_rw_semaphore_t     oo_sem;
109         struct osd_directory  *oo_dir;
110         /** protects inode attributes. */
111         cfs_spinlock_t         oo_guard;
112         /**
113          * Following two members are used to indicate the presence of dot and
114          * dotdot in the given directory. This is required for interop mode
115          * (b11826).
116          */
117         int                    oo_compat_dot_created;
118         int                    oo_compat_dotdot_created;
119
120         const struct lu_env   *oo_owner;
121 #ifdef CONFIG_LOCKDEP
122         struct lockdep_map     oo_dep_map;
123 #endif
124 };
125
126 static const struct lu_object_operations      osd_lu_obj_ops;
127 static const struct lu_device_operations      osd_lu_ops;
128 static       struct lu_context_key            osd_key;
129 static const struct dt_object_operations      osd_obj_ops;
130 static const struct dt_object_operations      osd_obj_ea_ops;
131 static const struct dt_body_operations        osd_body_ops;
132 static const struct dt_index_operations       osd_index_iam_ops;
133 static const struct dt_index_operations       osd_index_ea_ops;
134
135 struct osd_thandle {
136         struct thandle          ot_super;
137         handle_t               *ot_handle;
138         struct journal_callback ot_jcb;
139         cfs_list_t              ot_dcb_list;
140         /* Link to the device, for debugging. */
141         struct lu_ref_link     *ot_dev_link;
142
143 #if OSD_THANDLE_STATS
144         /** time when this handle was allocated */
145         cfs_time_t oth_alloced;
146
147         /** time when this thanle was started */
148         cfs_time_t oth_started;
149 #endif
150 };
151
152 /*
153  * Helpers.
154  */
155 static int lu_device_is_osd(const struct lu_device *d)
156 {
157         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &osd_lu_ops);
158 }
159
160 static struct osd_device *osd_dt_dev(const struct dt_device *d)
161 {
162         LASSERT(lu_device_is_osd(&d->dd_lu_dev));
163         return container_of0(d, struct osd_device, od_dt_dev);
164 }
165
166 static struct osd_device *osd_dev(const struct lu_device *d)
167 {
168         LASSERT(lu_device_is_osd(d));
169         return osd_dt_dev(container_of0(d, struct dt_device, dd_lu_dev));
170 }
171
172 static struct osd_device *osd_obj2dev(const struct osd_object *o)
173 {
174         return osd_dev(o->oo_dt.do_lu.lo_dev);
175 }
176
177 static struct super_block *osd_sb(const struct osd_device *dev)
178 {
179         return dev->od_mount->lmi_mnt->mnt_sb;
180 }
181
182 static int osd_object_is_root(const struct osd_object *obj)
183 {
184         return osd_sb(osd_obj2dev(obj))->s_root->d_inode == obj->oo_inode;
185 }
186
187 static struct osd_object *osd_obj(const struct lu_object *o)
188 {
189         LASSERT(lu_device_is_osd(o->lo_dev));
190         return container_of0(o, struct osd_object, oo_dt.do_lu);
191 }
192
193 static struct osd_object *osd_dt_obj(const struct dt_object *d)
194 {
195         return osd_obj(&d->do_lu);
196 }
197
198 static struct lu_device *osd2lu_dev(struct osd_device *osd)
199 {
200         return &osd->od_dt_dev.dd_lu_dev;
201 }
202
203 static journal_t *osd_journal(const struct osd_device *dev)
204 {
205         return LDISKFS_SB(osd_sb(dev))->s_journal;
206 }
207
208 static int osd_has_index(const struct osd_object *obj)
209 {
210         return obj->oo_dt.do_index_ops != NULL;
211 }
212
213 static int osd_object_invariant(const struct lu_object *l)
214 {
215         return osd_invariant(osd_obj(l));
216 }
217
218 #ifdef HAVE_QUOTA_SUPPORT
219 static inline void
220 osd_push_ctxt(const struct lu_env *env, struct osd_ctxt *save)
221 {
222         struct md_ucred    *uc = md_ucred(env);
223         struct cred        *tc;
224
225         LASSERT(uc != NULL);
226
227         save->oc_uid = current_fsuid();
228         save->oc_gid = current_fsgid();
229         save->oc_cap = current_cap();
230         if ((tc = prepare_creds())) {
231                 tc->fsuid         = uc->mu_fsuid;
232                 tc->fsgid         = uc->mu_fsgid;
233                 commit_creds(tc);
234         }
235         /* XXX not suboptimal */
236         cfs_curproc_cap_unpack(uc->mu_cap);
237 }
238
239 static inline void
240 osd_pop_ctxt(struct osd_ctxt *save)
241 {
242         struct cred *tc;
243
244         if ((tc = prepare_creds())) {
245                 tc->fsuid         = save->oc_uid;
246                 tc->fsgid         = save->oc_gid;
247                 tc->cap_effective = save->oc_cap;
248                 commit_creds(tc);
249         }
250 }
251 #endif
252
253 static inline struct osd_thread_info *osd_oti_get(const struct lu_env *env)
254 {
255         return lu_context_key_get(&env->le_ctx, &osd_key);
256 }
257
258 /*
259  * Concurrency: doesn't matter
260  */
261 static int osd_read_locked(const struct lu_env *env, struct osd_object *o)
262 {
263         return osd_oti_get(env)->oti_r_locks > 0;
264 }
265
266 /*
267  * Concurrency: doesn't matter
268  */
269 static int osd_write_locked(const struct lu_env *env, struct osd_object *o)
270 {
271         struct osd_thread_info *oti = osd_oti_get(env);
272         return oti->oti_w_locks > 0 && o->oo_owner == env;
273 }
274
275 /*
276  * Concurrency: doesn't access mutable data
277  */
278 static int osd_root_get(const struct lu_env *env,
279                         struct dt_device *dev, struct lu_fid *f)
280 {
281         struct inode *inode;
282
283         inode = osd_sb(osd_dt_dev(dev))->s_root->d_inode;
284         LU_IGIF_BUILD(f, inode->i_ino, inode->i_generation);
285         return 0;
286 }
287
288 /*
289  * OSD object methods.
290  */
291
292 /*
293  * Concurrency: no concurrent access is possible that early in object
294  * life-cycle.
295  */
296 static struct lu_object *osd_object_alloc(const struct lu_env *env,
297                                           const struct lu_object_header *hdr,
298                                           struct lu_device *d)
299 {
300         struct osd_object *mo;
301
302         OBD_ALLOC_PTR(mo);
303         if (mo != NULL) {
304                 struct lu_object *l;
305
306                 l = &mo->oo_dt.do_lu;
307                 dt_object_init(&mo->oo_dt, NULL, d);
308                 if (osd_dev(d)->od_iop_mode)
309                         mo->oo_dt.do_ops = &osd_obj_ea_ops;
310                 else
311                         mo->oo_dt.do_ops = &osd_obj_ops;
312
313                 l->lo_ops = &osd_lu_obj_ops;
314                 cfs_init_rwsem(&mo->oo_sem);
315                 cfs_init_rwsem(&mo->oo_ext_idx_sem);
316                 cfs_spin_lock_init(&mo->oo_guard);
317                 return l;
318         } else
319                 return NULL;
320 }
321
322 /*
323  * retrieve object from backend ext fs.
324  **/
325 static struct inode *osd_iget(struct osd_thread_info *info,
326                               struct osd_device *dev,
327                               const struct osd_inode_id *id)
328 {
329         struct inode *inode = NULL;
330
331 #ifdef HAVE_EXT4_LDISKFS
332         inode = ldiskfs_iget(osd_sb(dev), id->oii_ino);
333         if (IS_ERR(inode))
334         /* Newer kernels return an error instead of a NULL pointer */
335                 inode = NULL;
336 #else
337         inode = iget(osd_sb(dev), id->oii_ino);
338 #endif
339         if (inode == NULL) {
340                 CERROR("no inode\n");
341                 inode = ERR_PTR(-EACCES);
342         } else if (id->oii_gen != OSD_OII_NOGEN &&
343                    inode->i_generation != id->oii_gen) {
344                 iput(inode);
345                 inode = ERR_PTR(-ESTALE);
346         } else if (inode->i_nlink == 0) {
347                 /* due to parallel readdir and unlink,
348                 * we can have dead inode here. */
349                 CWARN("stale inode\n");
350                 make_bad_inode(inode);
351                 iput(inode);
352                 inode = ERR_PTR(-ESTALE);
353         } else if (is_bad_inode(inode)) {
354                 CERROR("bad inode %lx\n",inode->i_ino);
355                 iput(inode);
356                 inode = ERR_PTR(-ENOENT);
357         } else {
358                 /* Do not update file c/mtime in ldiskfs.
359                  * NB: we don't have any lock to protect this because we don't
360                  * have reference on osd_object now, but contention with
361                  * another lookup + attr_set can't happen in the tiny window
362                  * between if (...) and set S_NOCMTIME. */
363                 if (!(inode->i_flags & S_NOCMTIME))
364                         inode->i_flags |= S_NOCMTIME;
365         }
366         return inode;
367 }
368
369 static int osd_fid_lookup(const struct lu_env *env,
370                           struct osd_object *obj, const struct lu_fid *fid)
371 {
372         struct osd_thread_info *info;
373         struct lu_device       *ldev = obj->oo_dt.do_lu.lo_dev;
374         struct osd_device      *dev;
375         struct osd_inode_id    *id;
376         struct osd_oi          *oi;
377         struct inode           *inode;
378         int                     result;
379
380         LINVRNT(osd_invariant(obj));
381         LASSERT(obj->oo_inode == NULL);
382         LASSERT(fid_is_sane(fid) || osd_fid_is_root(fid));
383         /*
384          * This assertion checks that osd layer sees only local
385          * fids. Unfortunately it is somewhat expensive (does a
386          * cache-lookup). Disabling it for production/acceptance-testing.
387          */
388         LASSERT(1 || fid_is_local(env, ldev->ld_site, fid));
389
390         ENTRY;
391
392         info = osd_oti_get(env);
393         dev  = osd_dev(ldev);
394         id   = &info->oti_id;
395         oi   = &dev->od_oi;
396
397         if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT))
398                 RETURN(-ENOENT);
399
400         result = osd_oi_lookup(info, oi, fid, id);
401         if (result == 0) {
402                 inode = osd_iget(info, dev, id);
403                 if (!IS_ERR(inode)) {
404                         obj->oo_inode = inode;
405                         LASSERT(obj->oo_inode->i_sb == osd_sb(dev));
406                         if (dev->od_iop_mode) {
407                                 obj->oo_compat_dot_created = 1;
408                                 obj->oo_compat_dotdot_created = 1;
409                         }
410                         result = 0;
411                 } else
412                         /*
413                          * If fid wasn't found in oi, inode-less object is
414                          * created, for which lu_object_exists() returns
415                          * false. This is used in a (frequent) case when
416                          * objects are created as locking anchors or
417                          * place holders for objects yet to be created.
418                          */
419                         result = PTR_ERR(inode);
420         } else if (result == -ENOENT)
421                 result = 0;
422         LINVRNT(osd_invariant(obj));
423
424         RETURN(result);
425 }
426
427 /*
428  * Concurrency: shouldn't matter.
429  */
430 static void osd_object_init0(struct osd_object *obj)
431 {
432         LASSERT(obj->oo_inode != NULL);
433         obj->oo_dt.do_body_ops = &osd_body_ops;
434         obj->oo_dt.do_lu.lo_header->loh_attr |=
435                 (LOHA_EXISTS | (obj->oo_inode->i_mode & S_IFMT));
436 }
437
438 /*
439  * Concurrency: no concurrent access is possible that early in object
440  * life-cycle.
441  */
442 static int osd_object_init(const struct lu_env *env, struct lu_object *l,
443                            const struct lu_object_conf *unused)
444 {
445         struct osd_object *obj = osd_obj(l);
446         int result;
447
448         LINVRNT(osd_invariant(obj));
449
450         result = osd_fid_lookup(env, obj, lu_object_fid(l));
451         if (result == 0) {
452                 if (obj->oo_inode != NULL)
453                         osd_object_init0(obj);
454         }
455         LINVRNT(osd_invariant(obj));
456         return result;
457 }
458
459 /*
460  * Concurrency: no concurrent access is possible that late in object
461  * life-cycle.
462  */
463 static void osd_object_free(const struct lu_env *env, struct lu_object *l)
464 {
465         struct osd_object *obj = osd_obj(l);
466
467         LINVRNT(osd_invariant(obj));
468
469         dt_object_fini(&obj->oo_dt);
470         OBD_FREE_PTR(obj);
471 }
472
473 /**
474  * IAM Iterator
475  */
476 static struct iam_path_descr *osd_it_ipd_get(const struct lu_env *env,
477                                              const struct iam_container *bag)
478 {
479         return bag->ic_descr->id_ops->id_ipd_alloc(bag,
480                                            osd_oti_get(env)->oti_it_ipd);
481 }
482
483 static struct iam_path_descr *osd_idx_ipd_get(const struct lu_env *env,
484                                               const struct iam_container *bag)
485 {
486         return bag->ic_descr->id_ops->id_ipd_alloc(bag,
487                                            osd_oti_get(env)->oti_idx_ipd);
488 }
489
490 static void osd_ipd_put(const struct lu_env *env,
491                         const struct iam_container *bag,
492                         struct iam_path_descr *ipd)
493 {
494         bag->ic_descr->id_ops->id_ipd_free(ipd);
495 }
496
497 /*
498  * Concurrency: no concurrent access is possible that late in object
499  * life-cycle.
500  */
501 static void osd_index_fini(struct osd_object *o)
502 {
503         struct iam_container *bag;
504
505         if (o->oo_dir != NULL) {
506                 bag = &o->oo_dir->od_container;
507                 if (o->oo_inode != NULL) {
508                         if (bag->ic_object == o->oo_inode)
509                                 iam_container_fini(bag);
510                 }
511                 OBD_FREE_PTR(o->oo_dir);
512                 o->oo_dir = NULL;
513         }
514 }
515
516 /*
517  * Concurrency: no concurrent access is possible that late in object
518  * life-cycle (for all existing callers, that is. New callers have to provide
519  * their own locking.)
520  */
521 static int osd_inode_unlinked(const struct inode *inode)
522 {
523         return inode->i_nlink == 0;
524 }
525
526 enum {
527         OSD_TXN_OI_DELETE_CREDITS    = 20,
528         OSD_TXN_INODE_DELETE_CREDITS = 20
529 };
530
531 /*
532  * Journal
533  */
534
535 #if OSD_THANDLE_STATS
536 /**
537  * Set time when the handle is allocated
538  */
539 static void osd_th_alloced(struct osd_thandle *oth)
540 {
541         oth->oth_alloced = cfs_time_current();
542 }
543
544 /**
545  * Set time when the handle started
546  */
547 static void osd_th_started(struct osd_thandle *oth)
548 {
549         oth->oth_started = cfs_time_current();
550 }
551
552 /**
553  * Helper function to convert time interval to microseconds packed in
554  * long int (default time units for the counter in "stats" initialized
555  * by lu_time_init() )
556  */
557 static long interval_to_usec(cfs_time_t start, cfs_time_t end)
558 {
559         struct timeval val;
560
561         cfs_duration_usec(cfs_time_sub(end, start), &val);
562         return val.tv_sec * 1000000 + val.tv_usec;
563 }
564
565 /**
566  * Check whether the we deal with this handle for too long.
567  */
568 static void __osd_th_check_slow(void *oth, struct osd_device *dev,
569                                 cfs_time_t alloced, cfs_time_t started,
570                                 cfs_time_t closed)
571 {
572         cfs_time_t now = cfs_time_current();
573
574         LASSERT(dev != NULL);
575
576         lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_STARTING,
577                             interval_to_usec(alloced, started));
578         lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_OPEN,
579                             interval_to_usec(started, closed));
580         lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_CLOSING,
581                             interval_to_usec(closed, now));
582
583         if (cfs_time_before(cfs_time_add(alloced, cfs_time_seconds(30)), now)) {
584                 CWARN("transaction handle %p was open for too long: "
585                       "now "CFS_TIME_T" ,"
586                       "alloced "CFS_TIME_T" ,"
587                       "started "CFS_TIME_T" ,"
588                       "closed "CFS_TIME_T"\n",
589                       oth, now, alloced, started, closed);
590                 libcfs_debug_dumpstack(NULL);
591         }
592 }
593
594 #define OSD_CHECK_SLOW_TH(oth, dev, expr)                               \
595 {                                                                       \
596         cfs_time_t __closed = cfs_time_current();                       \
597         cfs_time_t __alloced = oth->oth_alloced;                        \
598         cfs_time_t __started = oth->oth_started;                        \
599                                                                         \
600         expr;                                                           \
601         __osd_th_check_slow(oth, dev, __alloced, __started, __closed);  \
602 }
603
604 #else /* OSD_THANDLE_STATS */
605
606 #define osd_th_alloced(h)                  do {} while(0)
607 #define osd_th_started(h)                  do {} while(0)
608 #define OSD_CHECK_SLOW_TH(oth, dev, expr)  expr
609
610 #endif /* OSD_THANDLE_STATS */
611
612 /*
613  * Concurrency: doesn't access mutable data.
614  */
615 static int osd_param_is_sane(const struct osd_device *dev,
616                              const struct txn_param *param)
617 {
618         return param->tp_credits <= osd_journal(dev)->j_max_transaction_buffers;
619 }
620
621 /*
622  * Concurrency: shouldn't matter.
623  */
624 #ifdef HAVE_LDISKFS_JOURNAL_CALLBACK_ADD
625 static void osd_trans_commit_cb(struct super_block *sb,
626                                 struct journal_callback *jcb, int error)
627 #else
628 static void osd_trans_commit_cb(struct journal_callback *jcb, int error)
629 #endif
630 {
631         struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb);
632         struct thandle     *th  = &oh->ot_super;
633         struct lu_device   *lud = &th->th_dev->dd_lu_dev;
634         struct dt_txn_commit_cb *dcb, *tmp;
635
636         LASSERT(oh->ot_handle == NULL);
637
638         if (error)
639                 CERROR("transaction @0x%p commit error: %d\n", th, error);
640
641         dt_txn_hook_commit(th);
642
643         /* call per-transaction callbacks if any */
644         cfs_list_for_each_entry_safe(dcb, tmp, &oh->ot_dcb_list, dcb_linkage)
645                 dcb->dcb_func(NULL, th, dcb, error);
646
647         lu_ref_del_at(&lud->ld_reference, oh->ot_dev_link, "osd-tx", th);
648         lu_device_put(lud);
649         th->th_dev = NULL;
650
651         lu_context_exit(&th->th_ctx);
652         lu_context_fini(&th->th_ctx);
653         OBD_FREE_PTR(oh);
654 }
655
656 /*
657  * Concurrency: shouldn't matter.
658  */
659 static struct thandle *osd_trans_start(const struct lu_env *env,
660                                        struct dt_device *d,
661                                        struct txn_param *p)
662 {
663         struct osd_device  *dev = osd_dt_dev(d);
664         handle_t           *jh;
665         struct osd_thandle *oh;
666         struct thandle     *th;
667         int hook_res;
668
669         ENTRY;
670
671         hook_res = dt_txn_hook_start(env, d, p);
672         if (hook_res != 0)
673                 RETURN(ERR_PTR(hook_res));
674
675         if (osd_param_is_sane(dev, p)) {
676                 OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO);
677                 if (oh != NULL) {
678                         struct osd_thread_info *oti = osd_oti_get(env);
679
680                         /*
681                          * XXX temporary stuff. Some abstraction layer should
682                          * be used.
683                          */
684                         oti->oti_dev = dev;
685                         CFS_INIT_LIST_HEAD(&oh->ot_dcb_list);
686                         osd_th_alloced(oh);
687                         jh = ldiskfs_journal_start_sb(osd_sb(dev), p->tp_credits);
688                         osd_th_started(oh);
689                         if (!IS_ERR(jh)) {
690                                 oh->ot_handle = jh;
691                                 th = &oh->ot_super;
692                                 th->th_dev = d;
693                                 th->th_result = 0;
694                                 th->th_sync = 0;
695                                 lu_device_get(&d->dd_lu_dev);
696                                 oh->ot_dev_link = lu_ref_add
697                                         (&d->dd_lu_dev.ld_reference,
698                                          "osd-tx", th);
699                                 /* add commit callback */
700                                 lu_context_init(&th->th_ctx, LCT_TX_HANDLE);
701                                 lu_context_enter(&th->th_ctx);
702                                 LASSERT(oti->oti_txns == 0);
703                                 LASSERT(oti->oti_r_locks == 0);
704                                 LASSERT(oti->oti_w_locks == 0);
705                                 oti->oti_txns++;
706                         } else {
707                                 OBD_FREE_PTR(oh);
708                                 th = (void *)jh;
709                         }
710                 } else
711                         th = ERR_PTR(-ENOMEM);
712         } else {
713                 CERROR("Invalid transaction parameters\n");
714                 th = ERR_PTR(-EINVAL);
715         }
716
717         RETURN(th);
718 }
719
720 /*
721  * Concurrency: shouldn't matter.
722  */
723 static void osd_trans_stop(const struct lu_env *env, struct thandle *th)
724 {
725         int result;
726         struct osd_thandle *oh;
727         struct osd_thread_info *oti = osd_oti_get(env);
728
729         ENTRY;
730
731         oh = container_of0(th, struct osd_thandle, ot_super);
732         if (oh->ot_handle != NULL) {
733                 handle_t *hdl = oh->ot_handle;
734
735                 hdl->h_sync = th->th_sync;
736                 /*
737                  * add commit callback
738                  * notice we don't do this in osd_trans_start()
739                  * as underlying transaction can change during truncate
740                  */
741                 osd_journal_callback_set(hdl, osd_trans_commit_cb,
742                                          &oh->ot_jcb);
743
744                 LASSERT(oti->oti_txns == 1);
745                 oti->oti_txns--;
746                 LASSERT(oti->oti_r_locks == 0);
747                 LASSERT(oti->oti_w_locks == 0);
748                 result = dt_txn_hook_stop(env, th);
749                 if (result != 0)
750                         CERROR("Failure in transaction hook: %d\n", result);
751                 oh->ot_handle = NULL;
752                 OSD_CHECK_SLOW_TH(oh, oti->oti_dev,
753                                   result = ldiskfs_journal_stop(hdl));
754                 if (result != 0)
755                         CERROR("Failure to stop transaction: %d\n", result);
756         } else {
757                 OBD_FREE_PTR(oh);
758         }
759         EXIT;
760 }
761
762 static int osd_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb)
763 {
764         struct osd_thandle *oh = container_of0(th, struct osd_thandle,
765                                                ot_super);
766
767         cfs_list_add(&dcb->dcb_linkage, &oh->ot_dcb_list);
768
769         return 0;
770 }
771
772 /*
773  * Concurrency: no concurrent access is possible that late in object
774  * life-cycle.
775  */
776 static int osd_inode_remove(const struct lu_env *env, struct osd_object *obj)
777 {
778         const struct lu_fid    *fid = lu_object_fid(&obj->oo_dt.do_lu);
779         struct osd_device      *osd = osd_obj2dev(obj);
780         struct osd_thread_info *oti = osd_oti_get(env);
781         struct txn_param       *prm = &oti->oti_txn;
782         struct lu_env          *env_del_obj = &oti->oti_obj_delete_tx_env;
783         struct thandle         *th;
784         int result;
785
786         lu_env_init(env_del_obj, LCT_DT_THREAD);
787         txn_param_init(prm, OSD_TXN_OI_DELETE_CREDITS +
788                             OSD_TXN_INODE_DELETE_CREDITS);
789         th = osd_trans_start(env_del_obj, &osd->od_dt_dev, prm);
790         if (!IS_ERR(th)) {
791                 result = osd_oi_delete(osd_oti_get(env_del_obj),
792                                        &osd->od_oi, fid, th);
793                 osd_trans_stop(env_del_obj, th);
794         } else
795                 result = PTR_ERR(th);
796
797         lu_env_fini(env_del_obj);
798         return result;
799 }
800
801 /*
802  * Called just before object is freed. Releases all resources except for
803  * object itself (that is released by osd_object_free()).
804  *
805  * Concurrency: no concurrent access is possible that late in object
806  * life-cycle.
807  */
808 static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
809 {
810         struct osd_object *obj   = osd_obj(l);
811         struct inode      *inode = obj->oo_inode;
812
813         LINVRNT(osd_invariant(obj));
814
815         /*
816          * If object is unlinked remove fid->ino mapping from object index.
817          */
818
819         osd_index_fini(obj);
820         if (inode != NULL) {
821                 int result;
822
823                 if (osd_inode_unlinked(inode)) {
824                         result = osd_inode_remove(env, obj);
825                         if (result != 0)
826                                 LU_OBJECT_DEBUG(D_ERROR, env, l,
827                                                 "Failed to cleanup: %d\n",
828                                                 result);
829                 }
830
831                 iput(inode);
832                 obj->oo_inode = NULL;
833         }
834 }
835
836 /*
837  * Concurrency: ->loo_object_release() is called under site spin-lock.
838  */
839 static void osd_object_release(const struct lu_env *env,
840                                struct lu_object *l)
841 {
842         struct osd_object *o = osd_obj(l);
843
844         LASSERT(!lu_object_is_dying(l->lo_header));
845         if (o->oo_inode != NULL && osd_inode_unlinked(o->oo_inode))
846                 cfs_set_bit(LU_OBJECT_HEARD_BANSHEE, &l->lo_header->loh_flags);
847 }
848
849 /*
850  * Concurrency: shouldn't matter.
851  */
852 static int osd_object_print(const struct lu_env *env, void *cookie,
853                             lu_printer_t p, const struct lu_object *l)
854 {
855         struct osd_object *o = osd_obj(l);
856         struct iam_descr  *d;
857
858         if (o->oo_dir != NULL)
859                 d = o->oo_dir->od_container.ic_descr;
860         else
861                 d = NULL;
862         return (*p)(env, cookie, LUSTRE_OSD_NAME"-object@%p(i:%p:%lu/%u)[%s]",
863                     o, o->oo_inode,
864                     o->oo_inode ? o->oo_inode->i_ino : 0UL,
865                     o->oo_inode ? o->oo_inode->i_generation : 0,
866                     d ? d->id_ops->id_name : "plain");
867 }
868
869 /*
870  * Concurrency: shouldn't matter.
871  */
872 int osd_statfs(const struct lu_env *env, struct dt_device *d,
873                cfs_kstatfs_t *sfs)
874 {
875         struct osd_device *osd = osd_dt_dev(d);
876         struct super_block *sb = osd_sb(osd);
877         int result = 0;
878
879         cfs_spin_lock(&osd->od_osfs_lock);
880         /* cache 1 second */
881         if (cfs_time_before_64(osd->od_osfs_age, cfs_time_shift_64(-1))) {
882                 result = ll_do_statfs(sb, &osd->od_kstatfs);
883                 if (likely(result == 0)) /* N.B. statfs can't really fail */
884                         osd->od_osfs_age = cfs_time_current_64();
885         }
886
887         if (likely(result == 0))
888                 *sfs = osd->od_kstatfs;
889         cfs_spin_unlock(&osd->od_osfs_lock);
890
891         return result;
892 }
893
894 /*
895  * Concurrency: doesn't access mutable data.
896  */
897 static void osd_conf_get(const struct lu_env *env,
898                          const struct dt_device *dev,
899                          struct dt_device_param *param)
900 {
901         struct super_block *sb = osd_sb(osd_dt_dev(dev));
902
903         /*
904          * XXX should be taken from not-yet-existing fs abstraction layer.
905          */
906         param->ddp_max_name_len = LDISKFS_NAME_LEN;
907         param->ddp_max_nlink    = LDISKFS_LINK_MAX;
908         param->ddp_block_shift  = osd_sb(osd_dt_dev(dev))->s_blocksize_bits;
909         param->ddp_mntopts      = 0;
910         if (test_opt(sb, XATTR_USER))
911                 param->ddp_mntopts |= MNTOPT_USERXATTR;
912         if (test_opt(sb, POSIX_ACL))
913                 param->ddp_mntopts |= MNTOPT_ACL;
914 }
915
916 /**
917  * Helper function to get and fill the buffer with input values.
918  */
919 static struct lu_buf *osd_buf_get(const struct lu_env *env, void *area, ssize_t len)
920 {
921         struct lu_buf *buf;
922
923         buf = &osd_oti_get(env)->oti_buf;
924         buf->lb_buf = area;
925         buf->lb_len = len;
926         return buf;
927 }
928
929 /*
930  * Concurrency: shouldn't matter.
931  */
932 static int osd_sync(const struct lu_env *env, struct dt_device *d)
933 {
934         CDEBUG(D_HA, "syncing OSD %s\n", LUSTRE_OSD_NAME);
935         return ldiskfs_force_commit(osd_sb(osd_dt_dev(d)));
936 }
937
938 /**
939  * Start commit for OSD device.
940  *
941  * An implementation of dt_commit_async method for OSD device.
942  * Asychronously starts underlayng fs sync and thereby a transaction
943  * commit.
944  *
945  * \param env environment
946  * \param d dt device
947  *
948  * \see dt_device_operations
949  */
950 static int osd_commit_async(const struct lu_env *env,
951                             struct dt_device *d)
952 {
953         struct super_block *s = osd_sb(osd_dt_dev(d));
954         ENTRY;
955
956         CDEBUG(D_HA, "async commit OSD %s\n", LUSTRE_OSD_NAME);
957         RETURN(s->s_op->sync_fs(s, 0));
958 }
959
960 /*
961  * Concurrency: shouldn't matter.
962  */
963 lvfs_sbdev_type fsfilt_ldiskfs_journal_sbdev(struct super_block *);
964
965 static void osd_ro(const struct lu_env *env, struct dt_device *d)
966 {
967         ENTRY;
968
969         CERROR("*** setting device %s read-only ***\n", LUSTRE_OSD_NAME);
970
971         __lvfs_set_rdonly(lvfs_sbdev(osd_sb(osd_dt_dev(d))),
972                           fsfilt_ldiskfs_journal_sbdev(osd_sb(osd_dt_dev(d))));
973         EXIT;
974 }
975
976
977 /*
978  * Concurrency: serialization provided by callers.
979  */
980 static int osd_init_capa_ctxt(const struct lu_env *env, struct dt_device *d,
981                               int mode, unsigned long timeout, __u32 alg,
982                               struct lustre_capa_key *keys)
983 {
984         struct osd_device *dev = osd_dt_dev(d);
985         ENTRY;
986
987         dev->od_fl_capa = mode;
988         dev->od_capa_timeout = timeout;
989         dev->od_capa_alg = alg;
990         dev->od_capa_keys = keys;
991         RETURN(0);
992 }
993
994 /**
995  * Concurrency: serialization provided by callers.
996  */
997 static void osd_init_quota_ctxt(const struct lu_env *env, struct dt_device *d,
998                                struct dt_quota_ctxt *ctxt, void *data)
999 {
1000         struct obd_device *obd = (void *)ctxt;
1001         struct vfsmount *mnt = (struct vfsmount *)data;
1002         ENTRY;
1003
1004         obd->u.obt.obt_sb = mnt->mnt_root->d_inode->i_sb;
1005         OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
1006         obd->obd_lvfs_ctxt.pwdmnt = mnt;
1007         obd->obd_lvfs_ctxt.pwd = mnt->mnt_root;
1008         obd->obd_lvfs_ctxt.fs = get_ds();
1009
1010         EXIT;
1011 }
1012
1013 /**
1014  * Note: we do not count into QUOTA here.
1015  * If we mount with --data_journal we may need more.
1016  */
1017 static const int osd_dto_credits_noquota[DTO_NR] = {
1018         /**
1019          * Insert/Delete.
1020          * INDEX_EXTRA_TRANS_BLOCKS(8) +
1021          * SINGLEDATA_TRANS_BLOCKS(8)
1022          * XXX Note: maybe iam need more, since iam have more level than
1023          *           EXT3 htree.
1024          */
1025         [DTO_INDEX_INSERT]  = 16,
1026         [DTO_INDEX_DELETE]  = 16,
1027         /**
1028          * Unused now
1029          */
1030         [DTO_IDNEX_UPDATE]  = 16,
1031         /**
1032          * Create a object. The same as create object in EXT3.
1033          * DATA_TRANS_BLOCKS(14) +
1034          * INDEX_EXTRA_BLOCKS(8) +
1035          * 3(inode bits, groups, GDT)
1036          */
1037         [DTO_OBJECT_CREATE] = 25,
1038         /**
1039          * Unused now
1040          */
1041         [DTO_OBJECT_DELETE] = 25,
1042         /**
1043          * Attr set credits.
1044          * 3(inode bits, group, GDT)
1045          */
1046         [DTO_ATTR_SET_BASE] = 3,
1047         /**
1048          * Xattr set. The same as xattr of EXT3.
1049          * DATA_TRANS_BLOCKS(14)
1050          * XXX Note: in original MDS implmentation INDEX_EXTRA_TRANS_BLOCKS
1051          * are also counted in. Do not know why?
1052          */
1053         [DTO_XATTR_SET]     = 14,
1054         [DTO_LOG_REC]       = 14,
1055         /**
1056          * creadits for inode change during write.
1057          */
1058         [DTO_WRITE_BASE]    = 3,
1059         /**
1060          * credits for single block write.
1061          */
1062         [DTO_WRITE_BLOCK]   = 14,
1063         /**
1064          * Attr set credits for chown.
1065          * This is extra credits for setattr, and it is null without quota
1066          */
1067         [DTO_ATTR_SET_CHOWN]= 0
1068 };
1069
1070 /**
1071  * Note: we count into QUOTA here.
1072  * If we mount with --data_journal we may need more.
1073  */
1074 static const int osd_dto_credits_quota[DTO_NR] = {
1075         /**
1076          * INDEX_EXTRA_TRANS_BLOCKS(8) +
1077          * SINGLEDATA_TRANS_BLOCKS(8) +
1078          * 2 * QUOTA_TRANS_BLOCKS(2)
1079          */
1080         [DTO_INDEX_INSERT]  = 20,
1081         /**
1082          * INDEX_EXTRA_TRANS_BLOCKS(8) +
1083          * SINGLEDATA_TRANS_BLOCKS(8) +
1084          * 2 * QUOTA_TRANS_BLOCKS(2)
1085          */
1086         [DTO_INDEX_DELETE]  = 20,
1087         /**
1088          * Unused now.
1089          */
1090         [DTO_IDNEX_UPDATE]  = 16,
1091         /*
1092          * Create a object. Same as create object in EXT3 filesystem.
1093          * DATA_TRANS_BLOCKS(16) +
1094          * INDEX_EXTRA_BLOCKS(8) +
1095          * 3(inode bits, groups, GDT) +
1096          * 2 * QUOTA_INIT_BLOCKS(25)
1097          */
1098         [DTO_OBJECT_CREATE] = 77,
1099         /*
1100          * Unused now.
1101          * DATA_TRANS_BLOCKS(16) +
1102          * INDEX_EXTRA_BLOCKS(8) +
1103          * 3(inode bits, groups, GDT) +
1104          * QUOTA(?)
1105          */
1106         [DTO_OBJECT_DELETE] = 27,
1107         /**
1108          * Attr set credits.
1109          * 3 (inode bit, group, GDT) +
1110          */
1111         [DTO_ATTR_SET_BASE] = 3,
1112         /**
1113          * Xattr set. The same as xattr of EXT3.
1114          * DATA_TRANS_BLOCKS(16)
1115          * XXX Note: in original MDS implmentation INDEX_EXTRA_TRANS_BLOCKS are
1116          *           also counted in. Do not know why?
1117          */
1118         [DTO_XATTR_SET]     = 16,
1119         [DTO_LOG_REC]       = 16,
1120         /**
1121          * creadits for inode change during write.
1122          */
1123         [DTO_WRITE_BASE]    = 3,
1124         /**
1125          * credits for single block write.
1126          */
1127         [DTO_WRITE_BLOCK]   = 16,
1128         /**
1129          * Attr set credits for chown.
1130          * It is added to already set setattr credits
1131          * 2 * QUOTA_INIT_BLOCKS(25) +
1132          * 2 * QUOTA_DEL_BLOCKS(9)
1133          */
1134         [DTO_ATTR_SET_CHOWN]= 68,
1135 };
1136
1137 static int osd_credit_get(const struct lu_env *env, struct dt_device *d,
1138                           enum dt_txn_op op)
1139 {
1140         LASSERT(ARRAY_SIZE(osd_dto_credits_noquota) ==
1141                 ARRAY_SIZE(osd_dto_credits_quota));
1142         LASSERT(0 <= op && op < ARRAY_SIZE(osd_dto_credits_noquota));
1143 #ifdef HAVE_QUOTA_SUPPORT
1144         if (test_opt(osd_sb(osd_dt_dev(d)), QUOTA))
1145                 return osd_dto_credits_quota[op];
1146         else
1147 #endif
1148                 return osd_dto_credits_noquota[op];
1149 }
1150
1151 static const struct dt_device_operations osd_dt_ops = {
1152         .dt_root_get       = osd_root_get,
1153         .dt_statfs         = osd_statfs,
1154         .dt_trans_start    = osd_trans_start,
1155         .dt_trans_stop     = osd_trans_stop,
1156         .dt_trans_cb_add   = osd_trans_cb_add,
1157         .dt_conf_get       = osd_conf_get,
1158         .dt_sync           = osd_sync,
1159         .dt_ro             = osd_ro,
1160         .dt_commit_async   = osd_commit_async,
1161         .dt_credit_get     = osd_credit_get,
1162         .dt_init_capa_ctxt = osd_init_capa_ctxt,
1163         .dt_init_quota_ctxt= osd_init_quota_ctxt,
1164 };
1165
1166 static void osd_object_read_lock(const struct lu_env *env,
1167                                  struct dt_object *dt, unsigned role)
1168 {
1169         struct osd_object *obj = osd_dt_obj(dt);
1170         struct osd_thread_info *oti = osd_oti_get(env);
1171
1172         LINVRNT(osd_invariant(obj));
1173
1174         LASSERT(obj->oo_owner != env);
1175         cfs_down_read_nested(&obj->oo_sem, role);
1176
1177         LASSERT(obj->oo_owner == NULL);
1178         oti->oti_r_locks++;
1179 }
1180
1181 static void osd_object_write_lock(const struct lu_env *env,
1182                                   struct dt_object *dt, unsigned role)
1183 {
1184         struct osd_object *obj = osd_dt_obj(dt);
1185         struct osd_thread_info *oti = osd_oti_get(env);
1186
1187         LINVRNT(osd_invariant(obj));
1188
1189         LASSERT(obj->oo_owner != env);
1190         cfs_down_write_nested(&obj->oo_sem, role);
1191
1192         LASSERT(obj->oo_owner == NULL);
1193         obj->oo_owner = env;
1194         oti->oti_w_locks++;
1195 }
1196
1197 static void osd_object_read_unlock(const struct lu_env *env,
1198                                    struct dt_object *dt)
1199 {
1200         struct osd_object *obj = osd_dt_obj(dt);
1201         struct osd_thread_info *oti = osd_oti_get(env);
1202
1203         LINVRNT(osd_invariant(obj));
1204
1205         LASSERT(oti->oti_r_locks > 0);
1206         oti->oti_r_locks--;
1207         cfs_up_read(&obj->oo_sem);
1208 }
1209
1210 static void osd_object_write_unlock(const struct lu_env *env,
1211                                     struct dt_object *dt)
1212 {
1213         struct osd_object *obj = osd_dt_obj(dt);
1214         struct osd_thread_info *oti = osd_oti_get(env);
1215
1216         LINVRNT(osd_invariant(obj));
1217
1218         LASSERT(obj->oo_owner == env);
1219         LASSERT(oti->oti_w_locks > 0);
1220         oti->oti_w_locks--;
1221         obj->oo_owner = NULL;
1222         cfs_up_write(&obj->oo_sem);
1223 }
1224
1225 static int osd_object_write_locked(const struct lu_env *env,
1226                                    struct dt_object *dt)
1227 {
1228         struct osd_object *obj = osd_dt_obj(dt);
1229
1230         LINVRNT(osd_invariant(obj));
1231
1232         return obj->oo_owner == env;
1233 }
1234
1235 static int capa_is_sane(const struct lu_env *env,
1236                         struct osd_device *dev,
1237                         struct lustre_capa *capa,
1238                         struct lustre_capa_key *keys)
1239 {
1240         struct osd_thread_info *oti = osd_oti_get(env);
1241         struct lustre_capa *tcapa = &oti->oti_capa;
1242         struct obd_capa *oc;
1243         int i, rc = 0;
1244         ENTRY;
1245
1246         oc = capa_lookup(dev->od_capa_hash, capa, 0);
1247         if (oc) {
1248                 if (capa_is_expired(oc)) {
1249                         DEBUG_CAPA(D_ERROR, capa, "expired");
1250                         rc = -ESTALE;
1251                 }
1252                 capa_put(oc);
1253                 RETURN(rc);
1254         }
1255
1256         if (capa_is_expired_sec(capa)) {
1257                 DEBUG_CAPA(D_ERROR, capa, "expired");
1258                 RETURN(-ESTALE);
1259         }
1260
1261         cfs_spin_lock(&capa_lock);
1262         for (i = 0; i < 2; i++) {
1263                 if (keys[i].lk_keyid == capa->lc_keyid) {
1264                         oti->oti_capa_key = keys[i];
1265                         break;
1266                 }
1267         }
1268         cfs_spin_unlock(&capa_lock);
1269
1270         if (i == 2) {
1271                 DEBUG_CAPA(D_ERROR, capa, "no matched capa key");
1272                 RETURN(-ESTALE);
1273         }
1274
1275         rc = capa_hmac(tcapa->lc_hmac, capa, oti->oti_capa_key.lk_key);
1276         if (rc)
1277                 RETURN(rc);
1278
1279         if (memcmp(tcapa->lc_hmac, capa->lc_hmac, sizeof(capa->lc_hmac))) {
1280                 DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch");
1281                 RETURN(-EACCES);
1282         }
1283
1284         oc = capa_add(dev->od_capa_hash, capa);
1285         capa_put(oc);
1286
1287         RETURN(0);
1288 }
1289
1290 static int osd_object_auth(const struct lu_env *env, struct dt_object *dt,
1291                            struct lustre_capa *capa, __u64 opc)
1292 {
1293         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
1294         struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
1295         struct md_capainfo *ci;
1296         int rc;
1297
1298         if (!dev->od_fl_capa)
1299                 return 0;
1300
1301         if (capa == BYPASS_CAPA)
1302                 return 0;
1303
1304         ci = md_capainfo(env);
1305         if (unlikely(!ci))
1306                 return 0;
1307
1308         if (ci->mc_auth == LC_ID_NONE)
1309                 return 0;
1310
1311         if (!capa) {
1312                 CERROR("no capability is provided for fid "DFID"\n", PFID(fid));
1313                 return -EACCES;
1314         }
1315
1316         if (!lu_fid_eq(fid, &capa->lc_fid)) {
1317                 DEBUG_CAPA(D_ERROR, capa, "fid "DFID" mismatch with",
1318                            PFID(fid));
1319                 return -EACCES;
1320         }
1321
1322         if (!capa_opc_supported(capa, opc)) {
1323                 DEBUG_CAPA(D_ERROR, capa, "opc "LPX64" not supported by", opc);
1324                 return -EACCES;
1325         }
1326
1327         if ((rc = capa_is_sane(env, dev, capa, dev->od_capa_keys))) {
1328                 DEBUG_CAPA(D_ERROR, capa, "insane (rc %d)", rc);
1329                 return -EACCES;
1330         }
1331
1332         return 0;
1333 }
1334
1335 static struct timespec *osd_inode_time(const struct lu_env *env,
1336                                        struct inode *inode, __u64 seconds)
1337 {
1338         struct osd_thread_info *oti = osd_oti_get(env);
1339         struct timespec        *t   = &oti->oti_time;
1340
1341         t->tv_sec  = seconds;
1342         t->tv_nsec = 0;
1343         *t = timespec_trunc(*t, get_sb_time_gran(inode->i_sb));
1344         return t;
1345 }
1346
1347
1348 static void osd_inode_getattr(const struct lu_env *env,
1349                               struct inode *inode, struct lu_attr *attr)
1350 {
1351         attr->la_valid      |= LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
1352                                LA_SIZE | LA_BLOCKS | LA_UID | LA_GID |
1353                                LA_FLAGS | LA_NLINK | LA_RDEV | LA_BLKSIZE;
1354
1355         attr->la_atime      = LTIME_S(inode->i_atime);
1356         attr->la_mtime      = LTIME_S(inode->i_mtime);
1357         attr->la_ctime      = LTIME_S(inode->i_ctime);
1358         attr->la_mode       = inode->i_mode;
1359         attr->la_size       = i_size_read(inode);
1360         attr->la_blocks     = inode->i_blocks;
1361         attr->la_uid        = inode->i_uid;
1362         attr->la_gid        = inode->i_gid;
1363         attr->la_flags      = LDISKFS_I(inode)->i_flags;
1364         attr->la_nlink      = inode->i_nlink;
1365         attr->la_rdev       = inode->i_rdev;
1366         attr->la_blksize    = ll_inode_blksize(inode);
1367         attr->la_blkbits    = inode->i_blkbits;
1368 }
1369
1370 static int osd_attr_get(const struct lu_env *env,
1371                         struct dt_object *dt,
1372                         struct lu_attr *attr,
1373                         struct lustre_capa *capa)
1374 {
1375         struct osd_object *obj = osd_dt_obj(dt);
1376
1377         LASSERT(dt_object_exists(dt));
1378         LINVRNT(osd_invariant(obj));
1379
1380         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1381                 return -EACCES;
1382
1383         cfs_spin_lock(&obj->oo_guard);
1384         osd_inode_getattr(env, obj->oo_inode, attr);
1385         cfs_spin_unlock(&obj->oo_guard);
1386         return 0;
1387 }
1388
1389 static int osd_inode_setattr(const struct lu_env *env,
1390                              struct inode *inode, const struct lu_attr *attr)
1391 {
1392         __u64 bits;
1393
1394         bits = attr->la_valid;
1395
1396         LASSERT(!(bits & LA_TYPE)); /* Huh? You want too much. */
1397
1398 #ifdef HAVE_QUOTA_SUPPORT
1399         if ((bits & LA_UID && attr->la_uid != inode->i_uid) ||
1400             (bits & LA_GID && attr->la_gid != inode->i_gid)) {
1401                 struct osd_ctxt *save = &osd_oti_get(env)->oti_ctxt;
1402                 struct iattr iattr;
1403                 int rc;
1404
1405                 iattr.ia_valid = 0;
1406                 if (bits & LA_UID)
1407                         iattr.ia_valid |= ATTR_UID;
1408                 if (bits & LA_GID)
1409                         iattr.ia_valid |= ATTR_GID;
1410                 iattr.ia_uid = attr->la_uid;
1411                 iattr.ia_gid = attr->la_gid;
1412                 osd_push_ctxt(env, save);
1413                 rc = ll_vfs_dq_transfer(inode, &iattr) ? -EDQUOT : 0;
1414                 osd_pop_ctxt(save);
1415                 if (rc != 0)
1416                         return rc;
1417         }
1418 #endif
1419
1420         if (bits & LA_ATIME)
1421                 inode->i_atime  = *osd_inode_time(env, inode, attr->la_atime);
1422         if (bits & LA_CTIME)
1423                 inode->i_ctime  = *osd_inode_time(env, inode, attr->la_ctime);
1424         if (bits & LA_MTIME)
1425                 inode->i_mtime  = *osd_inode_time(env, inode, attr->la_mtime);
1426         if (bits & LA_SIZE) {
1427                 LDISKFS_I(inode)->i_disksize = attr->la_size;
1428                 i_size_write(inode, attr->la_size);
1429         }
1430
1431 #if 0
1432         /* OSD should not change "i_blocks" which is used by quota.
1433          * "i_blocks" should be changed by ldiskfs only. */
1434         if (bits & LA_BLOCKS)
1435                 inode->i_blocks = attr->la_blocks;
1436 #endif
1437         if (bits & LA_MODE)
1438                 inode->i_mode   = (inode->i_mode & S_IFMT) |
1439                         (attr->la_mode & ~S_IFMT);
1440         if (bits & LA_UID)
1441                 inode->i_uid    = attr->la_uid;
1442         if (bits & LA_GID)
1443                 inode->i_gid    = attr->la_gid;
1444         if (bits & LA_NLINK)
1445                 inode->i_nlink  = attr->la_nlink;
1446         if (bits & LA_RDEV)
1447                 inode->i_rdev   = attr->la_rdev;
1448
1449         if (bits & LA_FLAGS) {
1450                 /* always keep S_NOCMTIME */
1451                 inode->i_flags = ll_ext_to_inode_flags(attr->la_flags) |
1452                                  S_NOCMTIME;
1453         }
1454         return 0;
1455 }
1456
1457 static int osd_attr_set(const struct lu_env *env,
1458                         struct dt_object *dt,
1459                         const struct lu_attr *attr,
1460                         struct thandle *handle,
1461                         struct lustre_capa *capa)
1462 {
1463         struct osd_object *obj = osd_dt_obj(dt);
1464         int rc;
1465
1466         LASSERT(handle != NULL);
1467         LASSERT(dt_object_exists(dt));
1468         LASSERT(osd_invariant(obj));
1469
1470         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1471                 return -EACCES;
1472
1473         cfs_spin_lock(&obj->oo_guard);
1474         rc = osd_inode_setattr(env, obj->oo_inode, attr);
1475         cfs_spin_unlock(&obj->oo_guard);
1476
1477         if (!rc)
1478                 obj->oo_inode->i_sb->s_op->dirty_inode(obj->oo_inode);
1479         return rc;
1480 }
1481
1482 /*
1483  * Object creation.
1484  *
1485  * XXX temporary solution.
1486  */
1487 static int osd_create_pre(struct osd_thread_info *info, struct osd_object *obj,
1488                           struct lu_attr *attr, struct thandle *th)
1489 {
1490         return 0;
1491 }
1492
1493 static int osd_create_post(struct osd_thread_info *info, struct osd_object *obj,
1494                            struct lu_attr *attr, struct thandle *th)
1495 {
1496         osd_object_init0(obj);
1497         if (obj->oo_inode && (obj->oo_inode->i_state & I_NEW))
1498                 unlock_new_inode(obj->oo_inode);
1499         return 0;
1500 }
1501
1502 static struct dentry * osd_child_dentry_get(const struct lu_env *env,
1503                                             struct osd_object *obj,
1504                                             const char *name,
1505                                             const int namelen)
1506 {
1507         struct osd_thread_info *info   = osd_oti_get(env);
1508         struct dentry *child_dentry = &info->oti_child_dentry;
1509         struct dentry *obj_dentry = &info->oti_obj_dentry;
1510
1511         obj_dentry->d_inode = obj->oo_inode;
1512         obj_dentry->d_sb = osd_sb(osd_obj2dev(obj));
1513         obj_dentry->d_name.hash = 0;
1514
1515         child_dentry->d_name.hash = 0;
1516         child_dentry->d_parent = obj_dentry;
1517         child_dentry->d_name.name = name;
1518         child_dentry->d_name.len = namelen;
1519         return child_dentry;
1520 }
1521
1522
1523 static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
1524                       cfs_umode_t mode,
1525                       struct dt_allocation_hint *hint,
1526                       struct thandle *th)
1527 {
1528         int result;
1529         struct osd_device  *osd = osd_obj2dev(obj);
1530         struct osd_thandle *oth;
1531         struct dt_object   *parent;
1532         struct inode       *inode;
1533 #ifdef HAVE_QUOTA_SUPPORT
1534         struct osd_ctxt    *save = &info->oti_ctxt;
1535 #endif
1536
1537         LINVRNT(osd_invariant(obj));
1538         LASSERT(obj->oo_inode == NULL);
1539
1540         oth = container_of(th, struct osd_thandle, ot_super);
1541         LASSERT(oth->ot_handle->h_transaction != NULL);
1542
1543         if (hint && hint->dah_parent)
1544                 parent = hint->dah_parent;
1545         else
1546                 parent = osd->od_obj_area;
1547
1548         LASSERT(parent != NULL);
1549         LASSERT(osd_dt_obj(parent)->oo_inode->i_op != NULL);
1550
1551 #ifdef HAVE_QUOTA_SUPPORT
1552         osd_push_ctxt(info->oti_env, save);
1553 #endif
1554         inode = ldiskfs_create_inode(oth->ot_handle,
1555                                      osd_dt_obj(parent)->oo_inode, mode);
1556 #ifdef HAVE_QUOTA_SUPPORT
1557         osd_pop_ctxt(save);
1558 #endif
1559         if (!IS_ERR(inode)) {
1560                 /* Do not update file c/mtime in ldiskfs.
1561                  * NB: don't need any lock because no contention at this
1562                  * early stage */
1563                 inode->i_flags |= S_NOCMTIME;
1564                 obj->oo_inode = inode;
1565                 result = 0;
1566         } else
1567                 result = PTR_ERR(inode);
1568         LINVRNT(osd_invariant(obj));
1569         return result;
1570 }
1571
1572 enum {
1573         OSD_NAME_LEN = 255
1574 };
1575
1576 static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj,
1577                      struct lu_attr *attr,
1578                      struct dt_allocation_hint *hint,
1579                      struct dt_object_format *dof,
1580                      struct thandle *th)
1581 {
1582         int result;
1583         struct osd_thandle *oth;
1584         struct osd_device *osd = osd_obj2dev(obj);
1585         __u32 mode = (attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX));
1586
1587         LASSERT(S_ISDIR(attr->la_mode));
1588
1589         oth = container_of(th, struct osd_thandle, ot_super);
1590         LASSERT(oth->ot_handle->h_transaction != NULL);
1591         result = osd_mkfile(info, obj, mode, hint, th);
1592         if (result == 0 && osd->od_iop_mode == 0) {
1593                 LASSERT(obj->oo_inode != NULL);
1594                 /*
1595                  * XXX uh-oh... call low-level iam function directly.
1596                  */
1597
1598                 result = iam_lvar_create(obj->oo_inode, OSD_NAME_LEN, 4,
1599                                          sizeof (struct osd_fid_pack),
1600                                          oth->ot_handle);
1601         }
1602         return result;
1603 }
1604
1605 static int osd_mk_index(struct osd_thread_info *info, struct osd_object *obj,
1606                         struct lu_attr *attr,
1607                         struct dt_allocation_hint *hint,
1608                         struct dt_object_format *dof,
1609                         struct thandle *th)
1610 {
1611         int result;
1612         struct osd_thandle *oth;
1613         const struct dt_index_features *feat = dof->u.dof_idx.di_feat;
1614
1615         __u32 mode = (attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX));
1616
1617         LASSERT(S_ISREG(attr->la_mode));
1618
1619         oth = container_of(th, struct osd_thandle, ot_super);
1620         LASSERT(oth->ot_handle->h_transaction != NULL);
1621
1622         result = osd_mkfile(info, obj, mode, hint, th);
1623         if (result == 0) {
1624                 LASSERT(obj->oo_inode != NULL);
1625                 if (feat->dif_flags & DT_IND_VARKEY)
1626                         result = iam_lvar_create(obj->oo_inode,
1627                                                  feat->dif_keysize_max,
1628                                                  feat->dif_ptrsize,
1629                                                  feat->dif_recsize_max,
1630                                                  oth->ot_handle);
1631                 else
1632                         result = iam_lfix_create(obj->oo_inode,
1633                                                  feat->dif_keysize_max,
1634                                                  feat->dif_ptrsize,
1635                                                  feat->dif_recsize_max,
1636                                                  oth->ot_handle);
1637
1638         }
1639         return result;
1640 }
1641
1642 static int osd_mkreg(struct osd_thread_info *info, struct osd_object *obj,
1643                      struct lu_attr *attr,
1644                      struct dt_allocation_hint *hint,
1645                      struct dt_object_format *dof,
1646                      struct thandle *th)
1647 {
1648         LASSERT(S_ISREG(attr->la_mode));
1649         return osd_mkfile(info, obj, (attr->la_mode &
1650                                (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1651 }
1652
1653 static int osd_mksym(struct osd_thread_info *info, struct osd_object *obj,
1654                      struct lu_attr *attr,
1655                      struct dt_allocation_hint *hint,
1656                      struct dt_object_format *dof,
1657                      struct thandle *th)
1658 {
1659         LASSERT(S_ISLNK(attr->la_mode));
1660         return osd_mkfile(info, obj, (attr->la_mode &
1661                               (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1662 }
1663
1664 static int osd_mknod(struct osd_thread_info *info, struct osd_object *obj,
1665                      struct lu_attr *attr,
1666                      struct dt_allocation_hint *hint,
1667                      struct dt_object_format *dof,
1668                      struct thandle *th)
1669 {
1670         cfs_umode_t mode = attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX);
1671         int result;
1672
1673         LINVRNT(osd_invariant(obj));
1674         LASSERT(obj->oo_inode == NULL);
1675         LASSERT(S_ISCHR(mode) || S_ISBLK(mode) ||
1676                 S_ISFIFO(mode) || S_ISSOCK(mode));
1677
1678         result = osd_mkfile(info, obj, mode, hint, th);
1679         if (result == 0) {
1680                 LASSERT(obj->oo_inode != NULL);
1681                 init_special_inode(obj->oo_inode, mode, attr->la_rdev);
1682         }
1683         LINVRNT(osd_invariant(obj));
1684         return result;
1685 }
1686
1687 typedef int (*osd_obj_type_f)(struct osd_thread_info *, struct osd_object *,
1688                               struct lu_attr *,
1689                               struct dt_allocation_hint *hint,
1690                               struct dt_object_format *dof,
1691                               struct thandle *);
1692
1693 static osd_obj_type_f osd_create_type_f(enum dt_format_type type)
1694 {
1695         osd_obj_type_f result;
1696
1697         switch (type) {
1698         case DFT_DIR:
1699                 result = osd_mkdir;
1700                 break;
1701         case DFT_REGULAR:
1702                 result = osd_mkreg;
1703                 break;
1704         case DFT_SYM:
1705                 result = osd_mksym;
1706                 break;
1707         case DFT_NODE:
1708                 result = osd_mknod;
1709                 break;
1710         case DFT_INDEX:
1711                 result = osd_mk_index;
1712                 break;
1713
1714         default:
1715                 LBUG();
1716                 break;
1717         }
1718         return result;
1719 }
1720
1721
1722 static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah,
1723                         struct dt_object *parent, cfs_umode_t child_mode)
1724 {
1725         LASSERT(ah);
1726
1727         memset(ah, 0, sizeof(*ah));
1728         ah->dah_parent = parent;
1729         ah->dah_mode = child_mode;
1730 }
1731
1732 /**
1733  * Helper function for osd_object_create()
1734  *
1735  * \retval 0, on success
1736  */
1737 static int __osd_object_create(struct osd_thread_info *info,
1738                                struct osd_object *obj, struct lu_attr *attr,
1739                                struct dt_allocation_hint *hint,
1740                                struct dt_object_format *dof,
1741                                struct thandle *th)
1742 {
1743
1744         int result;
1745
1746         result = osd_create_pre(info, obj, attr, th);
1747         if (result == 0) {
1748                 result = osd_create_type_f(dof->dof_type)(info, obj,
1749                                            attr, hint, dof, th);
1750                 if (result == 0)
1751                         result = osd_create_post(info, obj, attr, th);
1752         }
1753         return result;
1754 }
1755
1756 /**
1757  * Helper function for osd_object_create()
1758  *
1759  * \retval 0, on success
1760  */
1761 static int __osd_oi_insert(const struct lu_env *env, struct osd_object *obj,
1762                            const struct lu_fid *fid, struct thandle *th)
1763 {
1764         struct osd_thread_info *info = osd_oti_get(env);
1765         struct osd_inode_id    *id   = &info->oti_id;
1766         struct osd_device      *osd  = osd_obj2dev(obj);
1767         struct md_ucred        *uc   = md_ucred(env);
1768
1769         LASSERT(obj->oo_inode != NULL);
1770         LASSERT(uc != NULL);
1771
1772         id->oii_ino = obj->oo_inode->i_ino;
1773         id->oii_gen = obj->oo_inode->i_generation;
1774
1775         return osd_oi_insert(info, &osd->od_oi, fid, id, th,
1776                              uc->mu_cap & CFS_CAP_SYS_RESOURCE_MASK);
1777 }
1778
1779 static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
1780                              struct lu_attr *attr,
1781                              struct dt_allocation_hint *hint,
1782                              struct dt_object_format *dof,
1783                              struct thandle *th)
1784 {
1785         const struct lu_fid    *fid    = lu_object_fid(&dt->do_lu);
1786         struct osd_object      *obj    = osd_dt_obj(dt);
1787         struct osd_thread_info *info   = osd_oti_get(env);
1788         int result;
1789
1790         ENTRY;
1791
1792         LINVRNT(osd_invariant(obj));
1793         LASSERT(!dt_object_exists(dt));
1794         LASSERT(osd_write_locked(env, obj));
1795         LASSERT(th != NULL);
1796
1797         result = __osd_object_create(info, obj, attr, hint, dof, th);
1798         if (result == 0)
1799                 result = __osd_oi_insert(env, obj, fid, th);
1800
1801         LASSERT(ergo(result == 0, dt_object_exists(dt)));
1802         LASSERT(osd_invariant(obj));
1803         RETURN(result);
1804 }
1805
1806 /**
1807  * Helper function for osd_xattr_set()
1808  */
1809 static int __osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
1810                            const struct lu_buf *buf, const char *name, int fl)
1811 {
1812         struct osd_object      *obj      = osd_dt_obj(dt);
1813         struct inode           *inode    = obj->oo_inode;
1814         struct osd_thread_info *info     = osd_oti_get(env);
1815         struct dentry          *dentry   = &info->oti_child_dentry;
1816         int                     fs_flags = 0;
1817         int  rc;
1818
1819         LASSERT(dt_object_exists(dt));
1820         LASSERT(inode->i_op != NULL && inode->i_op->setxattr != NULL);
1821         LASSERT(osd_write_locked(env, obj));
1822
1823         if (fl & LU_XATTR_REPLACE)
1824                 fs_flags |= XATTR_REPLACE;
1825
1826         if (fl & LU_XATTR_CREATE)
1827                 fs_flags |= XATTR_CREATE;
1828
1829         dentry->d_inode = inode;
1830         rc = inode->i_op->setxattr(dentry, name, buf->lb_buf,
1831                                    buf->lb_len, fs_flags);
1832         return rc;
1833 }
1834
1835 /**
1836  * Put the fid into lustre_mdt_attrs, and then place the structure
1837  * inode's ea. This fid should not be altered during the life time
1838  * of the inode.
1839  *
1840  * \retval +ve, on success
1841  * \retval -ve, on error
1842  *
1843  * FIXME: It is good to have/use ldiskfs_xattr_set_handle() here
1844  */
1845 static int osd_ea_fid_set(const struct lu_env *env, struct dt_object *dt,
1846                           const struct lu_fid *fid)
1847 {
1848         struct osd_thread_info  *info      = osd_oti_get(env);
1849         struct lustre_mdt_attrs *mdt_attrs = &info->oti_mdt_attrs;
1850
1851         lustre_lma_init(mdt_attrs, fid);
1852         lustre_lma_swab(mdt_attrs);
1853         return __osd_xattr_set(env, dt,
1854                                osd_buf_get(env, mdt_attrs, sizeof *mdt_attrs),
1855                                XATTR_NAME_LMA, LU_XATTR_CREATE);
1856
1857 }
1858
1859 /**
1860  * Helper function to form igif
1861  */
1862 static inline void osd_igif_get(const struct lu_env *env, struct inode  *inode,
1863                                 struct lu_fid *fid)
1864 {
1865         LU_IGIF_BUILD(fid, inode->i_ino, inode->i_generation);
1866 }
1867
1868 /**
1869  * Helper function to pack the fid, ldiskfs stores fid in packed format.
1870  */
1871 void osd_fid_pack(struct osd_fid_pack *pack, const struct dt_rec *fid,
1872                   struct lu_fid *befider)
1873 {
1874         fid_cpu_to_be(befider, (struct lu_fid *)fid);
1875         memcpy(pack->fp_area, befider, sizeof(*befider));
1876         pack->fp_len =  sizeof(*befider) + 1;
1877 }
1878
1879 /**
1880  * ldiskfs supports fid in dirent, it is passed in dentry->d_fsdata.
1881  * lustre 1.8 also uses d_fsdata for passing other info to ldiskfs.
1882  * To have compatilibility with 1.8 ldiskfs driver we need to have
1883  * magic number at start of fid data.
1884  * \ldiskfs_dentry_param is used only to pass fid from osd to ldiskfs.
1885  * its inmemory API.
1886  */
1887 void osd_get_ldiskfs_dirent_param(struct ldiskfs_dentry_param *param,
1888                                   const struct dt_rec *fid)
1889 {
1890         param->edp_magic = LDISKFS_LUFID_MAGIC;
1891         param->edp_len =  sizeof(struct lu_fid) + 1;
1892
1893         fid_cpu_to_be((struct lu_fid *)param->edp_data,
1894                       (struct lu_fid *)fid);
1895 }
1896
1897 int osd_fid_unpack(struct lu_fid *fid, const struct osd_fid_pack *pack)
1898 {
1899         int result;
1900
1901         result = 0;
1902         switch (pack->fp_len) {
1903         case sizeof *fid + 1:
1904                 memcpy(fid, pack->fp_area, sizeof *fid);
1905                 fid_be_to_cpu(fid, fid);
1906                 break;
1907         default:
1908                 CERROR("Unexpected packed fid size: %d\n", pack->fp_len);
1909                 result = -EIO;
1910         }
1911         return result;
1912 }
1913
1914 /**
1915  * Try to read the fid from inode ea into dt_rec, if return value
1916  * i.e. rc is +ve, then we got fid, otherwise we will have to form igif
1917  *
1918  * \param fid object fid.
1919  *
1920  * \retval 0 on success
1921  */
1922 static int osd_ea_fid_get(const struct lu_env *env, struct osd_object *obj,
1923                           __u32 ino, struct lu_fid *fid)
1924 {
1925         struct osd_thread_info  *info      = osd_oti_get(env);
1926         struct lustre_mdt_attrs *mdt_attrs = &info->oti_mdt_attrs;
1927         struct lu_device        *ldev   = obj->oo_dt.do_lu.lo_dev;
1928         struct dentry           *dentry = &info->oti_child_dentry;
1929         struct osd_inode_id     *id     = &info->oti_id;
1930         struct osd_device       *dev;
1931         struct inode            *inode;
1932         int                      rc;
1933
1934         ENTRY;
1935         dev  = osd_dev(ldev);
1936
1937         id->oii_ino = ino;
1938         id->oii_gen = OSD_OII_NOGEN;
1939
1940         inode = osd_iget(info, dev, id);
1941         if (IS_ERR(inode)) {
1942                 rc = PTR_ERR(inode);
1943                 GOTO(out,rc);
1944         }
1945         dentry->d_inode = inode;
1946
1947         LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
1948         rc = inode->i_op->getxattr(dentry, XATTR_NAME_LMA, (void *)mdt_attrs,
1949                                    sizeof *mdt_attrs);
1950
1951         /* Check LMA compatibility */
1952         if (rc > 0 &&
1953             (mdt_attrs->lma_incompat & ~cpu_to_le32(LMA_INCOMPAT_SUPP))) {
1954                 CWARN("Inode %lx: Unsupported incompat LMA feature(s) %#x\n",
1955                       inode->i_ino, le32_to_cpu(mdt_attrs->lma_incompat) &
1956                       ~LMA_INCOMPAT_SUPP);
1957                 return -ENOSYS;
1958         }
1959
1960         if (rc > 0) {
1961                 lustre_lma_swab(mdt_attrs);
1962                 memcpy(fid, &mdt_attrs->lma_self_fid, sizeof(*fid));
1963                 rc = 0;
1964         } else if (rc == -ENODATA) {
1965                 osd_igif_get(env, inode, fid);
1966                 rc = 0;
1967         }
1968         iput(inode);
1969 out:
1970         RETURN(rc);
1971 }
1972
1973 /**
1974  * OSD layer object create function for interoperability mode (b11826).
1975  * This is mostly similar to osd_object_create(). Only difference being, fid is
1976  * inserted into inode ea here.
1977  *
1978  * \retval   0, on success
1979  * \retval -ve, on error
1980  */
1981 static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt,
1982                              struct lu_attr *attr,
1983                              struct dt_allocation_hint *hint,
1984                              struct dt_object_format *dof,
1985                              struct thandle *th)
1986 {
1987         const struct lu_fid    *fid    = lu_object_fid(&dt->do_lu);
1988         struct osd_object      *obj    = osd_dt_obj(dt);
1989         struct osd_thread_info *info   = osd_oti_get(env);
1990         int result;
1991
1992         ENTRY;
1993
1994         LASSERT(osd_invariant(obj));
1995         LASSERT(!dt_object_exists(dt));
1996         LASSERT(osd_write_locked(env, obj));
1997         LASSERT(th != NULL);
1998
1999         result = __osd_object_create(info, obj, attr, hint, dof, th);
2000
2001         /* objects under osd root shld have igif fid, so dont add fid EA */
2002         if (result == 0 && fid_seq(fid) >= FID_SEQ_NORMAL)
2003                 result = osd_ea_fid_set(env, dt, fid);
2004
2005         if (result == 0)
2006                 result = __osd_oi_insert(env, obj, fid, th);
2007
2008         LASSERT(ergo(result == 0, dt_object_exists(dt)));
2009         LINVRNT(osd_invariant(obj));
2010         RETURN(result);
2011 }
2012
2013 /*
2014  * Concurrency: @dt is write locked.
2015  */
2016 static void osd_object_ref_add(const struct lu_env *env,
2017                                struct dt_object *dt,
2018                                struct thandle *th)
2019 {
2020         struct osd_object *obj = osd_dt_obj(dt);
2021         struct inode *inode = obj->oo_inode;
2022
2023         LINVRNT(osd_invariant(obj));
2024         LASSERT(dt_object_exists(dt));
2025         LASSERT(osd_write_locked(env, obj));
2026         LASSERT(th != NULL);
2027
2028         cfs_spin_lock(&obj->oo_guard);
2029         LASSERT(inode->i_nlink < LDISKFS_LINK_MAX);
2030         inode->i_nlink++;
2031         cfs_spin_unlock(&obj->oo_guard);
2032         inode->i_sb->s_op->dirty_inode(inode);
2033         LINVRNT(osd_invariant(obj));
2034 }
2035
2036 /*
2037  * Concurrency: @dt is write locked.
2038  */
2039 static void osd_object_ref_del(const struct lu_env *env,
2040                                struct dt_object *dt,
2041                                struct thandle *th)
2042 {
2043         struct osd_object *obj = osd_dt_obj(dt);
2044         struct inode *inode = obj->oo_inode;
2045
2046         LINVRNT(osd_invariant(obj));
2047         LASSERT(dt_object_exists(dt));
2048         LASSERT(osd_write_locked(env, obj));
2049         LASSERT(th != NULL);
2050
2051         cfs_spin_lock(&obj->oo_guard);
2052         LASSERT(inode->i_nlink > 0);
2053         inode->i_nlink--;
2054         cfs_spin_unlock(&obj->oo_guard);
2055         inode->i_sb->s_op->dirty_inode(inode);
2056         LINVRNT(osd_invariant(obj));
2057 }
2058
2059 /*
2060  * Concurrency: @dt is read locked.
2061  */
2062 static int osd_xattr_get(const struct lu_env *env,
2063                          struct dt_object *dt,
2064                          struct lu_buf *buf,
2065                          const char *name,
2066                          struct lustre_capa *capa)
2067 {
2068         struct osd_object      *obj    = osd_dt_obj(dt);
2069         struct inode           *inode  = obj->oo_inode;
2070         struct osd_thread_info *info   = osd_oti_get(env);
2071         struct dentry          *dentry = &info->oti_obj_dentry;
2072
2073         LASSERT(dt_object_exists(dt));
2074         LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
2075         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
2076
2077         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
2078                 return -EACCES;
2079
2080         dentry->d_inode = inode;
2081         return inode->i_op->getxattr(dentry, name, buf->lb_buf, buf->lb_len);
2082 }
2083
2084 /*
2085  * Concurrency: @dt is write locked.
2086  */
2087 static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
2088                          const struct lu_buf *buf, const char *name, int fl,
2089                          struct thandle *handle, struct lustre_capa *capa)
2090 {
2091         LASSERT(handle != NULL);
2092
2093         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
2094                 return -EACCES;
2095
2096         return __osd_xattr_set(env, dt, buf, name, fl);
2097 }
2098
2099 /*
2100  * Concurrency: @dt is read locked.
2101  */
2102 static int osd_xattr_list(const struct lu_env *env,
2103                           struct dt_object *dt,
2104                           struct lu_buf *buf,
2105                           struct lustre_capa *capa)
2106 {
2107         struct osd_object      *obj    = osd_dt_obj(dt);
2108         struct inode           *inode  = obj->oo_inode;
2109         struct osd_thread_info *info   = osd_oti_get(env);
2110         struct dentry          *dentry = &info->oti_obj_dentry;
2111
2112         LASSERT(dt_object_exists(dt));
2113         LASSERT(inode->i_op != NULL && inode->i_op->listxattr != NULL);
2114         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
2115
2116         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
2117                 return -EACCES;
2118
2119         dentry->d_inode = inode;
2120         return inode->i_op->listxattr(dentry, buf->lb_buf, buf->lb_len);
2121 }
2122
2123 /*
2124  * Concurrency: @dt is write locked.
2125  */
2126 static int osd_xattr_del(const struct lu_env *env,
2127                          struct dt_object *dt,
2128                          const char *name,
2129                          struct thandle *handle,
2130                          struct lustre_capa *capa)
2131 {
2132         struct osd_object      *obj    = osd_dt_obj(dt);
2133         struct inode           *inode  = obj->oo_inode;
2134         struct osd_thread_info *info   = osd_oti_get(env);
2135         struct dentry          *dentry = &info->oti_obj_dentry;
2136         int                     rc;
2137
2138         LASSERT(dt_object_exists(dt));
2139         LASSERT(inode->i_op != NULL && inode->i_op->removexattr != NULL);
2140         LASSERT(osd_write_locked(env, obj));
2141         LASSERT(handle != NULL);
2142
2143         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
2144                 return -EACCES;
2145
2146         dentry->d_inode = inode;
2147         rc = inode->i_op->removexattr(dentry, name);
2148         return rc;
2149 }
2150
2151 static struct obd_capa *osd_capa_get(const struct lu_env *env,
2152                                      struct dt_object *dt,
2153                                      struct lustre_capa *old,
2154                                      __u64 opc)
2155 {
2156         struct osd_thread_info *info = osd_oti_get(env);
2157         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
2158         struct osd_object *obj = osd_dt_obj(dt);
2159         struct osd_device *dev = osd_obj2dev(obj);
2160         struct lustre_capa_key *key = &info->oti_capa_key;
2161         struct lustre_capa *capa = &info->oti_capa;
2162         struct obd_capa *oc;
2163         struct md_capainfo *ci;
2164         int rc;
2165         ENTRY;
2166
2167         if (!dev->od_fl_capa)
2168                 RETURN(ERR_PTR(-ENOENT));
2169
2170         LASSERT(dt_object_exists(dt));
2171         LINVRNT(osd_invariant(obj));
2172
2173         /* renewal sanity check */
2174         if (old && osd_object_auth(env, dt, old, opc))
2175                 RETURN(ERR_PTR(-EACCES));
2176
2177         ci = md_capainfo(env);
2178         if (unlikely(!ci))
2179                 RETURN(ERR_PTR(-ENOENT));
2180
2181         switch (ci->mc_auth) {
2182         case LC_ID_NONE:
2183                 RETURN(NULL);
2184         case LC_ID_PLAIN:
2185                 capa->lc_uid = obj->oo_inode->i_uid;
2186                 capa->lc_gid = obj->oo_inode->i_gid;
2187                 capa->lc_flags = LC_ID_PLAIN;
2188                 break;
2189         case LC_ID_CONVERT: {
2190                 __u32 d[4], s[4];
2191
2192                 s[0] = obj->oo_inode->i_uid;
2193                 cfs_get_random_bytes(&(s[1]), sizeof(__u32));
2194                 s[2] = obj->oo_inode->i_gid;
2195                 cfs_get_random_bytes(&(s[3]), sizeof(__u32));
2196                 rc = capa_encrypt_id(d, s, key->lk_key, CAPA_HMAC_KEY_MAX_LEN);
2197                 if (unlikely(rc))
2198                         RETURN(ERR_PTR(rc));
2199
2200                 capa->lc_uid   = ((__u64)d[1] << 32) | d[0];
2201                 capa->lc_gid   = ((__u64)d[3] << 32) | d[2];
2202                 capa->lc_flags = LC_ID_CONVERT;
2203                 break;
2204         }
2205         default:
2206                 RETURN(ERR_PTR(-EINVAL));
2207         }
2208
2209         capa->lc_fid = *fid;
2210         capa->lc_opc = opc;
2211         capa->lc_flags |= dev->od_capa_alg << 24;
2212         capa->lc_timeout = dev->od_capa_timeout;
2213         capa->lc_expiry = 0;
2214
2215         oc = capa_lookup(dev->od_capa_hash, capa, 1);
2216         if (oc) {
2217                 LASSERT(!capa_is_expired(oc));
2218                 RETURN(oc);
2219         }
2220
2221         cfs_spin_lock(&capa_lock);
2222         *key = dev->od_capa_keys[1];
2223         cfs_spin_unlock(&capa_lock);
2224
2225         capa->lc_keyid = key->lk_keyid;
2226         capa->lc_expiry = cfs_time_current_sec() + dev->od_capa_timeout;
2227
2228         rc = capa_hmac(capa->lc_hmac, capa, key->lk_key);
2229         if (rc) {
2230                 DEBUG_CAPA(D_ERROR, capa, "HMAC failed: %d for", rc);
2231                 RETURN(ERR_PTR(rc));
2232         }
2233
2234         oc = capa_add(dev->od_capa_hash, capa);
2235         RETURN(oc);
2236 }
2237
2238 static int osd_object_sync(const struct lu_env *env, struct dt_object *dt)
2239 {
2240         int rc;
2241         struct osd_object      *obj    = osd_dt_obj(dt);
2242         struct inode           *inode  = obj->oo_inode;
2243         struct osd_thread_info *info   = osd_oti_get(env);
2244         struct dentry          *dentry = &info->oti_obj_dentry;
2245         struct file            *file   = &info->oti_file;
2246         ENTRY;
2247
2248         dentry->d_inode = inode;
2249         file->f_dentry = dentry;
2250         file->f_mapping = inode->i_mapping;
2251         file->f_op = inode->i_fop;
2252         LOCK_INODE_MUTEX(inode);
2253         rc = file->f_op->fsync(file, dentry, 0);
2254         UNLOCK_INODE_MUTEX(inode);
2255         RETURN(rc);
2256 }
2257
2258 /*
2259  * Get the 64-bit version for an inode.
2260  */
2261 static dt_obj_version_t osd_object_version_get(const struct lu_env *env,
2262                                                struct dt_object *dt)
2263 {
2264         struct inode *inode = osd_dt_obj(dt)->oo_inode;
2265
2266         CDEBUG(D_INFO, "Get version "LPX64" for inode %lu\n",
2267                LDISKFS_I(inode)->i_fs_version, inode->i_ino);
2268         return LDISKFS_I(inode)->i_fs_version;
2269 }
2270
2271 /*
2272  * Set the 64-bit version and return the old version.
2273  */
2274 static void osd_object_version_set(const struct lu_env *env, struct dt_object *dt,
2275                                    dt_obj_version_t new_version)
2276 {
2277         struct inode *inode = osd_dt_obj(dt)->oo_inode;
2278
2279         CDEBUG(D_INFO, "Set version "LPX64" (old "LPX64") for inode %lu\n",
2280                new_version, LDISKFS_I(inode)->i_fs_version, inode->i_ino);
2281         LDISKFS_I(inode)->i_fs_version = new_version;
2282         /** Version is set after all inode operations are finished,
2283          *  so we should mark it dirty here */
2284         inode->i_sb->s_op->dirty_inode(inode);
2285 }
2286
2287 static int osd_data_get(const struct lu_env *env, struct dt_object *dt,
2288                         void **data)
2289 {
2290         struct osd_object *obj = osd_dt_obj(dt);
2291         ENTRY;
2292
2293         *data = (void *)obj->oo_inode;
2294         RETURN(0);
2295 }
2296
2297 /*
2298  * Index operations.
2299  */
2300
2301 static int osd_iam_index_probe(const struct lu_env *env, struct osd_object *o,
2302                            const struct dt_index_features *feat)
2303 {
2304         struct iam_descr *descr;
2305
2306         if (osd_object_is_root(o))
2307                 return feat == &dt_directory_features;
2308
2309         LASSERT(o->oo_dir != NULL);
2310
2311         descr = o->oo_dir->od_container.ic_descr;
2312         if (feat == &dt_directory_features) {
2313                 if (descr->id_rec_size == sizeof(struct osd_fid_pack))
2314                         return 1;
2315                 else
2316                         return 0;
2317         } else {
2318                 return
2319                         feat->dif_keysize_min <= descr->id_key_size &&
2320                         descr->id_key_size <= feat->dif_keysize_max &&
2321                         feat->dif_recsize_min <= descr->id_rec_size &&
2322                         descr->id_rec_size <= feat->dif_recsize_max &&
2323                         !(feat->dif_flags & (DT_IND_VARKEY |
2324                                              DT_IND_VARREC | DT_IND_NONUNQ)) &&
2325                         ergo(feat->dif_flags & DT_IND_UPDATE,
2326                              1 /* XXX check that object (and file system) is
2327                                 * writable */);
2328         }
2329 }
2330
2331 static int osd_iam_container_init(const struct lu_env *env,
2332                                   struct osd_object *obj,
2333                                   struct osd_directory *dir)
2334 {
2335         int result;
2336         struct iam_container *bag;
2337
2338         bag    = &dir->od_container;
2339         result = iam_container_init(bag, &dir->od_descr, obj->oo_inode);
2340         if (result == 0) {
2341                 result = iam_container_setup(bag);
2342                 if (result == 0)
2343                         obj->oo_dt.do_index_ops = &osd_index_iam_ops;
2344                 else
2345                         iam_container_fini(bag);
2346         }
2347         return result;
2348 }
2349
2350
2351 /*
2352  * Concurrency: no external locking is necessary.
2353  */
2354 static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
2355                          const struct dt_index_features *feat)
2356 {
2357         int result;
2358         int ea_dir = 0;
2359         struct osd_object *obj = osd_dt_obj(dt);
2360         struct osd_device *osd = osd_obj2dev(obj);
2361
2362         LINVRNT(osd_invariant(obj));
2363         LASSERT(dt_object_exists(dt));
2364
2365         if (osd_object_is_root(obj)) {
2366                 dt->do_index_ops = &osd_index_ea_ops;
2367                 result = 0;
2368         } else if (feat == &dt_directory_features && osd->od_iop_mode) {
2369                 dt->do_index_ops = &osd_index_ea_ops;
2370                 if (S_ISDIR(obj->oo_inode->i_mode))
2371                         result = 0;
2372                 else
2373                         result = -ENOTDIR;
2374                 ea_dir = 1;
2375         } else if (!osd_has_index(obj)) {
2376                 struct osd_directory *dir;
2377
2378                 OBD_ALLOC_PTR(dir);
2379                 if (dir != NULL) {
2380
2381                         cfs_spin_lock(&obj->oo_guard);
2382                         if (obj->oo_dir == NULL)
2383                                 obj->oo_dir = dir;
2384                         else
2385                                 /*
2386                                  * Concurrent thread allocated container data.
2387                                  */
2388                                 OBD_FREE_PTR(dir);
2389                         cfs_spin_unlock(&obj->oo_guard);
2390                         /*
2391                          * Now, that we have container data, serialize its
2392                          * initialization.
2393                          */
2394                         cfs_down_write(&obj->oo_ext_idx_sem);
2395                         /*
2396                          * recheck under lock.
2397                          */
2398                         if (!osd_has_index(obj))
2399                                 result = osd_iam_container_init(env, obj, dir);
2400                         else
2401                                 result = 0;
2402                         cfs_up_write(&obj->oo_ext_idx_sem);
2403                 } else
2404                         result = -ENOMEM;
2405         } else
2406                 result = 0;
2407
2408         if (result == 0 && ea_dir == 0) {
2409                 if (!osd_iam_index_probe(env, obj, feat))
2410                         result = -ENOTDIR;
2411         }
2412         LINVRNT(osd_invariant(obj));
2413
2414         return result;
2415 }
2416
2417 static const struct dt_object_operations osd_obj_ops = {
2418         .do_read_lock    = osd_object_read_lock,
2419         .do_write_lock   = osd_object_write_lock,
2420         .do_read_unlock  = osd_object_read_unlock,
2421         .do_write_unlock = osd_object_write_unlock,
2422         .do_write_locked = osd_object_write_locked,
2423         .do_attr_get     = osd_attr_get,
2424         .do_attr_set     = osd_attr_set,
2425         .do_ah_init      = osd_ah_init,
2426         .do_create       = osd_object_create,
2427         .do_index_try    = osd_index_try,
2428         .do_ref_add      = osd_object_ref_add,
2429         .do_ref_del      = osd_object_ref_del,
2430         .do_xattr_get    = osd_xattr_get,
2431         .do_xattr_set    = osd_xattr_set,
2432         .do_xattr_del    = osd_xattr_del,
2433         .do_xattr_list   = osd_xattr_list,
2434         .do_capa_get     = osd_capa_get,
2435         .do_object_sync  = osd_object_sync,
2436         .do_version_get  = osd_object_version_get,
2437         .do_version_set  = osd_object_version_set,
2438         .do_data_get     = osd_data_get,
2439 };
2440
2441 /**
2442  * dt_object_operations for interoperability mode
2443  * (i.e. to run 2.0 mds on 1.8 disk) (b11826)
2444  */
2445 static const struct dt_object_operations osd_obj_ea_ops = {
2446         .do_read_lock    = osd_object_read_lock,
2447         .do_write_lock   = osd_object_write_lock,
2448         .do_read_unlock  = osd_object_read_unlock,
2449         .do_write_unlock = osd_object_write_unlock,
2450         .do_write_locked = osd_object_write_locked,
2451         .do_attr_get     = osd_attr_get,
2452         .do_attr_set     = osd_attr_set,
2453         .do_ah_init      = osd_ah_init,
2454         .do_create       = osd_object_ea_create,
2455         .do_index_try    = osd_index_try,
2456         .do_ref_add      = osd_object_ref_add,
2457         .do_ref_del      = osd_object_ref_del,
2458         .do_xattr_get    = osd_xattr_get,
2459         .do_xattr_set    = osd_xattr_set,
2460         .do_xattr_del    = osd_xattr_del,
2461         .do_xattr_list   = osd_xattr_list,
2462         .do_capa_get     = osd_capa_get,
2463         .do_object_sync  = osd_object_sync,
2464         .do_version_get  = osd_object_version_get,
2465         .do_version_set  = osd_object_version_set,
2466         .do_data_get     = osd_data_get,
2467 };
2468
2469 /*
2470  * Body operations.
2471  */
2472
2473 /*
2474  * XXX: Another layering violation for now.
2475  *
2476  * We don't want to use ->f_op->read methods, because generic file write
2477  *
2478  *         - serializes on ->i_sem, and
2479  *
2480  *         - does a lot of extra work like balance_dirty_pages(),
2481  *
2482  * which doesn't work for globally shared files like /last-received.
2483  */
2484 static int osd_ldiskfs_readlink(struct inode *inode, char *buffer, int buflen)
2485 {
2486         struct ldiskfs_inode_info *ei = LDISKFS_I(inode);
2487
2488         memcpy(buffer, (char*)ei->i_data, buflen);
2489
2490         return  buflen;
2491 }
2492
2493 static int osd_ldiskfs_read(struct inode *inode, void *buf, int size,
2494                             loff_t *offs)
2495 {
2496         struct buffer_head *bh;
2497         unsigned long block;
2498         int osize = size;
2499         int blocksize;
2500         int csize;
2501         int boffs;
2502         int err;
2503
2504         /* prevent reading after eof */
2505         spin_lock(&inode->i_lock);
2506         if (i_size_read(inode) < *offs + size) {
2507                 size = i_size_read(inode) - *offs;
2508                 spin_unlock(&inode->i_lock);
2509                 if (size < 0) {
2510                         CDEBUG(D_EXT2, "size %llu is too short to read @%llu\n",
2511                                i_size_read(inode), *offs);
2512                         return -EBADR;
2513                 } else if (size == 0) {
2514                         return 0;
2515                 }
2516         } else {
2517                 spin_unlock(&inode->i_lock);
2518         }
2519
2520         blocksize = 1 << inode->i_blkbits;
2521
2522         while (size > 0) {
2523                 block = *offs >> inode->i_blkbits;
2524                 boffs = *offs & (blocksize - 1);
2525                 csize = min(blocksize - boffs, size);
2526                 bh = ldiskfs_bread(NULL, inode, block, 0, &err);
2527                 if (!bh) {
2528                         CERROR("can't read block: %d\n", err);
2529                         return err;
2530                 }
2531
2532                 memcpy(buf, bh->b_data + boffs, csize);
2533                 brelse(bh);
2534
2535                 *offs += csize;
2536                 buf += csize;
2537                 size -= csize;
2538         }
2539         return osize;
2540 }
2541
2542 static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt,
2543                         struct lu_buf *buf, loff_t *pos,
2544                         struct lustre_capa *capa)
2545 {
2546         struct osd_object      *obj    = osd_dt_obj(dt);
2547         struct inode           *inode  = obj->oo_inode;
2548         int rc;
2549
2550         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
2551                 RETURN(-EACCES);
2552
2553         /* Read small symlink from inode body as we need to maintain correct
2554          * on-disk symlinks for ldiskfs.
2555          */
2556         if (S_ISLNK(obj->oo_dt.do_lu.lo_header->loh_attr) &&
2557             (buf->lb_len <= sizeof (LDISKFS_I(inode)->i_data)))
2558                 rc = osd_ldiskfs_readlink(inode, buf->lb_buf, buf->lb_len);
2559         else
2560                 rc = osd_ldiskfs_read(inode, buf->lb_buf, buf->lb_len, pos);
2561
2562         return rc;
2563 }
2564
2565 static int osd_ldiskfs_writelink(struct inode *inode, char *buffer, int buflen)
2566 {
2567
2568         memcpy((char*)&LDISKFS_I(inode)->i_data, (char *)buffer,
2569                buflen);
2570         LDISKFS_I(inode)->i_disksize = buflen;
2571         i_size_write(inode, buflen);
2572         inode->i_sb->s_op->dirty_inode(inode);
2573
2574         return 0;
2575 }
2576
2577 static int osd_ldiskfs_write_record(struct inode *inode, void *buf, int bufsize,
2578                                     loff_t *offs, handle_t *handle)
2579 {
2580         struct buffer_head *bh = NULL;
2581         loff_t offset = *offs;
2582         loff_t new_size = i_size_read(inode);
2583         unsigned long block;
2584         int blocksize = 1 << inode->i_blkbits;
2585         int err = 0;
2586         int size;
2587         int boffs;
2588         int dirty_inode = 0;
2589
2590         while (bufsize > 0) {
2591                 if (bh != NULL)
2592                         brelse(bh);
2593
2594                 block = offset >> inode->i_blkbits;
2595                 boffs = offset & (blocksize - 1);
2596                 size = min(blocksize - boffs, bufsize);
2597                 bh = ldiskfs_bread(handle, inode, block, 1, &err);
2598                 if (!bh) {
2599                         CERROR("can't read/create block: %d\n", err);
2600                         break;
2601                 }
2602
2603                 err = ldiskfs_journal_get_write_access(handle, bh);
2604                 if (err) {
2605                         CERROR("journal_get_write_access() returned error %d\n",
2606                                err);
2607                         break;
2608                 }
2609                 LASSERTF(boffs + size <= bh->b_size,
2610                          "boffs %d size %d bh->b_size %lu",
2611                          boffs, size, (unsigned long)bh->b_size);
2612                 memcpy(bh->b_data + boffs, buf, size);
2613                 err = ldiskfs_journal_dirty_metadata(handle, bh);
2614                 if (err)
2615                         break;
2616
2617                 if (offset + size > new_size)
2618                         new_size = offset + size;
2619                 offset += size;
2620                 bufsize -= size;
2621                 buf += size;
2622         }
2623         if (bh)
2624                 brelse(bh);
2625
2626         /* correct in-core and on-disk sizes */
2627         if (new_size > i_size_read(inode)) {
2628                 spin_lock(&inode->i_lock);
2629                 if (new_size > i_size_read(inode))
2630                         i_size_write(inode, new_size);
2631                 if (i_size_read(inode) > LDISKFS_I(inode)->i_disksize) {
2632                         LDISKFS_I(inode)->i_disksize = i_size_read(inode);
2633                         dirty_inode = 1;
2634                 }
2635                 spin_unlock(&inode->i_lock);
2636                 if (dirty_inode)
2637                         inode->i_sb->s_op->dirty_inode(inode);
2638         }
2639
2640         if (err == 0)
2641                 *offs = offset;
2642         return err;
2643 }
2644
2645 static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
2646                          const struct lu_buf *buf, loff_t *pos,
2647                          struct thandle *handle, struct lustre_capa *capa,
2648                          int ignore_quota)
2649 {
2650         struct osd_object  *obj   = osd_dt_obj(dt);
2651         struct inode       *inode = obj->oo_inode;
2652         struct osd_thandle *oh;
2653         ssize_t            result = 0;
2654 #ifdef HAVE_QUOTA_SUPPORT
2655         cfs_cap_t           save = cfs_curproc_cap_pack();
2656 #endif
2657
2658         LASSERT(handle != NULL);
2659
2660         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_WRITE))
2661                 RETURN(-EACCES);
2662
2663         oh = container_of(handle, struct osd_thandle, ot_super);
2664         LASSERT(oh->ot_handle->h_transaction != NULL);
2665 #ifdef HAVE_QUOTA_SUPPORT
2666         if (ignore_quota)
2667                 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
2668         else
2669                 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
2670 #endif
2671         /* Write small symlink to inode body as we need to maintain correct
2672          * on-disk symlinks for ldiskfs.
2673          */
2674         if(S_ISLNK(obj->oo_dt.do_lu.lo_header->loh_attr) &&
2675            (buf->lb_len < sizeof (LDISKFS_I(inode)->i_data)))
2676                 result = osd_ldiskfs_writelink(inode, buf->lb_buf, buf->lb_len);
2677         else
2678                 result = osd_ldiskfs_write_record(inode, buf->lb_buf,
2679                                                   buf->lb_len, pos,
2680                                                   oh->ot_handle);
2681 #ifdef HAVE_QUOTA_SUPPORT
2682         cfs_curproc_cap_unpack(save);
2683 #endif
2684         if (result == 0)
2685                 result = buf->lb_len;
2686         return result;
2687 }
2688
2689 static const struct dt_body_operations osd_body_ops = {
2690         .dbo_read  = osd_read,
2691         .dbo_write = osd_write
2692 };
2693
2694
2695 /**
2696  *      delete a (key, value) pair from index \a dt specified by \a key
2697  *
2698  *      \param  dt      osd index object
2699  *      \param  key     key for index
2700  *      \param  rec     record reference
2701  *      \param  handle  transaction handler
2702  *
2703  *      \retval  0  success
2704  *      \retval -ve   failure
2705  */
2706
2707 static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt,
2708                                 const struct dt_key *key, struct thandle *handle,
2709                                 struct lustre_capa *capa)
2710 {
2711         struct osd_object     *obj = osd_dt_obj(dt);
2712         struct osd_thandle    *oh;
2713         struct iam_path_descr *ipd;
2714         struct iam_container  *bag = &obj->oo_dir->od_container;
2715         int rc;
2716
2717         ENTRY;
2718
2719         LINVRNT(osd_invariant(obj));
2720         LASSERT(dt_object_exists(dt));
2721         LASSERT(bag->ic_object == obj->oo_inode);
2722         LASSERT(handle != NULL);
2723
2724         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
2725                 RETURN(-EACCES);
2726
2727         ipd = osd_idx_ipd_get(env, bag);
2728         if (unlikely(ipd == NULL))
2729                 RETURN(-ENOMEM);
2730
2731         oh = container_of0(handle, struct osd_thandle, ot_super);
2732         LASSERT(oh->ot_handle != NULL);
2733         LASSERT(oh->ot_handle->h_transaction != NULL);
2734
2735         rc = iam_delete(oh->ot_handle, bag, (const struct iam_key *)key, ipd);
2736         osd_ipd_put(env, bag, ipd);
2737         LINVRNT(osd_invariant(obj));
2738         RETURN(rc);
2739 }
2740
2741 static inline int osd_get_fid_from_dentry(struct ldiskfs_dir_entry_2 *de,
2742                                           struct dt_rec *fid)
2743 {
2744         struct osd_fid_pack *rec;
2745         int rc = -ENODATA;
2746
2747         if (de->file_type & LDISKFS_DIRENT_LUFID) {
2748                 rec = (struct osd_fid_pack *) (de->name + de->name_len + 1);
2749                 rc = osd_fid_unpack((struct lu_fid *)fid, rec);
2750         }
2751         RETURN(rc);
2752 }
2753
2754 /**
2755  * Index delete function for interoperability mode (b11826).
2756  * It will remove the directory entry added by osd_index_ea_insert().
2757  * This entry is needed to maintain name->fid mapping.
2758  *
2759  * \param key,  key i.e. file entry to be deleted
2760  *
2761  * \retval   0, on success
2762  * \retval -ve, on error
2763  */
2764 static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
2765                                const struct dt_key *key, struct thandle *handle,
2766                                struct lustre_capa *capa)
2767 {
2768         struct osd_object          *obj    = osd_dt_obj(dt);
2769         struct inode               *dir    = obj->oo_inode;
2770         struct dentry              *dentry;
2771         struct osd_thandle         *oh;
2772         struct ldiskfs_dir_entry_2 *de;
2773         struct buffer_head         *bh;
2774
2775         int rc;
2776
2777         ENTRY;
2778
2779         LINVRNT(osd_invariant(obj));
2780         LASSERT(dt_object_exists(dt));
2781         LASSERT(handle != NULL);
2782
2783         oh = container_of(handle, struct osd_thandle, ot_super);
2784         LASSERT(oh->ot_handle != NULL);
2785         LASSERT(oh->ot_handle->h_transaction != NULL);
2786
2787         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
2788                 RETURN(-EACCES);
2789
2790         dentry = osd_child_dentry_get(env, obj,
2791                                       (char *)key, strlen((char *)key));
2792
2793         cfs_down_write(&obj->oo_ext_idx_sem);
2794         bh = ll_ldiskfs_find_entry(dir, dentry, &de);
2795         if (bh) {
2796                 rc = ldiskfs_delete_entry(oh->ot_handle,
2797                                 dir, de, bh);
2798                 brelse(bh);
2799         } else
2800                 rc = -ENOENT;
2801
2802         cfs_up_write(&obj->oo_ext_idx_sem);
2803         LASSERT(osd_invariant(obj));
2804         RETURN(rc);
2805 }
2806
2807 /**
2808  *      Lookup index for \a key and copy record to \a rec.
2809  *
2810  *      \param  dt      osd index object
2811  *      \param  key     key for index
2812  *      \param  rec     record reference
2813  *
2814  *      \retval  +ve  success : exact mach
2815  *      \retval  0    return record with key not greater than \a key
2816  *      \retval -ve   failure
2817  */
2818 static int osd_index_iam_lookup(const struct lu_env *env, struct dt_object *dt,
2819                                 struct dt_rec *rec, const struct dt_key *key,
2820                                 struct lustre_capa *capa)
2821 {
2822         struct osd_object     *obj = osd_dt_obj(dt);
2823         struct iam_path_descr *ipd;
2824         struct iam_container  *bag = &obj->oo_dir->od_container;
2825         struct osd_thread_info *oti = osd_oti_get(env);
2826         struct iam_iterator    *it = &oti->oti_idx_it;
2827         struct iam_rec *iam_rec;
2828         int rc;
2829         ENTRY;
2830
2831         LASSERT(osd_invariant(obj));
2832         LASSERT(dt_object_exists(dt));
2833         LASSERT(bag->ic_object == obj->oo_inode);
2834
2835         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
2836                 RETURN(-EACCES);
2837
2838         ipd = osd_idx_ipd_get(env, bag);
2839         if (IS_ERR(ipd))
2840                 RETURN(-ENOMEM);
2841
2842         /* got ipd now we can start iterator. */
2843         iam_it_init(it, bag, 0, ipd);
2844
2845         rc = iam_it_get(it, (struct iam_key *)key);
2846         if (rc >= 0) {
2847                 if (S_ISDIR(obj->oo_inode->i_mode))
2848                         iam_rec = (struct iam_rec *)oti->oti_ldp;
2849                 else
2850                         iam_rec = (struct iam_rec *) rec;
2851
2852                 iam_reccpy(&it->ii_path.ip_leaf, (struct iam_rec *)iam_rec);
2853                 if (S_ISDIR(obj->oo_inode->i_mode))
2854                         osd_fid_unpack((struct lu_fid *) rec,
2855                                        (struct osd_fid_pack *)iam_rec);
2856         }
2857         iam_it_put(it);
2858         iam_it_fini(it);
2859         osd_ipd_put(env, bag, ipd);
2860
2861         LINVRNT(osd_invariant(obj));
2862
2863         RETURN(rc);
2864 }
2865
2866 /**
2867  *      Inserts (key, value) pair in \a dt index object.
2868  *
2869  *      \param  dt      osd index object
2870  *      \param  key     key for index
2871  *      \param  rec     record reference
2872  *      \param  th      transaction handler
2873  *
2874  *      \retval  0  success
2875  *      \retval -ve failure
2876  */
2877 static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt,
2878                                 const struct dt_rec *rec, const struct dt_key *key,
2879                                 struct thandle *th, struct lustre_capa *capa,
2880                                 int ignore_quota)
2881 {
2882         struct osd_object     *obj = osd_dt_obj(dt);
2883         struct iam_path_descr *ipd;
2884         struct osd_thandle    *oh;
2885         struct iam_container  *bag = &obj->oo_dir->od_container;
2886 #ifdef HAVE_QUOTA_SUPPORT
2887         cfs_cap_t              save = cfs_curproc_cap_pack();
2888 #endif
2889         struct osd_thread_info *oti = osd_oti_get(env);
2890         struct iam_rec *iam_rec = (struct iam_rec *)oti->oti_ldp;
2891         int rc;
2892
2893         ENTRY;
2894
2895         LINVRNT(osd_invariant(obj));
2896         LASSERT(dt_object_exists(dt));
2897         LASSERT(bag->ic_object == obj->oo_inode);
2898         LASSERT(th != NULL);
2899
2900         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
2901                 return -EACCES;
2902
2903         ipd = osd_idx_ipd_get(env, bag);
2904         if (unlikely(ipd == NULL))
2905                 RETURN(-ENOMEM);
2906
2907         oh = container_of0(th, struct osd_thandle, ot_super);
2908         LASSERT(oh->ot_handle != NULL);
2909         LASSERT(oh->ot_handle->h_transaction != NULL);
2910 #ifdef HAVE_QUOTA_SUPPORT
2911         if (ignore_quota)
2912                 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
2913         else
2914                 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
2915 #endif
2916         if (S_ISDIR(obj->oo_inode->i_mode))
2917                 osd_fid_pack((struct osd_fid_pack *)iam_rec, rec, &oti->oti_fid);
2918         else
2919                 iam_rec = (struct iam_rec *) rec;
2920         rc = iam_insert(oh->ot_handle, bag, (const struct iam_key *)key,
2921                         iam_rec, ipd);
2922 #ifdef HAVE_QUOTA_SUPPORT
2923         cfs_curproc_cap_unpack(save);
2924 #endif
2925         osd_ipd_put(env, bag, ipd);
2926         LINVRNT(osd_invariant(obj));
2927         RETURN(rc);
2928 }
2929
2930 /**
2931  * Calls ldiskfs_add_entry() to add directory entry
2932  * into the directory. This is required for
2933  * interoperability mode (b11826)
2934  *
2935  * \retval   0, on success
2936  * \retval -ve, on error
2937  */
2938 static int __osd_ea_add_rec(struct osd_thread_info *info,
2939                             struct osd_object *pobj,
2940                             struct inode  *cinode,
2941                             const char *name,
2942                             const struct dt_rec *fid,
2943                             struct thandle *th)
2944 {
2945         struct ldiskfs_dentry_param *ldp;
2946         struct dentry      *child;
2947         struct osd_thandle *oth;
2948         int rc;
2949
2950         oth = container_of(th, struct osd_thandle, ot_super);
2951         LASSERT(oth->ot_handle != NULL);
2952         LASSERT(oth->ot_handle->h_transaction != NULL);
2953
2954         child = osd_child_dentry_get(info->oti_env, pobj, name, strlen(name));
2955
2956         if (fid_is_igif((struct lu_fid *)fid) ||
2957             fid_is_norm((struct lu_fid *)fid)) {
2958                 ldp = (struct ldiskfs_dentry_param *)info->oti_ldp;
2959                 osd_get_ldiskfs_dirent_param(ldp, fid);
2960                 child->d_fsdata = (void*) ldp;
2961         } else
2962                 child->d_fsdata = NULL;
2963         rc = ldiskfs_add_entry(oth->ot_handle, child, cinode);
2964
2965         RETURN(rc);
2966 }
2967
2968 /**
2969  * Calls ldiskfs_add_dot_dotdot() to add dot and dotdot entries
2970  * into the directory.Also sets flags into osd object to
2971  * indicate dot and dotdot are created. This is required for
2972  * interoperability mode (b11826)
2973  *
2974  * \param dir   directory for dot and dotdot fixup.
2975  * \param obj   child object for linking
2976  *
2977  * \retval   0, on success
2978  * \retval -ve, on error
2979  */
2980 static int osd_add_dot_dotdot(struct osd_thread_info *info,
2981                               struct osd_object *dir,
2982                               struct inode  *parent_dir, const char *name,
2983                               const struct dt_rec *dot_fid,
2984                               const struct dt_rec *dot_dot_fid,
2985                               struct thandle *th)
2986 {
2987         struct inode            *inode  = dir->oo_inode;
2988         struct ldiskfs_dentry_param *dot_ldp;
2989         struct ldiskfs_dentry_param *dot_dot_ldp;
2990         struct osd_thandle      *oth;
2991         int result = 0;
2992
2993         oth = container_of(th, struct osd_thandle, ot_super);
2994         LASSERT(oth->ot_handle->h_transaction != NULL);
2995         LASSERT(S_ISDIR(dir->oo_inode->i_mode));
2996
2997         if (strcmp(name, dot) == 0) {
2998                 if (dir->oo_compat_dot_created) {
2999                         result = -EEXIST;
3000                 } else {
3001                         LASSERT(inode == parent_dir);
3002                         dir->oo_compat_dot_created = 1;
3003                         result = 0;
3004                 }
3005         } else if(strcmp(name, dotdot) == 0) {
3006                 dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp;
3007                 dot_dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp2;
3008
3009                 if (!dir->oo_compat_dot_created)
3010                         return -EINVAL;
3011                 if (fid_seq((struct lu_fid *)dot_fid) >= FID_SEQ_NORMAL) {
3012                         osd_get_ldiskfs_dirent_param(dot_ldp, dot_fid);
3013                         osd_get_ldiskfs_dirent_param(dot_dot_ldp, dot_dot_fid);
3014                 } else {
3015                         dot_ldp = NULL;
3016                         dot_dot_ldp = NULL;
3017                 }
3018                 /* in case of rename, dotdot is already created */
3019                 if (dir->oo_compat_dotdot_created) {
3020                         return __osd_ea_add_rec(info, dir, parent_dir, name,
3021                                                 dot_dot_fid, th);
3022                 }
3023
3024                 result = ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir, inode,
3025                                                 dot_ldp, dot_dot_ldp);
3026                 if (result == 0)
3027                        dir->oo_compat_dotdot_created = 1;
3028         }
3029
3030         return result;
3031 }
3032
3033
3034 /**
3035  * It will call the appropriate osd_add* function and return the
3036  * value, return by respective functions.
3037  */
3038 static int osd_ea_add_rec(const struct lu_env *env,
3039                           struct osd_object *pobj,
3040                           struct inode *cinode,
3041                           const char *name,
3042                           const struct dt_rec *fid,
3043                           struct thandle *th)
3044 {
3045         struct osd_thread_info    *info   = osd_oti_get(env);
3046         int rc;
3047
3048         if (name[0] == '.' && (name[1] == '\0' || (name[1] == '.' &&
3049                                                    name[2] =='\0')))
3050                 rc = osd_add_dot_dotdot(info, pobj, cinode, name,
3051                      (struct dt_rec *)lu_object_fid(&pobj->oo_dt.do_lu),
3052                                         fid, th);
3053         else
3054                 rc = __osd_ea_add_rec(info, pobj, cinode, name, fid, th);
3055
3056         return rc;
3057 }
3058
3059 /**
3060  * Calls ->lookup() to find dentry. From dentry get inode and
3061  * read inode's ea to get fid. This is required for  interoperability
3062  * mode (b11826)
3063  *
3064  * \retval   0, on success
3065  * \retval -ve, on error
3066  */
3067 static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
3068                              struct dt_rec *rec, const struct dt_key *key)
3069 {
3070         struct inode               *dir    = obj->oo_inode;
3071         struct dentry              *dentry;
3072         struct ldiskfs_dir_entry_2 *de;
3073         struct buffer_head         *bh;
3074         struct lu_fid              *fid = (struct lu_fid *) rec;
3075         int ino;
3076         int rc;
3077
3078         LASSERT(dir->i_op != NULL && dir->i_op->lookup != NULL);
3079
3080         dentry = osd_child_dentry_get(env, obj,
3081                                       (char *)key, strlen((char *)key));
3082
3083         cfs_down_read(&obj->oo_ext_idx_sem);
3084         bh = ll_ldiskfs_find_entry(dir, dentry, &de);
3085         if (bh) {
3086                 ino = le32_to_cpu(de->inode);
3087                 rc = osd_get_fid_from_dentry(de, rec);
3088
3089                 /* done with de, release bh */
3090                 brelse(bh);
3091                 if (rc != 0)
3092                         rc = osd_ea_fid_get(env, obj, ino, fid);
3093         } else
3094                 rc = -ENOENT;
3095
3096         cfs_up_read(&obj->oo_ext_idx_sem);
3097         RETURN (rc);
3098 }
3099
3100 /**
3101  * Find the osd object for given fid.
3102  *
3103  * \param fid need to find the osd object having this fid
3104  *
3105  * \retval osd_object on success
3106  * \retval        -ve on error
3107  */
3108 struct osd_object *osd_object_find(const struct lu_env *env,
3109                                    struct dt_object *dt,
3110                                    const struct lu_fid *fid)
3111 {
3112         struct lu_device         *ludev = dt->do_lu.lo_dev;
3113         struct osd_object        *child = NULL;
3114         struct lu_object         *luch;
3115         struct lu_object         *lo;
3116
3117         luch = lu_object_find(env, ludev, fid, NULL);
3118         if (!IS_ERR(luch)) {
3119                 if (lu_object_exists(luch)) {
3120                         lo = lu_object_locate(luch->lo_header, ludev->ld_type);
3121                         if (lo != NULL)
3122                                 child = osd_obj(lo);
3123                         else
3124                                 LU_OBJECT_DEBUG(D_ERROR, env, luch,
3125                                                 "lu_object can't be located"
3126                                                 ""DFID"\n", PFID(fid));
3127
3128                         if (child == NULL) {
3129                                 lu_object_put(env, luch);
3130                                 CERROR("Unable to get osd_object\n");
3131                                 child = ERR_PTR(-ENOENT);
3132                         }
3133                 } else {
3134                         LU_OBJECT_DEBUG(D_ERROR, env, luch,
3135                                         "lu_object does not exists "DFID"\n",
3136                                         PFID(fid));
3137                         child = ERR_PTR(-ENOENT);
3138                 }
3139         } else
3140                 child = (void *)luch;
3141
3142         return child;
3143 }
3144
3145 /**
3146  * Put the osd object once done with it.
3147  *
3148  * \param obj osd object that needs to be put
3149  */
3150 static inline void osd_object_put(const struct lu_env *env,
3151                                   struct osd_object *obj)
3152 {
3153         lu_object_put(env, &obj->oo_dt.do_lu);
3154 }
3155
3156 /**
3157  * Index add function for interoperability mode (b11826).
3158  * It will add the directory entry.This entry is needed to
3159  * maintain name->fid mapping.
3160  *
3161  * \param key it is key i.e. file entry to be inserted
3162  * \param rec it is value of given key i.e. fid
3163  *
3164  * \retval   0, on success
3165  * \retval -ve, on error
3166  */
3167 static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
3168                                const struct dt_rec *rec,
3169                                const struct dt_key *key, struct thandle *th,
3170                                struct lustre_capa *capa, int ignore_quota)
3171 {
3172         struct osd_object        *obj   = osd_dt_obj(dt);
3173         struct lu_fid            *fid   = (struct lu_fid *) rec;
3174         const char               *name  = (const char *)key;
3175         struct osd_object        *child;
3176 #ifdef HAVE_QUOTA_SUPPORT
3177         cfs_cap_t                 save  = cfs_curproc_cap_pack();
3178 #endif
3179         int rc;
3180
3181         ENTRY;
3182
3183         LASSERT(osd_invariant(obj));
3184         LASSERT(dt_object_exists(dt));
3185         LASSERT(th != NULL);
3186
3187         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
3188                 RETURN(-EACCES);
3189
3190         child = osd_object_find(env, dt, fid);
3191         if (!IS_ERR(child)) {
3192 #ifdef HAVE_QUOTA_SUPPORT
3193                 if (ignore_quota)
3194                         cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
3195                 else
3196                         cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
3197 #endif
3198                 cfs_down_write(&obj->oo_ext_idx_sem);
3199                 rc = osd_ea_add_rec(env, obj, child->oo_inode, name, rec, th);
3200                 cfs_up_write(&obj->oo_ext_idx_sem);
3201 #ifdef HAVE_QUOTA_SUPPORT
3202                 cfs_curproc_cap_unpack(save);
3203 #endif
3204                 osd_object_put(env, child);
3205         } else {
3206                 rc = PTR_ERR(child);
3207         }
3208
3209         LASSERT(osd_invariant(obj));
3210         RETURN(rc);
3211 }
3212
3213 /**
3214  *  Initialize osd Iterator for given osd index object.
3215  *
3216  *  \param  dt      osd index object
3217  */
3218
3219 static struct dt_it *osd_it_iam_init(const struct lu_env *env,
3220                                      struct dt_object *dt,
3221                                      __u32 unused,
3222                                      struct lustre_capa *capa)
3223 {
3224         struct osd_it_iam         *it;
3225         struct osd_thread_info *oti = osd_oti_get(env);
3226         struct osd_object     *obj = osd_dt_obj(dt);
3227         struct lu_object      *lo  = &dt->do_lu;
3228         struct iam_path_descr *ipd;
3229         struct iam_container  *bag = &obj->oo_dir->od_container;
3230
3231         LASSERT(lu_object_exists(lo));
3232
3233         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
3234                 return ERR_PTR(-EACCES);
3235
3236         it = &oti->oti_it;
3237         ipd = osd_it_ipd_get(env, bag);
3238         if (likely(ipd != NULL)) {
3239                 it->oi_obj = obj;
3240                 it->oi_ipd = ipd;
3241                 lu_object_get(lo);
3242                 iam_it_init(&it->oi_it, bag, IAM_IT_MOVE, ipd);
3243                 return (struct dt_it *)it;
3244         }
3245         return ERR_PTR(-ENOMEM);
3246 }
3247
3248 /**
3249  * free given Iterator.
3250  */
3251
3252 static void osd_it_iam_fini(const struct lu_env *env, struct dt_it *di)
3253 {
3254         struct osd_it_iam     *it = (struct osd_it_iam *)di;
3255         struct osd_object *obj = it->oi_obj;
3256
3257         iam_it_fini(&it->oi_it);
3258         osd_ipd_put(env, &obj->oo_dir->od_container, it->oi_ipd);
3259         lu_object_put(env, &obj->oo_dt.do_lu);
3260 }
3261
3262 /**
3263  *  Move Iterator to record specified by \a key
3264  *
3265  *  \param  di      osd iterator
3266  *  \param  key     key for index
3267  *
3268  *  \retval +ve  di points to record with least key not larger than key
3269  *  \retval  0   di points to exact matched key
3270  *  \retval -ve  failure
3271  */
3272
3273 static int osd_it_iam_get(const struct lu_env *env,
3274                       struct dt_it *di, const struct dt_key *key)
3275 {
3276         struct osd_it_iam *it = (struct osd_it_iam *)di;
3277
3278         return iam_it_get(&it->oi_it, (const struct iam_key *)key);
3279 }
3280
3281 /**
3282  *  Release Iterator
3283  *
3284  *  \param  di      osd iterator
3285  */
3286
3287 static void osd_it_iam_put(const struct lu_env *env, struct dt_it *di)
3288 {
3289         struct osd_it_iam *it = (struct osd_it_iam *)di;
3290
3291         iam_it_put(&it->oi_it);
3292 }
3293
3294 /**
3295  *  Move iterator by one record
3296  *
3297  *  \param  di      osd iterator
3298  *
3299  *  \retval +1   end of container reached
3300  *  \retval  0   success
3301  *  \retval -ve  failure
3302  */
3303
3304 static int osd_it_iam_next(const struct lu_env *env, struct dt_it *di)
3305 {
3306         struct osd_it_iam *it = (struct osd_it_iam *)di;
3307
3308         return iam_it_next(&it->oi_it);
3309 }
3310
3311 /**
3312  * Return pointer to the key under iterator.
3313  */
3314
3315 static struct dt_key *osd_it_iam_key(const struct lu_env *env,
3316                                  const struct dt_it *di)
3317 {
3318         struct osd_it_iam *it = (struct osd_it_iam *)di;
3319
3320         return (struct dt_key *)iam_it_key_get(&it->oi_it);
3321 }
3322
3323 /**
3324  * Return size of key under iterator (in bytes)
3325  */
3326
3327 static int osd_it_iam_key_size(const struct lu_env *env, const struct dt_it *di)
3328 {
3329         struct osd_it_iam *it = (struct osd_it_iam *)di;
3330
3331         return iam_it_key_size(&it->oi_it);
3332 }
3333
3334 static inline void osd_it_append_attrs(struct lu_dirent*ent,
3335                                        __u32 attr,
3336                                        int len,
3337                                        __u16 type)
3338 {
3339         struct luda_type        *lt;
3340         const unsigned           align = sizeof(struct luda_type) - 1;
3341
3342         /* check if file type is required */
3343         if (attr & LUDA_TYPE) {
3344                         len = (len + align) & ~align;
3345
3346                         lt = (void *) ent->lde_name + len;
3347                         lt->lt_type = cpu_to_le16(CFS_DTTOIF(type));
3348                         ent->lde_attrs |= LUDA_TYPE;
3349         }
3350
3351         ent->lde_attrs = cpu_to_le32(ent->lde_attrs);
3352 }
3353
3354 /**
3355  * build lu direct from backend fs dirent.
3356  */
3357
3358 static inline void osd_it_pack_dirent(struct lu_dirent *ent,
3359                                       struct lu_fid *fid,
3360                                       __u64 offset,
3361                                       char *name,
3362                                       __u16 namelen,
3363                                       __u16 type,
3364                                       __u32 attr)
3365 {
3366         fid_cpu_to_le(&ent->lde_fid, fid);
3367         ent->lde_attrs = LUDA_FID;
3368
3369         ent->lde_hash = cpu_to_le64(offset);
3370         ent->lde_reclen = cpu_to_le16(lu_dirent_calc_size(namelen, attr));
3371
3372         strncpy(ent->lde_name, name, namelen);
3373         ent->lde_namelen = cpu_to_le16(namelen);
3374
3375         /* append lustre attributes */
3376         osd_it_append_attrs(ent, attr, namelen, type);
3377 }
3378
3379 /**
3380  * Return pointer to the record under iterator.
3381  */
3382 static int osd_it_iam_rec(const struct lu_env *env,
3383                           const struct dt_it *di,
3384                           struct lu_dirent *lde,
3385                           __u32 attr)
3386 {
3387         struct osd_it_iam *it        = (struct osd_it_iam *)di;
3388         struct osd_thread_info *info = osd_oti_get(env);
3389         struct lu_fid     *fid       = &info->oti_fid;
3390         const struct osd_fid_pack *rec;
3391         char *name;
3392         int namelen;
3393         __u64 hash;
3394         int rc;
3395
3396         name = (char *)iam_it_key_get(&it->oi_it);
3397         if (IS_ERR(name))
3398                 RETURN(PTR_ERR(name));
3399
3400         namelen = iam_it_key_size(&it->oi_it);
3401
3402         rec = (const struct osd_fid_pack *) iam_it_rec_get(&it->oi_it);
3403         if (IS_ERR(rec))
3404                 RETURN(PTR_ERR(rec));
3405
3406         rc = osd_fid_unpack(fid, rec);
3407         if (rc)
3408                 RETURN(rc);
3409
3410         hash = iam_it_store(&it->oi_it);
3411
3412         /* IAM does not store object type in IAM index (dir) */
3413         osd_it_pack_dirent(lde, fid, hash, name, namelen,
3414                            0, LUDA_FID);
3415
3416         return 0;
3417 }
3418
3419 /**
3420  * Returns cookie for current Iterator position.
3421  */
3422 static __u64 osd_it_iam_store(const struct lu_env *env, const struct dt_it *di)
3423 {
3424         struct osd_it_iam *it = (struct osd_it_iam *)di;
3425
3426         return iam_it_store(&it->oi_it);
3427 }
3428
3429 /**
3430  * Restore iterator from cookie.
3431  *
3432  * \param  di      osd iterator
3433  * \param  hash    Iterator location cookie
3434  *
3435  * \retval +ve  di points to record with least key not larger than key.
3436  * \retval  0   di points to exact matched key
3437  * \retval -ve  failure
3438  */
3439
3440 static int osd_it_iam_load(const struct lu_env *env,
3441                        const struct dt_it *di, __u64 hash)
3442 {
3443         struct osd_it_iam *it = (struct osd_it_iam *)di;
3444
3445         return iam_it_load(&it->oi_it, hash);
3446 }
3447
3448 static const struct dt_index_operations osd_index_iam_ops = {
3449         .dio_lookup = osd_index_iam_lookup,
3450         .dio_insert = osd_index_iam_insert,
3451         .dio_delete = osd_index_iam_delete,
3452         .dio_it     = {
3453                 .init     = osd_it_iam_init,
3454                 .fini     = osd_it_iam_fini,
3455                 .get      = osd_it_iam_get,
3456                 .put      = osd_it_iam_put,
3457                 .next     = osd_it_iam_next,
3458                 .key      = osd_it_iam_key,
3459                 .key_size = osd_it_iam_key_size,
3460                 .rec      = osd_it_iam_rec,
3461                 .store    = osd_it_iam_store,
3462                 .load     = osd_it_iam_load
3463         }
3464 };
3465
3466 /**
3467  * Creates or initializes iterator context.
3468  *
3469  * \retval struct osd_it_ea, iterator structure on success
3470  *
3471  */
3472 static struct dt_it *osd_it_ea_init(const struct lu_env *env,
3473                                     struct dt_object *dt,
3474                                     __u32 attr,
3475                                     struct lustre_capa *capa)
3476 {
3477         struct osd_object       *obj  = osd_dt_obj(dt);
3478         struct osd_thread_info  *info = osd_oti_get(env);
3479         struct osd_it_ea        *it   = &info->oti_it_ea;
3480         struct lu_object        *lo   = &dt->do_lu;
3481         struct dentry           *obj_dentry = &info->oti_it_dentry;
3482         ENTRY;
3483         LASSERT(lu_object_exists(lo));
3484
3485         obj_dentry->d_inode = obj->oo_inode;
3486         obj_dentry->d_sb = osd_sb(osd_obj2dev(obj));
3487         obj_dentry->d_name.hash = 0;
3488
3489         it->oie_rd_dirent       = 0;
3490         it->oie_it_dirent       = 0;
3491         it->oie_dirent          = NULL;
3492         it->oie_buf             = info->oti_it_ea_buf;
3493         it->oie_obj             = obj;
3494         it->oie_file.f_pos      = 0;
3495         it->oie_file.f_dentry   = obj_dentry;
3496         if (attr & LUDA_64BITHASH)
3497                 it->oie_file.f_flags = O_64BITHASH;
3498         else
3499                 it->oie_file.f_flags = O_32BITHASH;
3500         it->oie_file.f_mapping    = obj->oo_inode->i_mapping;
3501         it->oie_file.f_op         = obj->oo_inode->i_fop;
3502         it->oie_file.private_data = NULL;
3503         lu_object_get(lo);
3504         RETURN((struct dt_it *) it);
3505 }
3506
3507 /**
3508  * Destroy or finishes iterator context.
3509  *
3510  * \param di iterator structure to be destroyed
3511  */
3512 static void osd_it_ea_fini(const struct lu_env *env, struct dt_it *di)
3513 {
3514         struct osd_it_ea     *it   = (struct osd_it_ea *)di;
3515         struct osd_object    *obj  = it->oie_obj;
3516         struct inode       *inode  = obj->oo_inode;
3517
3518         ENTRY;
3519         it->oie_file.f_op->release(inode, &it->oie_file);
3520         lu_object_put(env, &obj->oo_dt.do_lu);
3521         EXIT;
3522 }
3523
3524 /**
3525  * It position the iterator at given key, so that next lookup continues from
3526  * that key Or it is similar to dio_it->load() but based on a key,
3527  * rather than file position.
3528  *
3529  * As a special convention, osd_it_ea_get(env, di, "") has to rewind iterator
3530  * to the beginning.
3531  *
3532  * TODO: Presently return +1 considering it is only used by mdd_dir_is_empty().
3533  */
3534 static int osd_it_ea_get(const struct lu_env *env,
3535                          struct dt_it *di, const struct dt_key *key)
3536 {
3537         struct osd_it_ea     *it   = (struct osd_it_ea *)di;
3538
3539         ENTRY;
3540         LASSERT(((const char *)key)[0] == '\0');
3541         it->oie_file.f_pos      = 0;
3542         it->oie_rd_dirent       = 0;
3543         it->oie_it_dirent       = 0;
3544         it->oie_dirent          = NULL;
3545
3546         RETURN(+1);
3547 }
3548
3549 /**
3550  * Does nothing
3551  */
3552 static void osd_it_ea_put(const struct lu_env *env, struct dt_it *di)
3553 {
3554 }
3555
3556 /**
3557  * It is called internally by ->readdir(). It fills the
3558  * iterator's in-memory data structure with required
3559  * information i.e. name, namelen, rec_size etc.
3560  *
3561  * \param buf in which information to be filled in.
3562  * \param name name of the file in given dir
3563  *
3564  * \retval 0 on success
3565  * \retval 1 on buffer full
3566  */
3567 static int osd_ldiskfs_filldir(char *buf, const char *name, int namelen,
3568                                loff_t offset, __u64 ino,
3569                                unsigned d_type)
3570 {
3571         struct osd_it_ea        *it   = (struct osd_it_ea *)buf;
3572         struct osd_it_ea_dirent *ent  = it->oie_dirent;
3573         struct lu_fid           *fid  = &ent->oied_fid;
3574         struct osd_fid_pack     *rec;
3575         ENTRY;
3576
3577         /* this should never happen */
3578         if (unlikely(namelen == 0 || namelen > LDISKFS_NAME_LEN)) {
3579                 CERROR("ldiskfs return invalid namelen %d\n", namelen);
3580                 RETURN(-EIO);
3581         }
3582
3583         if ((void *) ent - it->oie_buf + sizeof(*ent) + namelen >
3584             OSD_IT_EA_BUFSIZE)
3585                 RETURN(1);
3586
3587         if (d_type & LDISKFS_DIRENT_LUFID) {
3588                 rec = (struct osd_fid_pack*) (name + namelen + 1);
3589
3590                 if (osd_fid_unpack(fid, rec) != 0)
3591                         fid_zero(fid);
3592
3593                 d_type &= ~LDISKFS_DIRENT_LUFID;
3594         } else {
3595                 fid_zero(fid);
3596         }
3597
3598         ent->oied_ino     = ino;
3599         ent->oied_off     = offset;
3600         ent->oied_namelen = namelen;
3601         ent->oied_type    = d_type;
3602
3603         memcpy(ent->oied_name, name, namelen);
3604
3605         it->oie_rd_dirent++;
3606         it->oie_dirent = (void *) ent + cfs_size_round(sizeof(*ent) + namelen);
3607         RETURN(0);
3608 }
3609
3610 /**
3611  * Calls ->readdir() to load a directory entry at a time
3612  * and stored it in iterator's in-memory data structure.
3613  *
3614  * \param di iterator's in memory structure
3615  *
3616  * \retval   0 on success
3617  * \retval -ve on error
3618  */
3619 static int osd_ldiskfs_it_fill(const struct dt_it *di)
3620 {
3621         struct osd_it_ea   *it    = (struct osd_it_ea *)di;
3622         struct osd_object  *obj   = it->oie_obj;
3623         struct inode       *inode = obj->oo_inode;
3624         int                result = 0;
3625
3626         ENTRY;
3627         it->oie_dirent = it->oie_buf;
3628         it->oie_rd_dirent = 0;
3629
3630         cfs_down_read(&obj->oo_ext_idx_sem);
3631         result = inode->i_fop->readdir(&it->oie_file, it,
3632                                        (filldir_t) osd_ldiskfs_filldir);
3633
3634         cfs_up_read(&obj->oo_ext_idx_sem);
3635
3636         if (it->oie_rd_dirent == 0) {
3637                 result = -EIO;
3638         } else {
3639                 it->oie_dirent = it->oie_buf;
3640                 it->oie_it_dirent = 1;
3641         }
3642
3643         RETURN(result);
3644 }
3645
3646 /**
3647  * It calls osd_ldiskfs_it_fill() which will use ->readdir()
3648  * to load a directory entry at a time and stored it in
3649  * iterator's in-memory data structure.
3650  *
3651  * \param di iterator's in memory structure
3652  *
3653  * \retval +ve iterator reached to end
3654  * \retval   0 iterator not reached to end
3655  * \retval -ve on error
3656  */
3657 static int osd_it_ea_next(const struct lu_env *env, struct dt_it *di)
3658 {
3659         struct osd_it_ea *it = (struct osd_it_ea *)di;
3660         int rc;
3661
3662         ENTRY;
3663
3664         if (it->oie_it_dirent < it->oie_rd_dirent) {
3665                 it->oie_dirent =
3666                         (void *) it->oie_dirent +
3667                         cfs_size_round(sizeof(struct osd_it_ea_dirent) +
3668                                        it->oie_dirent->oied_namelen);
3669                 it->oie_it_dirent++;
3670                 RETURN(0);
3671         } else {
3672                 if (it->oie_file.f_pos == LDISKFS_HTREE_EOF)
3673                         rc = +1;
3674                 else
3675                         rc = osd_ldiskfs_it_fill(di);
3676         }
3677
3678         RETURN(rc);
3679 }
3680
3681 /**
3682  * Returns the key at current position from iterator's in memory structure.
3683  *
3684  * \param di iterator's in memory structure
3685  *
3686  * \retval key i.e. struct dt_key on success
3687  */
3688 static struct dt_key *osd_it_ea_key(const struct lu_env *env,
3689                                     const struct dt_it *di)
3690 {
3691         struct osd_it_ea *it = (struct osd_it_ea *)di;
3692         ENTRY;
3693         RETURN((struct dt_key *)it->oie_dirent->oied_name);
3694 }
3695
3696 /**
3697  * Returns the key's size at current position from iterator's in memory structure.
3698  *
3699  * \param di iterator's in memory structure
3700  *
3701  * \retval key_size i.e. struct dt_key on success
3702  */
3703 static int osd_it_ea_key_size(const struct lu_env *env, const struct dt_it *di)
3704 {
3705         struct osd_it_ea *it = (struct osd_it_ea *)di;
3706         ENTRY;
3707         RETURN(it->oie_dirent->oied_namelen);
3708 }
3709
3710
3711 /**
3712  * Returns the value (i.e. fid/igif) at current position from iterator's
3713  * in memory structure.
3714  *
3715  * \param di struct osd_it_ea, iterator's in memory structure
3716  * \param attr attr requested for dirent.
3717  * \param lde lustre dirent
3718  *
3719  * \retval   0 no error and \param lde has correct lustre dirent.
3720  * \retval -ve on error
3721  */
3722 static inline int osd_it_ea_rec(const struct lu_env *env,
3723                                 const struct dt_it *di,
3724                                 struct lu_dirent *lde,
3725                                 __u32 attr)
3726 {
3727         struct osd_it_ea        *it     = (struct osd_it_ea *)di;
3728         struct osd_object       *obj    = it->oie_obj;
3729         struct lu_fid           *fid    = &it->oie_dirent->oied_fid;
3730         int    rc = 0;
3731
3732         ENTRY;
3733
3734         if (!fid_is_sane(fid))
3735                 rc = osd_ea_fid_get(env, obj, it->oie_dirent->oied_ino, fid);
3736
3737         if (rc == 0)
3738                 osd_it_pack_dirent(lde, fid, it->oie_dirent->oied_off,
3739                                    it->oie_dirent->oied_name,
3740                                    it->oie_dirent->oied_namelen,
3741                                    it->oie_dirent->oied_type,
3742                                    attr);
3743         RETURN(rc);
3744 }
3745
3746 /**
3747  * Returns a cookie for current position of the iterator head, so that
3748  * user can use this cookie to load/start the iterator next time.
3749  *
3750  * \param di iterator's in memory structure
3751  *
3752  * \retval cookie for current position, on success
3753  */
3754 static __u64 osd_it_ea_store(const struct lu_env *env, const struct dt_it *di)
3755 {
3756         struct osd_it_ea *it = (struct osd_it_ea *)di;
3757         ENTRY;
3758         RETURN(it->oie_dirent->oied_off);
3759 }
3760
3761 /**
3762  * It calls osd_ldiskfs_it_fill() which will use ->readdir()
3763  * to load a directory entry at a time and stored it i inn,
3764  * in iterator's in-memory data structure.
3765  *
3766  * \param di struct osd_it_ea, iterator's in memory structure
3767  *
3768  * \retval +ve on success
3769  * \retval -ve on error
3770  */
3771 static int osd_it_ea_load(const struct lu_env *env,
3772                           const struct dt_it *di, __u64 hash)
3773 {
3774         struct osd_it_ea *it = (struct osd_it_ea *)di;
3775         int rc;
3776
3777         ENTRY;
3778         it->oie_file.f_pos = hash;
3779
3780         rc =  osd_ldiskfs_it_fill(di);
3781         if (rc == 0)
3782                 rc = +1;
3783
3784         RETURN(rc);
3785 }
3786
3787 /**
3788  * Index lookup function for interoperability mode (b11826).
3789  *
3790  * \param key,  key i.e. file name to be searched
3791  *
3792  * \retval +ve, on success
3793  * \retval -ve, on error
3794  */
3795 static int osd_index_ea_lookup(const struct lu_env *env, struct dt_object *dt,
3796                                struct dt_rec *rec, const struct dt_key *key,
3797                                struct lustre_capa *capa)
3798 {
3799         struct osd_object *obj = osd_dt_obj(dt);
3800         int rc = 0;
3801
3802         ENTRY;
3803
3804         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
3805         LINVRNT(osd_invariant(obj));
3806
3807         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
3808                 return -EACCES;
3809
3810         rc = osd_ea_lookup_rec(env, obj, rec, key);
3811
3812         if (rc == 0)
3813                 rc = +1;
3814         RETURN(rc);
3815 }
3816
3817 /**
3818  * Index and Iterator operations for interoperability
3819  * mode (i.e. to run 2.0 mds on 1.8 disk) (b11826)
3820  */
3821 static const struct dt_index_operations osd_index_ea_ops = {
3822         .dio_lookup = osd_index_ea_lookup,
3823         .dio_insert = osd_index_ea_insert,
3824         .dio_delete = osd_index_ea_delete,
3825         .dio_it     = {
3826                 .init     = osd_it_ea_init,
3827                 .fini     = osd_it_ea_fini,
3828                 .get      = osd_it_ea_get,
3829                 .put      = osd_it_ea_put,
3830                 .next     = osd_it_ea_next,
3831                 .key      = osd_it_ea_key,
3832                 .key_size = osd_it_ea_key_size,
3833                 .rec      = osd_it_ea_rec,
3834                 .store    = osd_it_ea_store,
3835                 .load     = osd_it_ea_load
3836         }
3837 };
3838
3839 static void *osd_key_init(const struct lu_context *ctx,
3840                           struct lu_context_key *key)
3841 {
3842         struct osd_thread_info *info;
3843
3844         OBD_ALLOC_PTR(info);
3845         if (info != NULL) {
3846                 OBD_ALLOC(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
3847                 if (info->oti_it_ea_buf != NULL) {
3848                         info->oti_env = container_of(ctx, struct lu_env,
3849                                                      le_ctx);
3850                 } else {
3851                         OBD_FREE_PTR(info);
3852                         info = ERR_PTR(-ENOMEM);
3853                 }
3854         } else {
3855                 info = ERR_PTR(-ENOMEM);
3856         }
3857         return info;
3858 }
3859
3860 static void osd_key_fini(const struct lu_context *ctx,
3861                          struct lu_context_key *key, void* data)
3862 {
3863         struct osd_thread_info *info = data;
3864
3865         OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
3866         OBD_FREE_PTR(info);
3867 }
3868
3869 static void osd_key_exit(const struct lu_context *ctx,
3870                          struct lu_context_key *key, void *data)
3871 {
3872         struct osd_thread_info *info = data;
3873
3874         LASSERT(info->oti_r_locks == 0);
3875         LASSERT(info->oti_w_locks == 0);
3876         LASSERT(info->oti_txns    == 0);
3877 }
3878
3879 /* type constructor/destructor: osd_type_init, osd_type_fini */
3880 LU_TYPE_INIT_FINI(osd, &osd_key);
3881
3882 static struct lu_context_key osd_key = {
3883         .lct_tags = LCT_DT_THREAD | LCT_MD_THREAD,
3884         .lct_init = osd_key_init,
3885         .lct_fini = osd_key_fini,
3886         .lct_exit = osd_key_exit
3887 };
3888
3889
3890 static int osd_device_init(const struct lu_env *env, struct lu_device *d,
3891                            const char *name, struct lu_device *next)
3892 {
3893         return osd_procfs_init(osd_dev(d), name);
3894 }
3895
3896 static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
3897 {
3898         struct osd_thread_info *info = osd_oti_get(env);
3899         ENTRY;
3900         if (o->od_obj_area != NULL) {
3901                 lu_object_put(env, &o->od_obj_area->do_lu);
3902                 o->od_obj_area = NULL;
3903         }
3904         osd_oi_fini(info, &o->od_oi);
3905
3906         RETURN(0);
3907 }
3908
3909 static int osd_mount(const struct lu_env *env,
3910                      struct osd_device *o, struct lustre_cfg *cfg)
3911 {
3912         struct lustre_mount_info *lmi;
3913         const char               *dev  = lustre_cfg_string(cfg, 0);
3914         struct lustre_disk_data  *ldd;
3915         struct lustre_sb_info    *lsi;
3916
3917         ENTRY;
3918         if (o->od_mount != NULL) {
3919                 CERROR("Already mounted (%s)\n", dev);
3920                 RETURN(-EEXIST);
3921         }
3922
3923         /* get mount */
3924         lmi = server_get_mount(dev);
3925         if (lmi == NULL) {
3926                 CERROR("Cannot get mount info for %s!\n", dev);
3927                 RETURN(-EFAULT);
3928         }
3929
3930         LASSERT(lmi != NULL);
3931         /* save lustre_mount_info in dt_device */
3932         o->od_mount = lmi;
3933
3934         lsi = s2lsi(lmi->lmi_sb);
3935         ldd = lsi->lsi_ldd;
3936
3937         if (ldd->ldd_flags & LDD_F_IAM_DIR) {
3938                 o->od_iop_mode = 0;
3939                 LCONSOLE_WARN("OSD: IAM mode enabled\n");
3940         } else
3941                 o->od_iop_mode = 1;
3942
3943         o->od_obj_area = NULL;
3944         RETURN(0);
3945 }
3946
3947 static struct lu_device *osd_device_fini(const struct lu_env *env,
3948                                          struct lu_device *d)
3949 {
3950         int rc;
3951         ENTRY;
3952
3953         shrink_dcache_sb(osd_sb(osd_dev(d)));
3954         osd_sync(env, lu2dt_dev(d));
3955
3956         rc = osd_procfs_fini(osd_dev(d));
3957         if (rc) {
3958                 CERROR("proc fini error %d \n", rc);
3959                 RETURN (ERR_PTR(rc));
3960         }
3961
3962         if (osd_dev(d)->od_mount)
3963                 server_put_mount(osd_dev(d)->od_mount->lmi_name,
3964                                  osd_dev(d)->od_mount->lmi_mnt);
3965         osd_dev(d)->od_mount = NULL;
3966
3967         RETURN(NULL);
3968 }
3969
3970 static struct lu_device *osd_device_alloc(const struct lu_env *env,
3971                                           struct lu_device_type *t,
3972                                           struct lustre_cfg *cfg)
3973 {
3974         struct lu_device  *l;
3975         struct osd_device *o;
3976
3977         OBD_ALLOC_PTR(o);
3978         if (o != NULL) {
3979                 int result;
3980
3981                 result = dt_device_init(&o->od_dt_dev, t);
3982                 if (result == 0) {
3983                         l = osd2lu_dev(o);
3984                         l->ld_ops = &osd_lu_ops;
3985                         o->od_dt_dev.dd_ops = &osd_dt_ops;
3986                         cfs_spin_lock_init(&o->od_osfs_lock);
3987                         o->od_osfs_age = cfs_time_shift_64(-1000);
3988                         o->od_capa_hash = init_capa_hash();
3989                         if (o->od_capa_hash == NULL) {
3990                                 dt_device_fini(&o->od_dt_dev);
3991                                 l = ERR_PTR(-ENOMEM);
3992                         }
3993                 } else
3994                         l = ERR_PTR(result);
3995
3996                 if (IS_ERR(l))
3997                         OBD_FREE_PTR(o);
3998         } else
3999                 l = ERR_PTR(-ENOMEM);
4000         return l;
4001 }
4002
4003 static struct lu_device *osd_device_free(const struct lu_env *env,
4004                                          struct lu_device *d)
4005 {
4006         struct osd_device *o = osd_dev(d);
4007         ENTRY;
4008
4009         cleanup_capa_hash(o->od_capa_hash);
4010         dt_device_fini(&o->od_dt_dev);
4011         OBD_FREE_PTR(o);
4012         RETURN(NULL);
4013 }
4014
4015 static int osd_process_config(const struct lu_env *env,
4016                               struct lu_device *d, struct lustre_cfg *cfg)
4017 {
4018         struct osd_device *o = osd_dev(d);
4019         int err;
4020         ENTRY;
4021
4022         switch(cfg->lcfg_command) {
4023         case LCFG_SETUP:
4024                 err = osd_mount(env, o, cfg);
4025                 break;
4026         case LCFG_CLEANUP:
4027                 err = osd_shutdown(env, o);
4028                 break;
4029         default:
4030                 err = -ENOSYS;
4031         }
4032
4033         RETURN(err);
4034 }
4035
4036 static int osd_recovery_complete(const struct lu_env *env,
4037                                  struct lu_device *d)
4038 {
4039         RETURN(0);
4040 }
4041
4042 static int osd_prepare(const struct lu_env *env,
4043                        struct lu_device *pdev,
4044                        struct lu_device *dev)
4045 {
4046         struct osd_device *osd = osd_dev(dev);
4047         struct lustre_sb_info *lsi;
4048         struct lustre_disk_data *ldd;
4049         struct lustre_mount_info  *lmi;
4050         struct osd_thread_info *oti = osd_oti_get(env);
4051         struct dt_object *d;
4052         int result;
4053
4054         ENTRY;
4055         /* 1. initialize oi before any file create or file open */
4056         result = osd_oi_init(oti, &osd->od_oi,
4057                              &osd->od_dt_dev, lu2md_dev(pdev));
4058         if (result != 0)
4059                 RETURN(result);
4060
4061         lmi = osd->od_mount;
4062         lsi = s2lsi(lmi->lmi_sb);
4063         ldd = lsi->lsi_ldd;
4064
4065         /* 2. setup local objects */
4066         result = llo_local_objects_setup(env, lu2md_dev(pdev), lu2dt_dev(dev));
4067         if (result)
4068                 goto out;
4069
4070         /* 3. open remote object dir */
4071         d = dt_store_open(env, lu2dt_dev(dev), "",
4072                           remote_obj_dir, &oti->oti_fid);
4073         if (!IS_ERR(d)) {
4074                 osd->od_obj_area = d;
4075                 result = 0;
4076         } else {
4077                 result = PTR_ERR(d);
4078                 osd->od_obj_area = NULL;
4079         }
4080
4081 out:
4082         RETURN(result);
4083 }
4084
4085 static const struct lu_object_operations osd_lu_obj_ops = {
4086         .loo_object_init      = osd_object_init,
4087         .loo_object_delete    = osd_object_delete,
4088         .loo_object_release   = osd_object_release,
4089         .loo_object_free      = osd_object_free,
4090         .loo_object_print     = osd_object_print,
4091         .loo_object_invariant = osd_object_invariant
4092 };
4093
4094 static const struct lu_device_operations osd_lu_ops = {
4095         .ldo_object_alloc      = osd_object_alloc,
4096         .ldo_process_config    = osd_process_config,
4097         .ldo_recovery_complete = osd_recovery_complete,
4098         .ldo_prepare           = osd_prepare,
4099 };
4100
4101 static const struct lu_device_type_operations osd_device_type_ops = {
4102         .ldto_init = osd_type_init,
4103         .ldto_fini = osd_type_fini,
4104
4105         .ldto_start = osd_type_start,
4106         .ldto_stop  = osd_type_stop,
4107
4108         .ldto_device_alloc = osd_device_alloc,
4109         .ldto_device_free  = osd_device_free,
4110
4111         .ldto_device_init    = osd_device_init,
4112         .ldto_device_fini    = osd_device_fini
4113 };
4114
4115 static struct lu_device_type osd_device_type = {
4116         .ldt_tags     = LU_DEVICE_DT,
4117         .ldt_name     = LUSTRE_OSD_NAME,
4118         .ldt_ops      = &osd_device_type_ops,
4119         .ldt_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
4120 };
4121
4122 /*
4123  * lprocfs legacy support.
4124  */
4125 static struct obd_ops osd_obd_device_ops = {
4126         .o_owner = THIS_MODULE
4127 };
4128
4129 static struct lu_local_obj_desc llod_osd_rem_obj_dir = {
4130         .llod_name      = remote_obj_dir,
4131         .llod_oid       = OSD_REM_OBJ_DIR_OID,
4132         .llod_is_index  = 1,
4133         .llod_feat      = &dt_directory_features,
4134 };
4135
4136 static int __init osd_mod_init(void)
4137 {
4138         struct lprocfs_static_vars lvars;
4139
4140         osd_oi_mod_init();
4141         llo_local_obj_register(&llod_osd_rem_obj_dir);
4142         lprocfs_osd_init_vars(&lvars);
4143         return class_register_type(&osd_obd_device_ops, NULL, lvars.module_vars,
4144                                    LUSTRE_OSD_NAME, &osd_device_type);
4145 }
4146
4147 static void __exit osd_mod_exit(void)
4148 {
4149         llo_local_obj_unregister(&llod_osd_rem_obj_dir);
4150         class_unregister_type(LUSTRE_OSD_NAME);
4151 }
4152
4153 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4154 MODULE_DESCRIPTION("Lustre Object Storage Device ("LUSTRE_OSD_NAME")");
4155 MODULE_LICENSE("GPL");
4156
4157 cfs_module(osd, "0.0.2", osd_mod_init, osd_mod_exit);