Whamcloud - gitweb
ORNL-3 mntopt: consider low-layer options for MDT ACL flags
[fs/lustre-release.git] / lustre / osd-ldiskfs / osd_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011 Whamcloud, Inc.
33  *
34  */
35 /*
36  * Copyright (c) 2011 Whamcloud, Inc.
37  */
38 /*
39  * This file is part of Lustre, http://www.lustre.org/
40  * Lustre is a trademark of Sun Microsystems, Inc.
41  *
42  * lustre/osd/osd_handler.c
43  *
44  * Top-level entry points into osd module
45  *
46  * Author: Nikita Danilov <nikita@clusterfs.com>
47  *         Pravin Shelar <pravin.shelar@sun.com> : Added fid in dirent
48  */
49
50 #ifndef EXPORT_SYMTAB
51 # define EXPORT_SYMTAB
52 #endif
53 #define DEBUG_SUBSYSTEM S_MDS
54
55 #include <linux/module.h>
56
57 /* LUSTRE_VERSION_CODE */
58 #include <lustre_ver.h>
59 /* prerequisite for linux/xattr.h */
60 #include <linux/types.h>
61 /* prerequisite for linux/xattr.h */
62 #include <linux/fs.h>
63 /* XATTR_{REPLACE,CREATE} */
64 #include <linux/xattr.h>
65 /* simple_mkdir() */
66 #include <lvfs.h>
67
68 /*
69  * struct OBD_{ALLOC,FREE}*()
70  * OBD_FAIL_CHECK
71  */
72 #include <obd_support.h>
73 /* struct ptlrpc_thread */
74 #include <lustre_net.h>
75
76 /* fid_is_local() */
77 #include <lustre_fid.h>
78
79 #include "osd_internal.h"
80 #include "osd_igif.h"
81
82 /* llo_* api support */
83 #include <md_object.h>
84
85 static const char dot[] = ".";
86 static const char dotdot[] = "..";
87 static const char remote_obj_dir[] = "REM_OBJ_DIR";
88
89 struct osd_directory {
90         struct iam_container od_container;
91         struct iam_descr     od_descr;
92 };
93
94 struct osd_object {
95         struct dt_object       oo_dt;
96         /**
97          * Inode for file system object represented by this osd_object. This
98          * inode is pinned for the whole duration of lu_object life.
99          *
100          * Not modified concurrently (either setup early during object
101          * creation, or assigned by osd_object_create() under write lock).
102          */
103         struct inode          *oo_inode;
104         /**
105          * to protect index ops.
106          */
107         cfs_rw_semaphore_t     oo_ext_idx_sem;
108         cfs_rw_semaphore_t     oo_sem;
109         struct osd_directory  *oo_dir;
110         /** protects inode attributes. */
111         cfs_spinlock_t         oo_guard;
112         /**
113          * Following two members are used to indicate the presence of dot and
114          * dotdot in the given directory. This is required for interop mode
115          * (b11826).
116          */
117         int                    oo_compat_dot_created;
118         int                    oo_compat_dotdot_created;
119
120         const struct lu_env   *oo_owner;
121 #ifdef CONFIG_LOCKDEP
122         struct lockdep_map     oo_dep_map;
123 #endif
124 };
125
126 static const struct lu_object_operations      osd_lu_obj_ops;
127 static const struct lu_device_operations      osd_lu_ops;
128 static       struct lu_context_key            osd_key;
129 static const struct dt_object_operations      osd_obj_ops;
130 static const struct dt_object_operations      osd_obj_ea_ops;
131 static const struct dt_body_operations        osd_body_ops;
132 static const struct dt_index_operations       osd_index_iam_ops;
133 static const struct dt_index_operations       osd_index_ea_ops;
134
135 struct osd_thandle {
136         struct thandle          ot_super;
137         handle_t               *ot_handle;
138         struct journal_callback ot_jcb;
139         /* Link to the device, for debugging. */
140         struct lu_ref_link     *ot_dev_link;
141
142 #if OSD_THANDLE_STATS
143         /** time when this handle was allocated */
144         cfs_time_t oth_alloced;
145
146         /** time when this thanle was started */
147         cfs_time_t oth_started;
148 #endif
149 };
150
151 /*
152  * Helpers.
153  */
154 static int lu_device_is_osd(const struct lu_device *d)
155 {
156         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &osd_lu_ops);
157 }
158
159 static struct osd_device *osd_dt_dev(const struct dt_device *d)
160 {
161         LASSERT(lu_device_is_osd(&d->dd_lu_dev));
162         return container_of0(d, struct osd_device, od_dt_dev);
163 }
164
165 static struct osd_device *osd_dev(const struct lu_device *d)
166 {
167         LASSERT(lu_device_is_osd(d));
168         return osd_dt_dev(container_of0(d, struct dt_device, dd_lu_dev));
169 }
170
171 static struct osd_device *osd_obj2dev(const struct osd_object *o)
172 {
173         return osd_dev(o->oo_dt.do_lu.lo_dev);
174 }
175
176 static struct super_block *osd_sb(const struct osd_device *dev)
177 {
178         return dev->od_mount->lmi_mnt->mnt_sb;
179 }
180
181 static int osd_object_is_root(const struct osd_object *obj)
182 {
183         return osd_sb(osd_obj2dev(obj))->s_root->d_inode == obj->oo_inode;
184 }
185
186 static struct osd_object *osd_obj(const struct lu_object *o)
187 {
188         LASSERT(lu_device_is_osd(o->lo_dev));
189         return container_of0(o, struct osd_object, oo_dt.do_lu);
190 }
191
192 static struct osd_object *osd_dt_obj(const struct dt_object *d)
193 {
194         return osd_obj(&d->do_lu);
195 }
196
197 static struct lu_device *osd2lu_dev(struct osd_device *osd)
198 {
199         return &osd->od_dt_dev.dd_lu_dev;
200 }
201
202 static journal_t *osd_journal(const struct osd_device *dev)
203 {
204         return LDISKFS_SB(osd_sb(dev))->s_journal;
205 }
206
207 static int osd_has_index(const struct osd_object *obj)
208 {
209         return obj->oo_dt.do_index_ops != NULL;
210 }
211
212 static int osd_object_invariant(const struct lu_object *l)
213 {
214         return osd_invariant(osd_obj(l));
215 }
216
217 #ifdef HAVE_QUOTA_SUPPORT
218 static inline void
219 osd_push_ctxt(const struct lu_env *env, struct osd_ctxt *save)
220 {
221         struct md_ucred    *uc = md_ucred(env);
222         struct cred        *tc;
223
224         LASSERT(uc != NULL);
225
226         save->oc_uid = current_fsuid();
227         save->oc_gid = current_fsgid();
228         save->oc_cap = current_cap();
229         if ((tc = prepare_creds())) {
230                 tc->fsuid         = uc->mu_fsuid;
231                 tc->fsgid         = uc->mu_fsgid;
232                 commit_creds(tc);
233         }
234         /* XXX not suboptimal */
235         cfs_curproc_cap_unpack(uc->mu_cap);
236 }
237
238 static inline void
239 osd_pop_ctxt(struct osd_ctxt *save)
240 {
241         struct cred *tc;
242
243         if ((tc = prepare_creds())) {
244                 tc->fsuid         = save->oc_uid;
245                 tc->fsgid         = save->oc_gid;
246                 tc->cap_effective = save->oc_cap;
247                 commit_creds(tc);
248         }
249 }
250 #endif
251
252 static inline struct osd_thread_info *osd_oti_get(const struct lu_env *env)
253 {
254         return lu_context_key_get(&env->le_ctx, &osd_key);
255 }
256
257 /*
258  * Concurrency: doesn't matter
259  */
260 static int osd_read_locked(const struct lu_env *env, struct osd_object *o)
261 {
262         return osd_oti_get(env)->oti_r_locks > 0;
263 }
264
265 /*
266  * Concurrency: doesn't matter
267  */
268 static int osd_write_locked(const struct lu_env *env, struct osd_object *o)
269 {
270         struct osd_thread_info *oti = osd_oti_get(env);
271         return oti->oti_w_locks > 0 && o->oo_owner == env;
272 }
273
274 /*
275  * Concurrency: doesn't access mutable data
276  */
277 static int osd_root_get(const struct lu_env *env,
278                         struct dt_device *dev, struct lu_fid *f)
279 {
280         struct inode *inode;
281
282         inode = osd_sb(osd_dt_dev(dev))->s_root->d_inode;
283         LU_IGIF_BUILD(f, inode->i_ino, inode->i_generation);
284         return 0;
285 }
286
287 /*
288  * OSD object methods.
289  */
290
291 /*
292  * Concurrency: no concurrent access is possible that early in object
293  * life-cycle.
294  */
295 static struct lu_object *osd_object_alloc(const struct lu_env *env,
296                                           const struct lu_object_header *hdr,
297                                           struct lu_device *d)
298 {
299         struct osd_object *mo;
300
301         OBD_ALLOC_PTR(mo);
302         if (mo != NULL) {
303                 struct lu_object *l;
304
305                 l = &mo->oo_dt.do_lu;
306                 dt_object_init(&mo->oo_dt, NULL, d);
307                 if (osd_dev(d)->od_iop_mode)
308                         mo->oo_dt.do_ops = &osd_obj_ea_ops;
309                 else
310                         mo->oo_dt.do_ops = &osd_obj_ops;
311
312                 l->lo_ops = &osd_lu_obj_ops;
313                 cfs_init_rwsem(&mo->oo_sem);
314                 cfs_init_rwsem(&mo->oo_ext_idx_sem);
315                 cfs_spin_lock_init(&mo->oo_guard);
316                 return l;
317         } else
318                 return NULL;
319 }
320
321 /*
322  * retrieve object from backend ext fs.
323  **/
324 static struct inode *osd_iget(struct osd_thread_info *info,
325                               struct osd_device *dev,
326                               const struct osd_inode_id *id)
327 {
328         struct inode *inode = NULL;
329
330 #ifdef HAVE_EXT4_LDISKFS
331         inode = ldiskfs_iget(osd_sb(dev), id->oii_ino);
332         if (IS_ERR(inode))
333         /* Newer kernels return an error instead of a NULL pointer */
334                 inode = NULL;
335 #else
336         inode = iget(osd_sb(dev), id->oii_ino);
337 #endif
338         if (inode == NULL) {
339                 CERROR("no inode\n");
340                 inode = ERR_PTR(-EACCES);
341         } else if (id->oii_gen != OSD_OII_NOGEN &&
342                    inode->i_generation != id->oii_gen) {
343                 iput(inode);
344                 inode = ERR_PTR(-ESTALE);
345         } else if (inode->i_nlink == 0) {
346                 /* due to parallel readdir and unlink,
347                 * we can have dead inode here. */
348                 CWARN("stale inode\n");
349                 make_bad_inode(inode);
350                 iput(inode);
351                 inode = ERR_PTR(-ESTALE);
352         } else if (is_bad_inode(inode)) {
353                 CERROR("bad inode %lx\n",inode->i_ino);
354                 iput(inode);
355                 inode = ERR_PTR(-ENOENT);
356         } else {
357                 /* Do not update file c/mtime in ldiskfs.
358                  * NB: we don't have any lock to protect this because we don't
359                  * have reference on osd_object now, but contention with
360                  * another lookup + attr_set can't happen in the tiny window
361                  * between if (...) and set S_NOCMTIME. */
362                 if (!(inode->i_flags & S_NOCMTIME))
363                         inode->i_flags |= S_NOCMTIME;
364         }
365         return inode;
366 }
367
368 static int osd_fid_lookup(const struct lu_env *env,
369                           struct osd_object *obj, const struct lu_fid *fid)
370 {
371         struct osd_thread_info *info;
372         struct lu_device       *ldev = obj->oo_dt.do_lu.lo_dev;
373         struct osd_device      *dev;
374         struct osd_inode_id    *id;
375         struct osd_oi          *oi;
376         struct inode           *inode;
377         int                     result;
378
379         LINVRNT(osd_invariant(obj));
380         LASSERT(obj->oo_inode == NULL);
381         LASSERT(fid_is_sane(fid) || osd_fid_is_root(fid));
382         /*
383          * This assertion checks that osd layer sees only local
384          * fids. Unfortunately it is somewhat expensive (does a
385          * cache-lookup). Disabling it for production/acceptance-testing.
386          */
387         LASSERT(1 || fid_is_local(env, ldev->ld_site, fid));
388
389         ENTRY;
390
391         info = osd_oti_get(env);
392         dev  = osd_dev(ldev);
393         id   = &info->oti_id;
394         oi   = &dev->od_oi;
395
396         if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT))
397                 RETURN(-ENOENT);
398
399         result = osd_oi_lookup(info, oi, fid, id);
400         if (result == 0) {
401                 inode = osd_iget(info, dev, id);
402                 if (!IS_ERR(inode)) {
403                         obj->oo_inode = inode;
404                         LASSERT(obj->oo_inode->i_sb == osd_sb(dev));
405                         if (dev->od_iop_mode) {
406                                 obj->oo_compat_dot_created = 1;
407                                 obj->oo_compat_dotdot_created = 1;
408                         }
409                         result = 0;
410                 } else
411                         /*
412                          * If fid wasn't found in oi, inode-less object is
413                          * created, for which lu_object_exists() returns
414                          * false. This is used in a (frequent) case when
415                          * objects are created as locking anchors or
416                          * place holders for objects yet to be created.
417                          */
418                         result = PTR_ERR(inode);
419         } else if (result == -ENOENT)
420                 result = 0;
421         LINVRNT(osd_invariant(obj));
422
423         RETURN(result);
424 }
425
426 /*
427  * Concurrency: shouldn't matter.
428  */
429 static void osd_object_init0(struct osd_object *obj)
430 {
431         LASSERT(obj->oo_inode != NULL);
432         obj->oo_dt.do_body_ops = &osd_body_ops;
433         obj->oo_dt.do_lu.lo_header->loh_attr |=
434                 (LOHA_EXISTS | (obj->oo_inode->i_mode & S_IFMT));
435 }
436
437 /*
438  * Concurrency: no concurrent access is possible that early in object
439  * life-cycle.
440  */
441 static int osd_object_init(const struct lu_env *env, struct lu_object *l,
442                            const struct lu_object_conf *unused)
443 {
444         struct osd_object *obj = osd_obj(l);
445         int result;
446
447         LINVRNT(osd_invariant(obj));
448
449         result = osd_fid_lookup(env, obj, lu_object_fid(l));
450         if (result == 0) {
451                 if (obj->oo_inode != NULL)
452                         osd_object_init0(obj);
453         }
454         LINVRNT(osd_invariant(obj));
455         return result;
456 }
457
458 /*
459  * Concurrency: no concurrent access is possible that late in object
460  * life-cycle.
461  */
462 static void osd_object_free(const struct lu_env *env, struct lu_object *l)
463 {
464         struct osd_object *obj = osd_obj(l);
465
466         LINVRNT(osd_invariant(obj));
467
468         dt_object_fini(&obj->oo_dt);
469         OBD_FREE_PTR(obj);
470 }
471
472 /**
473  * IAM Iterator
474  */
475 static struct iam_path_descr *osd_it_ipd_get(const struct lu_env *env,
476                                              const struct iam_container *bag)
477 {
478         return bag->ic_descr->id_ops->id_ipd_alloc(bag,
479                                            osd_oti_get(env)->oti_it_ipd);
480 }
481
482 static struct iam_path_descr *osd_idx_ipd_get(const struct lu_env *env,
483                                               const struct iam_container *bag)
484 {
485         return bag->ic_descr->id_ops->id_ipd_alloc(bag,
486                                            osd_oti_get(env)->oti_idx_ipd);
487 }
488
489 static void osd_ipd_put(const struct lu_env *env,
490                         const struct iam_container *bag,
491                         struct iam_path_descr *ipd)
492 {
493         bag->ic_descr->id_ops->id_ipd_free(ipd);
494 }
495
496 /*
497  * Concurrency: no concurrent access is possible that late in object
498  * life-cycle.
499  */
500 static void osd_index_fini(struct osd_object *o)
501 {
502         struct iam_container *bag;
503
504         if (o->oo_dir != NULL) {
505                 bag = &o->oo_dir->od_container;
506                 if (o->oo_inode != NULL) {
507                         if (bag->ic_object == o->oo_inode)
508                                 iam_container_fini(bag);
509                 }
510                 OBD_FREE_PTR(o->oo_dir);
511                 o->oo_dir = NULL;
512         }
513 }
514
515 /*
516  * Concurrency: no concurrent access is possible that late in object
517  * life-cycle (for all existing callers, that is. New callers have to provide
518  * their own locking.)
519  */
520 static int osd_inode_unlinked(const struct inode *inode)
521 {
522         return inode->i_nlink == 0;
523 }
524
525 enum {
526         OSD_TXN_OI_DELETE_CREDITS    = 20,
527         OSD_TXN_INODE_DELETE_CREDITS = 20
528 };
529
530 /*
531  * Journal
532  */
533
534 #if OSD_THANDLE_STATS
535 /**
536  * Set time when the handle is allocated
537  */
538 static void osd_th_alloced(struct osd_thandle *oth)
539 {
540         oth->oth_alloced = cfs_time_current();
541 }
542
543 /**
544  * Set time when the handle started
545  */
546 static void osd_th_started(struct osd_thandle *oth)
547 {
548         oth->oth_started = cfs_time_current();
549 }
550
551 /**
552  * Helper function to convert time interval to microseconds packed in
553  * long int (default time units for the counter in "stats" initialized
554  * by lu_time_init() )
555  */
556 static long interval_to_usec(cfs_time_t start, cfs_time_t end)
557 {
558         struct timeval val;
559
560         cfs_duration_usec(cfs_time_sub(end, start), &val);
561         return val.tv_sec * 1000000 + val.tv_usec;
562 }
563
564 /**
565  * Check whether the we deal with this handle for too long.
566  */
567 static void __osd_th_check_slow(void *oth, struct osd_device *dev,
568                                 cfs_time_t alloced, cfs_time_t started,
569                                 cfs_time_t closed)
570 {
571         cfs_time_t now = cfs_time_current();
572
573         LASSERT(dev != NULL);
574
575         lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_STARTING,
576                             interval_to_usec(alloced, started));
577         lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_OPEN,
578                             interval_to_usec(started, closed));
579         lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_CLOSING,
580                             interval_to_usec(closed, now));
581
582         if (cfs_time_before(cfs_time_add(alloced, cfs_time_seconds(30)), now)) {
583                 CWARN("transaction handle %p was open for too long: "
584                       "now "CFS_TIME_T" ,"
585                       "alloced "CFS_TIME_T" ,"
586                       "started "CFS_TIME_T" ,"
587                       "closed "CFS_TIME_T"\n",
588                       oth, now, alloced, started, closed);
589                 libcfs_debug_dumpstack(NULL);
590         }
591 }
592
593 #define OSD_CHECK_SLOW_TH(oth, dev, expr)                               \
594 {                                                                       \
595         cfs_time_t __closed = cfs_time_current();                       \
596         cfs_time_t __alloced = oth->oth_alloced;                        \
597         cfs_time_t __started = oth->oth_started;                        \
598                                                                         \
599         expr;                                                           \
600         __osd_th_check_slow(oth, dev, __alloced, __started, __closed);  \
601 }
602
603 #else /* OSD_THANDLE_STATS */
604
605 #define osd_th_alloced(h)                  do {} while(0)
606 #define osd_th_started(h)                  do {} while(0)
607 #define OSD_CHECK_SLOW_TH(oth, dev, expr)  expr
608
609 #endif /* OSD_THANDLE_STATS */
610
611 /*
612  * Concurrency: doesn't access mutable data.
613  */
614 static int osd_param_is_sane(const struct osd_device *dev,
615                              const struct txn_param *param)
616 {
617         return param->tp_credits <= osd_journal(dev)->j_max_transaction_buffers;
618 }
619
620 /*
621  * Concurrency: shouldn't matter.
622  */
623 #ifdef HAVE_LDISKFS_JOURNAL_CALLBACK_ADD
624 static void osd_trans_commit_cb(struct super_block *sb,
625                                 struct journal_callback *jcb, int error)
626 #else
627 static void osd_trans_commit_cb(struct journal_callback *jcb, int error)
628 #endif
629 {
630         struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb);
631         struct thandle     *th  = &oh->ot_super;
632         struct dt_device   *dev = th->th_dev;
633         struct lu_device   *lud = &dev->dd_lu_dev;
634
635         LASSERT(dev != NULL);
636         LASSERT(oh->ot_handle == NULL);
637
638         if (error) {
639                 CERROR("transaction @0x%p commit error: %d\n", th, error);
640         } else {
641                 struct lu_env *env = &osd_dt_dev(dev)->od_env_for_commit;
642                 /*
643                  * This od_env_for_commit is only for commit usage.  see
644                  * "struct dt_device"
645                  */
646                 lu_context_enter(&env->le_ctx);
647                 dt_txn_hook_commit(env, th);
648                 lu_context_exit(&env->le_ctx);
649         }
650
651         lu_ref_del_at(&lud->ld_reference, oh->ot_dev_link, "osd-tx", th);
652         lu_device_put(lud);
653         th->th_dev = NULL;
654
655         lu_context_exit(&th->th_ctx);
656         lu_context_fini(&th->th_ctx);
657         OBD_FREE_PTR(oh);
658 }
659
660 /*
661  * Concurrency: shouldn't matter.
662  */
663 static struct thandle *osd_trans_start(const struct lu_env *env,
664                                        struct dt_device *d,
665                                        struct txn_param *p)
666 {
667         struct osd_device  *dev = osd_dt_dev(d);
668         handle_t           *jh;
669         struct osd_thandle *oh;
670         struct thandle     *th;
671         int hook_res;
672
673         ENTRY;
674
675         hook_res = dt_txn_hook_start(env, d, p);
676         if (hook_res != 0)
677                 RETURN(ERR_PTR(hook_res));
678
679         if (osd_param_is_sane(dev, p)) {
680                 OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO);
681                 if (oh != NULL) {
682                         struct osd_thread_info *oti = osd_oti_get(env);
683
684                         /*
685                          * XXX temporary stuff. Some abstraction layer should
686                          * be used.
687                          */
688                         oti->oti_dev = dev;
689                         osd_th_alloced(oh);
690                         jh = ldiskfs_journal_start_sb(osd_sb(dev), p->tp_credits);
691                         osd_th_started(oh);
692                         if (!IS_ERR(jh)) {
693                                 oh->ot_handle = jh;
694                                 th = &oh->ot_super;
695                                 th->th_dev = d;
696                                 th->th_result = 0;
697                                 jh->h_sync = p->tp_sync;
698                                 lu_device_get(&d->dd_lu_dev);
699                                 oh->ot_dev_link = lu_ref_add
700                                         (&d->dd_lu_dev.ld_reference,
701                                          "osd-tx", th);
702                                 /* add commit callback */
703                                 lu_context_init(&th->th_ctx, LCT_TX_HANDLE);
704                                 lu_context_enter(&th->th_ctx);
705                                 osd_journal_callback_set(jh,osd_trans_commit_cb,
706                                                          &oh->ot_jcb);
707                                 LASSERT(oti->oti_txns == 0);
708                                 LASSERT(oti->oti_r_locks == 0);
709                                 LASSERT(oti->oti_w_locks == 0);
710                                 oti->oti_txns++;
711                         } else {
712                                 OBD_FREE_PTR(oh);
713                                 th = (void *)jh;
714                         }
715                 } else
716                         th = ERR_PTR(-ENOMEM);
717         } else {
718                 CERROR("Invalid transaction parameters\n");
719                 th = ERR_PTR(-EINVAL);
720         }
721
722         RETURN(th);
723 }
724
725 /*
726  * Concurrency: shouldn't matter.
727  */
728 static void osd_trans_stop(const struct lu_env *env, struct thandle *th)
729 {
730         int result;
731         struct osd_thandle *oh;
732         struct osd_thread_info *oti = osd_oti_get(env);
733
734         ENTRY;
735
736         oh = container_of0(th, struct osd_thandle, ot_super);
737         if (oh->ot_handle != NULL) {
738                 handle_t *hdl = oh->ot_handle;
739
740                 LASSERT(oti->oti_txns == 1);
741                 oti->oti_txns--;
742                 LASSERT(oti->oti_r_locks == 0);
743                 LASSERT(oti->oti_w_locks == 0);
744                 result = dt_txn_hook_stop(env, th);
745                 if (result != 0)
746                         CERROR("Failure in transaction hook: %d\n", result);
747                 oh->ot_handle = NULL;
748                 OSD_CHECK_SLOW_TH(oh, oti->oti_dev,
749                                   result = ldiskfs_journal_stop(hdl));
750                 if (result != 0)
751                         CERROR("Failure to stop transaction: %d\n", result);
752         }
753         EXIT;
754 }
755
756 /*
757  * Concurrency: no concurrent access is possible that late in object
758  * life-cycle.
759  */
760 static int osd_inode_remove(const struct lu_env *env, struct osd_object *obj)
761 {
762         const struct lu_fid    *fid = lu_object_fid(&obj->oo_dt.do_lu);
763         struct osd_device      *osd = osd_obj2dev(obj);
764         struct osd_thread_info *oti = osd_oti_get(env);
765         struct txn_param       *prm = &oti->oti_txn;
766         struct lu_env          *env_del_obj = &oti->oti_obj_delete_tx_env;
767         struct thandle         *th;
768         int result;
769
770         lu_env_init(env_del_obj, LCT_DT_THREAD);
771         txn_param_init(prm, OSD_TXN_OI_DELETE_CREDITS +
772                             OSD_TXN_INODE_DELETE_CREDITS);
773         th = osd_trans_start(env_del_obj, &osd->od_dt_dev, prm);
774         if (!IS_ERR(th)) {
775                 result = osd_oi_delete(osd_oti_get(env_del_obj),
776                                        &osd->od_oi, fid, th);
777                 osd_trans_stop(env_del_obj, th);
778         } else
779                 result = PTR_ERR(th);
780
781         lu_env_fini(env_del_obj);
782         return result;
783 }
784
785 /*
786  * Called just before object is freed. Releases all resources except for
787  * object itself (that is released by osd_object_free()).
788  *
789  * Concurrency: no concurrent access is possible that late in object
790  * life-cycle.
791  */
792 static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
793 {
794         struct osd_object *obj   = osd_obj(l);
795         struct inode      *inode = obj->oo_inode;
796
797         LINVRNT(osd_invariant(obj));
798
799         /*
800          * If object is unlinked remove fid->ino mapping from object index.
801          */
802
803         osd_index_fini(obj);
804         if (inode != NULL) {
805                 int result;
806
807                 if (osd_inode_unlinked(inode)) {
808                         result = osd_inode_remove(env, obj);
809                         if (result != 0)
810                                 LU_OBJECT_DEBUG(D_ERROR, env, l,
811                                                 "Failed to cleanup: %d\n",
812                                                 result);
813                 }
814
815                 iput(inode);
816                 obj->oo_inode = NULL;
817         }
818 }
819
820 /*
821  * Concurrency: ->loo_object_release() is called under site spin-lock.
822  */
823 static void osd_object_release(const struct lu_env *env,
824                                struct lu_object *l)
825 {
826         struct osd_object *o = osd_obj(l);
827
828         LASSERT(!lu_object_is_dying(l->lo_header));
829         if (o->oo_inode != NULL && osd_inode_unlinked(o->oo_inode))
830                 cfs_set_bit(LU_OBJECT_HEARD_BANSHEE, &l->lo_header->loh_flags);
831 }
832
833 /*
834  * Concurrency: shouldn't matter.
835  */
836 static int osd_object_print(const struct lu_env *env, void *cookie,
837                             lu_printer_t p, const struct lu_object *l)
838 {
839         struct osd_object *o = osd_obj(l);
840         struct iam_descr  *d;
841
842         if (o->oo_dir != NULL)
843                 d = o->oo_dir->od_container.ic_descr;
844         else
845                 d = NULL;
846         return (*p)(env, cookie, LUSTRE_OSD_NAME"-object@%p(i:%p:%lu/%u)[%s]",
847                     o, o->oo_inode,
848                     o->oo_inode ? o->oo_inode->i_ino : 0UL,
849                     o->oo_inode ? o->oo_inode->i_generation : 0,
850                     d ? d->id_ops->id_name : "plain");
851 }
852
853 /*
854  * Concurrency: shouldn't matter.
855  */
856 int osd_statfs(const struct lu_env *env, struct dt_device *d,
857                cfs_kstatfs_t *sfs)
858 {
859         struct osd_device *osd = osd_dt_dev(d);
860         struct super_block *sb = osd_sb(osd);
861         int result = 0;
862
863         cfs_spin_lock(&osd->od_osfs_lock);
864         /* cache 1 second */
865         if (cfs_time_before_64(osd->od_osfs_age, cfs_time_shift_64(-1))) {
866                 result = ll_do_statfs(sb, &osd->od_kstatfs);
867                 if (likely(result == 0)) /* N.B. statfs can't really fail */
868                         osd->od_osfs_age = cfs_time_current_64();
869         }
870
871         if (likely(result == 0))
872                 *sfs = osd->od_kstatfs;
873         cfs_spin_unlock(&osd->od_osfs_lock);
874
875         return result;
876 }
877
878 /*
879  * Concurrency: doesn't access mutable data.
880  */
881 static void osd_conf_get(const struct lu_env *env,
882                          const struct dt_device *dev,
883                          struct dt_device_param *param)
884 {
885         struct super_block *sb = osd_sb(osd_dt_dev(dev));
886
887         /*
888          * XXX should be taken from not-yet-existing fs abstraction layer.
889          */
890         param->ddp_max_name_len = LDISKFS_NAME_LEN;
891         param->ddp_max_nlink    = LDISKFS_LINK_MAX;
892         param->ddp_block_shift  = osd_sb(osd_dt_dev(dev))->s_blocksize_bits;
893         param->ddp_mntopts      = 0;
894         if (test_opt(sb, XATTR_USER))
895                 param->ddp_mntopts |= MNTOPT_USERXATTR;
896         if (test_opt(sb, POSIX_ACL))
897                 param->ddp_mntopts |= MNTOPT_ACL;
898 }
899
900 /**
901  * Helper function to get and fill the buffer with input values.
902  */
903 static struct lu_buf *osd_buf_get(const struct lu_env *env, void *area, ssize_t len)
904 {
905         struct lu_buf *buf;
906
907         buf = &osd_oti_get(env)->oti_buf;
908         buf->lb_buf = area;
909         buf->lb_len = len;
910         return buf;
911 }
912
913 /*
914  * Concurrency: shouldn't matter.
915  */
916 static int osd_sync(const struct lu_env *env, struct dt_device *d)
917 {
918         CDEBUG(D_HA, "syncing OSD %s\n", LUSTRE_OSD_NAME);
919         return ldiskfs_force_commit(osd_sb(osd_dt_dev(d)));
920 }
921
922 /**
923  * Start commit for OSD device.
924  *
925  * An implementation of dt_commit_async method for OSD device.
926  * Asychronously starts underlayng fs sync and thereby a transaction
927  * commit.
928  *
929  * \param env environment
930  * \param d dt device
931  *
932  * \see dt_device_operations
933  */
934 static int osd_commit_async(const struct lu_env *env,
935                             struct dt_device *d)
936 {
937         struct super_block *s = osd_sb(osd_dt_dev(d));
938         ENTRY;
939
940         CDEBUG(D_HA, "async commit OSD %s\n", LUSTRE_OSD_NAME);
941         RETURN(s->s_op->sync_fs(s, 0));
942 }
943
944 /*
945  * Concurrency: shouldn't matter.
946  */
947 lvfs_sbdev_type fsfilt_ldiskfs_journal_sbdev(struct super_block *);
948
949 static void osd_ro(const struct lu_env *env, struct dt_device *d)
950 {
951         ENTRY;
952
953         CERROR("*** setting device %s read-only ***\n", LUSTRE_OSD_NAME);
954
955         __lvfs_set_rdonly(lvfs_sbdev(osd_sb(osd_dt_dev(d))),
956                           fsfilt_ldiskfs_journal_sbdev(osd_sb(osd_dt_dev(d))));
957         EXIT;
958 }
959
960
961 /*
962  * Concurrency: serialization provided by callers.
963  */
964 static int osd_init_capa_ctxt(const struct lu_env *env, struct dt_device *d,
965                               int mode, unsigned long timeout, __u32 alg,
966                               struct lustre_capa_key *keys)
967 {
968         struct osd_device *dev = osd_dt_dev(d);
969         ENTRY;
970
971         dev->od_fl_capa = mode;
972         dev->od_capa_timeout = timeout;
973         dev->od_capa_alg = alg;
974         dev->od_capa_keys = keys;
975         RETURN(0);
976 }
977
978 /**
979  * Concurrency: serialization provided by callers.
980  */
981 static void osd_init_quota_ctxt(const struct lu_env *env, struct dt_device *d,
982                                struct dt_quota_ctxt *ctxt, void *data)
983 {
984         struct obd_device *obd = (void *)ctxt;
985         struct vfsmount *mnt = (struct vfsmount *)data;
986         ENTRY;
987
988         obd->u.obt.obt_sb = mnt->mnt_root->d_inode->i_sb;
989         OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
990         obd->obd_lvfs_ctxt.pwdmnt = mnt;
991         obd->obd_lvfs_ctxt.pwd = mnt->mnt_root;
992         obd->obd_lvfs_ctxt.fs = get_ds();
993
994         EXIT;
995 }
996
997 /**
998  * Note: we do not count into QUOTA here.
999  * If we mount with --data_journal we may need more.
1000  */
1001 static const int osd_dto_credits_noquota[DTO_NR] = {
1002         /**
1003          * Insert/Delete.
1004          * INDEX_EXTRA_TRANS_BLOCKS(8) +
1005          * SINGLEDATA_TRANS_BLOCKS(8)
1006          * XXX Note: maybe iam need more, since iam have more level than
1007          *           EXT3 htree.
1008          */
1009         [DTO_INDEX_INSERT]  = 16,
1010         [DTO_INDEX_DELETE]  = 16,
1011         /**
1012          * Unused now
1013          */
1014         [DTO_IDNEX_UPDATE]  = 16,
1015         /**
1016          * Create a object. The same as create object in EXT3.
1017          * DATA_TRANS_BLOCKS(14) +
1018          * INDEX_EXTRA_BLOCKS(8) +
1019          * 3(inode bits, groups, GDT)
1020          */
1021         [DTO_OBJECT_CREATE] = 25,
1022         /**
1023          * Unused now
1024          */
1025         [DTO_OBJECT_DELETE] = 25,
1026         /**
1027          * Attr set credits.
1028          * 3(inode bits, group, GDT)
1029          */
1030         [DTO_ATTR_SET_BASE] = 3,
1031         /**
1032          * Xattr set. The same as xattr of EXT3.
1033          * DATA_TRANS_BLOCKS(14)
1034          * XXX Note: in original MDS implmentation INDEX_EXTRA_TRANS_BLOCKS
1035          * are also counted in. Do not know why?
1036          */
1037         [DTO_XATTR_SET]     = 14,
1038         [DTO_LOG_REC]       = 14,
1039         /**
1040          * creadits for inode change during write.
1041          */
1042         [DTO_WRITE_BASE]    = 3,
1043         /**
1044          * credits for single block write.
1045          */
1046         [DTO_WRITE_BLOCK]   = 14,
1047         /**
1048          * Attr set credits for chown.
1049          * This is extra credits for setattr, and it is null without quota
1050          */
1051         [DTO_ATTR_SET_CHOWN]= 0
1052 };
1053
1054 /**
1055  * Note: we count into QUOTA here.
1056  * If we mount with --data_journal we may need more.
1057  */
1058 static const int osd_dto_credits_quota[DTO_NR] = {
1059         /**
1060          * INDEX_EXTRA_TRANS_BLOCKS(8) +
1061          * SINGLEDATA_TRANS_BLOCKS(8) +
1062          * 2 * QUOTA_TRANS_BLOCKS(2)
1063          */
1064         [DTO_INDEX_INSERT]  = 20,
1065         /**
1066          * INDEX_EXTRA_TRANS_BLOCKS(8) +
1067          * SINGLEDATA_TRANS_BLOCKS(8) +
1068          * 2 * QUOTA_TRANS_BLOCKS(2)
1069          */
1070         [DTO_INDEX_DELETE]  = 20,
1071         /**
1072          * Unused now.
1073          */
1074         [DTO_IDNEX_UPDATE]  = 16,
1075         /*
1076          * Create a object. Same as create object in EXT3 filesystem.
1077          * DATA_TRANS_BLOCKS(16) +
1078          * INDEX_EXTRA_BLOCKS(8) +
1079          * 3(inode bits, groups, GDT) +
1080          * 2 * QUOTA_INIT_BLOCKS(25)
1081          */
1082         [DTO_OBJECT_CREATE] = 77,
1083         /*
1084          * Unused now.
1085          * DATA_TRANS_BLOCKS(16) +
1086          * INDEX_EXTRA_BLOCKS(8) +
1087          * 3(inode bits, groups, GDT) +
1088          * QUOTA(?)
1089          */
1090         [DTO_OBJECT_DELETE] = 27,
1091         /**
1092          * Attr set credits.
1093          * 3 (inode bit, group, GDT) +
1094          */
1095         [DTO_ATTR_SET_BASE] = 3,
1096         /**
1097          * Xattr set. The same as xattr of EXT3.
1098          * DATA_TRANS_BLOCKS(16)
1099          * XXX Note: in original MDS implmentation INDEX_EXTRA_TRANS_BLOCKS are
1100          *           also counted in. Do not know why?
1101          */
1102         [DTO_XATTR_SET]     = 16,
1103         [DTO_LOG_REC]       = 16,
1104         /**
1105          * creadits for inode change during write.
1106          */
1107         [DTO_WRITE_BASE]    = 3,
1108         /**
1109          * credits for single block write.
1110          */
1111         [DTO_WRITE_BLOCK]   = 16,
1112         /**
1113          * Attr set credits for chown.
1114          * It is added to already set setattr credits
1115          * 2 * QUOTA_INIT_BLOCKS(25) +
1116          * 2 * QUOTA_DEL_BLOCKS(9)
1117          */
1118         [DTO_ATTR_SET_CHOWN]= 68,
1119 };
1120
1121 static int osd_credit_get(const struct lu_env *env, struct dt_device *d,
1122                           enum dt_txn_op op)
1123 {
1124         LASSERT(ARRAY_SIZE(osd_dto_credits_noquota) ==
1125                 ARRAY_SIZE(osd_dto_credits_quota));
1126         LASSERT(0 <= op && op < ARRAY_SIZE(osd_dto_credits_noquota));
1127 #ifdef HAVE_QUOTA_SUPPORT
1128         if (test_opt(osd_sb(osd_dt_dev(d)), QUOTA))
1129                 return osd_dto_credits_quota[op];
1130         else
1131 #endif
1132                 return osd_dto_credits_noquota[op];
1133 }
1134
1135 static const struct dt_device_operations osd_dt_ops = {
1136         .dt_root_get       = osd_root_get,
1137         .dt_statfs         = osd_statfs,
1138         .dt_trans_start    = osd_trans_start,
1139         .dt_trans_stop     = osd_trans_stop,
1140         .dt_conf_get       = osd_conf_get,
1141         .dt_sync           = osd_sync,
1142         .dt_ro             = osd_ro,
1143         .dt_commit_async   = osd_commit_async,
1144         .dt_credit_get     = osd_credit_get,
1145         .dt_init_capa_ctxt = osd_init_capa_ctxt,
1146         .dt_init_quota_ctxt= osd_init_quota_ctxt,
1147 };
1148
1149 static void osd_object_read_lock(const struct lu_env *env,
1150                                  struct dt_object *dt, unsigned role)
1151 {
1152         struct osd_object *obj = osd_dt_obj(dt);
1153         struct osd_thread_info *oti = osd_oti_get(env);
1154
1155         LINVRNT(osd_invariant(obj));
1156
1157         LASSERT(obj->oo_owner != env);
1158         cfs_down_read_nested(&obj->oo_sem, role);
1159
1160         LASSERT(obj->oo_owner == NULL);
1161         oti->oti_r_locks++;
1162 }
1163
1164 static void osd_object_write_lock(const struct lu_env *env,
1165                                   struct dt_object *dt, unsigned role)
1166 {
1167         struct osd_object *obj = osd_dt_obj(dt);
1168         struct osd_thread_info *oti = osd_oti_get(env);
1169
1170         LINVRNT(osd_invariant(obj));
1171
1172         LASSERT(obj->oo_owner != env);
1173         cfs_down_write_nested(&obj->oo_sem, role);
1174
1175         LASSERT(obj->oo_owner == NULL);
1176         obj->oo_owner = env;
1177         oti->oti_w_locks++;
1178 }
1179
1180 static void osd_object_read_unlock(const struct lu_env *env,
1181                                    struct dt_object *dt)
1182 {
1183         struct osd_object *obj = osd_dt_obj(dt);
1184         struct osd_thread_info *oti = osd_oti_get(env);
1185
1186         LINVRNT(osd_invariant(obj));
1187
1188         LASSERT(oti->oti_r_locks > 0);
1189         oti->oti_r_locks--;
1190         cfs_up_read(&obj->oo_sem);
1191 }
1192
1193 static void osd_object_write_unlock(const struct lu_env *env,
1194                                     struct dt_object *dt)
1195 {
1196         struct osd_object *obj = osd_dt_obj(dt);
1197         struct osd_thread_info *oti = osd_oti_get(env);
1198
1199         LINVRNT(osd_invariant(obj));
1200
1201         LASSERT(obj->oo_owner == env);
1202         LASSERT(oti->oti_w_locks > 0);
1203         oti->oti_w_locks--;
1204         obj->oo_owner = NULL;
1205         cfs_up_write(&obj->oo_sem);
1206 }
1207
1208 static int osd_object_write_locked(const struct lu_env *env,
1209                                    struct dt_object *dt)
1210 {
1211         struct osd_object *obj = osd_dt_obj(dt);
1212
1213         LINVRNT(osd_invariant(obj));
1214
1215         return obj->oo_owner == env;
1216 }
1217
1218 static int capa_is_sane(const struct lu_env *env,
1219                         struct osd_device *dev,
1220                         struct lustre_capa *capa,
1221                         struct lustre_capa_key *keys)
1222 {
1223         struct osd_thread_info *oti = osd_oti_get(env);
1224         struct lustre_capa *tcapa = &oti->oti_capa;
1225         struct obd_capa *oc;
1226         int i, rc = 0;
1227         ENTRY;
1228
1229         oc = capa_lookup(dev->od_capa_hash, capa, 0);
1230         if (oc) {
1231                 if (capa_is_expired(oc)) {
1232                         DEBUG_CAPA(D_ERROR, capa, "expired");
1233                         rc = -ESTALE;
1234                 }
1235                 capa_put(oc);
1236                 RETURN(rc);
1237         }
1238
1239         if (capa_is_expired_sec(capa)) {
1240                 DEBUG_CAPA(D_ERROR, capa, "expired");
1241                 RETURN(-ESTALE);
1242         }
1243
1244         cfs_spin_lock(&capa_lock);
1245         for (i = 0; i < 2; i++) {
1246                 if (keys[i].lk_keyid == capa->lc_keyid) {
1247                         oti->oti_capa_key = keys[i];
1248                         break;
1249                 }
1250         }
1251         cfs_spin_unlock(&capa_lock);
1252
1253         if (i == 2) {
1254                 DEBUG_CAPA(D_ERROR, capa, "no matched capa key");
1255                 RETURN(-ESTALE);
1256         }
1257
1258         rc = capa_hmac(tcapa->lc_hmac, capa, oti->oti_capa_key.lk_key);
1259         if (rc)
1260                 RETURN(rc);
1261
1262         if (memcmp(tcapa->lc_hmac, capa->lc_hmac, sizeof(capa->lc_hmac))) {
1263                 DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch");
1264                 RETURN(-EACCES);
1265         }
1266
1267         oc = capa_add(dev->od_capa_hash, capa);
1268         capa_put(oc);
1269
1270         RETURN(0);
1271 }
1272
1273 static int osd_object_auth(const struct lu_env *env, struct dt_object *dt,
1274                            struct lustre_capa *capa, __u64 opc)
1275 {
1276         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
1277         struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
1278         struct md_capainfo *ci;
1279         int rc;
1280
1281         if (!dev->od_fl_capa)
1282                 return 0;
1283
1284         if (capa == BYPASS_CAPA)
1285                 return 0;
1286
1287         ci = md_capainfo(env);
1288         if (unlikely(!ci))
1289                 return 0;
1290
1291         if (ci->mc_auth == LC_ID_NONE)
1292                 return 0;
1293
1294         if (!capa) {
1295                 CERROR("no capability is provided for fid "DFID"\n", PFID(fid));
1296                 return -EACCES;
1297         }
1298
1299         if (!lu_fid_eq(fid, &capa->lc_fid)) {
1300                 DEBUG_CAPA(D_ERROR, capa, "fid "DFID" mismatch with",
1301                            PFID(fid));
1302                 return -EACCES;
1303         }
1304
1305         if (!capa_opc_supported(capa, opc)) {
1306                 DEBUG_CAPA(D_ERROR, capa, "opc "LPX64" not supported by", opc);
1307                 return -EACCES;
1308         }
1309
1310         if ((rc = capa_is_sane(env, dev, capa, dev->od_capa_keys))) {
1311                 DEBUG_CAPA(D_ERROR, capa, "insane (rc %d)", rc);
1312                 return -EACCES;
1313         }
1314
1315         return 0;
1316 }
1317
1318 static struct timespec *osd_inode_time(const struct lu_env *env,
1319                                        struct inode *inode, __u64 seconds)
1320 {
1321         struct osd_thread_info *oti = osd_oti_get(env);
1322         struct timespec        *t   = &oti->oti_time;
1323
1324         t->tv_sec  = seconds;
1325         t->tv_nsec = 0;
1326         *t = timespec_trunc(*t, get_sb_time_gran(inode->i_sb));
1327         return t;
1328 }
1329
1330
1331 static void osd_inode_getattr(const struct lu_env *env,
1332                               struct inode *inode, struct lu_attr *attr)
1333 {
1334         attr->la_valid      |= LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
1335                                LA_SIZE | LA_BLOCKS | LA_UID | LA_GID |
1336                                LA_FLAGS | LA_NLINK | LA_RDEV | LA_BLKSIZE;
1337
1338         attr->la_atime      = LTIME_S(inode->i_atime);
1339         attr->la_mtime      = LTIME_S(inode->i_mtime);
1340         attr->la_ctime      = LTIME_S(inode->i_ctime);
1341         attr->la_mode       = inode->i_mode;
1342         attr->la_size       = i_size_read(inode);
1343         attr->la_blocks     = inode->i_blocks;
1344         attr->la_uid        = inode->i_uid;
1345         attr->la_gid        = inode->i_gid;
1346         attr->la_flags      = LDISKFS_I(inode)->i_flags;
1347         attr->la_nlink      = inode->i_nlink;
1348         attr->la_rdev       = inode->i_rdev;
1349         attr->la_blksize    = ll_inode_blksize(inode);
1350         attr->la_blkbits    = inode->i_blkbits;
1351 }
1352
1353 static int osd_attr_get(const struct lu_env *env,
1354                         struct dt_object *dt,
1355                         struct lu_attr *attr,
1356                         struct lustre_capa *capa)
1357 {
1358         struct osd_object *obj = osd_dt_obj(dt);
1359
1360         LASSERT(dt_object_exists(dt));
1361         LINVRNT(osd_invariant(obj));
1362
1363         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1364                 return -EACCES;
1365
1366         cfs_spin_lock(&obj->oo_guard);
1367         osd_inode_getattr(env, obj->oo_inode, attr);
1368         cfs_spin_unlock(&obj->oo_guard);
1369         return 0;
1370 }
1371
1372 static int osd_inode_setattr(const struct lu_env *env,
1373                              struct inode *inode, const struct lu_attr *attr)
1374 {
1375         __u64 bits;
1376
1377         bits = attr->la_valid;
1378
1379         LASSERT(!(bits & LA_TYPE)); /* Huh? You want too much. */
1380
1381 #ifdef HAVE_QUOTA_SUPPORT
1382         if ((bits & LA_UID && attr->la_uid != inode->i_uid) ||
1383             (bits & LA_GID && attr->la_gid != inode->i_gid)) {
1384                 struct osd_ctxt *save = &osd_oti_get(env)->oti_ctxt;
1385                 struct iattr iattr;
1386                 int rc;
1387
1388                 iattr.ia_valid = 0;
1389                 if (bits & LA_UID)
1390                         iattr.ia_valid |= ATTR_UID;
1391                 if (bits & LA_GID)
1392                         iattr.ia_valid |= ATTR_GID;
1393                 iattr.ia_uid = attr->la_uid;
1394                 iattr.ia_gid = attr->la_gid;
1395                 osd_push_ctxt(env, save);
1396                 rc = ll_vfs_dq_transfer(inode, &iattr) ? -EDQUOT : 0;
1397                 osd_pop_ctxt(save);
1398                 if (rc != 0)
1399                         return rc;
1400         }
1401 #endif
1402
1403         if (bits & LA_ATIME)
1404                 inode->i_atime  = *osd_inode_time(env, inode, attr->la_atime);
1405         if (bits & LA_CTIME)
1406                 inode->i_ctime  = *osd_inode_time(env, inode, attr->la_ctime);
1407         if (bits & LA_MTIME)
1408                 inode->i_mtime  = *osd_inode_time(env, inode, attr->la_mtime);
1409         if (bits & LA_SIZE) {
1410                 LDISKFS_I(inode)->i_disksize = attr->la_size;
1411                 i_size_write(inode, attr->la_size);
1412         }
1413
1414 #if 0
1415         /* OSD should not change "i_blocks" which is used by quota.
1416          * "i_blocks" should be changed by ldiskfs only. */
1417         if (bits & LA_BLOCKS)
1418                 inode->i_blocks = attr->la_blocks;
1419 #endif
1420         if (bits & LA_MODE)
1421                 inode->i_mode   = (inode->i_mode & S_IFMT) |
1422                         (attr->la_mode & ~S_IFMT);
1423         if (bits & LA_UID)
1424                 inode->i_uid    = attr->la_uid;
1425         if (bits & LA_GID)
1426                 inode->i_gid    = attr->la_gid;
1427         if (bits & LA_NLINK)
1428                 inode->i_nlink  = attr->la_nlink;
1429         if (bits & LA_RDEV)
1430                 inode->i_rdev   = attr->la_rdev;
1431
1432         if (bits & LA_FLAGS) {
1433                 /* always keep S_NOCMTIME */
1434                 inode->i_flags = ll_ext_to_inode_flags(attr->la_flags) |
1435                                  S_NOCMTIME;
1436         }
1437         return 0;
1438 }
1439
1440 static int osd_attr_set(const struct lu_env *env,
1441                         struct dt_object *dt,
1442                         const struct lu_attr *attr,
1443                         struct thandle *handle,
1444                         struct lustre_capa *capa)
1445 {
1446         struct osd_object *obj = osd_dt_obj(dt);
1447         int rc;
1448
1449         LASSERT(handle != NULL);
1450         LASSERT(dt_object_exists(dt));
1451         LASSERT(osd_invariant(obj));
1452
1453         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1454                 return -EACCES;
1455
1456         cfs_spin_lock(&obj->oo_guard);
1457         rc = osd_inode_setattr(env, obj->oo_inode, attr);
1458         cfs_spin_unlock(&obj->oo_guard);
1459
1460         if (!rc)
1461                 obj->oo_inode->i_sb->s_op->dirty_inode(obj->oo_inode);
1462         return rc;
1463 }
1464
1465 /*
1466  * Object creation.
1467  *
1468  * XXX temporary solution.
1469  */
1470 static int osd_create_pre(struct osd_thread_info *info, struct osd_object *obj,
1471                           struct lu_attr *attr, struct thandle *th)
1472 {
1473         return 0;
1474 }
1475
1476 static int osd_create_post(struct osd_thread_info *info, struct osd_object *obj,
1477                            struct lu_attr *attr, struct thandle *th)
1478 {
1479         osd_object_init0(obj);
1480         if (obj->oo_inode && (obj->oo_inode->i_state & I_NEW))
1481                 unlock_new_inode(obj->oo_inode);
1482         return 0;
1483 }
1484
1485 static struct dentry * osd_child_dentry_get(const struct lu_env *env,
1486                                             struct osd_object *obj,
1487                                             const char *name,
1488                                             const int namelen)
1489 {
1490         struct osd_thread_info *info   = osd_oti_get(env);
1491         struct dentry *child_dentry = &info->oti_child_dentry;
1492         struct dentry *obj_dentry = &info->oti_obj_dentry;
1493
1494         obj_dentry->d_inode = obj->oo_inode;
1495         obj_dentry->d_sb = osd_sb(osd_obj2dev(obj));
1496         obj_dentry->d_name.hash = 0;
1497
1498         child_dentry->d_name.hash = 0;
1499         child_dentry->d_parent = obj_dentry;
1500         child_dentry->d_name.name = name;
1501         child_dentry->d_name.len = namelen;
1502         return child_dentry;
1503 }
1504
1505
1506 static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
1507                       cfs_umode_t mode,
1508                       struct dt_allocation_hint *hint,
1509                       struct thandle *th)
1510 {
1511         int result;
1512         struct osd_device  *osd = osd_obj2dev(obj);
1513         struct osd_thandle *oth;
1514         struct dt_object   *parent;
1515         struct inode       *inode;
1516 #ifdef HAVE_QUOTA_SUPPORT
1517         struct osd_ctxt    *save = &info->oti_ctxt;
1518 #endif
1519
1520         LINVRNT(osd_invariant(obj));
1521         LASSERT(obj->oo_inode == NULL);
1522
1523         oth = container_of(th, struct osd_thandle, ot_super);
1524         LASSERT(oth->ot_handle->h_transaction != NULL);
1525
1526         if (hint && hint->dah_parent)
1527                 parent = hint->dah_parent;
1528         else
1529                 parent = osd->od_obj_area;
1530
1531         LASSERT(parent != NULL);
1532         LASSERT(osd_dt_obj(parent)->oo_inode->i_op != NULL);
1533
1534 #ifdef HAVE_QUOTA_SUPPORT
1535         osd_push_ctxt(info->oti_env, save);
1536 #endif
1537         inode = ldiskfs_create_inode(oth->ot_handle,
1538                                      osd_dt_obj(parent)->oo_inode, mode);
1539 #ifdef HAVE_QUOTA_SUPPORT
1540         osd_pop_ctxt(save);
1541 #endif
1542         if (!IS_ERR(inode)) {
1543                 /* Do not update file c/mtime in ldiskfs.
1544                  * NB: don't need any lock because no contention at this
1545                  * early stage */
1546                 inode->i_flags |= S_NOCMTIME;
1547                 obj->oo_inode = inode;
1548                 result = 0;
1549         } else
1550                 result = PTR_ERR(inode);
1551         LINVRNT(osd_invariant(obj));
1552         return result;
1553 }
1554
1555 enum {
1556         OSD_NAME_LEN = 255
1557 };
1558
1559 static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj,
1560                      struct lu_attr *attr,
1561                      struct dt_allocation_hint *hint,
1562                      struct dt_object_format *dof,
1563                      struct thandle *th)
1564 {
1565         int result;
1566         struct osd_thandle *oth;
1567         struct osd_device *osd = osd_obj2dev(obj);
1568         __u32 mode = (attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX));
1569
1570         LASSERT(S_ISDIR(attr->la_mode));
1571
1572         oth = container_of(th, struct osd_thandle, ot_super);
1573         LASSERT(oth->ot_handle->h_transaction != NULL);
1574         result = osd_mkfile(info, obj, mode, hint, th);
1575         if (result == 0 && osd->od_iop_mode == 0) {
1576                 LASSERT(obj->oo_inode != NULL);
1577                 /*
1578                  * XXX uh-oh... call low-level iam function directly.
1579                  */
1580
1581                 result = iam_lvar_create(obj->oo_inode, OSD_NAME_LEN, 4,
1582                                          sizeof (struct osd_fid_pack),
1583                                          oth->ot_handle);
1584         }
1585         return result;
1586 }
1587
1588 static int osd_mk_index(struct osd_thread_info *info, struct osd_object *obj,
1589                         struct lu_attr *attr,
1590                         struct dt_allocation_hint *hint,
1591                         struct dt_object_format *dof,
1592                         struct thandle *th)
1593 {
1594         int result;
1595         struct osd_thandle *oth;
1596         const struct dt_index_features *feat = dof->u.dof_idx.di_feat;
1597
1598         __u32 mode = (attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX));
1599
1600         LASSERT(S_ISREG(attr->la_mode));
1601
1602         oth = container_of(th, struct osd_thandle, ot_super);
1603         LASSERT(oth->ot_handle->h_transaction != NULL);
1604
1605         result = osd_mkfile(info, obj, mode, hint, th);
1606         if (result == 0) {
1607                 LASSERT(obj->oo_inode != NULL);
1608                 if (feat->dif_flags & DT_IND_VARKEY)
1609                         result = iam_lvar_create(obj->oo_inode,
1610                                                  feat->dif_keysize_max,
1611                                                  feat->dif_ptrsize,
1612                                                  feat->dif_recsize_max,
1613                                                  oth->ot_handle);
1614                 else
1615                         result = iam_lfix_create(obj->oo_inode,
1616                                                  feat->dif_keysize_max,
1617                                                  feat->dif_ptrsize,
1618                                                  feat->dif_recsize_max,
1619                                                  oth->ot_handle);
1620
1621         }
1622         return result;
1623 }
1624
1625 static int osd_mkreg(struct osd_thread_info *info, struct osd_object *obj,
1626                      struct lu_attr *attr,
1627                      struct dt_allocation_hint *hint,
1628                      struct dt_object_format *dof,
1629                      struct thandle *th)
1630 {
1631         LASSERT(S_ISREG(attr->la_mode));
1632         return osd_mkfile(info, obj, (attr->la_mode &
1633                                (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1634 }
1635
1636 static int osd_mksym(struct osd_thread_info *info, struct osd_object *obj,
1637                      struct lu_attr *attr,
1638                      struct dt_allocation_hint *hint,
1639                      struct dt_object_format *dof,
1640                      struct thandle *th)
1641 {
1642         LASSERT(S_ISLNK(attr->la_mode));
1643         return osd_mkfile(info, obj, (attr->la_mode &
1644                               (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1645 }
1646
1647 static int osd_mknod(struct osd_thread_info *info, struct osd_object *obj,
1648                      struct lu_attr *attr,
1649                      struct dt_allocation_hint *hint,
1650                      struct dt_object_format *dof,
1651                      struct thandle *th)
1652 {
1653         cfs_umode_t mode = attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX);
1654         int result;
1655
1656         LINVRNT(osd_invariant(obj));
1657         LASSERT(obj->oo_inode == NULL);
1658         LASSERT(S_ISCHR(mode) || S_ISBLK(mode) ||
1659                 S_ISFIFO(mode) || S_ISSOCK(mode));
1660
1661         result = osd_mkfile(info, obj, mode, hint, th);
1662         if (result == 0) {
1663                 LASSERT(obj->oo_inode != NULL);
1664                 init_special_inode(obj->oo_inode, mode, attr->la_rdev);
1665         }
1666         LINVRNT(osd_invariant(obj));
1667         return result;
1668 }
1669
1670 typedef int (*osd_obj_type_f)(struct osd_thread_info *, struct osd_object *,
1671                               struct lu_attr *,
1672                               struct dt_allocation_hint *hint,
1673                               struct dt_object_format *dof,
1674                               struct thandle *);
1675
1676 static osd_obj_type_f osd_create_type_f(enum dt_format_type type)
1677 {
1678         osd_obj_type_f result;
1679
1680         switch (type) {
1681         case DFT_DIR:
1682                 result = osd_mkdir;
1683                 break;
1684         case DFT_REGULAR:
1685                 result = osd_mkreg;
1686                 break;
1687         case DFT_SYM:
1688                 result = osd_mksym;
1689                 break;
1690         case DFT_NODE:
1691                 result = osd_mknod;
1692                 break;
1693         case DFT_INDEX:
1694                 result = osd_mk_index;
1695                 break;
1696
1697         default:
1698                 LBUG();
1699                 break;
1700         }
1701         return result;
1702 }
1703
1704
1705 static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah,
1706                         struct dt_object *parent, cfs_umode_t child_mode)
1707 {
1708         LASSERT(ah);
1709
1710         memset(ah, 0, sizeof(*ah));
1711         ah->dah_parent = parent;
1712         ah->dah_mode = child_mode;
1713 }
1714
1715 /**
1716  * Helper function for osd_object_create()
1717  *
1718  * \retval 0, on success
1719  */
1720 static int __osd_object_create(struct osd_thread_info *info,
1721                                struct osd_object *obj, struct lu_attr *attr,
1722                                struct dt_allocation_hint *hint,
1723                                struct dt_object_format *dof,
1724                                struct thandle *th)
1725 {
1726
1727         int result;
1728
1729         result = osd_create_pre(info, obj, attr, th);
1730         if (result == 0) {
1731                 result = osd_create_type_f(dof->dof_type)(info, obj,
1732                                            attr, hint, dof, th);
1733                 if (result == 0)
1734                         result = osd_create_post(info, obj, attr, th);
1735         }
1736         return result;
1737 }
1738
1739 /**
1740  * Helper function for osd_object_create()
1741  *
1742  * \retval 0, on success
1743  */
1744 static int __osd_oi_insert(const struct lu_env *env, struct osd_object *obj,
1745                            const struct lu_fid *fid, struct thandle *th)
1746 {
1747         struct osd_thread_info *info = osd_oti_get(env);
1748         struct osd_inode_id    *id   = &info->oti_id;
1749         struct osd_device      *osd  = osd_obj2dev(obj);
1750         struct md_ucred        *uc   = md_ucred(env);
1751
1752         LASSERT(obj->oo_inode != NULL);
1753         LASSERT(uc != NULL);
1754
1755         id->oii_ino = obj->oo_inode->i_ino;
1756         id->oii_gen = obj->oo_inode->i_generation;
1757
1758         return osd_oi_insert(info, &osd->od_oi, fid, id, th,
1759                              uc->mu_cap & CFS_CAP_SYS_RESOURCE_MASK);
1760 }
1761
1762 static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
1763                              struct lu_attr *attr,
1764                              struct dt_allocation_hint *hint,
1765                              struct dt_object_format *dof,
1766                              struct thandle *th)
1767 {
1768         const struct lu_fid    *fid    = lu_object_fid(&dt->do_lu);
1769         struct osd_object      *obj    = osd_dt_obj(dt);
1770         struct osd_thread_info *info   = osd_oti_get(env);
1771         int result;
1772
1773         ENTRY;
1774
1775         LINVRNT(osd_invariant(obj));
1776         LASSERT(!dt_object_exists(dt));
1777         LASSERT(osd_write_locked(env, obj));
1778         LASSERT(th != NULL);
1779
1780         result = __osd_object_create(info, obj, attr, hint, dof, th);
1781         if (result == 0)
1782                 result = __osd_oi_insert(env, obj, fid, th);
1783
1784         LASSERT(ergo(result == 0, dt_object_exists(dt)));
1785         LASSERT(osd_invariant(obj));
1786         RETURN(result);
1787 }
1788
1789 /**
1790  * Helper function for osd_xattr_set()
1791  */
1792 static int __osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
1793                            const struct lu_buf *buf, const char *name, int fl)
1794 {
1795         struct osd_object      *obj      = osd_dt_obj(dt);
1796         struct inode           *inode    = obj->oo_inode;
1797         struct osd_thread_info *info     = osd_oti_get(env);
1798         struct dentry          *dentry   = &info->oti_child_dentry;
1799         int                     fs_flags = 0;
1800         int  rc;
1801
1802         LASSERT(dt_object_exists(dt));
1803         LASSERT(inode->i_op != NULL && inode->i_op->setxattr != NULL);
1804         LASSERT(osd_write_locked(env, obj));
1805
1806         if (fl & LU_XATTR_REPLACE)
1807                 fs_flags |= XATTR_REPLACE;
1808
1809         if (fl & LU_XATTR_CREATE)
1810                 fs_flags |= XATTR_CREATE;
1811
1812         dentry->d_inode = inode;
1813         rc = inode->i_op->setxattr(dentry, name, buf->lb_buf,
1814                                    buf->lb_len, fs_flags);
1815         return rc;
1816 }
1817
1818 /**
1819  * Put the fid into lustre_mdt_attrs, and then place the structure
1820  * inode's ea. This fid should not be altered during the life time
1821  * of the inode.
1822  *
1823  * \retval +ve, on success
1824  * \retval -ve, on error
1825  *
1826  * FIXME: It is good to have/use ldiskfs_xattr_set_handle() here
1827  */
1828 static int osd_ea_fid_set(const struct lu_env *env, struct dt_object *dt,
1829                           const struct lu_fid *fid)
1830 {
1831         struct osd_thread_info  *info      = osd_oti_get(env);
1832         struct lustre_mdt_attrs *mdt_attrs = &info->oti_mdt_attrs;
1833
1834         lustre_lma_init(mdt_attrs, fid);
1835         lustre_lma_swab(mdt_attrs);
1836         return __osd_xattr_set(env, dt,
1837                                osd_buf_get(env, mdt_attrs, sizeof *mdt_attrs),
1838                                XATTR_NAME_LMA, LU_XATTR_CREATE);
1839
1840 }
1841
1842 /**
1843  * Helper function to form igif
1844  */
1845 static inline void osd_igif_get(const struct lu_env *env, struct inode  *inode,
1846                                 struct lu_fid *fid)
1847 {
1848         LU_IGIF_BUILD(fid, inode->i_ino, inode->i_generation);
1849 }
1850
1851 /**
1852  * Helper function to pack the fid, ldiskfs stores fid in packed format.
1853  */
1854 void osd_fid_pack(struct osd_fid_pack *pack, const struct dt_rec *fid,
1855                   struct lu_fid *befider)
1856 {
1857         fid_cpu_to_be(befider, (struct lu_fid *)fid);
1858         memcpy(pack->fp_area, befider, sizeof(*befider));
1859         pack->fp_len =  sizeof(*befider) + 1;
1860 }
1861
1862 /**
1863  * ldiskfs supports fid in dirent, it is passed in dentry->d_fsdata.
1864  * lustre 1.8 also uses d_fsdata for passing other info to ldiskfs.
1865  * To have compatilibility with 1.8 ldiskfs driver we need to have
1866  * magic number at start of fid data.
1867  * \ldiskfs_dentry_param is used only to pass fid from osd to ldiskfs.
1868  * its inmemory API.
1869  */
1870 void osd_get_ldiskfs_dirent_param(struct ldiskfs_dentry_param *param,
1871                                   const struct dt_rec *fid)
1872 {
1873         param->edp_magic = LDISKFS_LUFID_MAGIC;
1874         param->edp_len =  sizeof(struct lu_fid) + 1;
1875
1876         fid_cpu_to_be((struct lu_fid *)param->edp_data,
1877                       (struct lu_fid *)fid);
1878 }
1879
1880 int osd_fid_unpack(struct lu_fid *fid, const struct osd_fid_pack *pack)
1881 {
1882         int result;
1883
1884         result = 0;
1885         switch (pack->fp_len) {
1886         case sizeof *fid + 1:
1887                 memcpy(fid, pack->fp_area, sizeof *fid);
1888                 fid_be_to_cpu(fid, fid);
1889                 break;
1890         default:
1891                 CERROR("Unexpected packed fid size: %d\n", pack->fp_len);
1892                 result = -EIO;
1893         }
1894         return result;
1895 }
1896
1897 /**
1898  * Try to read the fid from inode ea into dt_rec, if return value
1899  * i.e. rc is +ve, then we got fid, otherwise we will have to form igif
1900  *
1901  * \param fid object fid.
1902  *
1903  * \retval 0 on success
1904  */
1905 static int osd_ea_fid_get(const struct lu_env *env, struct osd_object *obj,
1906                           __u32 ino, struct lu_fid *fid)
1907 {
1908         struct osd_thread_info  *info      = osd_oti_get(env);
1909         struct lustre_mdt_attrs *mdt_attrs = &info->oti_mdt_attrs;
1910         struct lu_device        *ldev   = obj->oo_dt.do_lu.lo_dev;
1911         struct dentry           *dentry = &info->oti_child_dentry;
1912         struct osd_inode_id     *id     = &info->oti_id;
1913         struct osd_device       *dev;
1914         struct inode            *inode;
1915         int                      rc;
1916
1917         ENTRY;
1918         dev  = osd_dev(ldev);
1919
1920         id->oii_ino = ino;
1921         id->oii_gen = OSD_OII_NOGEN;
1922
1923         inode = osd_iget(info, dev, id);
1924         if (IS_ERR(inode)) {
1925                 rc = PTR_ERR(inode);
1926                 GOTO(out,rc);
1927         }
1928         dentry->d_inode = inode;
1929
1930         LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
1931         rc = inode->i_op->getxattr(dentry, XATTR_NAME_LMA, (void *)mdt_attrs,
1932                                    sizeof *mdt_attrs);
1933
1934         /* Check LMA compatibility */
1935         if (rc > 0 &&
1936             (mdt_attrs->lma_incompat & ~cpu_to_le32(LMA_INCOMPAT_SUPP))) {
1937                 CWARN("Inode %lx: Unsupported incompat LMA feature(s) %#x\n",
1938                       inode->i_ino, le32_to_cpu(mdt_attrs->lma_incompat) &
1939                       ~LMA_INCOMPAT_SUPP);
1940                 return -ENOSYS;
1941         }
1942
1943         if (rc > 0) {
1944                 lustre_lma_swab(mdt_attrs);
1945                 memcpy(fid, &mdt_attrs->lma_self_fid, sizeof(*fid));
1946                 rc = 0;
1947         } else if (rc == -ENODATA) {
1948                 osd_igif_get(env, inode, fid);
1949                 rc = 0;
1950         }
1951         iput(inode);
1952 out:
1953         RETURN(rc);
1954 }
1955
1956 /**
1957  * OSD layer object create function for interoperability mode (b11826).
1958  * This is mostly similar to osd_object_create(). Only difference being, fid is
1959  * inserted into inode ea here.
1960  *
1961  * \retval   0, on success
1962  * \retval -ve, on error
1963  */
1964 static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt,
1965                              struct lu_attr *attr,
1966                              struct dt_allocation_hint *hint,
1967                              struct dt_object_format *dof,
1968                              struct thandle *th)
1969 {
1970         const struct lu_fid    *fid    = lu_object_fid(&dt->do_lu);
1971         struct osd_object      *obj    = osd_dt_obj(dt);
1972         struct osd_thread_info *info   = osd_oti_get(env);
1973         int result;
1974
1975         ENTRY;
1976
1977         LASSERT(osd_invariant(obj));
1978         LASSERT(!dt_object_exists(dt));
1979         LASSERT(osd_write_locked(env, obj));
1980         LASSERT(th != NULL);
1981
1982         result = __osd_object_create(info, obj, attr, hint, dof, th);
1983
1984         /* objects under osd root shld have igif fid, so dont add fid EA */
1985         if (result == 0 && fid_seq(fid) >= FID_SEQ_NORMAL)
1986                 result = osd_ea_fid_set(env, dt, fid);
1987
1988         if (result == 0)
1989                 result = __osd_oi_insert(env, obj, fid, th);
1990
1991         LASSERT(ergo(result == 0, dt_object_exists(dt)));
1992         LINVRNT(osd_invariant(obj));
1993         RETURN(result);
1994 }
1995
1996 /*
1997  * Concurrency: @dt is write locked.
1998  */
1999 static void osd_object_ref_add(const struct lu_env *env,
2000                                struct dt_object *dt,
2001                                struct thandle *th)
2002 {
2003         struct osd_object *obj = osd_dt_obj(dt);
2004         struct inode *inode = obj->oo_inode;
2005
2006         LINVRNT(osd_invariant(obj));
2007         LASSERT(dt_object_exists(dt));
2008         LASSERT(osd_write_locked(env, obj));
2009         LASSERT(th != NULL);
2010
2011         cfs_spin_lock(&obj->oo_guard);
2012         LASSERT(inode->i_nlink < LDISKFS_LINK_MAX);
2013         inode->i_nlink++;
2014         cfs_spin_unlock(&obj->oo_guard);
2015         inode->i_sb->s_op->dirty_inode(inode);
2016         LINVRNT(osd_invariant(obj));
2017 }
2018
2019 /*
2020  * Concurrency: @dt is write locked.
2021  */
2022 static void osd_object_ref_del(const struct lu_env *env,
2023                                struct dt_object *dt,
2024                                struct thandle *th)
2025 {
2026         struct osd_object *obj = osd_dt_obj(dt);
2027         struct inode *inode = obj->oo_inode;
2028
2029         LINVRNT(osd_invariant(obj));
2030         LASSERT(dt_object_exists(dt));
2031         LASSERT(osd_write_locked(env, obj));
2032         LASSERT(th != NULL);
2033
2034         cfs_spin_lock(&obj->oo_guard);
2035         LASSERT(inode->i_nlink > 0);
2036         inode->i_nlink--;
2037         cfs_spin_unlock(&obj->oo_guard);
2038         inode->i_sb->s_op->dirty_inode(inode);
2039         LINVRNT(osd_invariant(obj));
2040 }
2041
2042 /*
2043  * Concurrency: @dt is read locked.
2044  */
2045 static int osd_xattr_get(const struct lu_env *env,
2046                          struct dt_object *dt,
2047                          struct lu_buf *buf,
2048                          const char *name,
2049                          struct lustre_capa *capa)
2050 {
2051         struct osd_object      *obj    = osd_dt_obj(dt);
2052         struct inode           *inode  = obj->oo_inode;
2053         struct osd_thread_info *info   = osd_oti_get(env);
2054         struct dentry          *dentry = &info->oti_obj_dentry;
2055
2056         LASSERT(dt_object_exists(dt));
2057         LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
2058         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
2059
2060         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
2061                 return -EACCES;
2062
2063         dentry->d_inode = inode;
2064         return inode->i_op->getxattr(dentry, name, buf->lb_buf, buf->lb_len);
2065 }
2066
2067 /*
2068  * Concurrency: @dt is write locked.
2069  */
2070 static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
2071                          const struct lu_buf *buf, const char *name, int fl,
2072                          struct thandle *handle, struct lustre_capa *capa)
2073 {
2074         LASSERT(handle != NULL);
2075
2076         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
2077                 return -EACCES;
2078
2079         return __osd_xattr_set(env, dt, buf, name, fl);
2080 }
2081
2082 /*
2083  * Concurrency: @dt is read locked.
2084  */
2085 static int osd_xattr_list(const struct lu_env *env,
2086                           struct dt_object *dt,
2087                           struct lu_buf *buf,
2088                           struct lustre_capa *capa)
2089 {
2090         struct osd_object      *obj    = osd_dt_obj(dt);
2091         struct inode           *inode  = obj->oo_inode;
2092         struct osd_thread_info *info   = osd_oti_get(env);
2093         struct dentry          *dentry = &info->oti_obj_dentry;
2094
2095         LASSERT(dt_object_exists(dt));
2096         LASSERT(inode->i_op != NULL && inode->i_op->listxattr != NULL);
2097         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
2098
2099         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
2100                 return -EACCES;
2101
2102         dentry->d_inode = inode;
2103         return inode->i_op->listxattr(dentry, buf->lb_buf, buf->lb_len);
2104 }
2105
2106 /*
2107  * Concurrency: @dt is write locked.
2108  */
2109 static int osd_xattr_del(const struct lu_env *env,
2110                          struct dt_object *dt,
2111                          const char *name,
2112                          struct thandle *handle,
2113                          struct lustre_capa *capa)
2114 {
2115         struct osd_object      *obj    = osd_dt_obj(dt);
2116         struct inode           *inode  = obj->oo_inode;
2117         struct osd_thread_info *info   = osd_oti_get(env);
2118         struct dentry          *dentry = &info->oti_obj_dentry;
2119         int                     rc;
2120
2121         LASSERT(dt_object_exists(dt));
2122         LASSERT(inode->i_op != NULL && inode->i_op->removexattr != NULL);
2123         LASSERT(osd_write_locked(env, obj));
2124         LASSERT(handle != NULL);
2125
2126         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
2127                 return -EACCES;
2128
2129         dentry->d_inode = inode;
2130         rc = inode->i_op->removexattr(dentry, name);
2131         return rc;
2132 }
2133
2134 static struct obd_capa *osd_capa_get(const struct lu_env *env,
2135                                      struct dt_object *dt,
2136                                      struct lustre_capa *old,
2137                                      __u64 opc)
2138 {
2139         struct osd_thread_info *info = osd_oti_get(env);
2140         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
2141         struct osd_object *obj = osd_dt_obj(dt);
2142         struct osd_device *dev = osd_obj2dev(obj);
2143         struct lustre_capa_key *key = &info->oti_capa_key;
2144         struct lustre_capa *capa = &info->oti_capa;
2145         struct obd_capa *oc;
2146         struct md_capainfo *ci;
2147         int rc;
2148         ENTRY;
2149
2150         if (!dev->od_fl_capa)
2151                 RETURN(ERR_PTR(-ENOENT));
2152
2153         LASSERT(dt_object_exists(dt));
2154         LINVRNT(osd_invariant(obj));
2155
2156         /* renewal sanity check */
2157         if (old && osd_object_auth(env, dt, old, opc))
2158                 RETURN(ERR_PTR(-EACCES));
2159
2160         ci = md_capainfo(env);
2161         if (unlikely(!ci))
2162                 RETURN(ERR_PTR(-ENOENT));
2163
2164         switch (ci->mc_auth) {
2165         case LC_ID_NONE:
2166                 RETURN(NULL);
2167         case LC_ID_PLAIN:
2168                 capa->lc_uid = obj->oo_inode->i_uid;
2169                 capa->lc_gid = obj->oo_inode->i_gid;
2170                 capa->lc_flags = LC_ID_PLAIN;
2171                 break;
2172         case LC_ID_CONVERT: {
2173                 __u32 d[4], s[4];
2174
2175                 s[0] = obj->oo_inode->i_uid;
2176                 cfs_get_random_bytes(&(s[1]), sizeof(__u32));
2177                 s[2] = obj->oo_inode->i_gid;
2178                 cfs_get_random_bytes(&(s[3]), sizeof(__u32));
2179                 rc = capa_encrypt_id(d, s, key->lk_key, CAPA_HMAC_KEY_MAX_LEN);
2180                 if (unlikely(rc))
2181                         RETURN(ERR_PTR(rc));
2182
2183                 capa->lc_uid   = ((__u64)d[1] << 32) | d[0];
2184                 capa->lc_gid   = ((__u64)d[3] << 32) | d[2];
2185                 capa->lc_flags = LC_ID_CONVERT;
2186                 break;
2187         }
2188         default:
2189                 RETURN(ERR_PTR(-EINVAL));
2190         }
2191
2192         capa->lc_fid = *fid;
2193         capa->lc_opc = opc;
2194         capa->lc_flags |= dev->od_capa_alg << 24;
2195         capa->lc_timeout = dev->od_capa_timeout;
2196         capa->lc_expiry = 0;
2197
2198         oc = capa_lookup(dev->od_capa_hash, capa, 1);
2199         if (oc) {
2200                 LASSERT(!capa_is_expired(oc));
2201                 RETURN(oc);
2202         }
2203
2204         cfs_spin_lock(&capa_lock);
2205         *key = dev->od_capa_keys[1];
2206         cfs_spin_unlock(&capa_lock);
2207
2208         capa->lc_keyid = key->lk_keyid;
2209         capa->lc_expiry = cfs_time_current_sec() + dev->od_capa_timeout;
2210
2211         rc = capa_hmac(capa->lc_hmac, capa, key->lk_key);
2212         if (rc) {
2213                 DEBUG_CAPA(D_ERROR, capa, "HMAC failed: %d for", rc);
2214                 RETURN(ERR_PTR(rc));
2215         }
2216
2217         oc = capa_add(dev->od_capa_hash, capa);
2218         RETURN(oc);
2219 }
2220
2221 static int osd_object_sync(const struct lu_env *env, struct dt_object *dt)
2222 {
2223         int rc;
2224         struct osd_object      *obj    = osd_dt_obj(dt);
2225         struct inode           *inode  = obj->oo_inode;
2226         struct osd_thread_info *info   = osd_oti_get(env);
2227         struct dentry          *dentry = &info->oti_obj_dentry;
2228         struct file            *file   = &info->oti_file;
2229         ENTRY;
2230
2231         dentry->d_inode = inode;
2232         file->f_dentry = dentry;
2233         file->f_mapping = inode->i_mapping;
2234         file->f_op = inode->i_fop;
2235         LOCK_INODE_MUTEX(inode);
2236         rc = file->f_op->fsync(file, dentry, 0);
2237         UNLOCK_INODE_MUTEX(inode);
2238         RETURN(rc);
2239 }
2240
2241 /*
2242  * Get the 64-bit version for an inode.
2243  */
2244 static dt_obj_version_t osd_object_version_get(const struct lu_env *env,
2245                                                struct dt_object *dt)
2246 {
2247         struct inode *inode = osd_dt_obj(dt)->oo_inode;
2248
2249         CDEBUG(D_INFO, "Get version "LPX64" for inode %lu\n",
2250                LDISKFS_I(inode)->i_fs_version, inode->i_ino);
2251         return LDISKFS_I(inode)->i_fs_version;
2252 }
2253
2254 /*
2255  * Set the 64-bit version and return the old version.
2256  */
2257 static void osd_object_version_set(const struct lu_env *env, struct dt_object *dt,
2258                                    dt_obj_version_t new_version)
2259 {
2260         struct inode *inode = osd_dt_obj(dt)->oo_inode;
2261
2262         CDEBUG(D_INFO, "Set version "LPX64" (old "LPX64") for inode %lu\n",
2263                new_version, LDISKFS_I(inode)->i_fs_version, inode->i_ino);
2264         LDISKFS_I(inode)->i_fs_version = new_version;
2265         /** Version is set after all inode operations are finished,
2266          *  so we should mark it dirty here */
2267         inode->i_sb->s_op->dirty_inode(inode);
2268 }
2269
2270 static int osd_data_get(const struct lu_env *env, struct dt_object *dt,
2271                         void **data)
2272 {
2273         struct osd_object *obj = osd_dt_obj(dt);
2274         ENTRY;
2275
2276         *data = (void *)obj->oo_inode;
2277         RETURN(0);
2278 }
2279
2280 /*
2281  * Index operations.
2282  */
2283
2284 static int osd_iam_index_probe(const struct lu_env *env, struct osd_object *o,
2285                            const struct dt_index_features *feat)
2286 {
2287         struct iam_descr *descr;
2288
2289         if (osd_object_is_root(o))
2290                 return feat == &dt_directory_features;
2291
2292         LASSERT(o->oo_dir != NULL);
2293
2294         descr = o->oo_dir->od_container.ic_descr;
2295         if (feat == &dt_directory_features) {
2296                 if (descr->id_rec_size == sizeof(struct osd_fid_pack))
2297                         return 1;
2298                 else
2299                         return 0;
2300         } else {
2301                 return
2302                         feat->dif_keysize_min <= descr->id_key_size &&
2303                         descr->id_key_size <= feat->dif_keysize_max &&
2304                         feat->dif_recsize_min <= descr->id_rec_size &&
2305                         descr->id_rec_size <= feat->dif_recsize_max &&
2306                         !(feat->dif_flags & (DT_IND_VARKEY |
2307                                              DT_IND_VARREC | DT_IND_NONUNQ)) &&
2308                         ergo(feat->dif_flags & DT_IND_UPDATE,
2309                              1 /* XXX check that object (and file system) is
2310                                 * writable */);
2311         }
2312 }
2313
2314 static int osd_iam_container_init(const struct lu_env *env,
2315                                   struct osd_object *obj,
2316                                   struct osd_directory *dir)
2317 {
2318         int result;
2319         struct iam_container *bag;
2320
2321         bag    = &dir->od_container;
2322         result = iam_container_init(bag, &dir->od_descr, obj->oo_inode);
2323         if (result == 0) {
2324                 result = iam_container_setup(bag);
2325                 if (result == 0)
2326                         obj->oo_dt.do_index_ops = &osd_index_iam_ops;
2327                 else
2328                         iam_container_fini(bag);
2329         }
2330         return result;
2331 }
2332
2333
2334 /*
2335  * Concurrency: no external locking is necessary.
2336  */
2337 static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
2338                          const struct dt_index_features *feat)
2339 {
2340         int result;
2341         int ea_dir = 0;
2342         struct osd_object *obj = osd_dt_obj(dt);
2343         struct osd_device *osd = osd_obj2dev(obj);
2344
2345         LINVRNT(osd_invariant(obj));
2346         LASSERT(dt_object_exists(dt));
2347
2348         if (osd_object_is_root(obj)) {
2349                 dt->do_index_ops = &osd_index_ea_ops;
2350                 result = 0;
2351         } else if (feat == &dt_directory_features && osd->od_iop_mode) {
2352                 dt->do_index_ops = &osd_index_ea_ops;
2353                 if (S_ISDIR(obj->oo_inode->i_mode))
2354                         result = 0;
2355                 else
2356                         result = -ENOTDIR;
2357                 ea_dir = 1;
2358         } else if (!osd_has_index(obj)) {
2359                 struct osd_directory *dir;
2360
2361                 OBD_ALLOC_PTR(dir);
2362                 if (dir != NULL) {
2363
2364                         cfs_spin_lock(&obj->oo_guard);
2365                         if (obj->oo_dir == NULL)
2366                                 obj->oo_dir = dir;
2367                         else
2368                                 /*
2369                                  * Concurrent thread allocated container data.
2370                                  */
2371                                 OBD_FREE_PTR(dir);
2372                         cfs_spin_unlock(&obj->oo_guard);
2373                         /*
2374                          * Now, that we have container data, serialize its
2375                          * initialization.
2376                          */
2377                         cfs_down_write(&obj->oo_ext_idx_sem);
2378                         /*
2379                          * recheck under lock.
2380                          */
2381                         if (!osd_has_index(obj))
2382                                 result = osd_iam_container_init(env, obj, dir);
2383                         else
2384                                 result = 0;
2385                         cfs_up_write(&obj->oo_ext_idx_sem);
2386                 } else
2387                         result = -ENOMEM;
2388         } else
2389                 result = 0;
2390
2391         if (result == 0 && ea_dir == 0) {
2392                 if (!osd_iam_index_probe(env, obj, feat))
2393                         result = -ENOTDIR;
2394         }
2395         LINVRNT(osd_invariant(obj));
2396
2397         return result;
2398 }
2399
2400 static const struct dt_object_operations osd_obj_ops = {
2401         .do_read_lock    = osd_object_read_lock,
2402         .do_write_lock   = osd_object_write_lock,
2403         .do_read_unlock  = osd_object_read_unlock,
2404         .do_write_unlock = osd_object_write_unlock,
2405         .do_write_locked = osd_object_write_locked,
2406         .do_attr_get     = osd_attr_get,
2407         .do_attr_set     = osd_attr_set,
2408         .do_ah_init      = osd_ah_init,
2409         .do_create       = osd_object_create,
2410         .do_index_try    = osd_index_try,
2411         .do_ref_add      = osd_object_ref_add,
2412         .do_ref_del      = osd_object_ref_del,
2413         .do_xattr_get    = osd_xattr_get,
2414         .do_xattr_set    = osd_xattr_set,
2415         .do_xattr_del    = osd_xattr_del,
2416         .do_xattr_list   = osd_xattr_list,
2417         .do_capa_get     = osd_capa_get,
2418         .do_object_sync  = osd_object_sync,
2419         .do_version_get  = osd_object_version_get,
2420         .do_version_set  = osd_object_version_set,
2421         .do_data_get     = osd_data_get,
2422 };
2423
2424 /**
2425  * dt_object_operations for interoperability mode
2426  * (i.e. to run 2.0 mds on 1.8 disk) (b11826)
2427  */
2428 static const struct dt_object_operations osd_obj_ea_ops = {
2429         .do_read_lock    = osd_object_read_lock,
2430         .do_write_lock   = osd_object_write_lock,
2431         .do_read_unlock  = osd_object_read_unlock,
2432         .do_write_unlock = osd_object_write_unlock,
2433         .do_write_locked = osd_object_write_locked,
2434         .do_attr_get     = osd_attr_get,
2435         .do_attr_set     = osd_attr_set,
2436         .do_ah_init      = osd_ah_init,
2437         .do_create       = osd_object_ea_create,
2438         .do_index_try    = osd_index_try,
2439         .do_ref_add      = osd_object_ref_add,
2440         .do_ref_del      = osd_object_ref_del,
2441         .do_xattr_get    = osd_xattr_get,
2442         .do_xattr_set    = osd_xattr_set,
2443         .do_xattr_del    = osd_xattr_del,
2444         .do_xattr_list   = osd_xattr_list,
2445         .do_capa_get     = osd_capa_get,
2446         .do_object_sync  = osd_object_sync,
2447         .do_version_get  = osd_object_version_get,
2448         .do_version_set  = osd_object_version_set,
2449         .do_data_get     = osd_data_get,
2450 };
2451
2452 /*
2453  * Body operations.
2454  */
2455
2456 /*
2457  * XXX: Another layering violation for now.
2458  *
2459  * We don't want to use ->f_op->read methods, because generic file write
2460  *
2461  *         - serializes on ->i_sem, and
2462  *
2463  *         - does a lot of extra work like balance_dirty_pages(),
2464  *
2465  * which doesn't work for globally shared files like /last-received.
2466  */
2467 static int osd_ldiskfs_readlink(struct inode *inode, char *buffer, int buflen)
2468 {
2469         struct ldiskfs_inode_info *ei = LDISKFS_I(inode);
2470
2471         memcpy(buffer, (char*)ei->i_data, buflen);
2472
2473         return  buflen;
2474 }
2475
2476 static int osd_ldiskfs_read(struct inode *inode, void *buf, int size,
2477                             loff_t *offs)
2478 {
2479         struct buffer_head *bh;
2480         unsigned long block;
2481         int osize = size;
2482         int blocksize;
2483         int csize;
2484         int boffs;
2485         int err;
2486
2487         /* prevent reading after eof */
2488         spin_lock(&inode->i_lock);
2489         if (i_size_read(inode) < *offs + size) {
2490                 size = i_size_read(inode) - *offs;
2491                 spin_unlock(&inode->i_lock);
2492                 if (size < 0) {
2493                         CDEBUG(D_EXT2, "size %llu is too short to read @%llu\n",
2494                                i_size_read(inode), *offs);
2495                         return -EBADR;
2496                 } else if (size == 0) {
2497                         return 0;
2498                 }
2499         } else {
2500                 spin_unlock(&inode->i_lock);
2501         }
2502
2503         blocksize = 1 << inode->i_blkbits;
2504
2505         while (size > 0) {
2506                 block = *offs >> inode->i_blkbits;
2507                 boffs = *offs & (blocksize - 1);
2508                 csize = min(blocksize - boffs, size);
2509                 bh = ldiskfs_bread(NULL, inode, block, 0, &err);
2510                 if (!bh) {
2511                         CERROR("can't read block: %d\n", err);
2512                         return err;
2513                 }
2514
2515                 memcpy(buf, bh->b_data + boffs, csize);
2516                 brelse(bh);
2517
2518                 *offs += csize;
2519                 buf += csize;
2520                 size -= csize;
2521         }
2522         return osize;
2523 }
2524
2525 static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt,
2526                         struct lu_buf *buf, loff_t *pos,
2527                         struct lustre_capa *capa)
2528 {
2529         struct osd_object      *obj    = osd_dt_obj(dt);
2530         struct inode           *inode  = obj->oo_inode;
2531         int rc;
2532
2533         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
2534                 RETURN(-EACCES);
2535
2536         /* Read small symlink from inode body as we need to maintain correct
2537          * on-disk symlinks for ldiskfs.
2538          */
2539         if (S_ISLNK(obj->oo_dt.do_lu.lo_header->loh_attr) &&
2540             (buf->lb_len <= sizeof (LDISKFS_I(inode)->i_data)))
2541                 rc = osd_ldiskfs_readlink(inode, buf->lb_buf, buf->lb_len);
2542         else
2543                 rc = osd_ldiskfs_read(inode, buf->lb_buf, buf->lb_len, pos);
2544
2545         return rc;
2546 }
2547
2548 static int osd_ldiskfs_writelink(struct inode *inode, char *buffer, int buflen)
2549 {
2550
2551         memcpy((char*)&LDISKFS_I(inode)->i_data, (char *)buffer,
2552                buflen);
2553         LDISKFS_I(inode)->i_disksize = buflen;
2554         i_size_write(inode, buflen);
2555         inode->i_sb->s_op->dirty_inode(inode);
2556
2557         return 0;
2558 }
2559
2560 static int osd_ldiskfs_write_record(struct inode *inode, void *buf, int bufsize,
2561                                     loff_t *offs, handle_t *handle)
2562 {
2563         struct buffer_head *bh = NULL;
2564         loff_t offset = *offs;
2565         loff_t new_size = i_size_read(inode);
2566         unsigned long block;
2567         int blocksize = 1 << inode->i_blkbits;
2568         int err = 0;
2569         int size;
2570         int boffs;
2571         int dirty_inode = 0;
2572
2573         while (bufsize > 0) {
2574                 if (bh != NULL)
2575                         brelse(bh);
2576
2577                 block = offset >> inode->i_blkbits;
2578                 boffs = offset & (blocksize - 1);
2579                 size = min(blocksize - boffs, bufsize);
2580                 bh = ldiskfs_bread(handle, inode, block, 1, &err);
2581                 if (!bh) {
2582                         CERROR("can't read/create block: %d\n", err);
2583                         break;
2584                 }
2585
2586                 err = ldiskfs_journal_get_write_access(handle, bh);
2587                 if (err) {
2588                         CERROR("journal_get_write_access() returned error %d\n",
2589                                err);
2590                         break;
2591                 }
2592                 LASSERTF(boffs + size <= bh->b_size,
2593                          "boffs %d size %d bh->b_size %lu",
2594                          boffs, size, (unsigned long)bh->b_size);
2595                 memcpy(bh->b_data + boffs, buf, size);
2596                 err = ldiskfs_journal_dirty_metadata(handle, bh);
2597                 if (err)
2598                         break;
2599
2600                 if (offset + size > new_size)
2601                         new_size = offset + size;
2602                 offset += size;
2603                 bufsize -= size;
2604                 buf += size;
2605         }
2606         if (bh)
2607                 brelse(bh);
2608
2609         /* correct in-core and on-disk sizes */
2610         if (new_size > i_size_read(inode)) {
2611                 spin_lock(&inode->i_lock);
2612                 if (new_size > i_size_read(inode))
2613                         i_size_write(inode, new_size);
2614                 if (i_size_read(inode) > LDISKFS_I(inode)->i_disksize) {
2615                         LDISKFS_I(inode)->i_disksize = i_size_read(inode);
2616                         dirty_inode = 1;
2617                 }
2618                 spin_unlock(&inode->i_lock);
2619                 if (dirty_inode)
2620                         inode->i_sb->s_op->dirty_inode(inode);
2621         }
2622
2623         if (err == 0)
2624                 *offs = offset;
2625         return err;
2626 }
2627
2628 static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
2629                          const struct lu_buf *buf, loff_t *pos,
2630                          struct thandle *handle, struct lustre_capa *capa,
2631                          int ignore_quota)
2632 {
2633         struct osd_object  *obj   = osd_dt_obj(dt);
2634         struct inode       *inode = obj->oo_inode;
2635         struct osd_thandle *oh;
2636         ssize_t            result = 0;
2637 #ifdef HAVE_QUOTA_SUPPORT
2638         cfs_cap_t           save = cfs_curproc_cap_pack();
2639 #endif
2640
2641         LASSERT(handle != NULL);
2642
2643         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_WRITE))
2644                 RETURN(-EACCES);
2645
2646         oh = container_of(handle, struct osd_thandle, ot_super);
2647         LASSERT(oh->ot_handle->h_transaction != NULL);
2648 #ifdef HAVE_QUOTA_SUPPORT
2649         if (ignore_quota)
2650                 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
2651         else
2652                 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
2653 #endif
2654         /* Write small symlink to inode body as we need to maintain correct
2655          * on-disk symlinks for ldiskfs.
2656          */
2657         if(S_ISLNK(obj->oo_dt.do_lu.lo_header->loh_attr) &&
2658            (buf->lb_len < sizeof (LDISKFS_I(inode)->i_data)))
2659                 result = osd_ldiskfs_writelink(inode, buf->lb_buf, buf->lb_len);
2660         else
2661                 result = osd_ldiskfs_write_record(inode, buf->lb_buf,
2662                                                   buf->lb_len, pos,
2663                                                   oh->ot_handle);
2664 #ifdef HAVE_QUOTA_SUPPORT
2665         cfs_curproc_cap_unpack(save);
2666 #endif
2667         if (result == 0)
2668                 result = buf->lb_len;
2669         return result;
2670 }
2671
2672 static const struct dt_body_operations osd_body_ops = {
2673         .dbo_read  = osd_read,
2674         .dbo_write = osd_write
2675 };
2676
2677
2678 /**
2679  *      delete a (key, value) pair from index \a dt specified by \a key
2680  *
2681  *      \param  dt      osd index object
2682  *      \param  key     key for index
2683  *      \param  rec     record reference
2684  *      \param  handle  transaction handler
2685  *
2686  *      \retval  0  success
2687  *      \retval -ve   failure
2688  */
2689
2690 static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt,
2691                                 const struct dt_key *key, struct thandle *handle,
2692                                 struct lustre_capa *capa)
2693 {
2694         struct osd_object     *obj = osd_dt_obj(dt);
2695         struct osd_thandle    *oh;
2696         struct iam_path_descr *ipd;
2697         struct iam_container  *bag = &obj->oo_dir->od_container;
2698         int rc;
2699
2700         ENTRY;
2701
2702         LINVRNT(osd_invariant(obj));
2703         LASSERT(dt_object_exists(dt));
2704         LASSERT(bag->ic_object == obj->oo_inode);
2705         LASSERT(handle != NULL);
2706
2707         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
2708                 RETURN(-EACCES);
2709
2710         ipd = osd_idx_ipd_get(env, bag);
2711         if (unlikely(ipd == NULL))
2712                 RETURN(-ENOMEM);
2713
2714         oh = container_of0(handle, struct osd_thandle, ot_super);
2715         LASSERT(oh->ot_handle != NULL);
2716         LASSERT(oh->ot_handle->h_transaction != NULL);
2717
2718         rc = iam_delete(oh->ot_handle, bag, (const struct iam_key *)key, ipd);
2719         osd_ipd_put(env, bag, ipd);
2720         LINVRNT(osd_invariant(obj));
2721         RETURN(rc);
2722 }
2723
2724 static inline int osd_get_fid_from_dentry(struct ldiskfs_dir_entry_2 *de,
2725                                           struct dt_rec *fid)
2726 {
2727         struct osd_fid_pack *rec;
2728         int rc = -ENODATA;
2729
2730         if (de->file_type & LDISKFS_DIRENT_LUFID) {
2731                 rec = (struct osd_fid_pack *) (de->name + de->name_len + 1);
2732                 rc = osd_fid_unpack((struct lu_fid *)fid, rec);
2733         }
2734         RETURN(rc);
2735 }
2736
2737 /**
2738  * Index delete function for interoperability mode (b11826).
2739  * It will remove the directory entry added by osd_index_ea_insert().
2740  * This entry is needed to maintain name->fid mapping.
2741  *
2742  * \param key,  key i.e. file entry to be deleted
2743  *
2744  * \retval   0, on success
2745  * \retval -ve, on error
2746  */
2747 static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
2748                                const struct dt_key *key, struct thandle *handle,
2749                                struct lustre_capa *capa)
2750 {
2751         struct osd_object          *obj    = osd_dt_obj(dt);
2752         struct inode               *dir    = obj->oo_inode;
2753         struct dentry              *dentry;
2754         struct osd_thandle         *oh;
2755         struct ldiskfs_dir_entry_2 *de;
2756         struct buffer_head         *bh;
2757
2758         int rc;
2759
2760         ENTRY;
2761
2762         LINVRNT(osd_invariant(obj));
2763         LASSERT(dt_object_exists(dt));
2764         LASSERT(handle != NULL);
2765
2766         oh = container_of(handle, struct osd_thandle, ot_super);
2767         LASSERT(oh->ot_handle != NULL);
2768         LASSERT(oh->ot_handle->h_transaction != NULL);
2769
2770         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
2771                 RETURN(-EACCES);
2772
2773         dentry = osd_child_dentry_get(env, obj,
2774                                       (char *)key, strlen((char *)key));
2775
2776         cfs_down_write(&obj->oo_ext_idx_sem);
2777         bh = ll_ldiskfs_find_entry(dir, dentry, &de);
2778         if (bh) {
2779                 rc = ldiskfs_delete_entry(oh->ot_handle,
2780                                 dir, de, bh);
2781                 brelse(bh);
2782         } else
2783                 rc = -ENOENT;
2784
2785         cfs_up_write(&obj->oo_ext_idx_sem);
2786         LASSERT(osd_invariant(obj));
2787         RETURN(rc);
2788 }
2789
2790 /**
2791  *      Lookup index for \a key and copy record to \a rec.
2792  *
2793  *      \param  dt      osd index object
2794  *      \param  key     key for index
2795  *      \param  rec     record reference
2796  *
2797  *      \retval  +ve  success : exact mach
2798  *      \retval  0    return record with key not greater than \a key
2799  *      \retval -ve   failure
2800  */
2801 static int osd_index_iam_lookup(const struct lu_env *env, struct dt_object *dt,
2802                                 struct dt_rec *rec, const struct dt_key *key,
2803                                 struct lustre_capa *capa)
2804 {
2805         struct osd_object     *obj = osd_dt_obj(dt);
2806         struct iam_path_descr *ipd;
2807         struct iam_container  *bag = &obj->oo_dir->od_container;
2808         struct osd_thread_info *oti = osd_oti_get(env);
2809         struct iam_iterator    *it = &oti->oti_idx_it;
2810         struct iam_rec *iam_rec;
2811         int rc;
2812         ENTRY;
2813
2814         LASSERT(osd_invariant(obj));
2815         LASSERT(dt_object_exists(dt));
2816         LASSERT(bag->ic_object == obj->oo_inode);
2817
2818         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
2819                 RETURN(-EACCES);
2820
2821         ipd = osd_idx_ipd_get(env, bag);
2822         if (IS_ERR(ipd))
2823                 RETURN(-ENOMEM);
2824
2825         /* got ipd now we can start iterator. */
2826         iam_it_init(it, bag, 0, ipd);
2827
2828         rc = iam_it_get(it, (struct iam_key *)key);
2829         if (rc >= 0) {
2830                 if (S_ISDIR(obj->oo_inode->i_mode))
2831                         iam_rec = (struct iam_rec *)oti->oti_ldp;
2832                 else
2833                         iam_rec = (struct iam_rec *) rec;
2834
2835                 iam_reccpy(&it->ii_path.ip_leaf, (struct iam_rec *)iam_rec);
2836                 if (S_ISDIR(obj->oo_inode->i_mode))
2837                         osd_fid_unpack((struct lu_fid *) rec,
2838                                        (struct osd_fid_pack *)iam_rec);
2839         }
2840         iam_it_put(it);
2841         iam_it_fini(it);
2842         osd_ipd_put(env, bag, ipd);
2843
2844         LINVRNT(osd_invariant(obj));
2845
2846         RETURN(rc);
2847 }
2848
2849 /**
2850  *      Inserts (key, value) pair in \a dt index object.
2851  *
2852  *      \param  dt      osd index object
2853  *      \param  key     key for index
2854  *      \param  rec     record reference
2855  *      \param  th      transaction handler
2856  *
2857  *      \retval  0  success
2858  *      \retval -ve failure
2859  */
2860 static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt,
2861                                 const struct dt_rec *rec, const struct dt_key *key,
2862                                 struct thandle *th, struct lustre_capa *capa,
2863                                 int ignore_quota)
2864 {
2865         struct osd_object     *obj = osd_dt_obj(dt);
2866         struct iam_path_descr *ipd;
2867         struct osd_thandle    *oh;
2868         struct iam_container  *bag = &obj->oo_dir->od_container;
2869 #ifdef HAVE_QUOTA_SUPPORT
2870         cfs_cap_t              save = cfs_curproc_cap_pack();
2871 #endif
2872         struct osd_thread_info *oti = osd_oti_get(env);
2873         struct iam_rec *iam_rec = (struct iam_rec *)oti->oti_ldp;
2874         int rc;
2875
2876         ENTRY;
2877
2878         LINVRNT(osd_invariant(obj));
2879         LASSERT(dt_object_exists(dt));
2880         LASSERT(bag->ic_object == obj->oo_inode);
2881         LASSERT(th != NULL);
2882
2883         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
2884                 return -EACCES;
2885
2886         ipd = osd_idx_ipd_get(env, bag);
2887         if (unlikely(ipd == NULL))
2888                 RETURN(-ENOMEM);
2889
2890         oh = container_of0(th, struct osd_thandle, ot_super);
2891         LASSERT(oh->ot_handle != NULL);
2892         LASSERT(oh->ot_handle->h_transaction != NULL);
2893 #ifdef HAVE_QUOTA_SUPPORT
2894         if (ignore_quota)
2895                 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
2896         else
2897                 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
2898 #endif
2899         if (S_ISDIR(obj->oo_inode->i_mode))
2900                 osd_fid_pack((struct osd_fid_pack *)iam_rec, rec, &oti->oti_fid);
2901         else
2902                 iam_rec = (struct iam_rec *) rec;
2903         rc = iam_insert(oh->ot_handle, bag, (const struct iam_key *)key,
2904                         iam_rec, ipd);
2905 #ifdef HAVE_QUOTA_SUPPORT
2906         cfs_curproc_cap_unpack(save);
2907 #endif
2908         osd_ipd_put(env, bag, ipd);
2909         LINVRNT(osd_invariant(obj));
2910         RETURN(rc);
2911 }
2912
2913 /**
2914  * Calls ldiskfs_add_entry() to add directory entry
2915  * into the directory. This is required for
2916  * interoperability mode (b11826)
2917  *
2918  * \retval   0, on success
2919  * \retval -ve, on error
2920  */
2921 static int __osd_ea_add_rec(struct osd_thread_info *info,
2922                             struct osd_object *pobj,
2923                             struct inode  *cinode,
2924                             const char *name,
2925                             const struct dt_rec *fid,
2926                             struct thandle *th)
2927 {
2928         struct ldiskfs_dentry_param *ldp;
2929         struct dentry      *child;
2930         struct osd_thandle *oth;
2931         int rc;
2932
2933         oth = container_of(th, struct osd_thandle, ot_super);
2934         LASSERT(oth->ot_handle != NULL);
2935         LASSERT(oth->ot_handle->h_transaction != NULL);
2936
2937         child = osd_child_dentry_get(info->oti_env, pobj, name, strlen(name));
2938
2939         if (fid_is_igif((struct lu_fid *)fid) ||
2940             fid_is_norm((struct lu_fid *)fid)) {
2941                 ldp = (struct ldiskfs_dentry_param *)info->oti_ldp;
2942                 osd_get_ldiskfs_dirent_param(ldp, fid);
2943                 child->d_fsdata = (void*) ldp;
2944         } else
2945                 child->d_fsdata = NULL;
2946         rc = ldiskfs_add_entry(oth->ot_handle, child, cinode);
2947
2948         RETURN(rc);
2949 }
2950
2951 /**
2952  * Calls ldiskfs_add_dot_dotdot() to add dot and dotdot entries
2953  * into the directory.Also sets flags into osd object to
2954  * indicate dot and dotdot are created. This is required for
2955  * interoperability mode (b11826)
2956  *
2957  * \param dir   directory for dot and dotdot fixup.
2958  * \param obj   child object for linking
2959  *
2960  * \retval   0, on success
2961  * \retval -ve, on error
2962  */
2963 static int osd_add_dot_dotdot(struct osd_thread_info *info,
2964                               struct osd_object *dir,
2965                               struct inode  *parent_dir, const char *name,
2966                               const struct dt_rec *dot_fid,
2967                               const struct dt_rec *dot_dot_fid,
2968                               struct thandle *th)
2969 {
2970         struct inode            *inode  = dir->oo_inode;
2971         struct ldiskfs_dentry_param *dot_ldp;
2972         struct ldiskfs_dentry_param *dot_dot_ldp;
2973         struct osd_thandle      *oth;
2974         int result = 0;
2975
2976         oth = container_of(th, struct osd_thandle, ot_super);
2977         LASSERT(oth->ot_handle->h_transaction != NULL);
2978         LASSERT(S_ISDIR(dir->oo_inode->i_mode));
2979
2980         if (strcmp(name, dot) == 0) {
2981                 if (dir->oo_compat_dot_created) {
2982                         result = -EEXIST;
2983                 } else {
2984                         LASSERT(inode == parent_dir);
2985                         dir->oo_compat_dot_created = 1;
2986                         result = 0;
2987                 }
2988         } else if(strcmp(name, dotdot) == 0) {
2989                 dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp;
2990                 dot_dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp2;
2991
2992                 if (!dir->oo_compat_dot_created)
2993                         return -EINVAL;
2994                 if (fid_seq((struct lu_fid *)dot_fid) >= FID_SEQ_NORMAL) {
2995                         osd_get_ldiskfs_dirent_param(dot_ldp, dot_fid);
2996                         osd_get_ldiskfs_dirent_param(dot_dot_ldp, dot_dot_fid);
2997                 } else {
2998                         dot_ldp = NULL;
2999                         dot_dot_ldp = NULL;
3000                 }
3001                 /* in case of rename, dotdot is already created */
3002                 if (dir->oo_compat_dotdot_created) {
3003                         return __osd_ea_add_rec(info, dir, parent_dir, name,
3004                                                 dot_dot_fid, th);
3005                 }
3006
3007                 result = ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir, inode,
3008                                                 dot_ldp, dot_dot_ldp);
3009                 if (result == 0)
3010                        dir->oo_compat_dotdot_created = 1;
3011         }
3012
3013         return result;
3014 }
3015
3016
3017 /**
3018  * It will call the appropriate osd_add* function and return the
3019  * value, return by respective functions.
3020  */
3021 static int osd_ea_add_rec(const struct lu_env *env,
3022                           struct osd_object *pobj,
3023                           struct inode *cinode,
3024                           const char *name,
3025                           const struct dt_rec *fid,
3026                           struct thandle *th)
3027 {
3028         struct osd_thread_info    *info   = osd_oti_get(env);
3029         int rc;
3030
3031         if (name[0] == '.' && (name[1] == '\0' || (name[1] == '.' &&
3032                                                    name[2] =='\0')))
3033                 rc = osd_add_dot_dotdot(info, pobj, cinode, name,
3034                      (struct dt_rec *)lu_object_fid(&pobj->oo_dt.do_lu),
3035                                         fid, th);
3036         else
3037                 rc = __osd_ea_add_rec(info, pobj, cinode, name, fid, th);
3038
3039         return rc;
3040 }
3041
3042 /**
3043  * Calls ->lookup() to find dentry. From dentry get inode and
3044  * read inode's ea to get fid. This is required for  interoperability
3045  * mode (b11826)
3046  *
3047  * \retval   0, on success
3048  * \retval -ve, on error
3049  */
3050 static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
3051                              struct dt_rec *rec, const struct dt_key *key)
3052 {
3053         struct inode               *dir    = obj->oo_inode;
3054         struct dentry              *dentry;
3055         struct ldiskfs_dir_entry_2 *de;
3056         struct buffer_head         *bh;
3057         struct lu_fid              *fid = (struct lu_fid *) rec;
3058         int ino;
3059         int rc;
3060
3061         LASSERT(dir->i_op != NULL && dir->i_op->lookup != NULL);
3062
3063         dentry = osd_child_dentry_get(env, obj,
3064                                       (char *)key, strlen((char *)key));
3065
3066         cfs_down_read(&obj->oo_ext_idx_sem);
3067         bh = ll_ldiskfs_find_entry(dir, dentry, &de);
3068         if (bh) {
3069                 ino = le32_to_cpu(de->inode);
3070                 rc = osd_get_fid_from_dentry(de, rec);
3071
3072                 /* done with de, release bh */
3073                 brelse(bh);
3074                 if (rc != 0)
3075                         rc = osd_ea_fid_get(env, obj, ino, fid);
3076         } else
3077                 rc = -ENOENT;
3078
3079         cfs_up_read(&obj->oo_ext_idx_sem);
3080         RETURN (rc);
3081 }
3082
3083 /**
3084  * Find the osd object for given fid.
3085  *
3086  * \param fid need to find the osd object having this fid
3087  *
3088  * \retval osd_object on success
3089  * \retval        -ve on error
3090  */
3091 struct osd_object *osd_object_find(const struct lu_env *env,
3092                                    struct dt_object *dt,
3093                                    const struct lu_fid *fid)
3094 {
3095         struct lu_device         *ludev = dt->do_lu.lo_dev;
3096         struct osd_object        *child = NULL;
3097         struct lu_object         *luch;
3098         struct lu_object         *lo;
3099
3100         luch = lu_object_find(env, ludev, fid, NULL);
3101         if (!IS_ERR(luch)) {
3102                 if (lu_object_exists(luch)) {
3103                         lo = lu_object_locate(luch->lo_header, ludev->ld_type);
3104                         if (lo != NULL)
3105                                 child = osd_obj(lo);
3106                         else
3107                                 LU_OBJECT_DEBUG(D_ERROR, env, luch,
3108                                                 "lu_object can't be located"
3109                                                 ""DFID"\n", PFID(fid));
3110
3111                         if (child == NULL) {
3112                                 lu_object_put(env, luch);
3113                                 CERROR("Unable to get osd_object\n");
3114                                 child = ERR_PTR(-ENOENT);
3115                         }
3116                 } else {
3117                         LU_OBJECT_DEBUG(D_ERROR, env, luch,
3118                                         "lu_object does not exists "DFID"\n",
3119                                         PFID(fid));
3120                         child = ERR_PTR(-ENOENT);
3121                 }
3122         } else
3123                 child = (void *)luch;
3124
3125         return child;
3126 }
3127
3128 /**
3129  * Put the osd object once done with it.
3130  *
3131  * \param obj osd object that needs to be put
3132  */
3133 static inline void osd_object_put(const struct lu_env *env,
3134                                   struct osd_object *obj)
3135 {
3136         lu_object_put(env, &obj->oo_dt.do_lu);
3137 }
3138
3139 /**
3140  * Index add function for interoperability mode (b11826).
3141  * It will add the directory entry.This entry is needed to
3142  * maintain name->fid mapping.
3143  *
3144  * \param key it is key i.e. file entry to be inserted
3145  * \param rec it is value of given key i.e. fid
3146  *
3147  * \retval   0, on success
3148  * \retval -ve, on error
3149  */
3150 static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
3151                                const struct dt_rec *rec,
3152                                const struct dt_key *key, struct thandle *th,
3153                                struct lustre_capa *capa, int ignore_quota)
3154 {
3155         struct osd_object        *obj   = osd_dt_obj(dt);
3156         struct lu_fid            *fid   = (struct lu_fid *) rec;
3157         const char               *name  = (const char *)key;
3158         struct osd_object        *child;
3159 #ifdef HAVE_QUOTA_SUPPORT
3160         cfs_cap_t                 save  = cfs_curproc_cap_pack();
3161 #endif
3162         int rc;
3163
3164         ENTRY;
3165
3166         LASSERT(osd_invariant(obj));
3167         LASSERT(dt_object_exists(dt));
3168         LASSERT(th != NULL);
3169
3170         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
3171                 RETURN(-EACCES);
3172
3173         child = osd_object_find(env, dt, fid);
3174         if (!IS_ERR(child)) {
3175 #ifdef HAVE_QUOTA_SUPPORT
3176                 if (ignore_quota)
3177                         cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
3178                 else
3179                         cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
3180 #endif
3181                 cfs_down_write(&obj->oo_ext_idx_sem);
3182                 rc = osd_ea_add_rec(env, obj, child->oo_inode, name, rec, th);
3183                 cfs_up_write(&obj->oo_ext_idx_sem);
3184 #ifdef HAVE_QUOTA_SUPPORT
3185                 cfs_curproc_cap_unpack(save);
3186 #endif
3187                 osd_object_put(env, child);
3188         } else {
3189                 rc = PTR_ERR(child);
3190         }
3191
3192         LASSERT(osd_invariant(obj));
3193         RETURN(rc);
3194 }
3195
3196 /**
3197  *  Initialize osd Iterator for given osd index object.
3198  *
3199  *  \param  dt      osd index object
3200  */
3201
3202 static struct dt_it *osd_it_iam_init(const struct lu_env *env,
3203                                      struct dt_object *dt,
3204                                      __u32 unused,
3205                                      struct lustre_capa *capa)
3206 {
3207         struct osd_it_iam         *it;
3208         struct osd_thread_info *oti = osd_oti_get(env);
3209         struct osd_object     *obj = osd_dt_obj(dt);
3210         struct lu_object      *lo  = &dt->do_lu;
3211         struct iam_path_descr *ipd;
3212         struct iam_container  *bag = &obj->oo_dir->od_container;
3213
3214         LASSERT(lu_object_exists(lo));
3215
3216         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
3217                 return ERR_PTR(-EACCES);
3218
3219         it = &oti->oti_it;
3220         ipd = osd_it_ipd_get(env, bag);
3221         if (likely(ipd != NULL)) {
3222                 it->oi_obj = obj;
3223                 it->oi_ipd = ipd;
3224                 lu_object_get(lo);
3225                 iam_it_init(&it->oi_it, bag, IAM_IT_MOVE, ipd);
3226                 return (struct dt_it *)it;
3227         }
3228         return ERR_PTR(-ENOMEM);
3229 }
3230
3231 /**
3232  * free given Iterator.
3233  */
3234
3235 static void osd_it_iam_fini(const struct lu_env *env, struct dt_it *di)
3236 {
3237         struct osd_it_iam     *it = (struct osd_it_iam *)di;
3238         struct osd_object *obj = it->oi_obj;
3239
3240         iam_it_fini(&it->oi_it);
3241         osd_ipd_put(env, &obj->oo_dir->od_container, it->oi_ipd);
3242         lu_object_put(env, &obj->oo_dt.do_lu);
3243 }
3244
3245 /**
3246  *  Move Iterator to record specified by \a key
3247  *
3248  *  \param  di      osd iterator
3249  *  \param  key     key for index
3250  *
3251  *  \retval +ve  di points to record with least key not larger than key
3252  *  \retval  0   di points to exact matched key
3253  *  \retval -ve  failure
3254  */
3255
3256 static int osd_it_iam_get(const struct lu_env *env,
3257                       struct dt_it *di, const struct dt_key *key)
3258 {
3259         struct osd_it_iam *it = (struct osd_it_iam *)di;
3260
3261         return iam_it_get(&it->oi_it, (const struct iam_key *)key);
3262 }
3263
3264 /**
3265  *  Release Iterator
3266  *
3267  *  \param  di      osd iterator
3268  */
3269
3270 static void osd_it_iam_put(const struct lu_env *env, struct dt_it *di)
3271 {
3272         struct osd_it_iam *it = (struct osd_it_iam *)di;
3273
3274         iam_it_put(&it->oi_it);
3275 }
3276
3277 /**
3278  *  Move iterator by one record
3279  *
3280  *  \param  di      osd iterator
3281  *
3282  *  \retval +1   end of container reached
3283  *  \retval  0   success
3284  *  \retval -ve  failure
3285  */
3286
3287 static int osd_it_iam_next(const struct lu_env *env, struct dt_it *di)
3288 {
3289         struct osd_it_iam *it = (struct osd_it_iam *)di;
3290
3291         return iam_it_next(&it->oi_it);
3292 }
3293
3294 /**
3295  * Return pointer to the key under iterator.
3296  */
3297
3298 static struct dt_key *osd_it_iam_key(const struct lu_env *env,
3299                                  const struct dt_it *di)
3300 {
3301         struct osd_it_iam *it = (struct osd_it_iam *)di;
3302
3303         return (struct dt_key *)iam_it_key_get(&it->oi_it);
3304 }
3305
3306 /**
3307  * Return size of key under iterator (in bytes)
3308  */
3309
3310 static int osd_it_iam_key_size(const struct lu_env *env, const struct dt_it *di)
3311 {
3312         struct osd_it_iam *it = (struct osd_it_iam *)di;
3313
3314         return iam_it_key_size(&it->oi_it);
3315 }
3316
3317 static inline void osd_it_append_attrs(struct lu_dirent*ent,
3318                                        __u32 attr,
3319                                        int len,
3320                                        __u16 type)
3321 {
3322         struct luda_type        *lt;
3323         const unsigned           align = sizeof(struct luda_type) - 1;
3324
3325         /* check if file type is required */
3326         if (attr & LUDA_TYPE) {
3327                         len = (len + align) & ~align;
3328
3329                         lt = (void *) ent->lde_name + len;
3330                         lt->lt_type = cpu_to_le16(CFS_DTTOIF(type));
3331                         ent->lde_attrs |= LUDA_TYPE;
3332         }
3333
3334         ent->lde_attrs = cpu_to_le32(ent->lde_attrs);
3335 }
3336
3337 /**
3338  * build lu direct from backend fs dirent.
3339  */
3340
3341 static inline void osd_it_pack_dirent(struct lu_dirent *ent,
3342                                       struct lu_fid *fid,
3343                                       __u64 offset,
3344                                       char *name,
3345                                       __u16 namelen,
3346                                       __u16 type,
3347                                       __u32 attr)
3348 {
3349         fid_cpu_to_le(&ent->lde_fid, fid);
3350         ent->lde_attrs = LUDA_FID;
3351
3352         ent->lde_hash = cpu_to_le64(offset);
3353         ent->lde_reclen = cpu_to_le16(lu_dirent_calc_size(namelen, attr));
3354
3355         strncpy(ent->lde_name, name, namelen);
3356         ent->lde_namelen = cpu_to_le16(namelen);
3357
3358         /* append lustre attributes */
3359         osd_it_append_attrs(ent, attr, namelen, type);
3360 }
3361
3362 /**
3363  * Return pointer to the record under iterator.
3364  */
3365 static int osd_it_iam_rec(const struct lu_env *env,
3366                           const struct dt_it *di,
3367                           struct lu_dirent *lde,
3368                           __u32 attr)
3369 {
3370         struct osd_it_iam *it        = (struct osd_it_iam *)di;
3371         struct osd_thread_info *info = osd_oti_get(env);
3372         struct lu_fid     *fid       = &info->oti_fid;
3373         const struct osd_fid_pack *rec;
3374         char *name;
3375         int namelen;
3376         __u64 hash;
3377         int rc;
3378
3379         name = (char *)iam_it_key_get(&it->oi_it);
3380         if (IS_ERR(name))
3381                 RETURN(PTR_ERR(name));
3382
3383         namelen = iam_it_key_size(&it->oi_it);
3384
3385         rec = (const struct osd_fid_pack *) iam_it_rec_get(&it->oi_it);
3386         if (IS_ERR(rec))
3387                 RETURN(PTR_ERR(rec));
3388
3389         rc = osd_fid_unpack(fid, rec);
3390         if (rc)
3391                 RETURN(rc);
3392
3393         hash = iam_it_store(&it->oi_it);
3394
3395         /* IAM does not store object type in IAM index (dir) */
3396         osd_it_pack_dirent(lde, fid, hash, name, namelen,
3397                            0, LUDA_FID);
3398
3399         return 0;
3400 }
3401
3402 /**
3403  * Returns cookie for current Iterator position.
3404  */
3405 static __u64 osd_it_iam_store(const struct lu_env *env, const struct dt_it *di)
3406 {
3407         struct osd_it_iam *it = (struct osd_it_iam *)di;
3408
3409         return iam_it_store(&it->oi_it);
3410 }
3411
3412 /**
3413  * Restore iterator from cookie.
3414  *
3415  * \param  di      osd iterator
3416  * \param  hash    Iterator location cookie
3417  *
3418  * \retval +ve  di points to record with least key not larger than key.
3419  * \retval  0   di points to exact matched key
3420  * \retval -ve  failure
3421  */
3422
3423 static int osd_it_iam_load(const struct lu_env *env,
3424                        const struct dt_it *di, __u64 hash)
3425 {
3426         struct osd_it_iam *it = (struct osd_it_iam *)di;
3427
3428         return iam_it_load(&it->oi_it, hash);
3429 }
3430
3431 static const struct dt_index_operations osd_index_iam_ops = {
3432         .dio_lookup = osd_index_iam_lookup,
3433         .dio_insert = osd_index_iam_insert,
3434         .dio_delete = osd_index_iam_delete,
3435         .dio_it     = {
3436                 .init     = osd_it_iam_init,
3437                 .fini     = osd_it_iam_fini,
3438                 .get      = osd_it_iam_get,
3439                 .put      = osd_it_iam_put,
3440                 .next     = osd_it_iam_next,
3441                 .key      = osd_it_iam_key,
3442                 .key_size = osd_it_iam_key_size,
3443                 .rec      = osd_it_iam_rec,
3444                 .store    = osd_it_iam_store,
3445                 .load     = osd_it_iam_load
3446         }
3447 };
3448
3449 /**
3450  * Creates or initializes iterator context.
3451  *
3452  * \retval struct osd_it_ea, iterator structure on success
3453  *
3454  */
3455 static struct dt_it *osd_it_ea_init(const struct lu_env *env,
3456                                     struct dt_object *dt,
3457                                     __u32 attr,
3458                                     struct lustre_capa *capa)
3459 {
3460         struct osd_object       *obj  = osd_dt_obj(dt);
3461         struct osd_thread_info  *info = osd_oti_get(env);
3462         struct osd_it_ea        *it   = &info->oti_it_ea;
3463         struct lu_object        *lo   = &dt->do_lu;
3464         struct dentry           *obj_dentry = &info->oti_it_dentry;
3465         ENTRY;
3466         LASSERT(lu_object_exists(lo));
3467
3468         obj_dentry->d_inode = obj->oo_inode;
3469         obj_dentry->d_sb = osd_sb(osd_obj2dev(obj));
3470         obj_dentry->d_name.hash = 0;
3471
3472         it->oie_rd_dirent       = 0;
3473         it->oie_it_dirent       = 0;
3474         it->oie_dirent          = NULL;
3475         it->oie_buf             = info->oti_it_ea_buf;
3476         it->oie_obj             = obj;
3477         it->oie_file.f_pos      = 0;
3478         it->oie_file.f_dentry   = obj_dentry;
3479         if (attr & LUDA_64BITHASH)
3480                 it->oie_file.f_flags = O_64BITHASH;
3481         else
3482                 it->oie_file.f_flags = O_32BITHASH;
3483         it->oie_file.f_mapping    = obj->oo_inode->i_mapping;
3484         it->oie_file.f_op         = obj->oo_inode->i_fop;
3485         it->oie_file.private_data = NULL;
3486         lu_object_get(lo);
3487         RETURN((struct dt_it *) it);
3488 }
3489
3490 /**
3491  * Destroy or finishes iterator context.
3492  *
3493  * \param di iterator structure to be destroyed
3494  */
3495 static void osd_it_ea_fini(const struct lu_env *env, struct dt_it *di)
3496 {
3497         struct osd_it_ea     *it   = (struct osd_it_ea *)di;
3498         struct osd_object    *obj  = it->oie_obj;
3499         struct inode       *inode  = obj->oo_inode;
3500
3501         ENTRY;
3502         it->oie_file.f_op->release(inode, &it->oie_file);
3503         lu_object_put(env, &obj->oo_dt.do_lu);
3504         EXIT;
3505 }
3506
3507 /**
3508  * It position the iterator at given key, so that next lookup continues from
3509  * that key Or it is similar to dio_it->load() but based on a key,
3510  * rather than file position.
3511  *
3512  * As a special convention, osd_it_ea_get(env, di, "") has to rewind iterator
3513  * to the beginning.
3514  *
3515  * TODO: Presently return +1 considering it is only used by mdd_dir_is_empty().
3516  */
3517 static int osd_it_ea_get(const struct lu_env *env,
3518                          struct dt_it *di, const struct dt_key *key)
3519 {
3520         struct osd_it_ea     *it   = (struct osd_it_ea *)di;
3521
3522         ENTRY;
3523         LASSERT(((const char *)key)[0] == '\0');
3524         it->oie_file.f_pos      = 0;
3525         it->oie_rd_dirent       = 0;
3526         it->oie_it_dirent       = 0;
3527         it->oie_dirent          = NULL;
3528
3529         RETURN(+1);
3530 }
3531
3532 /**
3533  * Does nothing
3534  */
3535 static void osd_it_ea_put(const struct lu_env *env, struct dt_it *di)
3536 {
3537 }
3538
3539 /**
3540  * It is called internally by ->readdir(). It fills the
3541  * iterator's in-memory data structure with required
3542  * information i.e. name, namelen, rec_size etc.
3543  *
3544  * \param buf in which information to be filled in.
3545  * \param name name of the file in given dir
3546  *
3547  * \retval 0 on success
3548  * \retval 1 on buffer full
3549  */
3550 static int osd_ldiskfs_filldir(char *buf, const char *name, int namelen,
3551                                loff_t offset, __u64 ino,
3552                                unsigned d_type)
3553 {
3554         struct osd_it_ea        *it   = (struct osd_it_ea *)buf;
3555         struct osd_it_ea_dirent *ent  = it->oie_dirent;
3556         struct lu_fid           *fid  = &ent->oied_fid;
3557         struct osd_fid_pack     *rec;
3558         ENTRY;
3559
3560         /* this should never happen */
3561         if (unlikely(namelen == 0 || namelen > LDISKFS_NAME_LEN)) {
3562                 CERROR("ldiskfs return invalid namelen %d\n", namelen);
3563                 RETURN(-EIO);
3564         }
3565
3566         if ((void *) ent - it->oie_buf + sizeof(*ent) + namelen >
3567             OSD_IT_EA_BUFSIZE)
3568                 RETURN(1);
3569
3570         if (d_type & LDISKFS_DIRENT_LUFID) {
3571                 rec = (struct osd_fid_pack*) (name + namelen + 1);
3572
3573                 if (osd_fid_unpack(fid, rec) != 0)
3574                         fid_zero(fid);
3575
3576                 d_type &= ~LDISKFS_DIRENT_LUFID;
3577         } else {
3578                 fid_zero(fid);
3579         }
3580
3581         ent->oied_ino     = ino;
3582         ent->oied_off     = offset;
3583         ent->oied_namelen = namelen;
3584         ent->oied_type    = d_type;
3585
3586         memcpy(ent->oied_name, name, namelen);
3587
3588         it->oie_rd_dirent++;
3589         it->oie_dirent = (void *) ent + cfs_size_round(sizeof(*ent) + namelen);
3590         RETURN(0);
3591 }
3592
3593 /**
3594  * Calls ->readdir() to load a directory entry at a time
3595  * and stored it in iterator's in-memory data structure.
3596  *
3597  * \param di iterator's in memory structure
3598  *
3599  * \retval   0 on success
3600  * \retval -ve on error
3601  */
3602 static int osd_ldiskfs_it_fill(const struct dt_it *di)
3603 {
3604         struct osd_it_ea   *it    = (struct osd_it_ea *)di;
3605         struct osd_object  *obj   = it->oie_obj;
3606         struct inode       *inode = obj->oo_inode;
3607         int                result = 0;
3608
3609         ENTRY;
3610         it->oie_dirent = it->oie_buf;
3611         it->oie_rd_dirent = 0;
3612
3613         cfs_down_read(&obj->oo_ext_idx_sem);
3614         result = inode->i_fop->readdir(&it->oie_file, it,
3615                                        (filldir_t) osd_ldiskfs_filldir);
3616
3617         cfs_up_read(&obj->oo_ext_idx_sem);
3618
3619         if (it->oie_rd_dirent == 0) {
3620                 result = -EIO;
3621         } else {
3622                 it->oie_dirent = it->oie_buf;
3623                 it->oie_it_dirent = 1;
3624         }
3625
3626         RETURN(result);
3627 }
3628
3629 /**
3630  * It calls osd_ldiskfs_it_fill() which will use ->readdir()
3631  * to load a directory entry at a time and stored it in
3632  * iterator's in-memory data structure.
3633  *
3634  * \param di iterator's in memory structure
3635  *
3636  * \retval +ve iterator reached to end
3637  * \retval   0 iterator not reached to end
3638  * \retval -ve on error
3639  */
3640 static int osd_it_ea_next(const struct lu_env *env, struct dt_it *di)
3641 {
3642         struct osd_it_ea *it = (struct osd_it_ea *)di;
3643         int rc;
3644
3645         ENTRY;
3646
3647         if (it->oie_it_dirent < it->oie_rd_dirent) {
3648                 it->oie_dirent =
3649                         (void *) it->oie_dirent +
3650                         cfs_size_round(sizeof(struct osd_it_ea_dirent) +
3651                                        it->oie_dirent->oied_namelen);
3652                 it->oie_it_dirent++;
3653                 RETURN(0);
3654         } else {
3655                 if (it->oie_file.f_pos == LDISKFS_HTREE_EOF)
3656                         rc = +1;
3657                 else
3658                         rc = osd_ldiskfs_it_fill(di);
3659         }
3660
3661         RETURN(rc);
3662 }
3663
3664 /**
3665  * Returns the key at current position from iterator's in memory structure.
3666  *
3667  * \param di iterator's in memory structure
3668  *
3669  * \retval key i.e. struct dt_key on success
3670  */
3671 static struct dt_key *osd_it_ea_key(const struct lu_env *env,
3672                                     const struct dt_it *di)
3673 {
3674         struct osd_it_ea *it = (struct osd_it_ea *)di;
3675         ENTRY;
3676         RETURN((struct dt_key *)it->oie_dirent->oied_name);
3677 }
3678
3679 /**
3680  * Returns the key's size at current position from iterator's in memory structure.
3681  *
3682  * \param di iterator's in memory structure
3683  *
3684  * \retval key_size i.e. struct dt_key on success
3685  */
3686 static int osd_it_ea_key_size(const struct lu_env *env, const struct dt_it *di)
3687 {
3688         struct osd_it_ea *it = (struct osd_it_ea *)di;
3689         ENTRY;
3690         RETURN(it->oie_dirent->oied_namelen);
3691 }
3692
3693
3694 /**
3695  * Returns the value (i.e. fid/igif) at current position from iterator's
3696  * in memory structure.
3697  *
3698  * \param di struct osd_it_ea, iterator's in memory structure
3699  * \param attr attr requested for dirent.
3700  * \param lde lustre dirent
3701  *
3702  * \retval   0 no error and \param lde has correct lustre dirent.
3703  * \retval -ve on error
3704  */
3705 static inline int osd_it_ea_rec(const struct lu_env *env,
3706                                 const struct dt_it *di,
3707                                 struct lu_dirent *lde,
3708                                 __u32 attr)
3709 {
3710         struct osd_it_ea        *it     = (struct osd_it_ea *)di;
3711         struct osd_object       *obj    = it->oie_obj;
3712         struct lu_fid           *fid    = &it->oie_dirent->oied_fid;
3713         int    rc = 0;
3714
3715         ENTRY;
3716
3717         if (!fid_is_sane(fid))
3718                 rc = osd_ea_fid_get(env, obj, it->oie_dirent->oied_ino, fid);
3719
3720         if (rc == 0)
3721                 osd_it_pack_dirent(lde, fid, it->oie_dirent->oied_off,
3722                                    it->oie_dirent->oied_name,
3723                                    it->oie_dirent->oied_namelen,
3724                                    it->oie_dirent->oied_type,
3725                                    attr);
3726         RETURN(rc);
3727 }
3728
3729 /**
3730  * Returns a cookie for current position of the iterator head, so that
3731  * user can use this cookie to load/start the iterator next time.
3732  *
3733  * \param di iterator's in memory structure
3734  *
3735  * \retval cookie for current position, on success
3736  */
3737 static __u64 osd_it_ea_store(const struct lu_env *env, const struct dt_it *di)
3738 {
3739         struct osd_it_ea *it = (struct osd_it_ea *)di;
3740         ENTRY;
3741         RETURN(it->oie_dirent->oied_off);
3742 }
3743
3744 /**
3745  * It calls osd_ldiskfs_it_fill() which will use ->readdir()
3746  * to load a directory entry at a time and stored it i inn,
3747  * in iterator's in-memory data structure.
3748  *
3749  * \param di struct osd_it_ea, iterator's in memory structure
3750  *
3751  * \retval +ve on success
3752  * \retval -ve on error
3753  */
3754 static int osd_it_ea_load(const struct lu_env *env,
3755                           const struct dt_it *di, __u64 hash)
3756 {
3757         struct osd_it_ea *it = (struct osd_it_ea *)di;
3758         int rc;
3759
3760         ENTRY;
3761         it->oie_file.f_pos = hash;
3762
3763         rc =  osd_ldiskfs_it_fill(di);
3764         if (rc == 0)
3765                 rc = +1;
3766
3767         RETURN(rc);
3768 }
3769
3770 /**
3771  * Index lookup function for interoperability mode (b11826).
3772  *
3773  * \param key,  key i.e. file name to be searched
3774  *
3775  * \retval +ve, on success
3776  * \retval -ve, on error
3777  */
3778 static int osd_index_ea_lookup(const struct lu_env *env, struct dt_object *dt,
3779                                struct dt_rec *rec, const struct dt_key *key,
3780                                struct lustre_capa *capa)
3781 {
3782         struct osd_object *obj = osd_dt_obj(dt);
3783         int rc = 0;
3784
3785         ENTRY;
3786
3787         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
3788         LINVRNT(osd_invariant(obj));
3789
3790         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
3791                 return -EACCES;
3792
3793         rc = osd_ea_lookup_rec(env, obj, rec, key);
3794
3795         if (rc == 0)
3796                 rc = +1;
3797         RETURN(rc);
3798 }
3799
3800 /**
3801  * Index and Iterator operations for interoperability
3802  * mode (i.e. to run 2.0 mds on 1.8 disk) (b11826)
3803  */
3804 static const struct dt_index_operations osd_index_ea_ops = {
3805         .dio_lookup = osd_index_ea_lookup,
3806         .dio_insert = osd_index_ea_insert,
3807         .dio_delete = osd_index_ea_delete,
3808         .dio_it     = {
3809                 .init     = osd_it_ea_init,
3810                 .fini     = osd_it_ea_fini,
3811                 .get      = osd_it_ea_get,
3812                 .put      = osd_it_ea_put,
3813                 .next     = osd_it_ea_next,
3814                 .key      = osd_it_ea_key,
3815                 .key_size = osd_it_ea_key_size,
3816                 .rec      = osd_it_ea_rec,
3817                 .store    = osd_it_ea_store,
3818                 .load     = osd_it_ea_load
3819         }
3820 };
3821
3822 static void *osd_key_init(const struct lu_context *ctx,
3823                           struct lu_context_key *key)
3824 {
3825         struct osd_thread_info *info;
3826
3827         OBD_ALLOC_PTR(info);
3828         if (info != NULL) {
3829                 OBD_ALLOC(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
3830                 if (info->oti_it_ea_buf != NULL) {
3831                         info->oti_env = container_of(ctx, struct lu_env,
3832                                                      le_ctx);
3833                 } else {
3834                         OBD_FREE_PTR(info);
3835                         info = ERR_PTR(-ENOMEM);
3836                 }
3837         } else {
3838                 info = ERR_PTR(-ENOMEM);
3839         }
3840         return info;
3841 }
3842
3843 static void osd_key_fini(const struct lu_context *ctx,
3844                          struct lu_context_key *key, void* data)
3845 {
3846         struct osd_thread_info *info = data;
3847
3848         OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
3849         OBD_FREE_PTR(info);
3850 }
3851
3852 static void osd_key_exit(const struct lu_context *ctx,
3853                          struct lu_context_key *key, void *data)
3854 {
3855         struct osd_thread_info *info = data;
3856
3857         LASSERT(info->oti_r_locks == 0);
3858         LASSERT(info->oti_w_locks == 0);
3859         LASSERT(info->oti_txns    == 0);
3860 }
3861
3862 /* type constructor/destructor: osd_type_init, osd_type_fini */
3863 LU_TYPE_INIT_FINI(osd, &osd_key);
3864
3865 static struct lu_context_key osd_key = {
3866         .lct_tags = LCT_DT_THREAD | LCT_MD_THREAD,
3867         .lct_init = osd_key_init,
3868         .lct_fini = osd_key_fini,
3869         .lct_exit = osd_key_exit
3870 };
3871
3872
3873 static int osd_device_init(const struct lu_env *env, struct lu_device *d,
3874                            const char *name, struct lu_device *next)
3875 {
3876         int rc;
3877         struct lu_context *ctx;
3878
3879         /* context for commit hooks */
3880         ctx = &osd_dev(d)->od_env_for_commit.le_ctx;
3881         rc = lu_context_init(ctx, LCT_MD_THREAD|LCT_REMEMBER|LCT_NOREF);
3882         if (rc == 0) {
3883                 rc = osd_procfs_init(osd_dev(d), name);
3884                 ctx->lc_cookie = 0x3;
3885         }
3886         return rc;
3887 }
3888
3889 static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
3890 {
3891         struct osd_thread_info *info = osd_oti_get(env);
3892         ENTRY;
3893         if (o->od_obj_area != NULL) {
3894                 lu_object_put(env, &o->od_obj_area->do_lu);
3895                 o->od_obj_area = NULL;
3896         }
3897         osd_oi_fini(info, &o->od_oi);
3898
3899         RETURN(0);
3900 }
3901
3902 static int osd_mount(const struct lu_env *env,
3903                      struct osd_device *o, struct lustre_cfg *cfg)
3904 {
3905         struct lustre_mount_info *lmi;
3906         const char               *dev  = lustre_cfg_string(cfg, 0);
3907         struct lustre_disk_data  *ldd;
3908         struct lustre_sb_info    *lsi;
3909
3910         ENTRY;
3911         if (o->od_mount != NULL) {
3912                 CERROR("Already mounted (%s)\n", dev);
3913                 RETURN(-EEXIST);
3914         }
3915
3916         /* get mount */
3917         lmi = server_get_mount(dev);
3918         if (lmi == NULL) {
3919                 CERROR("Cannot get mount info for %s!\n", dev);
3920                 RETURN(-EFAULT);
3921         }
3922
3923         LASSERT(lmi != NULL);
3924         /* save lustre_mount_info in dt_device */
3925         o->od_mount = lmi;
3926
3927         lsi = s2lsi(lmi->lmi_sb);
3928         ldd = lsi->lsi_ldd;
3929
3930         if (ldd->ldd_flags & LDD_F_IAM_DIR) {
3931                 o->od_iop_mode = 0;
3932                 LCONSOLE_WARN("OSD: IAM mode enabled\n");
3933         } else
3934                 o->od_iop_mode = 1;
3935
3936         o->od_obj_area = NULL;
3937         RETURN(0);
3938 }
3939
3940 static struct lu_device *osd_device_fini(const struct lu_env *env,
3941                                          struct lu_device *d)
3942 {
3943         int rc;
3944         ENTRY;
3945
3946         shrink_dcache_sb(osd_sb(osd_dev(d)));
3947         osd_sync(env, lu2dt_dev(d));
3948
3949         rc = osd_procfs_fini(osd_dev(d));
3950         if (rc) {
3951                 CERROR("proc fini error %d \n", rc);
3952                 RETURN (ERR_PTR(rc));
3953         }
3954
3955         if (osd_dev(d)->od_mount)
3956                 server_put_mount(osd_dev(d)->od_mount->lmi_name,
3957                                  osd_dev(d)->od_mount->lmi_mnt);
3958         osd_dev(d)->od_mount = NULL;
3959
3960         lu_context_fini(&osd_dev(d)->od_env_for_commit.le_ctx);
3961         RETURN(NULL);
3962 }
3963
3964 static struct lu_device *osd_device_alloc(const struct lu_env *env,
3965                                           struct lu_device_type *t,
3966                                           struct lustre_cfg *cfg)
3967 {
3968         struct lu_device  *l;
3969         struct osd_device *o;
3970
3971         OBD_ALLOC_PTR(o);
3972         if (o != NULL) {
3973                 int result;
3974
3975                 result = dt_device_init(&o->od_dt_dev, t);
3976                 if (result == 0) {
3977                         l = osd2lu_dev(o);
3978                         l->ld_ops = &osd_lu_ops;
3979                         o->od_dt_dev.dd_ops = &osd_dt_ops;
3980                         cfs_spin_lock_init(&o->od_osfs_lock);
3981                         o->od_osfs_age = cfs_time_shift_64(-1000);
3982                         o->od_capa_hash = init_capa_hash();
3983                         if (o->od_capa_hash == NULL) {
3984                                 dt_device_fini(&o->od_dt_dev);
3985                                 l = ERR_PTR(-ENOMEM);
3986                         }
3987                 } else
3988                         l = ERR_PTR(result);
3989
3990                 if (IS_ERR(l))
3991                         OBD_FREE_PTR(o);
3992         } else
3993                 l = ERR_PTR(-ENOMEM);
3994         return l;
3995 }
3996
3997 static struct lu_device *osd_device_free(const struct lu_env *env,
3998                                          struct lu_device *d)
3999 {
4000         struct osd_device *o = osd_dev(d);
4001         ENTRY;
4002
4003         cleanup_capa_hash(o->od_capa_hash);
4004         dt_device_fini(&o->od_dt_dev);
4005         OBD_FREE_PTR(o);
4006         RETURN(NULL);
4007 }
4008
4009 static int osd_process_config(const struct lu_env *env,
4010                               struct lu_device *d, struct lustre_cfg *cfg)
4011 {
4012         struct osd_device *o = osd_dev(d);
4013         int err;
4014         ENTRY;
4015
4016         switch(cfg->lcfg_command) {
4017         case LCFG_SETUP:
4018                 err = osd_mount(env, o, cfg);
4019                 break;
4020         case LCFG_CLEANUP:
4021                 err = osd_shutdown(env, o);
4022                 break;
4023         default:
4024                 err = -ENOSYS;
4025         }
4026
4027         RETURN(err);
4028 }
4029
4030 static int osd_recovery_complete(const struct lu_env *env,
4031                                  struct lu_device *d)
4032 {
4033         RETURN(0);
4034 }
4035
4036 static int osd_prepare(const struct lu_env *env,
4037                        struct lu_device *pdev,
4038                        struct lu_device *dev)
4039 {
4040         struct osd_device *osd = osd_dev(dev);
4041         struct lustre_sb_info *lsi;
4042         struct lustre_disk_data *ldd;
4043         struct lustre_mount_info  *lmi;
4044         struct osd_thread_info *oti = osd_oti_get(env);
4045         struct dt_object *d;
4046         int result;
4047
4048         ENTRY;
4049         /* 1. initialize oi before any file create or file open */
4050         result = osd_oi_init(oti, &osd->od_oi,
4051                              &osd->od_dt_dev, lu2md_dev(pdev));
4052         if (result != 0)
4053                 RETURN(result);
4054
4055         lmi = osd->od_mount;
4056         lsi = s2lsi(lmi->lmi_sb);
4057         ldd = lsi->lsi_ldd;
4058
4059         /* 2. setup local objects */
4060         result = llo_local_objects_setup(env, lu2md_dev(pdev), lu2dt_dev(dev));
4061         if (result)
4062                 goto out;
4063
4064         /* 3. open remote object dir */
4065         d = dt_store_open(env, lu2dt_dev(dev), "",
4066                           remote_obj_dir, &oti->oti_fid);
4067         if (!IS_ERR(d)) {
4068                 osd->od_obj_area = d;
4069                 result = 0;
4070         } else {
4071                 result = PTR_ERR(d);
4072                 osd->od_obj_area = NULL;
4073         }
4074
4075 out:
4076         RETURN(result);
4077 }
4078
4079 static const struct lu_object_operations osd_lu_obj_ops = {
4080         .loo_object_init      = osd_object_init,
4081         .loo_object_delete    = osd_object_delete,
4082         .loo_object_release   = osd_object_release,
4083         .loo_object_free      = osd_object_free,
4084         .loo_object_print     = osd_object_print,
4085         .loo_object_invariant = osd_object_invariant
4086 };
4087
4088 static const struct lu_device_operations osd_lu_ops = {
4089         .ldo_object_alloc      = osd_object_alloc,
4090         .ldo_process_config    = osd_process_config,
4091         .ldo_recovery_complete = osd_recovery_complete,
4092         .ldo_prepare           = osd_prepare,
4093 };
4094
4095 static const struct lu_device_type_operations osd_device_type_ops = {
4096         .ldto_init = osd_type_init,
4097         .ldto_fini = osd_type_fini,
4098
4099         .ldto_start = osd_type_start,
4100         .ldto_stop  = osd_type_stop,
4101
4102         .ldto_device_alloc = osd_device_alloc,
4103         .ldto_device_free  = osd_device_free,
4104
4105         .ldto_device_init    = osd_device_init,
4106         .ldto_device_fini    = osd_device_fini
4107 };
4108
4109 static struct lu_device_type osd_device_type = {
4110         .ldt_tags     = LU_DEVICE_DT,
4111         .ldt_name     = LUSTRE_OSD_NAME,
4112         .ldt_ops      = &osd_device_type_ops,
4113         .ldt_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
4114 };
4115
4116 /*
4117  * lprocfs legacy support.
4118  */
4119 static struct obd_ops osd_obd_device_ops = {
4120         .o_owner = THIS_MODULE
4121 };
4122
4123 static struct lu_local_obj_desc llod_osd_rem_obj_dir = {
4124         .llod_name      = remote_obj_dir,
4125         .llod_oid       = OSD_REM_OBJ_DIR_OID,
4126         .llod_is_index  = 1,
4127         .llod_feat      = &dt_directory_features,
4128 };
4129
4130 static int __init osd_mod_init(void)
4131 {
4132         struct lprocfs_static_vars lvars;
4133
4134         osd_oi_mod_init();
4135         llo_local_obj_register(&llod_osd_rem_obj_dir);
4136         lprocfs_osd_init_vars(&lvars);
4137         return class_register_type(&osd_obd_device_ops, NULL, lvars.module_vars,
4138                                    LUSTRE_OSD_NAME, &osd_device_type);
4139 }
4140
4141 static void __exit osd_mod_exit(void)
4142 {
4143         llo_local_obj_unregister(&llod_osd_rem_obj_dir);
4144         class_unregister_type(LUSTRE_OSD_NAME);
4145 }
4146
4147 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4148 MODULE_DESCRIPTION("Lustre Object Storage Device ("LUSTRE_OSD_NAME")");
4149 MODULE_LICENSE("GPL");
4150
4151 cfs_module(osd, "0.0.2", osd_mod_init, osd_mod_exit);