Whamcloud - gitweb
LU-822 osd: multiple Object Index files
[fs/lustre-release.git] / lustre / osd-ldiskfs / osd_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011 Whamcloud, Inc.
33  *
34  */
35 /*
36  * Copyright (c) 2011 Whamcloud, Inc.
37  */
38 /*
39  * This file is part of Lustre, http://www.lustre.org/
40  * Lustre is a trademark of Sun Microsystems, Inc.
41  *
42  * lustre/osd/osd_handler.c
43  *
44  * Top-level entry points into osd module
45  *
46  * Author: Nikita Danilov <nikita@clusterfs.com>
47  *         Pravin Shelar <pravin.shelar@sun.com> : Added fid in dirent
48  */
49
50 #ifndef EXPORT_SYMTAB
51 # define EXPORT_SYMTAB
52 #endif
53 #define DEBUG_SUBSYSTEM S_MDS
54
55 #include <linux/module.h>
56
57 /* LUSTRE_VERSION_CODE */
58 #include <lustre_ver.h>
59 /* prerequisite for linux/xattr.h */
60 #include <linux/types.h>
61 /* prerequisite for linux/xattr.h */
62 #include <linux/fs.h>
63 /* XATTR_{REPLACE,CREATE} */
64 #include <linux/xattr.h>
65 /* simple_mkdir() */
66 #include <lvfs.h>
67
68 /*
69  * struct OBD_{ALLOC,FREE}*()
70  * OBD_FAIL_CHECK
71  */
72 #include <obd_support.h>
73 /* struct ptlrpc_thread */
74 #include <lustre_net.h>
75
76 /* fid_is_local() */
77 #include <lustre_fid.h>
78
79 #include "osd_internal.h"
80 #include "osd_igif.h"
81
82 /* llo_* api support */
83 #include <md_object.h>
84
85 #ifdef HAVE_LDISKFS_PDO
86 int ldiskfs_pdo = 1;
87 CFS_MODULE_PARM(ldiskfs_pdo, "i", int, 0644,
88                 "ldiskfs with parallel directory operations");
89 #else
90 int ldiskfs_pdo = 0;
91 #endif
92
93 static const char dot[] = ".";
94 static const char dotdot[] = "..";
95 static const char remote_obj_dir[] = "REM_OBJ_DIR";
96
97 struct osd_directory {
98         struct iam_container od_container;
99         struct iam_descr     od_descr;
100 };
101
102 struct osd_object {
103         struct dt_object       oo_dt;
104         /**
105          * Inode for file system object represented by this osd_object. This
106          * inode is pinned for the whole duration of lu_object life.
107          *
108          * Not modified concurrently (either setup early during object
109          * creation, or assigned by osd_object_create() under write lock).
110          */
111         struct inode          *oo_inode;
112         /**
113          * to protect index ops.
114          */
115         struct htree_lock_head *oo_hl_head;
116         cfs_rw_semaphore_t     oo_ext_idx_sem;
117         cfs_rw_semaphore_t     oo_sem;
118         struct osd_directory  *oo_dir;
119         /** protects inode attributes. */
120         cfs_spinlock_t         oo_guard;
121         /**
122          * Following two members are used to indicate the presence of dot and
123          * dotdot in the given directory. This is required for interop mode
124          * (b11826).
125          */
126         int                    oo_compat_dot_created;
127         int                    oo_compat_dotdot_created;
128
129         const struct lu_env   *oo_owner;
130 #ifdef CONFIG_LOCKDEP
131         struct lockdep_map     oo_dep_map;
132 #endif
133 };
134
135 static const struct lu_object_operations      osd_lu_obj_ops;
136 static const struct lu_device_operations      osd_lu_ops;
137 static       struct lu_context_key            osd_key;
138 static const struct dt_object_operations      osd_obj_ops;
139 static const struct dt_object_operations      osd_obj_ea_ops;
140 static const struct dt_body_operations        osd_body_ops;
141 static const struct dt_body_operations        osd_body_ops_new;
142 static const struct dt_index_operations       osd_index_iam_ops;
143 static const struct dt_index_operations       osd_index_ea_ops;
144
145 #define OSD_TRACK_DECLARES
146 #ifdef OSD_TRACK_DECLARES
147 #define OSD_DECLARE_OP(oh, op)   {                               \
148         LASSERT(oh->ot_handle == NULL);                          \
149         ((oh)->ot_declare_ ##op)++; }
150 #define OSD_EXEC_OP(handle, op)     {                            \
151         struct osd_thandle *oh;                                  \
152         oh = container_of0(handle, struct osd_thandle, ot_super);\
153         LASSERT((oh)->ot_declare_ ##op > 0);                     \
154         ((oh)->ot_declare_ ##op)--; }
155 #else
156 #define OSD_DECLARE_OP(oh, op)
157 #define OSD_EXEC_OP(oh, op)
158 #endif
159
160 struct osd_thandle {
161         struct thandle          ot_super;
162         handle_t               *ot_handle;
163         struct journal_callback ot_jcb;
164         cfs_list_t              ot_dcb_list;
165         /* Link to the device, for debugging. */
166         struct lu_ref_link     *ot_dev_link;
167         int                     ot_credits;
168
169 #ifdef OSD_TRACK_DECLARES
170         unsigned char           ot_declare_attr_set;
171         unsigned char           ot_declare_punch;
172         unsigned char           ot_declare_xattr_set;
173         unsigned char           ot_declare_create;
174         unsigned char           ot_declare_destroy;
175         unsigned char           ot_declare_ref_add;
176         unsigned char           ot_declare_ref_del;
177         unsigned char           ot_declare_write;
178         unsigned char           ot_declare_insert;
179         unsigned char           ot_declare_delete;
180 #endif
181
182 #if OSD_THANDLE_STATS
183         /** time when this handle was allocated */
184         cfs_time_t oth_alloced;
185
186         /** time when this thanle was started */
187         cfs_time_t oth_started;
188 #endif
189 };
190
191 /**
192  * Basic transaction credit op
193  */
194 enum dt_txn_op {
195         DTO_INDEX_INSERT,
196         DTO_INDEX_DELETE,
197         DTO_INDEX_UPDATE,
198         DTO_OBJECT_CREATE,
199         DTO_OBJECT_DELETE,
200         DTO_ATTR_SET_BASE,
201         DTO_XATTR_SET,
202         DTO_LOG_REC, /**< XXX temporary: dt layer knows nothing about llog. */
203         DTO_WRITE_BASE,
204         DTO_WRITE_BLOCK,
205         DTO_ATTR_SET_CHOWN,
206
207         DTO_NR
208 };
209
210 /*
211  * Helpers.
212  */
213 static int lu_device_is_osd(const struct lu_device *d)
214 {
215         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &osd_lu_ops);
216 }
217
218 static struct osd_device *osd_dt_dev(const struct dt_device *d)
219 {
220         LASSERT(lu_device_is_osd(&d->dd_lu_dev));
221         return container_of0(d, struct osd_device, od_dt_dev);
222 }
223
224 static struct osd_device *osd_dev(const struct lu_device *d)
225 {
226         LASSERT(lu_device_is_osd(d));
227         return osd_dt_dev(container_of0(d, struct dt_device, dd_lu_dev));
228 }
229
230 static struct osd_device *osd_obj2dev(const struct osd_object *o)
231 {
232         return osd_dev(o->oo_dt.do_lu.lo_dev);
233 }
234
235 static struct super_block *osd_sb(const struct osd_device *dev)
236 {
237         return dev->od_mount->lmi_mnt->mnt_sb;
238 }
239
240 static int osd_object_is_root(const struct osd_object *obj)
241 {
242         return osd_sb(osd_obj2dev(obj))->s_root->d_inode == obj->oo_inode;
243 }
244
245 static struct osd_object *osd_obj(const struct lu_object *o)
246 {
247         LASSERT(lu_device_is_osd(o->lo_dev));
248         return container_of0(o, struct osd_object, oo_dt.do_lu);
249 }
250
251 static struct osd_object *osd_dt_obj(const struct dt_object *d)
252 {
253         return osd_obj(&d->do_lu);
254 }
255
256 static struct lu_device *osd2lu_dev(struct osd_device *osd)
257 {
258         return &osd->od_dt_dev.dd_lu_dev;
259 }
260
261 static journal_t *osd_journal(const struct osd_device *dev)
262 {
263         return LDISKFS_SB(osd_sb(dev))->s_journal;
264 }
265
266 static int osd_has_index(const struct osd_object *obj)
267 {
268         return obj->oo_dt.do_index_ops != NULL;
269 }
270
271 static int osd_object_invariant(const struct lu_object *l)
272 {
273         return osd_invariant(osd_obj(l));
274 }
275
276 #ifdef HAVE_QUOTA_SUPPORT
277 static inline void
278 osd_push_ctxt(const struct lu_env *env, struct osd_ctxt *save)
279 {
280         struct md_ucred    *uc = md_ucred(env);
281         struct cred        *tc;
282
283         LASSERT(uc != NULL);
284
285         save->oc_uid = current_fsuid();
286         save->oc_gid = current_fsgid();
287         save->oc_cap = current_cap();
288         if ((tc = prepare_creds())) {
289                 tc->fsuid         = uc->mu_fsuid;
290                 tc->fsgid         = uc->mu_fsgid;
291                 commit_creds(tc);
292         }
293         /* XXX not suboptimal */
294         cfs_curproc_cap_unpack(uc->mu_cap);
295 }
296
297 static inline void
298 osd_pop_ctxt(struct osd_ctxt *save)
299 {
300         struct cred *tc;
301
302         if ((tc = prepare_creds())) {
303                 tc->fsuid         = save->oc_uid;
304                 tc->fsgid         = save->oc_gid;
305                 tc->cap_effective = save->oc_cap;
306                 commit_creds(tc);
307         }
308 }
309 #endif
310
311 static inline struct osd_thread_info *osd_oti_get(const struct lu_env *env)
312 {
313         return lu_context_key_get(&env->le_ctx, &osd_key);
314 }
315
316 /*
317  * Concurrency: doesn't matter
318  */
319 static int osd_read_locked(const struct lu_env *env, struct osd_object *o)
320 {
321         return osd_oti_get(env)->oti_r_locks > 0;
322 }
323
324 /*
325  * Concurrency: doesn't matter
326  */
327 static int osd_write_locked(const struct lu_env *env, struct osd_object *o)
328 {
329         struct osd_thread_info *oti = osd_oti_get(env);
330         return oti->oti_w_locks > 0 && o->oo_owner == env;
331 }
332
333 /*
334  * Concurrency: doesn't access mutable data
335  */
336 static int osd_root_get(const struct lu_env *env,
337                         struct dt_device *dev, struct lu_fid *f)
338 {
339         struct inode *inode;
340
341         inode = osd_sb(osd_dt_dev(dev))->s_root->d_inode;
342         LU_IGIF_BUILD(f, inode->i_ino, inode->i_generation);
343         return 0;
344 }
345
346 /*
347  * OSD object methods.
348  */
349
350 /*
351  * Concurrency: no concurrent access is possible that early in object
352  * life-cycle.
353  */
354 static struct lu_object *osd_object_alloc(const struct lu_env *env,
355                                           const struct lu_object_header *hdr,
356                                           struct lu_device *d)
357 {
358         struct osd_object *mo;
359
360         OBD_ALLOC_PTR(mo);
361         if (mo != NULL) {
362                 struct lu_object *l;
363
364                 l = &mo->oo_dt.do_lu;
365                 dt_object_init(&mo->oo_dt, NULL, d);
366                 if (osd_dev(d)->od_iop_mode)
367                         mo->oo_dt.do_ops = &osd_obj_ea_ops;
368                 else
369                         mo->oo_dt.do_ops = &osd_obj_ops;
370
371                 l->lo_ops = &osd_lu_obj_ops;
372                 cfs_init_rwsem(&mo->oo_sem);
373                 cfs_init_rwsem(&mo->oo_ext_idx_sem);
374                 cfs_spin_lock_init(&mo->oo_guard);
375                 return l;
376         } else {
377                 return NULL;
378         }
379 }
380
381 /*
382  * retrieve object from backend ext fs.
383  **/
384 static struct inode *osd_iget(struct osd_thread_info *info,
385                               struct osd_device *dev,
386                               const struct osd_inode_id *id)
387 {
388         struct inode *inode = NULL;
389
390 #ifdef HAVE_EXT4_LDISKFS
391         inode = ldiskfs_iget(osd_sb(dev), id->oii_ino);
392         if (IS_ERR(inode))
393         /* Newer kernels return an error instead of a NULL pointer */
394                 inode = NULL;
395 #else
396         inode = iget(osd_sb(dev), id->oii_ino);
397 #endif
398         if (inode == NULL) {
399                 CERROR("no inode\n");
400                 inode = ERR_PTR(-EACCES);
401         } else if (id->oii_gen != OSD_OII_NOGEN &&
402                    inode->i_generation != id->oii_gen) {
403                 iput(inode);
404                 inode = ERR_PTR(-ESTALE);
405         } else if (inode->i_nlink == 0) {
406                 /* due to parallel readdir and unlink,
407                 * we can have dead inode here. */
408                 CWARN("stale inode\n");
409                 make_bad_inode(inode);
410                 iput(inode);
411                 inode = ERR_PTR(-ESTALE);
412         } else if (is_bad_inode(inode)) {
413                 CERROR("bad inode %lx\n",inode->i_ino);
414                 iput(inode);
415                 inode = ERR_PTR(-ENOENT);
416         } else {
417                 /* Do not update file c/mtime in ldiskfs.
418                  * NB: we don't have any lock to protect this because we don't
419                  * have reference on osd_object now, but contention with
420                  * another lookup + attr_set can't happen in the tiny window
421                  * between if (...) and set S_NOCMTIME. */
422                 if (!(inode->i_flags & S_NOCMTIME))
423                         inode->i_flags |= S_NOCMTIME;
424         }
425         return inode;
426 }
427
428 static int osd_fid_lookup(const struct lu_env *env,
429                           struct osd_object *obj, const struct lu_fid *fid)
430 {
431         struct osd_thread_info *info;
432         struct lu_device       *ldev = obj->oo_dt.do_lu.lo_dev;
433         struct osd_device      *dev;
434         struct osd_inode_id    *id;
435         struct inode           *inode;
436         int                     result;
437
438         LINVRNT(osd_invariant(obj));
439         LASSERT(obj->oo_inode == NULL);
440         LASSERTF(fid_is_sane(fid) || osd_fid_is_root(fid), DFID, PFID(fid));
441         /*
442          * This assertion checks that osd layer sees only local
443          * fids. Unfortunately it is somewhat expensive (does a
444          * cache-lookup). Disabling it for production/acceptance-testing.
445          */
446         LASSERT(1 || fid_is_local(env, ldev->ld_site, fid));
447
448         ENTRY;
449
450         info = osd_oti_get(env);
451         dev  = osd_dev(ldev);
452         id   = &info->oti_id;
453
454         if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT))
455                 RETURN(-ENOENT);
456
457         result = osd_oi_lookup(info, osd_fid2oi(dev, fid), fid, id);
458         if (result != 0) {
459                 if (result == -ENOENT)
460                         result = 0;
461                 goto out;
462         }
463
464         inode = osd_iget(info, dev, id);
465         if (IS_ERR(inode)) {
466                 /*
467                  * If fid wasn't found in oi, inode-less object is
468                  * created, for which lu_object_exists() returns
469                  * false. This is used in a (frequent) case when
470                  * objects are created as locking anchors or
471                  * place holders for objects yet to be created.
472                  */
473                 result = PTR_ERR(inode);
474                 goto out;
475         }
476
477         obj->oo_inode = inode;
478         LASSERT(obj->oo_inode->i_sb == osd_sb(dev));
479         if (dev->od_iop_mode) {
480                 obj->oo_compat_dot_created = 1;
481                 obj->oo_compat_dotdot_created = 1;
482         }
483
484         if (!S_ISDIR(inode->i_mode) || !ldiskfs_pdo) /* done */
485                 goto out;
486
487         LASSERT(obj->oo_hl_head == NULL);
488         obj->oo_hl_head = ldiskfs_htree_lock_head_alloc(HTREE_HBITS_DEF);
489         if (obj->oo_hl_head == NULL) {
490                 obj->oo_inode = NULL;
491                 iput(inode);
492                 result = -ENOMEM;
493         }
494 out:
495         LINVRNT(osd_invariant(obj));
496
497         RETURN(result);
498 }
499
500 /*
501  * Concurrency: shouldn't matter.
502  */
503 static void osd_object_init0(struct osd_object *obj)
504 {
505         LASSERT(obj->oo_inode != NULL);
506         obj->oo_dt.do_body_ops = &osd_body_ops;
507         obj->oo_dt.do_lu.lo_header->loh_attr |=
508                 (LOHA_EXISTS | (obj->oo_inode->i_mode & S_IFMT));
509 }
510
511 /*
512  * Concurrency: no concurrent access is possible that early in object
513  * life-cycle.
514  */
515 static int osd_object_init(const struct lu_env *env, struct lu_object *l,
516                            const struct lu_object_conf *unused)
517 {
518         struct osd_object *obj = osd_obj(l);
519         int result;
520
521         LINVRNT(osd_invariant(obj));
522
523         result = osd_fid_lookup(env, obj, lu_object_fid(l));
524         obj->oo_dt.do_body_ops = &osd_body_ops_new;
525         if (result == 0) {
526                 if (obj->oo_inode != NULL)
527                         osd_object_init0(obj);
528         }
529         LINVRNT(osd_invariant(obj));
530         return result;
531 }
532
533 /*
534  * Concurrency: no concurrent access is possible that late in object
535  * life-cycle.
536  */
537 static void osd_object_free(const struct lu_env *env, struct lu_object *l)
538 {
539         struct osd_object *obj = osd_obj(l);
540
541         LINVRNT(osd_invariant(obj));
542
543         dt_object_fini(&obj->oo_dt);
544         if (obj->oo_hl_head != NULL)
545                 ldiskfs_htree_lock_head_free(obj->oo_hl_head);
546         OBD_FREE_PTR(obj);
547 }
548
549 /**
550  * IAM Iterator
551  */
552 static struct iam_path_descr *osd_it_ipd_get(const struct lu_env *env,
553                                              const struct iam_container *bag)
554 {
555         return bag->ic_descr->id_ops->id_ipd_alloc(bag,
556                                            osd_oti_get(env)->oti_it_ipd);
557 }
558
559 static struct iam_path_descr *osd_idx_ipd_get(const struct lu_env *env,
560                                               const struct iam_container *bag)
561 {
562         return bag->ic_descr->id_ops->id_ipd_alloc(bag,
563                                            osd_oti_get(env)->oti_idx_ipd);
564 }
565
566 static void osd_ipd_put(const struct lu_env *env,
567                         const struct iam_container *bag,
568                         struct iam_path_descr *ipd)
569 {
570         bag->ic_descr->id_ops->id_ipd_free(ipd);
571 }
572
573 /*
574  * Concurrency: no concurrent access is possible that late in object
575  * life-cycle.
576  */
577 static void osd_index_fini(struct osd_object *o)
578 {
579         struct iam_container *bag;
580
581         if (o->oo_dir != NULL) {
582                 bag = &o->oo_dir->od_container;
583                 if (o->oo_inode != NULL) {
584                         if (bag->ic_object == o->oo_inode)
585                                 iam_container_fini(bag);
586                 }
587                 OBD_FREE_PTR(o->oo_dir);
588                 o->oo_dir = NULL;
589         }
590 }
591
592 /*
593  * Concurrency: no concurrent access is possible that late in object
594  * life-cycle (for all existing callers, that is. New callers have to provide
595  * their own locking.)
596  */
597 static int osd_inode_unlinked(const struct inode *inode)
598 {
599         return inode->i_nlink == 0;
600 }
601
602 enum {
603         OSD_TXN_OI_DELETE_CREDITS    = 20,
604         OSD_TXN_INODE_DELETE_CREDITS = 20
605 };
606
607 /*
608  * Journal
609  */
610
611 #if OSD_THANDLE_STATS
612 /**
613  * Set time when the handle is allocated
614  */
615 static void osd_th_alloced(struct osd_thandle *oth)
616 {
617         oth->oth_alloced = cfs_time_current();
618 }
619
620 /**
621  * Set time when the handle started
622  */
623 static void osd_th_started(struct osd_thandle *oth)
624 {
625         oth->oth_started = cfs_time_current();
626 }
627
628 /**
629  * Helper function to convert time interval to microseconds packed in
630  * long int (default time units for the counter in "stats" initialized
631  * by lu_time_init() )
632  */
633 static long interval_to_usec(cfs_time_t start, cfs_time_t end)
634 {
635         struct timeval val;
636
637         cfs_duration_usec(cfs_time_sub(end, start), &val);
638         return val.tv_sec * 1000000 + val.tv_usec;
639 }
640
641 /**
642  * Check whether the we deal with this handle for too long.
643  */
644 static void __osd_th_check_slow(void *oth, struct osd_device *dev,
645                                 cfs_time_t alloced, cfs_time_t started,
646                                 cfs_time_t closed)
647 {
648         cfs_time_t now = cfs_time_current();
649
650         LASSERT(dev != NULL);
651
652         lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_STARTING,
653                             interval_to_usec(alloced, started));
654         lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_OPEN,
655                             interval_to_usec(started, closed));
656         lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_CLOSING,
657                             interval_to_usec(closed, now));
658
659         if (cfs_time_before(cfs_time_add(alloced, cfs_time_seconds(30)), now)) {
660                 CWARN("transaction handle %p was open for too long: "
661                       "now "CFS_TIME_T" ,"
662                       "alloced "CFS_TIME_T" ,"
663                       "started "CFS_TIME_T" ,"
664                       "closed "CFS_TIME_T"\n",
665                       oth, now, alloced, started, closed);
666                 libcfs_debug_dumpstack(NULL);
667         }
668 }
669
670 #define OSD_CHECK_SLOW_TH(oth, dev, expr)                               \
671 {                                                                       \
672         cfs_time_t __closed = cfs_time_current();                       \
673         cfs_time_t __alloced = oth->oth_alloced;                        \
674         cfs_time_t __started = oth->oth_started;                        \
675                                                                         \
676         expr;                                                           \
677         __osd_th_check_slow(oth, dev, __alloced, __started, __closed);  \
678 }
679
680 #else /* OSD_THANDLE_STATS */
681
682 #define osd_th_alloced(h)                  do {} while(0)
683 #define osd_th_started(h)                  do {} while(0)
684 #define OSD_CHECK_SLOW_TH(oth, dev, expr)  expr
685
686 #endif /* OSD_THANDLE_STATS */
687
688 /*
689  * Concurrency: doesn't access mutable data.
690  */
691 static int osd_param_is_sane(const struct osd_device *dev,
692                              const struct thandle *th)
693 {
694         struct osd_thandle *oh;
695         oh = container_of0(th, struct osd_thandle, ot_super);
696         return oh->ot_credits <= osd_journal(dev)->j_max_transaction_buffers;
697 }
698
699 /*
700  * Concurrency: shouldn't matter.
701  */
702 #ifdef HAVE_LDISKFS_JOURNAL_CALLBACK_ADD
703 static void osd_trans_commit_cb(struct super_block *sb,
704                                 struct journal_callback *jcb, int error)
705 #else
706 static void osd_trans_commit_cb(struct journal_callback *jcb, int error)
707 #endif
708 {
709         struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb);
710         struct thandle     *th  = &oh->ot_super;
711         struct lu_device   *lud = &th->th_dev->dd_lu_dev;
712         struct dt_txn_commit_cb *dcb, *tmp;
713
714         LASSERT(oh->ot_handle == NULL);
715
716         if (error)
717                 CERROR("transaction @0x%p commit error: %d\n", th, error);
718
719         dt_txn_hook_commit(th);
720
721         /* call per-transaction callbacks if any */
722         cfs_list_for_each_entry_safe(dcb, tmp, &oh->ot_dcb_list, dcb_linkage)
723                 dcb->dcb_func(NULL, th, dcb, error);
724
725         lu_ref_del_at(&lud->ld_reference, oh->ot_dev_link, "osd-tx", th);
726         lu_device_put(lud);
727         th->th_dev = NULL;
728
729         lu_context_exit(&th->th_ctx);
730         lu_context_fini(&th->th_ctx);
731         OBD_FREE_PTR(oh);
732 }
733
734 static struct thandle *osd_trans_create(const struct lu_env *env,
735                                         struct dt_device *d)
736 {
737         struct osd_thread_info *oti = osd_oti_get(env);
738         struct osd_thandle     *oh;
739         struct thandle         *th;
740         ENTRY;
741
742         th = ERR_PTR(-ENOMEM);
743         OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO);
744         if (oh != NULL) {
745                 th = &oh->ot_super;
746                 th->th_dev = d;
747                 th->th_result = 0;
748                 th->th_tags = LCT_TX_HANDLE;
749                 oh->ot_credits = 0;
750                 oti->oti_dev = osd_dt_dev(d);
751                 CFS_INIT_LIST_HEAD(&oh->ot_dcb_list);
752                 osd_th_alloced(oh);
753         }
754         RETURN(th);
755 }
756
757 /*
758  * Concurrency: shouldn't matter.
759  */
760 int osd_trans_start(const struct lu_env *env, struct dt_device *d,
761                     struct thandle *th)
762 {
763         struct osd_thread_info *oti = osd_oti_get(env);
764         struct osd_device  *dev = osd_dt_dev(d);
765         handle_t           *jh;
766         struct osd_thandle *oh;
767         int rc;
768
769         ENTRY;
770
771         LASSERT(current->journal_info == NULL);
772
773         oh = container_of0(th, struct osd_thandle, ot_super);
774         LASSERT(oh != NULL);
775         LASSERT(oh->ot_handle == NULL);
776
777         rc = dt_txn_hook_start(env, d, th);
778         if (rc != 0)
779                 GOTO(out, rc);
780
781         oh->ot_credits += LDISKFS_QUOTA_INIT_BLOCKS(osd_sb(dev));
782
783         if (!osd_param_is_sane(dev, th)) {
784                 CWARN("%s: too many transaction credits (%d > %d)\n",
785                       d->dd_lu_dev.ld_obd->obd_name, oh->ot_credits,
786                       osd_journal(dev)->j_max_transaction_buffers);
787 #ifdef OSD_TRACK_DECLARES
788                 CERROR("  attr_set: %d, punch: %d, xattr_set: %d,\n",
789                        oh->ot_declare_attr_set, oh->ot_declare_punch,
790                        oh->ot_declare_xattr_set);
791                 CERROR("  create: %d, ref_add: %d, ref_del: %d, write: %d\n",
792                        oh->ot_declare_create, oh->ot_declare_ref_add,
793                        oh->ot_declare_ref_del, oh->ot_declare_write);
794                 CERROR("  insert: %d, delete: %d\n",
795                        oh->ot_declare_insert, oh->ot_declare_delete);
796 #endif
797         }
798
799         /*
800          * XXX temporary stuff. Some abstraction layer should
801          * be used.
802          */
803         jh = ldiskfs_journal_start_sb(osd_sb(dev), oh->ot_credits);
804         osd_th_started(oh);
805         if (!IS_ERR(jh)) {
806                 oh->ot_handle = jh;
807                 LASSERT(oti->oti_txns == 0);
808                 lu_context_init(&th->th_ctx, th->th_tags);
809                 lu_context_enter(&th->th_ctx);
810
811                 lu_device_get(&d->dd_lu_dev);
812                 oh->ot_dev_link = lu_ref_add(&d->dd_lu_dev.ld_reference,
813                                              "osd-tx", th);
814
815                 /*
816                  * XXX: current rule is that we first start tx,
817                  *      then lock object(s), but we can't use
818                  *      this rule for data (due to locking specifics
819                  *      in ldiskfs). also in long-term we'd like to
820                  *      use usually-used (locks;tx) ordering. so,
821                  *      UGLY thing is that we'll use one ordering for
822                  *      data (ofd) and reverse ordering for metadata
823                  *      (mdd). then at some point we'll fix the latter
824                  */
825                 if (lu_device_is_md(&d->dd_lu_dev)) {
826                         LASSERT(oti->oti_r_locks == 0);
827                         LASSERT(oti->oti_w_locks == 0);
828                 }
829
830                 oti->oti_txns++;
831                 rc = 0;
832         } else {
833                 rc = PTR_ERR(jh);
834         }
835 out:
836         RETURN(rc);
837 }
838
839 /*
840  * Concurrency: shouldn't matter.
841  */
842 static int osd_trans_stop(const struct lu_env *env, struct thandle *th)
843 {
844         int                     rc = 0;
845         struct osd_thandle     *oh;
846         struct osd_thread_info *oti = osd_oti_get(env);
847
848         ENTRY;
849
850         oh = container_of0(th, struct osd_thandle, ot_super);
851
852         if (oh->ot_handle != NULL) {
853                 handle_t *hdl = oh->ot_handle;
854
855                 hdl->h_sync = th->th_sync;
856
857                 /*
858                  * add commit callback
859                  * notice we don't do this in osd_trans_start()
860                  * as underlying transaction can change during truncate
861                  */
862                 osd_journal_callback_set(hdl, osd_trans_commit_cb,
863                                          &oh->ot_jcb);
864
865                 LASSERT(oti->oti_txns == 1);
866                 oti->oti_txns--;
867                 /*
868                  * XXX: current rule is that we first start tx,
869                  *      then lock object(s), but we can't use
870                  *      this rule for data (due to locking specifics
871                  *      in ldiskfs). also in long-term we'd like to
872                  *      use usually-used (locks;tx) ordering. so,
873                  *      UGLY thing is that we'll use one ordering for
874                  *      data (ofd) and reverse ordering for metadata
875                  *      (mdd). then at some point we'll fix the latter
876                  */
877                 if (lu_device_is_md(&th->th_dev->dd_lu_dev)) {
878                         LASSERT(oti->oti_r_locks == 0);
879                         LASSERT(oti->oti_w_locks == 0);
880                 }
881                 rc = dt_txn_hook_stop(env, th);
882                 if (rc != 0)
883                         CERROR("Failure in transaction hook: %d\n", rc);
884                 oh->ot_handle = NULL;
885                 OSD_CHECK_SLOW_TH(oh, oti->oti_dev,
886                                   rc = ldiskfs_journal_stop(hdl));
887                 if (rc != 0)
888                         CERROR("Failure to stop transaction: %d\n", rc);
889         } else {
890                 OBD_FREE_PTR(oh);
891         }
892
893         RETURN(rc);
894 }
895
896 static int osd_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb)
897 {
898         struct osd_thandle *oh = container_of0(th, struct osd_thandle,
899                                                ot_super);
900
901         cfs_list_add(&dcb->dcb_linkage, &oh->ot_dcb_list);
902
903         return 0;
904 }
905
906 /*
907  * Called just before object is freed. Releases all resources except for
908  * object itself (that is released by osd_object_free()).
909  *
910  * Concurrency: no concurrent access is possible that late in object
911  * life-cycle.
912  */
913 static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
914 {
915         struct osd_object *obj   = osd_obj(l);
916         struct inode      *inode = obj->oo_inode;
917
918         LINVRNT(osd_invariant(obj));
919
920         /*
921          * If object is unlinked remove fid->ino mapping from object index.
922          */
923
924         osd_index_fini(obj);
925         if (inode != NULL) {
926                 iput(inode);
927                 obj->oo_inode = NULL;
928         }
929 }
930
931 /*
932  * Concurrency: ->loo_object_release() is called under site spin-lock.
933  */
934 static void osd_object_release(const struct lu_env *env,
935                                struct lu_object *l)
936 {
937 }
938
939 /*
940  * Concurrency: shouldn't matter.
941  */
942 static int osd_object_print(const struct lu_env *env, void *cookie,
943                             lu_printer_t p, const struct lu_object *l)
944 {
945         struct osd_object *o = osd_obj(l);
946         struct iam_descr  *d;
947
948         if (o->oo_dir != NULL)
949                 d = o->oo_dir->od_container.ic_descr;
950         else
951                 d = NULL;
952         return (*p)(env, cookie, LUSTRE_OSD_NAME"-object@%p(i:%p:%lu/%u)[%s]",
953                     o, o->oo_inode,
954                     o->oo_inode ? o->oo_inode->i_ino : 0UL,
955                     o->oo_inode ? o->oo_inode->i_generation : 0,
956                     d ? d->id_ops->id_name : "plain");
957 }
958
959 /*
960  * Concurrency: shouldn't matter.
961  */
962 int osd_statfs(const struct lu_env *env, struct dt_device *d,
963                cfs_kstatfs_t *sfs)
964 {
965         struct osd_device *osd = osd_dt_dev(d);
966         struct super_block *sb = osd_sb(osd);
967         int result = 0;
968
969         cfs_spin_lock(&osd->od_osfs_lock);
970         /* cache 1 second */
971         if (cfs_time_before_64(osd->od_osfs_age, cfs_time_shift_64(-1))) {
972                 result = ll_do_statfs(sb, &osd->od_kstatfs);
973                 if (likely(result == 0)) /* N.B. statfs can't really fail */
974                         osd->od_osfs_age = cfs_time_current_64();
975         }
976
977         if (likely(result == 0))
978                 *sfs = osd->od_kstatfs;
979         cfs_spin_unlock(&osd->od_osfs_lock);
980
981         return result;
982 }
983
984 /*
985  * Concurrency: doesn't access mutable data.
986  */
987 static void osd_conf_get(const struct lu_env *env,
988                          const struct dt_device *dev,
989                          struct dt_device_param *param)
990 {
991         struct super_block *sb = osd_sb(osd_dt_dev(dev));
992
993         /*
994          * XXX should be taken from not-yet-existing fs abstraction layer.
995          */
996         param->ddp_max_name_len = LDISKFS_NAME_LEN;
997         param->ddp_max_nlink    = LDISKFS_LINK_MAX;
998         param->ddp_block_shift  = osd_sb(osd_dt_dev(dev))->s_blocksize_bits;
999         param->ddp_mntopts      = 0;
1000         if (test_opt(sb, XATTR_USER))
1001                 param->ddp_mntopts |= MNTOPT_USERXATTR;
1002         if (test_opt(sb, POSIX_ACL))
1003                 param->ddp_mntopts |= MNTOPT_ACL;
1004
1005 #if defined(LDISKFS_FEATURE_INCOMPAT_EA_INODE)
1006         if (LDISKFS_HAS_INCOMPAT_FEATURE(sb, LDISKFS_FEATURE_INCOMPAT_EA_INODE))
1007                 param->ddp_max_ea_size = LDISKFS_XATTR_MAX_LARGE_EA_SIZE;
1008         else
1009 #endif
1010                 param->ddp_max_ea_size = sb->s_blocksize;
1011
1012 }
1013
1014 /**
1015  * Helper function to get and fill the buffer with input values.
1016  */
1017 static struct lu_buf *osd_buf_get(const struct lu_env *env, void *area, ssize_t len)
1018 {
1019         struct lu_buf *buf;
1020
1021         buf = &osd_oti_get(env)->oti_buf;
1022         buf->lb_buf = area;
1023         buf->lb_len = len;
1024         return buf;
1025 }
1026
1027 /*
1028  * Concurrency: shouldn't matter.
1029  */
1030 static int osd_sync(const struct lu_env *env, struct dt_device *d)
1031 {
1032         CDEBUG(D_HA, "syncing OSD %s\n", LUSTRE_OSD_NAME);
1033         return ldiskfs_force_commit(osd_sb(osd_dt_dev(d)));
1034 }
1035
1036 /**
1037  * Start commit for OSD device.
1038  *
1039  * An implementation of dt_commit_async method for OSD device.
1040  * Asychronously starts underlayng fs sync and thereby a transaction
1041  * commit.
1042  *
1043  * \param env environment
1044  * \param d dt device
1045  *
1046  * \see dt_device_operations
1047  */
1048 static int osd_commit_async(const struct lu_env *env,
1049                             struct dt_device *d)
1050 {
1051         struct super_block *s = osd_sb(osd_dt_dev(d));
1052         ENTRY;
1053
1054         CDEBUG(D_HA, "async commit OSD %s\n", LUSTRE_OSD_NAME);
1055         RETURN(s->s_op->sync_fs(s, 0));
1056 }
1057
1058 /*
1059  * Concurrency: shouldn't matter.
1060  */
1061
1062 static void osd_ro(const struct lu_env *env, struct dt_device *d)
1063 {
1064         struct super_block *sb = osd_sb(osd_dt_dev(d));
1065         ENTRY;
1066
1067         CERROR("*** setting device %s read-only ***\n", LUSTRE_OSD_NAME);
1068
1069         __lvfs_set_rdonly(sb->s_bdev, LDISKFS_SB(sb)->journal_bdev);
1070         EXIT;
1071 }
1072
1073 /*
1074  * Concurrency: serialization provided by callers.
1075  */
1076 static int osd_init_capa_ctxt(const struct lu_env *env, struct dt_device *d,
1077                               int mode, unsigned long timeout, __u32 alg,
1078                               struct lustre_capa_key *keys)
1079 {
1080         struct osd_device *dev = osd_dt_dev(d);
1081         ENTRY;
1082
1083         dev->od_fl_capa = mode;
1084         dev->od_capa_timeout = timeout;
1085         dev->od_capa_alg = alg;
1086         dev->od_capa_keys = keys;
1087         RETURN(0);
1088 }
1089
1090 /**
1091  * Concurrency: serialization provided by callers.
1092  */
1093 static void osd_init_quota_ctxt(const struct lu_env *env, struct dt_device *d,
1094                                struct dt_quota_ctxt *ctxt, void *data)
1095 {
1096         struct obd_device *obd = (void *)ctxt;
1097         struct vfsmount *mnt = (struct vfsmount *)data;
1098         ENTRY;
1099
1100         obd->u.obt.obt_sb = mnt->mnt_root->d_inode->i_sb;
1101         OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
1102         obd->obd_lvfs_ctxt.pwdmnt = mnt;
1103         obd->obd_lvfs_ctxt.pwd = mnt->mnt_root;
1104         obd->obd_lvfs_ctxt.fs = get_ds();
1105
1106         EXIT;
1107 }
1108
1109 /**
1110  * Note: we do not count into QUOTA here.
1111  * If we mount with --data_journal we may need more.
1112  */
1113 static const int osd_dto_credits_noquota[DTO_NR] = {
1114         /**
1115          * Insert/Delete.
1116          * INDEX_EXTRA_TRANS_BLOCKS(8) +
1117          * SINGLEDATA_TRANS_BLOCKS(8)
1118          * XXX Note: maybe iam need more, since iam have more level than
1119          *           EXT3 htree.
1120          */
1121         [DTO_INDEX_INSERT]  = 16,
1122         [DTO_INDEX_DELETE]  = 16,
1123         /**
1124          * Unused now
1125          */
1126         [DTO_INDEX_UPDATE]  = 16,
1127         /**
1128          * Create a object. The same as create object in EXT3.
1129          * DATA_TRANS_BLOCKS(14) +
1130          * INDEX_EXTRA_BLOCKS(8) +
1131          * 3(inode bits, groups, GDT)
1132          */
1133         [DTO_OBJECT_CREATE] = 25,
1134         /**
1135          * XXX: real credits to be fixed
1136          */
1137         [DTO_OBJECT_DELETE] = 25,
1138         /**
1139          * Attr set credits (inode)
1140          */
1141         [DTO_ATTR_SET_BASE] = 1,
1142         /**
1143          * Xattr set. The same as xattr of EXT3.
1144          * DATA_TRANS_BLOCKS(14)
1145          * XXX Note: in original MDS implmentation INDEX_EXTRA_TRANS_BLOCKS
1146          * are also counted in. Do not know why?
1147          */
1148         [DTO_XATTR_SET]     = 14,
1149         [DTO_LOG_REC]       = 14,
1150         /**
1151          * credits for inode change during write.
1152          */
1153         [DTO_WRITE_BASE]    = 3,
1154         /**
1155          * credits for single block write.
1156          */
1157         [DTO_WRITE_BLOCK]   = 14,
1158         /**
1159          * Attr set credits for chown.
1160          * This is extra credits for setattr, and it is null without quota
1161          */
1162         [DTO_ATTR_SET_CHOWN]= 0
1163 };
1164
1165 /**
1166  * Note: we count into QUOTA here.
1167  * If we mount with --data_journal we may need more.
1168  */
1169 static const int osd_dto_credits_quota[DTO_NR] = {
1170         /**
1171          * INDEX_EXTRA_TRANS_BLOCKS(8) +
1172          * SINGLEDATA_TRANS_BLOCKS(8) +
1173          * 2 * QUOTA_TRANS_BLOCKS(2)
1174          */
1175         [DTO_INDEX_INSERT]  = 20,
1176         /**
1177          * INDEX_EXTRA_TRANS_BLOCKS(8) +
1178          * SINGLEDATA_TRANS_BLOCKS(8) +
1179          * 2 * QUOTA_TRANS_BLOCKS(2)
1180          */
1181         [DTO_INDEX_DELETE]  = 20,
1182         /**
1183          * Unused now.
1184          */
1185         [DTO_INDEX_UPDATE]  = 16,
1186         /*
1187          * Create a object. Same as create object in EXT3 filesystem.
1188          * DATA_TRANS_BLOCKS(16) +
1189          * INDEX_EXTRA_BLOCKS(8) +
1190          * 3(inode bits, groups, GDT) +
1191          * 2 * QUOTA_INIT_BLOCKS(25)
1192          */
1193         [DTO_OBJECT_CREATE] = 77,
1194         /*
1195          * Unused now.
1196          * DATA_TRANS_BLOCKS(16) +
1197          * INDEX_EXTRA_BLOCKS(8) +
1198          * 3(inode bits, groups, GDT) +
1199          * QUOTA(?)
1200          */
1201         [DTO_OBJECT_DELETE] = 27,
1202         /**
1203          * Attr set credits.
1204          * 3 (inode bit, group, GDT) +
1205          */
1206         [DTO_ATTR_SET_BASE] = 3,
1207         /**
1208          * Xattr set. The same as xattr of EXT3.
1209          * DATA_TRANS_BLOCKS(16)
1210          * XXX Note: in original MDS implmentation INDEX_EXTRA_TRANS_BLOCKS are
1211          *           also counted in. Do not know why?
1212          */
1213         [DTO_XATTR_SET]     = 16,
1214         [DTO_LOG_REC]       = 16,
1215         /**
1216          * creadits for inode change during write.
1217          */
1218         [DTO_WRITE_BASE]    = 3,
1219         /**
1220          * credits for single block write.
1221          */
1222         [DTO_WRITE_BLOCK]   = 16,
1223         /**
1224          * Attr set credits for chown.
1225          * It is added to already set setattr credits
1226          * 2 * QUOTA_INIT_BLOCKS(25) +
1227          * 2 * QUOTA_DEL_BLOCKS(9)
1228          */
1229         [DTO_ATTR_SET_CHOWN]= 68,
1230 };
1231
1232 static const struct dt_device_operations osd_dt_ops = {
1233         .dt_root_get       = osd_root_get,
1234         .dt_statfs         = osd_statfs,
1235         .dt_trans_create   = osd_trans_create,
1236         .dt_trans_start    = osd_trans_start,
1237         .dt_trans_stop     = osd_trans_stop,
1238         .dt_trans_cb_add   = osd_trans_cb_add,
1239         .dt_conf_get       = osd_conf_get,
1240         .dt_sync           = osd_sync,
1241         .dt_ro             = osd_ro,
1242         .dt_commit_async   = osd_commit_async,
1243         .dt_init_capa_ctxt = osd_init_capa_ctxt,
1244         .dt_init_quota_ctxt= osd_init_quota_ctxt,
1245 };
1246
1247 static void osd_object_read_lock(const struct lu_env *env,
1248                                  struct dt_object *dt, unsigned role)
1249 {
1250         struct osd_object *obj = osd_dt_obj(dt);
1251         struct osd_thread_info *oti = osd_oti_get(env);
1252
1253         LINVRNT(osd_invariant(obj));
1254
1255         LASSERT(obj->oo_owner != env);
1256         cfs_down_read_nested(&obj->oo_sem, role);
1257
1258         LASSERT(obj->oo_owner == NULL);
1259         oti->oti_r_locks++;
1260 }
1261
1262 static void osd_object_write_lock(const struct lu_env *env,
1263                                   struct dt_object *dt, unsigned role)
1264 {
1265         struct osd_object *obj = osd_dt_obj(dt);
1266         struct osd_thread_info *oti = osd_oti_get(env);
1267
1268         LINVRNT(osd_invariant(obj));
1269
1270         LASSERT(obj->oo_owner != env);
1271         cfs_down_write_nested(&obj->oo_sem, role);
1272
1273         LASSERT(obj->oo_owner == NULL);
1274         obj->oo_owner = env;
1275         oti->oti_w_locks++;
1276 }
1277
1278 static void osd_object_read_unlock(const struct lu_env *env,
1279                                    struct dt_object *dt)
1280 {
1281         struct osd_object *obj = osd_dt_obj(dt);
1282         struct osd_thread_info *oti = osd_oti_get(env);
1283
1284         LINVRNT(osd_invariant(obj));
1285
1286         LASSERT(oti->oti_r_locks > 0);
1287         oti->oti_r_locks--;
1288         cfs_up_read(&obj->oo_sem);
1289 }
1290
1291 static void osd_object_write_unlock(const struct lu_env *env,
1292                                     struct dt_object *dt)
1293 {
1294         struct osd_object *obj = osd_dt_obj(dt);
1295         struct osd_thread_info *oti = osd_oti_get(env);
1296
1297         LINVRNT(osd_invariant(obj));
1298
1299         LASSERT(obj->oo_owner == env);
1300         LASSERT(oti->oti_w_locks > 0);
1301         oti->oti_w_locks--;
1302         obj->oo_owner = NULL;
1303         cfs_up_write(&obj->oo_sem);
1304 }
1305
1306 static int osd_object_write_locked(const struct lu_env *env,
1307                                    struct dt_object *dt)
1308 {
1309         struct osd_object *obj = osd_dt_obj(dt);
1310
1311         LINVRNT(osd_invariant(obj));
1312
1313         return obj->oo_owner == env;
1314 }
1315
1316 static int capa_is_sane(const struct lu_env *env,
1317                         struct osd_device *dev,
1318                         struct lustre_capa *capa,
1319                         struct lustre_capa_key *keys)
1320 {
1321         struct osd_thread_info *oti = osd_oti_get(env);
1322         struct lustre_capa *tcapa = &oti->oti_capa;
1323         struct obd_capa *oc;
1324         int i, rc = 0;
1325         ENTRY;
1326
1327         oc = capa_lookup(dev->od_capa_hash, capa, 0);
1328         if (oc) {
1329                 if (capa_is_expired(oc)) {
1330                         DEBUG_CAPA(D_ERROR, capa, "expired");
1331                         rc = -ESTALE;
1332                 }
1333                 capa_put(oc);
1334                 RETURN(rc);
1335         }
1336
1337         if (capa_is_expired_sec(capa)) {
1338                 DEBUG_CAPA(D_ERROR, capa, "expired");
1339                 RETURN(-ESTALE);
1340         }
1341
1342         cfs_spin_lock(&capa_lock);
1343         for (i = 0; i < 2; i++) {
1344                 if (keys[i].lk_keyid == capa->lc_keyid) {
1345                         oti->oti_capa_key = keys[i];
1346                         break;
1347                 }
1348         }
1349         cfs_spin_unlock(&capa_lock);
1350
1351         if (i == 2) {
1352                 DEBUG_CAPA(D_ERROR, capa, "no matched capa key");
1353                 RETURN(-ESTALE);
1354         }
1355
1356         rc = capa_hmac(tcapa->lc_hmac, capa, oti->oti_capa_key.lk_key);
1357         if (rc)
1358                 RETURN(rc);
1359
1360         if (memcmp(tcapa->lc_hmac, capa->lc_hmac, sizeof(capa->lc_hmac))) {
1361                 DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch");
1362                 RETURN(-EACCES);
1363         }
1364
1365         oc = capa_add(dev->od_capa_hash, capa);
1366         capa_put(oc);
1367
1368         RETURN(0);
1369 }
1370
1371 static int osd_object_auth(const struct lu_env *env, struct dt_object *dt,
1372                            struct lustre_capa *capa, __u64 opc)
1373 {
1374         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
1375         struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
1376         struct md_capainfo *ci;
1377         int rc;
1378
1379         if (!dev->od_fl_capa)
1380                 return 0;
1381
1382         if (capa == BYPASS_CAPA)
1383                 return 0;
1384
1385         ci = md_capainfo(env);
1386         if (unlikely(!ci))
1387                 return 0;
1388
1389         if (ci->mc_auth == LC_ID_NONE)
1390                 return 0;
1391
1392         if (!capa) {
1393                 CERROR("no capability is provided for fid "DFID"\n", PFID(fid));
1394                 return -EACCES;
1395         }
1396
1397         if (!lu_fid_eq(fid, &capa->lc_fid)) {
1398                 DEBUG_CAPA(D_ERROR, capa, "fid "DFID" mismatch with",
1399                            PFID(fid));
1400                 return -EACCES;
1401         }
1402
1403         if (!capa_opc_supported(capa, opc)) {
1404                 DEBUG_CAPA(D_ERROR, capa, "opc "LPX64" not supported by", opc);
1405                 return -EACCES;
1406         }
1407
1408         if ((rc = capa_is_sane(env, dev, capa, dev->od_capa_keys))) {
1409                 DEBUG_CAPA(D_ERROR, capa, "insane (rc %d)", rc);
1410                 return -EACCES;
1411         }
1412
1413         return 0;
1414 }
1415
1416 static struct timespec *osd_inode_time(const struct lu_env *env,
1417                                        struct inode *inode, __u64 seconds)
1418 {
1419         struct osd_thread_info *oti = osd_oti_get(env);
1420         struct timespec        *t   = &oti->oti_time;
1421
1422         t->tv_sec  = seconds;
1423         t->tv_nsec = 0;
1424         *t = timespec_trunc(*t, get_sb_time_gran(inode->i_sb));
1425         return t;
1426 }
1427
1428
1429 static void osd_inode_getattr(const struct lu_env *env,
1430                               struct inode *inode, struct lu_attr *attr)
1431 {
1432         attr->la_valid      |= LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
1433                                LA_SIZE | LA_BLOCKS | LA_UID | LA_GID |
1434                                LA_FLAGS | LA_NLINK | LA_RDEV | LA_BLKSIZE;
1435
1436         attr->la_atime      = LTIME_S(inode->i_atime);
1437         attr->la_mtime      = LTIME_S(inode->i_mtime);
1438         attr->la_ctime      = LTIME_S(inode->i_ctime);
1439         attr->la_mode       = inode->i_mode;
1440         attr->la_size       = i_size_read(inode);
1441         attr->la_blocks     = inode->i_blocks;
1442         attr->la_uid        = inode->i_uid;
1443         attr->la_gid        = inode->i_gid;
1444         attr->la_flags      = LDISKFS_I(inode)->i_flags;
1445         attr->la_nlink      = inode->i_nlink;
1446         attr->la_rdev       = inode->i_rdev;
1447         attr->la_blksize    = ll_inode_blksize(inode);
1448         attr->la_blkbits    = inode->i_blkbits;
1449 }
1450
1451 static int osd_attr_get(const struct lu_env *env,
1452                         struct dt_object *dt,
1453                         struct lu_attr *attr,
1454                         struct lustre_capa *capa)
1455 {
1456         struct osd_object *obj = osd_dt_obj(dt);
1457
1458         LASSERT(dt_object_exists(dt));
1459         LINVRNT(osd_invariant(obj));
1460
1461         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1462                 return -EACCES;
1463
1464         cfs_spin_lock(&obj->oo_guard);
1465         osd_inode_getattr(env, obj->oo_inode, attr);
1466         cfs_spin_unlock(&obj->oo_guard);
1467         return 0;
1468 }
1469
1470 static int osd_declare_attr_set(const struct lu_env *env,
1471                                 struct dt_object *dt,
1472                                 const struct lu_attr *attr,
1473                                 struct thandle *handle)
1474 {
1475         struct osd_thandle *oh;
1476
1477         LASSERT(handle != NULL);
1478         LASSERT(osd_invariant(obj));
1479
1480         oh = container_of0(handle, struct osd_thandle, ot_super);
1481         LASSERT(oh->ot_handle == NULL);
1482
1483         OSD_DECLARE_OP(oh, attr_set);
1484         oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE];
1485
1486         return 0;
1487 }
1488
1489 static int osd_inode_setattr(const struct lu_env *env,
1490                              struct inode *inode, const struct lu_attr *attr)
1491 {
1492         __u64 bits;
1493
1494         bits = attr->la_valid;
1495
1496         LASSERT(!(bits & LA_TYPE)); /* Huh? You want too much. */
1497
1498 #ifdef HAVE_QUOTA_SUPPORT
1499         if ((bits & LA_UID && attr->la_uid != inode->i_uid) ||
1500             (bits & LA_GID && attr->la_gid != inode->i_gid)) {
1501                 struct osd_ctxt *save = &osd_oti_get(env)->oti_ctxt;
1502                 struct iattr iattr;
1503                 int rc;
1504
1505                 iattr.ia_valid = 0;
1506                 if (bits & LA_UID)
1507                         iattr.ia_valid |= ATTR_UID;
1508                 if (bits & LA_GID)
1509                         iattr.ia_valid |= ATTR_GID;
1510                 iattr.ia_uid = attr->la_uid;
1511                 iattr.ia_gid = attr->la_gid;
1512                 osd_push_ctxt(env, save);
1513                 rc = ll_vfs_dq_transfer(inode, &iattr) ? -EDQUOT : 0;
1514                 osd_pop_ctxt(save);
1515                 if (rc != 0)
1516                         return rc;
1517         }
1518 #endif
1519
1520         if (bits & LA_ATIME)
1521                 inode->i_atime  = *osd_inode_time(env, inode, attr->la_atime);
1522         if (bits & LA_CTIME)
1523                 inode->i_ctime  = *osd_inode_time(env, inode, attr->la_ctime);
1524         if (bits & LA_MTIME)
1525                 inode->i_mtime  = *osd_inode_time(env, inode, attr->la_mtime);
1526         if (bits & LA_SIZE) {
1527                 LDISKFS_I(inode)->i_disksize = attr->la_size;
1528                 i_size_write(inode, attr->la_size);
1529         }
1530
1531 #if 0
1532         /* OSD should not change "i_blocks" which is used by quota.
1533          * "i_blocks" should be changed by ldiskfs only. */
1534         if (bits & LA_BLOCKS)
1535                 inode->i_blocks = attr->la_blocks;
1536 #endif
1537         if (bits & LA_MODE)
1538                 inode->i_mode   = (inode->i_mode & S_IFMT) |
1539                         (attr->la_mode & ~S_IFMT);
1540         if (bits & LA_UID)
1541                 inode->i_uid    = attr->la_uid;
1542         if (bits & LA_GID)
1543                 inode->i_gid    = attr->la_gid;
1544         if (bits & LA_NLINK)
1545                 inode->i_nlink  = attr->la_nlink;
1546         if (bits & LA_RDEV)
1547                 inode->i_rdev   = attr->la_rdev;
1548
1549         if (bits & LA_FLAGS) {
1550                 /* always keep S_NOCMTIME */
1551                 inode->i_flags = ll_ext_to_inode_flags(attr->la_flags) |
1552                                  S_NOCMTIME;
1553         }
1554         return 0;
1555 }
1556
1557 static int osd_attr_set(const struct lu_env *env,
1558                         struct dt_object *dt,
1559                         const struct lu_attr *attr,
1560                         struct thandle *handle,
1561                         struct lustre_capa *capa)
1562 {
1563         struct osd_object *obj = osd_dt_obj(dt);
1564         int rc;
1565
1566         LASSERT(handle != NULL);
1567         LASSERT(dt_object_exists(dt));
1568         LASSERT(osd_invariant(obj));
1569
1570         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1571                 return -EACCES;
1572
1573         OSD_EXEC_OP(handle, attr_set);
1574
1575         cfs_spin_lock(&obj->oo_guard);
1576         rc = osd_inode_setattr(env, obj->oo_inode, attr);
1577         cfs_spin_unlock(&obj->oo_guard);
1578
1579         if (!rc)
1580                 obj->oo_inode->i_sb->s_op->dirty_inode(obj->oo_inode);
1581         return rc;
1582 }
1583
1584 /*
1585  * Object creation.
1586  *
1587  * XXX temporary solution.
1588  */
1589 static int osd_create_pre(struct osd_thread_info *info, struct osd_object *obj,
1590                           struct lu_attr *attr, struct thandle *th)
1591 {
1592         return 0;
1593 }
1594
1595 static int osd_create_post(struct osd_thread_info *info, struct osd_object *obj,
1596                            struct lu_attr *attr, struct thandle *th)
1597 {
1598         osd_object_init0(obj);
1599         if (obj->oo_inode && (obj->oo_inode->i_state & I_NEW))
1600                 unlock_new_inode(obj->oo_inode);
1601         return 0;
1602 }
1603
1604 static struct dentry * osd_child_dentry_get(const struct lu_env *env,
1605                                             struct osd_object *obj,
1606                                             const char *name,
1607                                             const int namelen)
1608 {
1609         struct osd_thread_info *info   = osd_oti_get(env);
1610         struct dentry *child_dentry = &info->oti_child_dentry;
1611         struct dentry *obj_dentry = &info->oti_obj_dentry;
1612
1613         obj_dentry->d_inode = obj->oo_inode;
1614         obj_dentry->d_sb = osd_sb(osd_obj2dev(obj));
1615         obj_dentry->d_name.hash = 0;
1616
1617         child_dentry->d_name.hash = 0;
1618         child_dentry->d_parent = obj_dentry;
1619         child_dentry->d_name.name = name;
1620         child_dentry->d_name.len = namelen;
1621         return child_dentry;
1622 }
1623
1624
1625 static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
1626                       cfs_umode_t mode,
1627                       struct dt_allocation_hint *hint,
1628                       struct thandle *th)
1629 {
1630         int result;
1631         struct osd_device  *osd = osd_obj2dev(obj);
1632         struct osd_thandle *oth;
1633         struct dt_object   *parent;
1634         struct inode       *inode;
1635 #ifdef HAVE_QUOTA_SUPPORT
1636         struct osd_ctxt    *save = &info->oti_ctxt;
1637 #endif
1638
1639         LINVRNT(osd_invariant(obj));
1640         LASSERT(obj->oo_inode == NULL);
1641         LASSERT(obj->oo_hl_head == NULL);
1642
1643         if (S_ISDIR(mode) && ldiskfs_pdo) {
1644                 obj->oo_hl_head =ldiskfs_htree_lock_head_alloc(HTREE_HBITS_DEF);
1645                 if (obj->oo_hl_head == NULL)
1646                         return -ENOMEM;
1647         }
1648
1649         oth = container_of(th, struct osd_thandle, ot_super);
1650         LASSERT(oth->ot_handle->h_transaction != NULL);
1651
1652         if (hint && hint->dah_parent)
1653                 parent = hint->dah_parent;
1654         else
1655                 parent = osd->od_obj_area;
1656
1657         LASSERT(parent != NULL);
1658         LASSERT(osd_dt_obj(parent)->oo_inode->i_op != NULL);
1659
1660 #ifdef HAVE_QUOTA_SUPPORT
1661         osd_push_ctxt(info->oti_env, save);
1662 #endif
1663         inode = ldiskfs_create_inode(oth->ot_handle,
1664                                      osd_dt_obj(parent)->oo_inode, mode);
1665 #ifdef HAVE_QUOTA_SUPPORT
1666         osd_pop_ctxt(save);
1667 #endif
1668         if (!IS_ERR(inode)) {
1669                 /* Do not update file c/mtime in ldiskfs.
1670                  * NB: don't need any lock because no contention at this
1671                  * early stage */
1672                 inode->i_flags |= S_NOCMTIME;
1673                 obj->oo_inode = inode;
1674                 result = 0;
1675         } else {
1676                 if (obj->oo_hl_head != NULL) {
1677                         ldiskfs_htree_lock_head_free(obj->oo_hl_head);
1678                         obj->oo_hl_head = NULL;
1679                 }
1680                 result = PTR_ERR(inode);
1681         }
1682         LINVRNT(osd_invariant(obj));
1683         return result;
1684 }
1685
1686 enum {
1687         OSD_NAME_LEN = 255
1688 };
1689
1690 static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj,
1691                      struct lu_attr *attr,
1692                      struct dt_allocation_hint *hint,
1693                      struct dt_object_format *dof,
1694                      struct thandle *th)
1695 {
1696         int result;
1697         struct osd_thandle *oth;
1698         struct osd_device *osd = osd_obj2dev(obj);
1699         __u32 mode = (attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX));
1700
1701         LASSERT(S_ISDIR(attr->la_mode));
1702
1703         oth = container_of(th, struct osd_thandle, ot_super);
1704         LASSERT(oth->ot_handle->h_transaction != NULL);
1705         result = osd_mkfile(info, obj, mode, hint, th);
1706         if (result == 0 && osd->od_iop_mode == 0) {
1707                 LASSERT(obj->oo_inode != NULL);
1708                 /*
1709                  * XXX uh-oh... call low-level iam function directly.
1710                  */
1711
1712                 result = iam_lvar_create(obj->oo_inode, OSD_NAME_LEN, 4,
1713                                          sizeof (struct osd_fid_pack),
1714                                          oth->ot_handle);
1715         }
1716         return result;
1717 }
1718
1719 static int osd_mk_index(struct osd_thread_info *info, struct osd_object *obj,
1720                         struct lu_attr *attr,
1721                         struct dt_allocation_hint *hint,
1722                         struct dt_object_format *dof,
1723                         struct thandle *th)
1724 {
1725         int result;
1726         struct osd_thandle *oth;
1727         const struct dt_index_features *feat = dof->u.dof_idx.di_feat;
1728
1729         __u32 mode = (attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX));
1730
1731         LASSERT(S_ISREG(attr->la_mode));
1732
1733         oth = container_of(th, struct osd_thandle, ot_super);
1734         LASSERT(oth->ot_handle->h_transaction != NULL);
1735
1736         result = osd_mkfile(info, obj, mode, hint, th);
1737         if (result == 0) {
1738                 LASSERT(obj->oo_inode != NULL);
1739                 if (feat->dif_flags & DT_IND_VARKEY)
1740                         result = iam_lvar_create(obj->oo_inode,
1741                                                  feat->dif_keysize_max,
1742                                                  feat->dif_ptrsize,
1743                                                  feat->dif_recsize_max,
1744                                                  oth->ot_handle);
1745                 else
1746                         result = iam_lfix_create(obj->oo_inode,
1747                                                  feat->dif_keysize_max,
1748                                                  feat->dif_ptrsize,
1749                                                  feat->dif_recsize_max,
1750                                                  oth->ot_handle);
1751
1752         }
1753         return result;
1754 }
1755
1756 static int osd_mkreg(struct osd_thread_info *info, struct osd_object *obj,
1757                      struct lu_attr *attr,
1758                      struct dt_allocation_hint *hint,
1759                      struct dt_object_format *dof,
1760                      struct thandle *th)
1761 {
1762         LASSERT(S_ISREG(attr->la_mode));
1763         return osd_mkfile(info, obj, (attr->la_mode &
1764                                (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1765 }
1766
1767 static int osd_mksym(struct osd_thread_info *info, struct osd_object *obj,
1768                      struct lu_attr *attr,
1769                      struct dt_allocation_hint *hint,
1770                      struct dt_object_format *dof,
1771                      struct thandle *th)
1772 {
1773         LASSERT(S_ISLNK(attr->la_mode));
1774         return osd_mkfile(info, obj, (attr->la_mode &
1775                               (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1776 }
1777
1778 static int osd_mknod(struct osd_thread_info *info, struct osd_object *obj,
1779                      struct lu_attr *attr,
1780                      struct dt_allocation_hint *hint,
1781                      struct dt_object_format *dof,
1782                      struct thandle *th)
1783 {
1784         cfs_umode_t mode = attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX);
1785         int result;
1786
1787         LINVRNT(osd_invariant(obj));
1788         LASSERT(obj->oo_inode == NULL);
1789         LASSERT(S_ISCHR(mode) || S_ISBLK(mode) ||
1790                 S_ISFIFO(mode) || S_ISSOCK(mode));
1791
1792         result = osd_mkfile(info, obj, mode, hint, th);
1793         if (result == 0) {
1794                 LASSERT(obj->oo_inode != NULL);
1795                 init_special_inode(obj->oo_inode, mode, attr->la_rdev);
1796         }
1797         LINVRNT(osd_invariant(obj));
1798         return result;
1799 }
1800
1801 typedef int (*osd_obj_type_f)(struct osd_thread_info *, struct osd_object *,
1802                               struct lu_attr *,
1803                               struct dt_allocation_hint *hint,
1804                               struct dt_object_format *dof,
1805                               struct thandle *);
1806
1807 static osd_obj_type_f osd_create_type_f(enum dt_format_type type)
1808 {
1809         osd_obj_type_f result;
1810
1811         switch (type) {
1812         case DFT_DIR:
1813                 result = osd_mkdir;
1814                 break;
1815         case DFT_REGULAR:
1816                 result = osd_mkreg;
1817                 break;
1818         case DFT_SYM:
1819                 result = osd_mksym;
1820                 break;
1821         case DFT_NODE:
1822                 result = osd_mknod;
1823                 break;
1824         case DFT_INDEX:
1825                 result = osd_mk_index;
1826                 break;
1827
1828         default:
1829                 LBUG();
1830                 break;
1831         }
1832         return result;
1833 }
1834
1835
1836 static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah,
1837                         struct dt_object *parent, cfs_umode_t child_mode)
1838 {
1839         LASSERT(ah);
1840
1841         memset(ah, 0, sizeof(*ah));
1842         ah->dah_parent = parent;
1843         ah->dah_mode = child_mode;
1844 }
1845
1846 /**
1847  * Helper function for osd_object_create()
1848  *
1849  * \retval 0, on success
1850  */
1851 static int __osd_object_create(struct osd_thread_info *info,
1852                                struct osd_object *obj, struct lu_attr *attr,
1853                                struct dt_allocation_hint *hint,
1854                                struct dt_object_format *dof,
1855                                struct thandle *th)
1856 {
1857
1858         int result;
1859
1860         result = osd_create_pre(info, obj, attr, th);
1861         if (result == 0) {
1862                 result = osd_create_type_f(dof->dof_type)(info, obj,
1863                                            attr, hint, dof, th);
1864                 if (result == 0)
1865                         result = osd_create_post(info, obj, attr, th);
1866         }
1867         return result;
1868 }
1869
1870 /**
1871  * Helper function for osd_object_create()
1872  *
1873  * \retval 0, on success
1874  */
1875 static int __osd_oi_insert(const struct lu_env *env, struct osd_object *obj,
1876                            const struct lu_fid *fid, struct thandle *th)
1877 {
1878         struct osd_thread_info *info = osd_oti_get(env);
1879         struct osd_inode_id    *id   = &info->oti_id;
1880         struct osd_device      *osd  = osd_obj2dev(obj);
1881         struct md_ucred        *uc   = md_ucred(env);
1882
1883         LASSERT(obj->oo_inode != NULL);
1884         LASSERT(uc != NULL);
1885
1886         id->oii_ino = obj->oo_inode->i_ino;
1887         id->oii_gen = obj->oo_inode->i_generation;
1888
1889         return osd_oi_insert(info, osd_fid2oi(osd, fid), fid, id, th,
1890                              uc->mu_cap & CFS_CAP_SYS_RESOURCE_MASK);
1891 }
1892
1893 static int osd_declare_object_create(const struct lu_env *env,
1894                                      struct dt_object *dt,
1895                                      struct lu_attr *attr,
1896                                      struct dt_allocation_hint *hint,
1897                                      struct dt_object_format *dof,
1898                                      struct thandle *handle)
1899 {
1900         struct osd_thandle *oh;
1901
1902         LASSERT(handle != NULL);
1903
1904         oh = container_of0(handle, struct osd_thandle, ot_super);
1905         LASSERT(oh->ot_handle == NULL);
1906
1907         OSD_DECLARE_OP(oh, insert);
1908         OSD_DECLARE_OP(oh, create);
1909         oh->ot_credits += osd_dto_credits_noquota[DTO_OBJECT_CREATE];
1910         oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT];
1911
1912         /* if this is directory, then we expect . and ..
1913          * to be inserted as well */
1914         OSD_DECLARE_OP(oh, insert);
1915         OSD_DECLARE_OP(oh, insert);
1916         oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT];
1917         return 0;
1918 }
1919
1920 static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
1921                              struct lu_attr *attr,
1922                              struct dt_allocation_hint *hint,
1923                              struct dt_object_format *dof,
1924                              struct thandle *th)
1925 {
1926         const struct lu_fid    *fid    = lu_object_fid(&dt->do_lu);
1927         struct osd_object      *obj    = osd_dt_obj(dt);
1928         struct osd_thread_info *info   = osd_oti_get(env);
1929         int result;
1930
1931         ENTRY;
1932
1933         LINVRNT(osd_invariant(obj));
1934         LASSERT(!dt_object_exists(dt));
1935         LASSERT(osd_write_locked(env, obj));
1936         LASSERT(th != NULL);
1937
1938         OSD_EXEC_OP(th, create);
1939
1940         result = __osd_object_create(info, obj, attr, hint, dof, th);
1941         if (result == 0)
1942                 result = __osd_oi_insert(env, obj, fid, th);
1943
1944         LASSERT(ergo(result == 0, dt_object_exists(dt)));
1945         LASSERT(osd_invariant(obj));
1946         RETURN(result);
1947 }
1948
1949 /**
1950  * Called to destroy on-disk representation of the object
1951  *
1952  * Concurrency: must be locked
1953  */
1954 static int osd_declare_object_destroy(const struct lu_env *env,
1955                                       struct dt_object *dt,
1956                                       struct thandle *th)
1957 {
1958         struct osd_object      *obj = osd_dt_obj(dt);
1959         struct inode           *inode = obj->oo_inode;
1960         struct osd_thandle     *oh;
1961         ENTRY;
1962
1963         oh = container_of0(th, struct osd_thandle, ot_super);
1964         LASSERT(oh->ot_handle == NULL);
1965         LASSERT(inode);
1966         LASSERT(!lu_object_is_dying(dt->do_lu.lo_header));
1967
1968         OSD_DECLARE_OP(oh, destroy);
1969         OSD_DECLARE_OP(oh, delete);
1970         oh->ot_credits += osd_dto_credits_noquota[DTO_OBJECT_DELETE];
1971         oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_DELETE];
1972
1973         RETURN(0);
1974 }
1975
1976 static int osd_object_destroy(const struct lu_env *env,
1977                               struct dt_object *dt,
1978                               struct thandle *th)
1979 {
1980         const struct lu_fid    *fid = lu_object_fid(&dt->do_lu);
1981         struct osd_object      *obj = osd_dt_obj(dt);
1982         struct inode           *inode = obj->oo_inode;
1983         struct osd_device      *osd = osd_obj2dev(obj);
1984         struct osd_thandle     *oh;
1985         int                     result;
1986         ENTRY;
1987
1988         oh = container_of0(th, struct osd_thandle, ot_super);
1989         LASSERT(oh->ot_handle);
1990         LASSERT(inode);
1991         LASSERT(osd_inode_unlinked(inode));
1992
1993         OSD_EXEC_OP(th, destroy);
1994
1995         result = osd_oi_delete(osd_oti_get(env),
1996                                osd_fid2oi(osd, fid), fid, th);
1997
1998         /* XXX: add to ext3 orphan list */
1999         /* rc = ext3_orphan_add(handle_t *handle, struct inode *inode) */
2000
2001         /* not needed in the cache anymore */
2002         set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
2003
2004         RETURN(0);
2005 }
2006
2007 /**
2008  * Helper function for osd_xattr_set()
2009  */
2010 static int __osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
2011                            const struct lu_buf *buf, const char *name, int fl)
2012 {
2013         struct osd_object      *obj      = osd_dt_obj(dt);
2014         struct inode           *inode    = obj->oo_inode;
2015         struct osd_thread_info *info     = osd_oti_get(env);
2016         struct dentry          *dentry   = &info->oti_child_dentry;
2017         int                     fs_flags = 0;
2018         int  rc;
2019
2020         LASSERT(dt_object_exists(dt));
2021         LASSERT(inode->i_op != NULL && inode->i_op->setxattr != NULL);
2022         LASSERT(osd_write_locked(env, obj));
2023
2024         if (fl & LU_XATTR_REPLACE)
2025                 fs_flags |= XATTR_REPLACE;
2026
2027         if (fl & LU_XATTR_CREATE)
2028                 fs_flags |= XATTR_CREATE;
2029
2030         dentry->d_inode = inode;
2031         rc = inode->i_op->setxattr(dentry, name, buf->lb_buf,
2032                                    buf->lb_len, fs_flags);
2033         return rc;
2034 }
2035
2036 /**
2037  * Put the fid into lustre_mdt_attrs, and then place the structure
2038  * inode's ea. This fid should not be altered during the life time
2039  * of the inode.
2040  *
2041  * \retval +ve, on success
2042  * \retval -ve, on error
2043  *
2044  * FIXME: It is good to have/use ldiskfs_xattr_set_handle() here
2045  */
2046 static int osd_ea_fid_set(const struct lu_env *env, struct dt_object *dt,
2047                           const struct lu_fid *fid)
2048 {
2049         struct osd_thread_info  *info      = osd_oti_get(env);
2050         struct lustre_mdt_attrs *mdt_attrs = &info->oti_mdt_attrs;
2051
2052         lustre_lma_init(mdt_attrs, fid);
2053         lustre_lma_swab(mdt_attrs);
2054         return __osd_xattr_set(env, dt,
2055                                osd_buf_get(env, mdt_attrs, sizeof *mdt_attrs),
2056                                XATTR_NAME_LMA, LU_XATTR_CREATE);
2057
2058 }
2059
2060 /**
2061  * Helper function to form igif
2062  */
2063 static inline void osd_igif_get(const struct lu_env *env, struct inode  *inode,
2064                                 struct lu_fid *fid)
2065 {
2066         LU_IGIF_BUILD(fid, inode->i_ino, inode->i_generation);
2067 }
2068
2069 /**
2070  * Helper function to pack the fid, ldiskfs stores fid in packed format.
2071  */
2072 void osd_fid_pack(struct osd_fid_pack *pack, const struct dt_rec *fid,
2073                   struct lu_fid *befider)
2074 {
2075         fid_cpu_to_be(befider, (struct lu_fid *)fid);
2076         memcpy(pack->fp_area, befider, sizeof(*befider));
2077         pack->fp_len =  sizeof(*befider) + 1;
2078 }
2079
2080 /**
2081  * ldiskfs supports fid in dirent, it is passed in dentry->d_fsdata.
2082  * lustre 1.8 also uses d_fsdata for passing other info to ldiskfs.
2083  * To have compatilibility with 1.8 ldiskfs driver we need to have
2084  * magic number at start of fid data.
2085  * \ldiskfs_dentry_param is used only to pass fid from osd to ldiskfs.
2086  * its inmemory API.
2087  */
2088 void osd_get_ldiskfs_dirent_param(struct ldiskfs_dentry_param *param,
2089                                   const struct dt_rec *fid)
2090 {
2091         param->edp_magic = LDISKFS_LUFID_MAGIC;
2092         param->edp_len =  sizeof(struct lu_fid) + 1;
2093
2094         fid_cpu_to_be((struct lu_fid *)param->edp_data,
2095                       (struct lu_fid *)fid);
2096 }
2097
2098 int osd_fid_unpack(struct lu_fid *fid, const struct osd_fid_pack *pack)
2099 {
2100         int result;
2101
2102         result = 0;
2103         switch (pack->fp_len) {
2104         case sizeof *fid + 1:
2105                 memcpy(fid, pack->fp_area, sizeof *fid);
2106                 fid_be_to_cpu(fid, fid);
2107                 break;
2108         default:
2109                 CERROR("Unexpected packed fid size: %d\n", pack->fp_len);
2110                 result = -EIO;
2111         }
2112         return result;
2113 }
2114
2115 /**
2116  * Try to read the fid from inode ea into dt_rec, if return value
2117  * i.e. rc is +ve, then we got fid, otherwise we will have to form igif
2118  *
2119  * \param fid object fid.
2120  *
2121  * \retval 0 on success
2122  */
2123 static int osd_ea_fid_get(const struct lu_env *env, struct osd_object *obj,
2124                           __u32 ino, struct lu_fid *fid)
2125 {
2126         struct osd_thread_info  *info      = osd_oti_get(env);
2127         struct lustre_mdt_attrs *mdt_attrs = &info->oti_mdt_attrs;
2128         struct lu_device        *ldev   = obj->oo_dt.do_lu.lo_dev;
2129         struct dentry           *dentry = &info->oti_child_dentry;
2130         struct osd_inode_id     *id     = &info->oti_id;
2131         struct osd_device       *dev;
2132         struct inode            *inode;
2133         int                      rc;
2134
2135         ENTRY;
2136         dev  = osd_dev(ldev);
2137
2138         id->oii_ino = ino;
2139         id->oii_gen = OSD_OII_NOGEN;
2140
2141         inode = osd_iget(info, dev, id);
2142         if (IS_ERR(inode)) {
2143                 rc = PTR_ERR(inode);
2144                 GOTO(out,rc);
2145         }
2146         dentry->d_inode = inode;
2147
2148         LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
2149         rc = inode->i_op->getxattr(dentry, XATTR_NAME_LMA, (void *)mdt_attrs,
2150                                    sizeof *mdt_attrs);
2151
2152         /* Check LMA compatibility */
2153         if (rc > 0 &&
2154             (mdt_attrs->lma_incompat & ~cpu_to_le32(LMA_INCOMPAT_SUPP))) {
2155                 CWARN("Inode %lx: Unsupported incompat LMA feature(s) %#x\n",
2156                       inode->i_ino, le32_to_cpu(mdt_attrs->lma_incompat) &
2157                       ~LMA_INCOMPAT_SUPP);
2158                 return -ENOSYS;
2159         }
2160
2161         if (rc > 0) {
2162                 lustre_lma_swab(mdt_attrs);
2163                 memcpy(fid, &mdt_attrs->lma_self_fid, sizeof(*fid));
2164                 rc = 0;
2165         } else if (rc == -ENODATA) {
2166                 osd_igif_get(env, inode, fid);
2167                 rc = 0;
2168         }
2169         iput(inode);
2170 out:
2171         RETURN(rc);
2172 }
2173
2174 /**
2175  * OSD layer object create function for interoperability mode (b11826).
2176  * This is mostly similar to osd_object_create(). Only difference being, fid is
2177  * inserted into inode ea here.
2178  *
2179  * \retval   0, on success
2180  * \retval -ve, on error
2181  */
2182 static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt,
2183                              struct lu_attr *attr,
2184                              struct dt_allocation_hint *hint,
2185                              struct dt_object_format *dof,
2186                              struct thandle *th)
2187 {
2188         const struct lu_fid    *fid    = lu_object_fid(&dt->do_lu);
2189         struct osd_object      *obj    = osd_dt_obj(dt);
2190         struct osd_thread_info *info   = osd_oti_get(env);
2191         int result;
2192
2193         ENTRY;
2194
2195         LASSERT(osd_invariant(obj));
2196         LASSERT(!dt_object_exists(dt));
2197         LASSERT(osd_write_locked(env, obj));
2198         LASSERT(th != NULL);
2199
2200         OSD_EXEC_OP(th, create);
2201
2202         result = __osd_object_create(info, obj, attr, hint, dof, th);
2203
2204         /* objects under osd root shld have igif fid, so dont add fid EA */
2205         if (result == 0 && fid_seq(fid) >= FID_SEQ_NORMAL)
2206                 result = osd_ea_fid_set(env, dt, fid);
2207
2208         if (result == 0)
2209                 result = __osd_oi_insert(env, obj, fid, th);
2210
2211         LASSERT(ergo(result == 0, dt_object_exists(dt)));
2212         LINVRNT(osd_invariant(obj));
2213         RETURN(result);
2214 }
2215
2216 static int osd_declare_object_ref_add(const struct lu_env *env,
2217                                struct dt_object *dt,
2218                                struct thandle *handle)
2219 {
2220         struct osd_thandle *oh;
2221
2222         /* it's possible that object doesn't exist yet */
2223         LASSERT(handle != NULL);
2224
2225         oh = container_of0(handle, struct osd_thandle, ot_super);
2226         LASSERT(oh->ot_handle == NULL);
2227
2228         OSD_DECLARE_OP(oh, ref_add);
2229         oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE];
2230
2231         return 0;
2232 }
2233
2234 /*
2235  * Concurrency: @dt is write locked.
2236  */
2237 static int osd_object_ref_add(const struct lu_env *env,
2238                               struct dt_object *dt,
2239                               struct thandle *th)
2240 {
2241         struct osd_object *obj = osd_dt_obj(dt);
2242         struct inode *inode = obj->oo_inode;
2243
2244         LINVRNT(osd_invariant(obj));
2245         LASSERT(dt_object_exists(dt));
2246         LASSERT(osd_write_locked(env, obj));
2247         LASSERT(th != NULL);
2248
2249         OSD_EXEC_OP(th, ref_add);
2250
2251         cfs_spin_lock(&obj->oo_guard);
2252         LASSERT(inode->i_nlink < LDISKFS_LINK_MAX);
2253         inode->i_nlink++;
2254         cfs_spin_unlock(&obj->oo_guard);
2255         inode->i_sb->s_op->dirty_inode(inode);
2256         LINVRNT(osd_invariant(obj));
2257
2258         return 0;
2259 }
2260
2261 static int osd_declare_object_ref_del(const struct lu_env *env,
2262                                struct dt_object *dt,
2263                                struct thandle *handle)
2264 {
2265         struct osd_thandle *oh;
2266
2267         LASSERT(dt_object_exists(dt));
2268         LASSERT(handle != NULL);
2269
2270         oh = container_of0(handle, struct osd_thandle, ot_super);
2271         LASSERT(oh->ot_handle == NULL);
2272
2273         OSD_DECLARE_OP(oh, ref_del);
2274         oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE];
2275
2276         return 0;
2277 }
2278
2279 /*
2280  * Concurrency: @dt is write locked.
2281  */
2282 static int osd_object_ref_del(const struct lu_env *env,
2283                               struct dt_object *dt,
2284                               struct thandle *th)
2285 {
2286         struct osd_object *obj = osd_dt_obj(dt);
2287         struct inode *inode = obj->oo_inode;
2288
2289         LINVRNT(osd_invariant(obj));
2290         LASSERT(dt_object_exists(dt));
2291         LASSERT(osd_write_locked(env, obj));
2292         LASSERT(th != NULL);
2293
2294         OSD_EXEC_OP(th, ref_del);
2295
2296         cfs_spin_lock(&obj->oo_guard);
2297         LASSERT(inode->i_nlink > 0);
2298         inode->i_nlink--;
2299         cfs_spin_unlock(&obj->oo_guard);
2300         inode->i_sb->s_op->dirty_inode(inode);
2301         LINVRNT(osd_invariant(obj));
2302
2303         return 0;
2304 }
2305
2306 /*
2307  * Get the 64-bit version for an inode.
2308  */
2309 static int osd_object_version_get(const struct lu_env *env,
2310                                   struct dt_object *dt, dt_obj_version_t *ver)
2311 {
2312         struct inode *inode = osd_dt_obj(dt)->oo_inode;
2313
2314         CDEBUG(D_INODE, "Get version "LPX64" for inode %lu\n",
2315                LDISKFS_I(inode)->i_fs_version, inode->i_ino);
2316         *ver = LDISKFS_I(inode)->i_fs_version;
2317         return 0;
2318 }
2319
2320 /*
2321  * Concurrency: @dt is read locked.
2322  */
2323 static int osd_xattr_get(const struct lu_env *env,
2324                          struct dt_object *dt,
2325                          struct lu_buf *buf,
2326                          const char *name,
2327                          struct lustre_capa *capa)
2328 {
2329         struct osd_object      *obj    = osd_dt_obj(dt);
2330         struct inode           *inode  = obj->oo_inode;
2331         struct osd_thread_info *info   = osd_oti_get(env);
2332         struct dentry          *dentry = &info->oti_obj_dentry;
2333
2334         /* version get is not real XATTR but uses xattr API */
2335         if (strcmp(name, XATTR_NAME_VERSION) == 0) {
2336                 /* for version we are just using xattr API but change inode
2337                  * field instead */
2338                 LASSERT(buf->lb_len == sizeof(dt_obj_version_t));
2339                 osd_object_version_get(env, dt, buf->lb_buf);
2340                 return sizeof(dt_obj_version_t);
2341         }
2342
2343         LASSERT(dt_object_exists(dt));
2344         LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
2345         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
2346
2347         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
2348                 return -EACCES;
2349
2350         dentry->d_inode = inode;
2351         return inode->i_op->getxattr(dentry, name, buf->lb_buf, buf->lb_len);
2352 }
2353
2354
2355 static int osd_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
2356                                  const struct lu_buf *buf, const char *name,
2357                                  int fl, struct thandle *handle)
2358 {
2359         struct osd_thandle *oh;
2360
2361         LASSERT(handle != NULL);
2362
2363         if (strcmp(name, XATTR_NAME_VERSION) == 0) {
2364                 /* no credits for version */
2365                 return 0;
2366         }
2367
2368         oh = container_of0(handle, struct osd_thandle, ot_super);
2369         LASSERT(oh->ot_handle == NULL);
2370
2371         OSD_DECLARE_OP(oh, xattr_set);
2372         oh->ot_credits += osd_dto_credits_noquota[DTO_XATTR_SET];
2373
2374         return 0;
2375 }
2376
2377 /*
2378  * Set the 64-bit version for object
2379  */
2380 static void osd_object_version_set(const struct lu_env *env,
2381                                    struct dt_object *dt,
2382                                    dt_obj_version_t *new_version)
2383 {
2384         struct inode *inode = osd_dt_obj(dt)->oo_inode;
2385
2386         CDEBUG(D_INODE, "Set version "LPX64" (old "LPX64") for inode %lu\n",
2387                *new_version, LDISKFS_I(inode)->i_fs_version, inode->i_ino);
2388
2389         LDISKFS_I(inode)->i_fs_version = *new_version;
2390         /** Version is set after all inode operations are finished,
2391          *  so we should mark it dirty here */
2392         inode->i_sb->s_op->dirty_inode(inode);
2393 }
2394
2395 /*
2396  * Concurrency: @dt is write locked.
2397  */
2398 static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
2399                          const struct lu_buf *buf, const char *name, int fl,
2400                          struct thandle *handle, struct lustre_capa *capa)
2401 {
2402         LASSERT(handle != NULL);
2403
2404         /* version set is not real XATTR */
2405         if (strcmp(name, XATTR_NAME_VERSION) == 0) {
2406                 /* for version we are just using xattr API but change inode
2407                  * field instead */
2408                 LASSERT(buf->lb_len == sizeof(dt_obj_version_t));
2409                 osd_object_version_set(env, dt, buf->lb_buf);
2410                 return sizeof(dt_obj_version_t);
2411         }
2412
2413         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
2414                 return -EACCES;
2415
2416         OSD_EXEC_OP(handle, xattr_set);
2417         return __osd_xattr_set(env, dt, buf, name, fl);
2418 }
2419
2420 /*
2421  * Concurrency: @dt is read locked.
2422  */
2423 static int osd_xattr_list(const struct lu_env *env,
2424                           struct dt_object *dt,
2425                           struct lu_buf *buf,
2426                           struct lustre_capa *capa)
2427 {
2428         struct osd_object      *obj    = osd_dt_obj(dt);
2429         struct inode           *inode  = obj->oo_inode;
2430         struct osd_thread_info *info   = osd_oti_get(env);
2431         struct dentry          *dentry = &info->oti_obj_dentry;
2432
2433         LASSERT(dt_object_exists(dt));
2434         LASSERT(inode->i_op != NULL && inode->i_op->listxattr != NULL);
2435         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
2436
2437         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
2438                 return -EACCES;
2439
2440         dentry->d_inode = inode;
2441         return inode->i_op->listxattr(dentry, buf->lb_buf, buf->lb_len);
2442 }
2443
2444 static int osd_declare_xattr_del(const struct lu_env *env,
2445                                  struct dt_object *dt,
2446                                 const char *name,
2447                                 struct thandle *handle)
2448 {
2449         struct osd_thandle *oh;
2450
2451         LASSERT(dt_object_exists(dt));
2452         LASSERT(handle != NULL);
2453
2454         oh = container_of0(handle, struct osd_thandle, ot_super);
2455         LASSERT(oh->ot_handle == NULL);
2456
2457         OSD_DECLARE_OP(oh, xattr_set);
2458         oh->ot_credits += osd_dto_credits_noquota[DTO_XATTR_SET];
2459
2460         return 0;
2461 }
2462
2463 /*
2464  * Concurrency: @dt is write locked.
2465  */
2466 static int osd_xattr_del(const struct lu_env *env,
2467                          struct dt_object *dt,
2468                          const char *name,
2469                          struct thandle *handle,
2470                          struct lustre_capa *capa)
2471 {
2472         struct osd_object      *obj    = osd_dt_obj(dt);
2473         struct inode           *inode  = obj->oo_inode;
2474         struct osd_thread_info *info   = osd_oti_get(env);
2475         struct dentry          *dentry = &info->oti_obj_dentry;
2476         int                     rc;
2477
2478         LASSERT(dt_object_exists(dt));
2479         LASSERT(inode->i_op != NULL && inode->i_op->removexattr != NULL);
2480         LASSERT(osd_write_locked(env, obj));
2481         LASSERT(handle != NULL);
2482
2483         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
2484                 return -EACCES;
2485
2486         OSD_EXEC_OP(handle, xattr_set);
2487
2488         dentry->d_inode = inode;
2489         rc = inode->i_op->removexattr(dentry, name);
2490         return rc;
2491 }
2492
2493 static struct obd_capa *osd_capa_get(const struct lu_env *env,
2494                                      struct dt_object *dt,
2495                                      struct lustre_capa *old,
2496                                      __u64 opc)
2497 {
2498         struct osd_thread_info *info = osd_oti_get(env);
2499         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
2500         struct osd_object *obj = osd_dt_obj(dt);
2501         struct osd_device *dev = osd_obj2dev(obj);
2502         struct lustre_capa_key *key = &info->oti_capa_key;
2503         struct lustre_capa *capa = &info->oti_capa;
2504         struct obd_capa *oc;
2505         struct md_capainfo *ci;
2506         int rc;
2507         ENTRY;
2508
2509         if (!dev->od_fl_capa)
2510                 RETURN(ERR_PTR(-ENOENT));
2511
2512         LASSERT(dt_object_exists(dt));
2513         LINVRNT(osd_invariant(obj));
2514
2515         /* renewal sanity check */
2516         if (old && osd_object_auth(env, dt, old, opc))
2517                 RETURN(ERR_PTR(-EACCES));
2518
2519         ci = md_capainfo(env);
2520         if (unlikely(!ci))
2521                 RETURN(ERR_PTR(-ENOENT));
2522
2523         switch (ci->mc_auth) {
2524         case LC_ID_NONE:
2525                 RETURN(NULL);
2526         case LC_ID_PLAIN:
2527                 capa->lc_uid = obj->oo_inode->i_uid;
2528                 capa->lc_gid = obj->oo_inode->i_gid;
2529                 capa->lc_flags = LC_ID_PLAIN;
2530                 break;
2531         case LC_ID_CONVERT: {
2532                 __u32 d[4], s[4];
2533
2534                 s[0] = obj->oo_inode->i_uid;
2535                 cfs_get_random_bytes(&(s[1]), sizeof(__u32));
2536                 s[2] = obj->oo_inode->i_gid;
2537                 cfs_get_random_bytes(&(s[3]), sizeof(__u32));
2538                 rc = capa_encrypt_id(d, s, key->lk_key, CAPA_HMAC_KEY_MAX_LEN);
2539                 if (unlikely(rc))
2540                         RETURN(ERR_PTR(rc));
2541
2542                 capa->lc_uid   = ((__u64)d[1] << 32) | d[0];
2543                 capa->lc_gid   = ((__u64)d[3] << 32) | d[2];
2544                 capa->lc_flags = LC_ID_CONVERT;
2545                 break;
2546         }
2547         default:
2548                 RETURN(ERR_PTR(-EINVAL));
2549         }
2550
2551         capa->lc_fid = *fid;
2552         capa->lc_opc = opc;
2553         capa->lc_flags |= dev->od_capa_alg << 24;
2554         capa->lc_timeout = dev->od_capa_timeout;
2555         capa->lc_expiry = 0;
2556
2557         oc = capa_lookup(dev->od_capa_hash, capa, 1);
2558         if (oc) {
2559                 LASSERT(!capa_is_expired(oc));
2560                 RETURN(oc);
2561         }
2562
2563         cfs_spin_lock(&capa_lock);
2564         *key = dev->od_capa_keys[1];
2565         cfs_spin_unlock(&capa_lock);
2566
2567         capa->lc_keyid = key->lk_keyid;
2568         capa->lc_expiry = cfs_time_current_sec() + dev->od_capa_timeout;
2569
2570         rc = capa_hmac(capa->lc_hmac, capa, key->lk_key);
2571         if (rc) {
2572                 DEBUG_CAPA(D_ERROR, capa, "HMAC failed: %d for", rc);
2573                 RETURN(ERR_PTR(rc));
2574         }
2575
2576         oc = capa_add(dev->od_capa_hash, capa);
2577         RETURN(oc);
2578 }
2579
2580 static int osd_object_sync(const struct lu_env *env, struct dt_object *dt)
2581 {
2582         int rc;
2583         struct osd_object      *obj    = osd_dt_obj(dt);
2584         struct inode           *inode  = obj->oo_inode;
2585         struct osd_thread_info *info   = osd_oti_get(env);
2586         struct dentry          *dentry = &info->oti_obj_dentry;
2587         struct file            *file   = &info->oti_file;
2588         ENTRY;
2589
2590         dentry->d_inode = inode;
2591         file->f_dentry = dentry;
2592         file->f_mapping = inode->i_mapping;
2593         file->f_op = inode->i_fop;
2594         LOCK_INODE_MUTEX(inode);
2595         rc = file->f_op->fsync(file, dentry, 0);
2596         UNLOCK_INODE_MUTEX(inode);
2597         RETURN(rc);
2598 }
2599
2600 static int osd_data_get(const struct lu_env *env, struct dt_object *dt,
2601                         void **data)
2602 {
2603         struct osd_object *obj = osd_dt_obj(dt);
2604         ENTRY;
2605
2606         *data = (void *)obj->oo_inode;
2607         RETURN(0);
2608 }
2609
2610 /*
2611  * Index operations.
2612  */
2613
2614 static int osd_iam_index_probe(const struct lu_env *env, struct osd_object *o,
2615                            const struct dt_index_features *feat)
2616 {
2617         struct iam_descr *descr;
2618
2619         if (osd_object_is_root(o))
2620                 return feat == &dt_directory_features;
2621
2622         LASSERT(o->oo_dir != NULL);
2623
2624         descr = o->oo_dir->od_container.ic_descr;
2625         if (feat == &dt_directory_features) {
2626                 if (descr->id_rec_size == sizeof(struct osd_fid_pack))
2627                         return 1;
2628                 else
2629                         return 0;
2630         } else {
2631                 return
2632                         feat->dif_keysize_min <= descr->id_key_size &&
2633                         descr->id_key_size <= feat->dif_keysize_max &&
2634                         feat->dif_recsize_min <= descr->id_rec_size &&
2635                         descr->id_rec_size <= feat->dif_recsize_max &&
2636                         !(feat->dif_flags & (DT_IND_VARKEY |
2637                                              DT_IND_VARREC | DT_IND_NONUNQ)) &&
2638                         ergo(feat->dif_flags & DT_IND_UPDATE,
2639                              1 /* XXX check that object (and file system) is
2640                                 * writable */);
2641         }
2642 }
2643
2644 static int osd_iam_container_init(const struct lu_env *env,
2645                                   struct osd_object *obj,
2646                                   struct osd_directory *dir)
2647 {
2648         struct iam_container *bag = &dir->od_container;
2649         int result;
2650
2651         result = iam_container_init(bag, &dir->od_descr, obj->oo_inode);
2652         if (result != 0)
2653                 return result;
2654
2655         result = iam_container_setup(bag);
2656         if (result != 0)
2657                 goto out;
2658
2659         if (osd_obj2dev(obj)->od_iop_mode) {
2660                 u32 ptr = bag->ic_descr->id_ops->id_root_ptr(bag);
2661
2662                 bag->ic_root_bh = ldiskfs_bread(NULL, obj->oo_inode,
2663                                                 ptr, 0, &result);
2664         }
2665
2666  out:
2667         if (result == 0)
2668                 obj->oo_dt.do_index_ops = &osd_index_iam_ops;
2669         else
2670                 iam_container_fini(bag);
2671
2672         return result;
2673 }
2674
2675
2676 /*
2677  * Concurrency: no external locking is necessary.
2678  */
2679 static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
2680                          const struct dt_index_features *feat)
2681 {
2682         int result;
2683         int ea_dir = 0;
2684         struct osd_object *obj = osd_dt_obj(dt);
2685         struct osd_device *osd = osd_obj2dev(obj);
2686
2687         LINVRNT(osd_invariant(obj));
2688         LASSERT(dt_object_exists(dt));
2689
2690         if (osd_object_is_root(obj)) {
2691                 dt->do_index_ops = &osd_index_ea_ops;
2692                 result = 0;
2693         } else if (feat == &dt_directory_features && osd->od_iop_mode) {
2694                 dt->do_index_ops = &osd_index_ea_ops;
2695                 if (S_ISDIR(obj->oo_inode->i_mode))
2696                         result = 0;
2697                 else
2698                         result = -ENOTDIR;
2699                 ea_dir = 1;
2700         } else if (!osd_has_index(obj)) {
2701                 struct osd_directory *dir;
2702
2703                 OBD_ALLOC_PTR(dir);
2704                 if (dir != NULL) {
2705
2706                         cfs_spin_lock(&obj->oo_guard);
2707                         if (obj->oo_dir == NULL)
2708                                 obj->oo_dir = dir;
2709                         else
2710                                 /*
2711                                  * Concurrent thread allocated container data.
2712                                  */
2713                                 OBD_FREE_PTR(dir);
2714                         cfs_spin_unlock(&obj->oo_guard);
2715                         /*
2716                          * Now, that we have container data, serialize its
2717                          * initialization.
2718                          */
2719                         cfs_down_write(&obj->oo_ext_idx_sem);
2720                         /*
2721                          * recheck under lock.
2722                          */
2723                         if (!osd_has_index(obj))
2724                                 result = osd_iam_container_init(env, obj, dir);
2725                         else
2726                                 result = 0;
2727                         cfs_up_write(&obj->oo_ext_idx_sem);
2728                 } else {
2729                         result = -ENOMEM;
2730                 }
2731         } else {
2732                 result = 0;
2733         }
2734
2735         if (result == 0 && ea_dir == 0) {
2736                 if (!osd_iam_index_probe(env, obj, feat))
2737                         result = -ENOTDIR;
2738         }
2739         LINVRNT(osd_invariant(obj));
2740
2741         return result;
2742 }
2743
2744 static const struct dt_object_operations osd_obj_ops = {
2745         .do_read_lock         = osd_object_read_lock,
2746         .do_write_lock        = osd_object_write_lock,
2747         .do_read_unlock       = osd_object_read_unlock,
2748         .do_write_unlock      = osd_object_write_unlock,
2749         .do_write_locked      = osd_object_write_locked,
2750         .do_attr_get          = osd_attr_get,
2751         .do_declare_attr_set  = osd_declare_attr_set,
2752         .do_attr_set          = osd_attr_set,
2753         .do_ah_init           = osd_ah_init,
2754         .do_declare_create    = osd_declare_object_create,
2755         .do_create            = osd_object_create,
2756         .do_declare_destroy   = osd_declare_object_destroy,
2757         .do_destroy           = osd_object_destroy,
2758         .do_index_try         = osd_index_try,
2759         .do_declare_ref_add   = osd_declare_object_ref_add,
2760         .do_ref_add           = osd_object_ref_add,
2761         .do_declare_ref_del   = osd_declare_object_ref_del,
2762         .do_ref_del           = osd_object_ref_del,
2763         .do_xattr_get         = osd_xattr_get,
2764         .do_declare_xattr_set = osd_declare_xattr_set,
2765         .do_xattr_set         = osd_xattr_set,
2766         .do_declare_xattr_del = osd_declare_xattr_del,
2767         .do_xattr_del         = osd_xattr_del,
2768         .do_xattr_list        = osd_xattr_list,
2769         .do_capa_get          = osd_capa_get,
2770         .do_object_sync       = osd_object_sync,
2771         .do_data_get          = osd_data_get,
2772 };
2773
2774 /**
2775  * dt_object_operations for interoperability mode
2776  * (i.e. to run 2.0 mds on 1.8 disk) (b11826)
2777  */
2778 static const struct dt_object_operations osd_obj_ea_ops = {
2779         .do_read_lock         = osd_object_read_lock,
2780         .do_write_lock        = osd_object_write_lock,
2781         .do_read_unlock       = osd_object_read_unlock,
2782         .do_write_unlock      = osd_object_write_unlock,
2783         .do_write_locked      = osd_object_write_locked,
2784         .do_attr_get          = osd_attr_get,
2785         .do_declare_attr_set  = osd_declare_attr_set,
2786         .do_attr_set          = osd_attr_set,
2787         .do_ah_init           = osd_ah_init,
2788         .do_declare_create    = osd_declare_object_create,
2789         .do_create            = osd_object_ea_create,
2790         .do_declare_destroy   = osd_declare_object_destroy,
2791         .do_destroy           = osd_object_destroy,
2792         .do_index_try         = osd_index_try,
2793         .do_declare_ref_add   = osd_declare_object_ref_add,
2794         .do_ref_add           = osd_object_ref_add,
2795         .do_declare_ref_del   = osd_declare_object_ref_del,
2796         .do_ref_del           = osd_object_ref_del,
2797         .do_xattr_get         = osd_xattr_get,
2798         .do_declare_xattr_set = osd_declare_xattr_set,
2799         .do_xattr_set         = osd_xattr_set,
2800         .do_declare_xattr_del = osd_declare_xattr_del,
2801         .do_xattr_del         = osd_xattr_del,
2802         .do_xattr_list        = osd_xattr_list,
2803         .do_capa_get          = osd_capa_get,
2804         .do_object_sync       = osd_object_sync,
2805         .do_data_get          = osd_data_get,
2806 };
2807
2808 /*
2809  * Body operations.
2810  */
2811
2812 /*
2813  * XXX: Another layering violation for now.
2814  *
2815  * We don't want to use ->f_op->read methods, because generic file write
2816  *
2817  *         - serializes on ->i_sem, and
2818  *
2819  *         - does a lot of extra work like balance_dirty_pages(),
2820  *
2821  * which doesn't work for globally shared files like /last-received.
2822  */
2823 static int osd_ldiskfs_readlink(struct inode *inode, char *buffer, int buflen)
2824 {
2825         struct ldiskfs_inode_info *ei = LDISKFS_I(inode);
2826
2827         memcpy(buffer, (char*)ei->i_data, buflen);
2828
2829         return  buflen;
2830 }
2831
2832 static int osd_ldiskfs_read(struct inode *inode, void *buf, int size,
2833                             loff_t *offs)
2834 {
2835         struct buffer_head *bh;
2836         unsigned long block;
2837         int osize = size;
2838         int blocksize;
2839         int csize;
2840         int boffs;
2841         int err;
2842
2843         /* prevent reading after eof */
2844         spin_lock(&inode->i_lock);
2845         if (i_size_read(inode) < *offs + size) {
2846                 size = i_size_read(inode) - *offs;
2847                 spin_unlock(&inode->i_lock);
2848                 if (size < 0) {
2849                         CDEBUG(D_EXT2, "size %llu is too short to read @%llu\n",
2850                                i_size_read(inode), *offs);
2851                         return -EBADR;
2852                 } else if (size == 0) {
2853                         return 0;
2854                 }
2855         } else {
2856                 spin_unlock(&inode->i_lock);
2857         }
2858
2859         blocksize = 1 << inode->i_blkbits;
2860
2861         while (size > 0) {
2862                 block = *offs >> inode->i_blkbits;
2863                 boffs = *offs & (blocksize - 1);
2864                 csize = min(blocksize - boffs, size);
2865                 bh = ldiskfs_bread(NULL, inode, block, 0, &err);
2866                 if (!bh) {
2867                         CERROR("can't read block: %d\n", err);
2868                         return err;
2869                 }
2870
2871                 memcpy(buf, bh->b_data + boffs, csize);
2872                 brelse(bh);
2873
2874                 *offs += csize;
2875                 buf += csize;
2876                 size -= csize;
2877         }
2878         return osize;
2879 }
2880
2881 static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt,
2882                         struct lu_buf *buf, loff_t *pos,
2883                         struct lustre_capa *capa)
2884 {
2885         struct osd_object      *obj    = osd_dt_obj(dt);
2886         struct inode           *inode  = obj->oo_inode;
2887         int rc;
2888
2889         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
2890                 RETURN(-EACCES);
2891
2892         /* Read small symlink from inode body as we need to maintain correct
2893          * on-disk symlinks for ldiskfs.
2894          */
2895         if (S_ISLNK(obj->oo_dt.do_lu.lo_header->loh_attr) &&
2896             (buf->lb_len <= sizeof (LDISKFS_I(inode)->i_data)))
2897                 rc = osd_ldiskfs_readlink(inode, buf->lb_buf, buf->lb_len);
2898         else
2899                 rc = osd_ldiskfs_read(inode, buf->lb_buf, buf->lb_len, pos);
2900
2901         return rc;
2902 }
2903
2904 static int osd_ldiskfs_writelink(struct inode *inode, char *buffer, int buflen)
2905 {
2906
2907         memcpy((char*)&LDISKFS_I(inode)->i_data, (char *)buffer,
2908                buflen);
2909         LDISKFS_I(inode)->i_disksize = buflen;
2910         i_size_write(inode, buflen);
2911         inode->i_sb->s_op->dirty_inode(inode);
2912
2913         return 0;
2914 }
2915
2916 static int osd_ldiskfs_write_record(struct inode *inode, void *buf, int bufsize,
2917                                     loff_t *offs, handle_t *handle)
2918 {
2919         struct buffer_head *bh = NULL;
2920         loff_t offset = *offs;
2921         loff_t new_size = i_size_read(inode);
2922         unsigned long block;
2923         int blocksize = 1 << inode->i_blkbits;
2924         int err = 0;
2925         int size;
2926         int boffs;
2927         int dirty_inode = 0;
2928
2929         while (bufsize > 0) {
2930                 if (bh != NULL)
2931                         brelse(bh);
2932
2933                 block = offset >> inode->i_blkbits;
2934                 boffs = offset & (blocksize - 1);
2935                 size = min(blocksize - boffs, bufsize);
2936                 bh = ldiskfs_bread(handle, inode, block, 1, &err);
2937                 if (!bh) {
2938                         CERROR("can't read/create block: %d\n", err);
2939                         break;
2940                 }
2941
2942                 err = ldiskfs_journal_get_write_access(handle, bh);
2943                 if (err) {
2944                         CERROR("journal_get_write_access() returned error %d\n",
2945                                err);
2946                         break;
2947                 }
2948                 LASSERTF(boffs + size <= bh->b_size,
2949                          "boffs %d size %d bh->b_size %lu",
2950                          boffs, size, (unsigned long)bh->b_size);
2951                 memcpy(bh->b_data + boffs, buf, size);
2952                 err = ldiskfs_journal_dirty_metadata(handle, bh);
2953                 if (err)
2954                         break;
2955
2956                 if (offset + size > new_size)
2957                         new_size = offset + size;
2958                 offset += size;
2959                 bufsize -= size;
2960                 buf += size;
2961         }
2962         if (bh)
2963                 brelse(bh);
2964
2965         /* correct in-core and on-disk sizes */
2966         if (new_size > i_size_read(inode)) {
2967                 spin_lock(&inode->i_lock);
2968                 if (new_size > i_size_read(inode))
2969                         i_size_write(inode, new_size);
2970                 if (i_size_read(inode) > LDISKFS_I(inode)->i_disksize) {
2971                         LDISKFS_I(inode)->i_disksize = i_size_read(inode);
2972                         dirty_inode = 1;
2973                 }
2974                 spin_unlock(&inode->i_lock);
2975                 if (dirty_inode)
2976                         inode->i_sb->s_op->dirty_inode(inode);
2977         }
2978
2979         if (err == 0)
2980                 *offs = offset;
2981         return err;
2982 }
2983
2984 static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt,
2985                                  const loff_t size, loff_t pos,
2986                                  struct thandle *handle)
2987 {
2988         struct osd_thandle *oh;
2989
2990         LASSERT(handle != NULL);
2991
2992         oh = container_of0(handle, struct osd_thandle, ot_super);
2993         LASSERT(oh->ot_handle == NULL);
2994
2995         OSD_DECLARE_OP(oh, write);
2996         oh->ot_credits += osd_dto_credits_noquota[DTO_WRITE_BLOCK];
2997
2998         return 0;
2999 }
3000
3001 static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
3002                          const struct lu_buf *buf, loff_t *pos,
3003                          struct thandle *handle, struct lustre_capa *capa,
3004                          int ignore_quota)
3005 {
3006         struct osd_object  *obj   = osd_dt_obj(dt);
3007         struct inode       *inode = obj->oo_inode;
3008         struct osd_thandle *oh;
3009         ssize_t            result = 0;
3010 #ifdef HAVE_QUOTA_SUPPORT
3011         cfs_cap_t           save = cfs_curproc_cap_pack();
3012 #endif
3013
3014         LASSERT(handle != NULL);
3015
3016         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_WRITE))
3017                 RETURN(-EACCES);
3018
3019         oh = container_of(handle, struct osd_thandle, ot_super);
3020         LASSERT(oh->ot_handle->h_transaction != NULL);
3021 #ifdef HAVE_QUOTA_SUPPORT
3022         if (ignore_quota)
3023                 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
3024         else
3025                 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
3026 #endif
3027         /* Write small symlink to inode body as we need to maintain correct
3028          * on-disk symlinks for ldiskfs.
3029          */
3030         if(S_ISLNK(obj->oo_dt.do_lu.lo_header->loh_attr) &&
3031            (buf->lb_len < sizeof (LDISKFS_I(inode)->i_data)))
3032                 result = osd_ldiskfs_writelink(inode, buf->lb_buf, buf->lb_len);
3033         else
3034                 result = osd_ldiskfs_write_record(inode, buf->lb_buf,
3035                                                   buf->lb_len, pos,
3036                                                   oh->ot_handle);
3037 #ifdef HAVE_QUOTA_SUPPORT
3038         cfs_curproc_cap_unpack(save);
3039 #endif
3040         if (result == 0)
3041                 result = buf->lb_len;
3042         return result;
3043 }
3044
3045 /*
3046  * in some cases we may need declare methods for objects being created
3047  * e.g., when we create symlink
3048  */
3049 static const struct dt_body_operations osd_body_ops_new = {
3050         .dbo_declare_write = osd_declare_write,
3051 };
3052
3053 static const struct dt_body_operations osd_body_ops = {
3054         .dbo_read          = osd_read,
3055         .dbo_declare_write = osd_declare_write,
3056         .dbo_write         = osd_write
3057 };
3058
3059 static int osd_index_declare_iam_delete(const struct lu_env *env,
3060                                         struct dt_object *dt,
3061                                         const struct dt_key *key,
3062                                         struct thandle *handle)
3063 {
3064         struct osd_thandle    *oh;
3065
3066         oh = container_of0(handle, struct osd_thandle, ot_super);
3067         LASSERT(oh->ot_handle == NULL);
3068
3069         OSD_DECLARE_OP(oh, delete);
3070         oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_DELETE];
3071
3072         return 0;
3073 }
3074
3075 /**
3076  *      delete a (key, value) pair from index \a dt specified by \a key
3077  *
3078  *      \param  dt      osd index object
3079  *      \param  key     key for index
3080  *      \param  rec     record reference
3081  *      \param  handle  transaction handler
3082  *
3083  *      \retval  0  success
3084  *      \retval -ve   failure
3085  */
3086
3087 static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt,
3088                                 const struct dt_key *key, struct thandle *handle,
3089                                 struct lustre_capa *capa)
3090 {
3091         struct osd_object     *obj = osd_dt_obj(dt);
3092         struct osd_thandle    *oh;
3093         struct iam_path_descr *ipd;
3094         struct iam_container  *bag = &obj->oo_dir->od_container;
3095         int rc;
3096
3097         ENTRY;
3098
3099         LINVRNT(osd_invariant(obj));
3100         LASSERT(dt_object_exists(dt));
3101         LASSERT(bag->ic_object == obj->oo_inode);
3102         LASSERT(handle != NULL);
3103
3104         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
3105                 RETURN(-EACCES);
3106
3107         OSD_EXEC_OP(handle, delete);
3108
3109         ipd = osd_idx_ipd_get(env, bag);
3110         if (unlikely(ipd == NULL))
3111                 RETURN(-ENOMEM);
3112
3113         oh = container_of0(handle, struct osd_thandle, ot_super);
3114         LASSERT(oh->ot_handle != NULL);
3115         LASSERT(oh->ot_handle->h_transaction != NULL);
3116
3117         rc = iam_delete(oh->ot_handle, bag, (const struct iam_key *)key, ipd);
3118         osd_ipd_put(env, bag, ipd);
3119         LINVRNT(osd_invariant(obj));
3120         RETURN(rc);
3121 }
3122
3123 static int osd_index_declare_ea_delete(const struct lu_env *env,
3124                                        struct dt_object *dt,
3125                                        const struct dt_key *key,
3126                                        struct thandle *handle)
3127 {
3128         struct osd_thandle *oh;
3129
3130         LASSERT(dt_object_exists(dt));
3131         LASSERT(handle != NULL);
3132
3133         oh = container_of0(handle, struct osd_thandle, ot_super);
3134         LASSERT(oh->ot_handle == NULL);
3135
3136         OSD_DECLARE_OP(oh, delete);
3137         oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_DELETE];
3138
3139         return 0;
3140 }
3141
3142 static inline int osd_get_fid_from_dentry(struct ldiskfs_dir_entry_2 *de,
3143                                           struct dt_rec *fid)
3144 {
3145         struct osd_fid_pack *rec;
3146         int rc = -ENODATA;
3147
3148         if (de->file_type & LDISKFS_DIRENT_LUFID) {
3149                 rec = (struct osd_fid_pack *) (de->name + de->name_len + 1);
3150                 rc = osd_fid_unpack((struct lu_fid *)fid, rec);
3151         }
3152         RETURN(rc);
3153 }
3154
3155 /**
3156  * Index delete function for interoperability mode (b11826).
3157  * It will remove the directory entry added by osd_index_ea_insert().
3158  * This entry is needed to maintain name->fid mapping.
3159  *
3160  * \param key,  key i.e. file entry to be deleted
3161  *
3162  * \retval   0, on success
3163  * \retval -ve, on error
3164  */
3165 static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
3166                                const struct dt_key *key, struct thandle *handle,
3167                                struct lustre_capa *capa)
3168 {
3169         struct osd_object          *obj    = osd_dt_obj(dt);
3170         struct inode               *dir    = obj->oo_inode;
3171         struct dentry              *dentry;
3172         struct osd_thandle         *oh;
3173         struct ldiskfs_dir_entry_2 *de;
3174         struct buffer_head         *bh;
3175         struct htree_lock          *hlock = NULL;
3176
3177         int rc;
3178
3179         ENTRY;
3180
3181         LINVRNT(osd_invariant(obj));
3182         LASSERT(dt_object_exists(dt));
3183         LASSERT(handle != NULL);
3184
3185         OSD_EXEC_OP(handle, delete);
3186
3187         oh = container_of(handle, struct osd_thandle, ot_super);
3188         LASSERT(oh->ot_handle != NULL);
3189         LASSERT(oh->ot_handle->h_transaction != NULL);
3190
3191         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
3192                 RETURN(-EACCES);
3193
3194         dentry = osd_child_dentry_get(env, obj,
3195                                       (char *)key, strlen((char *)key));
3196
3197         if (obj->oo_hl_head != NULL) {
3198                 hlock = osd_oti_get(env)->oti_hlock;
3199                 ldiskfs_htree_lock(hlock, obj->oo_hl_head,
3200                                    dir, LDISKFS_HLOCK_DEL);
3201         } else {
3202                 cfs_down_write(&obj->oo_ext_idx_sem);
3203         }
3204
3205         bh = osd_ldiskfs_find_entry(dir, dentry, &de, hlock);
3206         if (bh) {
3207                 rc = ldiskfs_delete_entry(oh->ot_handle,
3208                                           dir, de, bh);
3209                 brelse(bh);
3210         } else {
3211                 rc = -ENOENT;
3212         }
3213         if (hlock != NULL)
3214                 ldiskfs_htree_unlock(hlock);
3215         else
3216                 cfs_up_write(&obj->oo_ext_idx_sem);
3217
3218         LASSERT(osd_invariant(obj));
3219         RETURN(rc);
3220 }
3221
3222 /**
3223  *      Lookup index for \a key and copy record to \a rec.
3224  *
3225  *      \param  dt      osd index object
3226  *      \param  key     key for index
3227  *      \param  rec     record reference
3228  *
3229  *      \retval  +ve  success : exact mach
3230  *      \retval  0    return record with key not greater than \a key
3231  *      \retval -ve   failure
3232  */
3233 static int osd_index_iam_lookup(const struct lu_env *env, struct dt_object *dt,
3234                                 struct dt_rec *rec, const struct dt_key *key,
3235                                 struct lustre_capa *capa)
3236 {
3237         struct osd_object     *obj = osd_dt_obj(dt);
3238         struct iam_path_descr *ipd;
3239         struct iam_container  *bag = &obj->oo_dir->od_container;
3240         struct osd_thread_info *oti = osd_oti_get(env);
3241         struct iam_iterator    *it = &oti->oti_idx_it;
3242         struct iam_rec *iam_rec;
3243         int rc;
3244         ENTRY;
3245
3246         LASSERT(osd_invariant(obj));
3247         LASSERT(dt_object_exists(dt));
3248         LASSERT(bag->ic_object == obj->oo_inode);
3249
3250         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
3251                 RETURN(-EACCES);
3252
3253         ipd = osd_idx_ipd_get(env, bag);
3254         if (IS_ERR(ipd))
3255                 RETURN(-ENOMEM);
3256
3257         /* got ipd now we can start iterator. */
3258         iam_it_init(it, bag, 0, ipd);
3259
3260         rc = iam_it_get(it, (struct iam_key *)key);
3261         if (rc >= 0) {
3262                 if (S_ISDIR(obj->oo_inode->i_mode))
3263                         iam_rec = (struct iam_rec *)oti->oti_ldp;
3264                 else
3265                         iam_rec = (struct iam_rec *) rec;
3266
3267                 iam_reccpy(&it->ii_path.ip_leaf, (struct iam_rec *)iam_rec);
3268                 if (S_ISDIR(obj->oo_inode->i_mode))
3269                         osd_fid_unpack((struct lu_fid *) rec,
3270                                        (struct osd_fid_pack *)iam_rec);
3271         }
3272         iam_it_put(it);
3273         iam_it_fini(it);
3274         osd_ipd_put(env, bag, ipd);
3275
3276         LINVRNT(osd_invariant(obj));
3277
3278         RETURN(rc);
3279 }
3280
3281 static int osd_index_declare_iam_insert(const struct lu_env *env,
3282                                         struct dt_object *dt,
3283                                         const struct dt_rec *rec,
3284                                         const struct dt_key *key,
3285                                         struct thandle *handle)
3286 {
3287         struct osd_thandle *oh;
3288
3289         LASSERT(dt_object_exists(dt));
3290         LASSERT(handle != NULL);
3291
3292         oh = container_of0(handle, struct osd_thandle, ot_super);
3293         LASSERT(oh->ot_handle == NULL);
3294
3295         OSD_DECLARE_OP(oh, insert);
3296         oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT];
3297
3298         return 0;
3299 }
3300
3301 /**
3302  *      Inserts (key, value) pair in \a dt index object.
3303  *
3304  *      \param  dt      osd index object
3305  *      \param  key     key for index
3306  *      \param  rec     record reference
3307  *      \param  th      transaction handler
3308  *
3309  *      \retval  0  success
3310  *      \retval -ve failure
3311  */
3312 static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt,
3313                                 const struct dt_rec *rec, const struct dt_key *key,
3314                                 struct thandle *th, struct lustre_capa *capa,
3315                                 int ignore_quota)
3316 {
3317         struct osd_object     *obj = osd_dt_obj(dt);
3318         struct iam_path_descr *ipd;
3319         struct osd_thandle    *oh;
3320         struct iam_container  *bag = &obj->oo_dir->od_container;
3321 #ifdef HAVE_QUOTA_SUPPORT
3322         cfs_cap_t              save = cfs_curproc_cap_pack();
3323 #endif
3324         struct osd_thread_info *oti = osd_oti_get(env);
3325         struct iam_rec *iam_rec = (struct iam_rec *)oti->oti_ldp;
3326         int rc;
3327
3328         ENTRY;
3329
3330         LINVRNT(osd_invariant(obj));
3331         LASSERT(dt_object_exists(dt));
3332         LASSERT(bag->ic_object == obj->oo_inode);
3333         LASSERT(th != NULL);
3334
3335         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
3336                 return -EACCES;
3337
3338         OSD_EXEC_OP(th, insert);
3339
3340         ipd = osd_idx_ipd_get(env, bag);
3341         if (unlikely(ipd == NULL))
3342                 RETURN(-ENOMEM);
3343
3344         oh = container_of0(th, struct osd_thandle, ot_super);
3345         LASSERT(oh->ot_handle != NULL);
3346         LASSERT(oh->ot_handle->h_transaction != NULL);
3347 #ifdef HAVE_QUOTA_SUPPORT
3348         if (ignore_quota)
3349                 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
3350         else
3351                 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
3352 #endif
3353         if (S_ISDIR(obj->oo_inode->i_mode))
3354                 osd_fid_pack((struct osd_fid_pack *)iam_rec, rec, &oti->oti_fid);
3355         else
3356                 iam_rec = (struct iam_rec *) rec;
3357         rc = iam_insert(oh->ot_handle, bag, (const struct iam_key *)key,
3358                         iam_rec, ipd);
3359 #ifdef HAVE_QUOTA_SUPPORT
3360         cfs_curproc_cap_unpack(save);
3361 #endif
3362         osd_ipd_put(env, bag, ipd);
3363         LINVRNT(osd_invariant(obj));
3364         RETURN(rc);
3365 }
3366
3367 /**
3368  * Calls ldiskfs_add_entry() to add directory entry
3369  * into the directory. This is required for
3370  * interoperability mode (b11826)
3371  *
3372  * \retval   0, on success
3373  * \retval -ve, on error
3374  */
3375 static int __osd_ea_add_rec(struct osd_thread_info *info,
3376                             struct osd_object *pobj,
3377                             struct inode  *cinode,
3378                             const char *name,
3379                             const struct dt_rec *fid,
3380                             struct htree_lock *hlock,
3381                             struct thandle *th)
3382 {
3383         struct ldiskfs_dentry_param *ldp;
3384         struct dentry      *child;
3385         struct osd_thandle *oth;
3386         int rc;
3387
3388         oth = container_of(th, struct osd_thandle, ot_super);
3389         LASSERT(oth->ot_handle != NULL);
3390         LASSERT(oth->ot_handle->h_transaction != NULL);
3391
3392         child = osd_child_dentry_get(info->oti_env, pobj, name, strlen(name));
3393
3394         if (fid_is_igif((struct lu_fid *)fid) ||
3395             fid_is_norm((struct lu_fid *)fid)) {
3396                 ldp = (struct ldiskfs_dentry_param *)info->oti_ldp;
3397                 osd_get_ldiskfs_dirent_param(ldp, fid);
3398                 child->d_fsdata = (void*) ldp;
3399         } else
3400                 child->d_fsdata = NULL;
3401         rc = osd_ldiskfs_add_entry(oth->ot_handle, child, cinode, hlock);
3402
3403         RETURN(rc);
3404 }
3405
3406 /**
3407  * Calls ldiskfs_add_dot_dotdot() to add dot and dotdot entries
3408  * into the directory.Also sets flags into osd object to
3409  * indicate dot and dotdot are created. This is required for
3410  * interoperability mode (b11826)
3411  *
3412  * \param dir   directory for dot and dotdot fixup.
3413  * \param obj   child object for linking
3414  *
3415  * \retval   0, on success
3416  * \retval -ve, on error
3417  */
3418 static int osd_add_dot_dotdot(struct osd_thread_info *info,
3419                               struct osd_object *dir,
3420                               struct inode  *parent_dir, const char *name,
3421                               const struct dt_rec *dot_fid,
3422                               const struct dt_rec *dot_dot_fid,
3423                               struct thandle *th)
3424 {
3425         struct inode            *inode  = dir->oo_inode;
3426         struct ldiskfs_dentry_param *dot_ldp;
3427         struct ldiskfs_dentry_param *dot_dot_ldp;
3428         struct osd_thandle      *oth;
3429         int result = 0;
3430
3431         oth = container_of(th, struct osd_thandle, ot_super);
3432         LASSERT(oth->ot_handle->h_transaction != NULL);
3433         LASSERT(S_ISDIR(dir->oo_inode->i_mode));
3434
3435         if (strcmp(name, dot) == 0) {
3436                 if (dir->oo_compat_dot_created) {
3437                         result = -EEXIST;
3438                 } else {
3439                         LASSERT(inode == parent_dir);
3440                         dir->oo_compat_dot_created = 1;
3441                         result = 0;
3442                 }
3443         } else if(strcmp(name, dotdot) == 0) {
3444                 dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp;
3445                 dot_dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp2;
3446
3447                 if (!dir->oo_compat_dot_created)
3448                         return -EINVAL;
3449                 if (fid_seq((struct lu_fid *)dot_fid) >= FID_SEQ_NORMAL) {
3450                         osd_get_ldiskfs_dirent_param(dot_ldp, dot_fid);
3451                         osd_get_ldiskfs_dirent_param(dot_dot_ldp, dot_dot_fid);
3452                 } else {
3453                         dot_ldp = NULL;
3454                         dot_dot_ldp = NULL;
3455                 }
3456                 /* in case of rename, dotdot is already created */
3457                 if (dir->oo_compat_dotdot_created) {
3458                         return __osd_ea_add_rec(info, dir, parent_dir, name,
3459                                                 dot_dot_fid, NULL, th);
3460                 }
3461
3462                 result = ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir,
3463                                                 inode, dot_ldp, dot_dot_ldp);
3464                 if (result == 0)
3465                        dir->oo_compat_dotdot_created = 1;
3466         }
3467
3468         return result;
3469 }
3470
3471
3472 /**
3473  * It will call the appropriate osd_add* function and return the
3474  * value, return by respective functions.
3475  */
3476 static int osd_ea_add_rec(const struct lu_env *env,
3477                           struct osd_object *pobj,
3478                           struct inode *cinode,
3479                           const char *name,
3480                           const struct dt_rec *fid,
3481                           struct thandle *th)
3482 {
3483         struct osd_thread_info    *info   = osd_oti_get(env);
3484         struct htree_lock         *hlock;
3485         int rc;
3486
3487         hlock = pobj->oo_hl_head != NULL ? info->oti_hlock : NULL;
3488
3489         if (name[0] == '.' && (name[1] == '\0' || (name[1] == '.' &&
3490                                                    name[2] =='\0'))) {
3491                 if (hlock != NULL) {
3492                         ldiskfs_htree_lock(hlock, pobj->oo_hl_head,
3493                                            pobj->oo_inode, 0);
3494                 } else {
3495                         cfs_down_write(&pobj->oo_ext_idx_sem);
3496                 }
3497                 rc = osd_add_dot_dotdot(info, pobj, cinode, name,
3498                      (struct dt_rec *)lu_object_fid(&pobj->oo_dt.do_lu),
3499                                         fid, th);
3500         } else {
3501                 if (hlock != NULL) {
3502                         ldiskfs_htree_lock(hlock, pobj->oo_hl_head,
3503                                            pobj->oo_inode, LDISKFS_HLOCK_ADD);
3504                 } else {
3505                         cfs_down_write(&pobj->oo_ext_idx_sem);
3506                 }
3507
3508                 rc = __osd_ea_add_rec(info, pobj, cinode, name, fid,
3509                                       hlock, th);
3510         }
3511         if (hlock != NULL)
3512                 ldiskfs_htree_unlock(hlock);
3513         else
3514                 cfs_up_write(&pobj->oo_ext_idx_sem);
3515
3516         return rc;
3517 }
3518
3519 /**
3520  * Calls ->lookup() to find dentry. From dentry get inode and
3521  * read inode's ea to get fid. This is required for  interoperability
3522  * mode (b11826)
3523  *
3524  * \retval   0, on success
3525  * \retval -ve, on error
3526  */
3527 static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
3528                              struct dt_rec *rec, const struct dt_key *key)
3529 {
3530         struct inode               *dir    = obj->oo_inode;
3531         struct dentry              *dentry;
3532         struct ldiskfs_dir_entry_2 *de;
3533         struct buffer_head         *bh;
3534         struct lu_fid              *fid = (struct lu_fid *) rec;
3535         struct htree_lock          *hlock = NULL;
3536         int ino;
3537         int rc;
3538
3539         LASSERT(dir->i_op != NULL && dir->i_op->lookup != NULL);
3540
3541         dentry = osd_child_dentry_get(env, obj,
3542                                       (char *)key, strlen((char *)key));
3543
3544         if (obj->oo_hl_head != NULL) {
3545                 hlock = osd_oti_get(env)->oti_hlock;
3546                 ldiskfs_htree_lock(hlock, obj->oo_hl_head,
3547                                    dir, LDISKFS_HLOCK_LOOKUP);
3548         } else {
3549                 cfs_down_read(&obj->oo_ext_idx_sem);
3550         }
3551
3552         bh = osd_ldiskfs_find_entry(dir, dentry, &de, hlock);
3553         if (bh) {
3554                 ino = le32_to_cpu(de->inode);
3555                 rc = osd_get_fid_from_dentry(de, rec);
3556
3557                 /* done with de, release bh */
3558                 brelse(bh);
3559                 if (rc != 0)
3560                         rc = osd_ea_fid_get(env, obj, ino, fid);
3561         } else {
3562                 rc = -ENOENT;
3563         }
3564
3565         if (hlock != NULL)
3566                 ldiskfs_htree_unlock(hlock);
3567         else
3568                 cfs_up_read(&obj->oo_ext_idx_sem);
3569         RETURN (rc);
3570 }
3571
3572 /**
3573  * Find the osd object for given fid.
3574  *
3575  * \param fid need to find the osd object having this fid
3576  *
3577  * \retval osd_object on success
3578  * \retval        -ve on error
3579  */
3580 struct osd_object *osd_object_find(const struct lu_env *env,
3581                                    struct dt_object *dt,
3582                                    const struct lu_fid *fid)
3583 {
3584         struct lu_device         *ludev = dt->do_lu.lo_dev;
3585         struct osd_object        *child = NULL;
3586         struct lu_object         *luch;
3587         struct lu_object         *lo;
3588
3589         luch = lu_object_find(env, ludev, fid, NULL);
3590         if (!IS_ERR(luch)) {
3591                 if (lu_object_exists(luch)) {
3592                         lo = lu_object_locate(luch->lo_header, ludev->ld_type);
3593                         if (lo != NULL)
3594                                 child = osd_obj(lo);
3595                         else
3596                                 LU_OBJECT_DEBUG(D_ERROR, env, luch,
3597                                                 "lu_object can't be located"
3598                                                 ""DFID"\n", PFID(fid));
3599
3600                         if (child == NULL) {
3601                                 lu_object_put(env, luch);
3602                                 CERROR("Unable to get osd_object\n");
3603                                 child = ERR_PTR(-ENOENT);
3604                         }
3605                 } else {
3606                         LU_OBJECT_DEBUG(D_ERROR, env, luch,
3607                                         "lu_object does not exists "DFID"\n",
3608                                         PFID(fid));
3609                         child = ERR_PTR(-ENOENT);
3610                 }
3611         } else
3612                 child = (void *)luch;
3613
3614         return child;
3615 }
3616
3617 /**
3618  * Put the osd object once done with it.
3619  *
3620  * \param obj osd object that needs to be put
3621  */
3622 static inline void osd_object_put(const struct lu_env *env,
3623                                   struct osd_object *obj)
3624 {
3625         lu_object_put(env, &obj->oo_dt.do_lu);
3626 }
3627
3628 static int osd_index_declare_ea_insert(const struct lu_env *env,
3629                                        struct dt_object *dt,
3630                                        const struct dt_rec *rec,
3631                                        const struct dt_key *key,
3632                                        struct thandle *handle)
3633 {
3634         struct osd_thandle *oh;
3635
3636         LASSERT(dt_object_exists(dt));
3637         LASSERT(handle != NULL);
3638
3639         oh = container_of0(handle, struct osd_thandle, ot_super);
3640         LASSERT(oh->ot_handle == NULL);
3641
3642         OSD_DECLARE_OP(oh, insert);
3643         oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT];
3644
3645         return 0;
3646 }
3647
3648 /**
3649  * Index add function for interoperability mode (b11826).
3650  * It will add the directory entry.This entry is needed to
3651  * maintain name->fid mapping.
3652  *
3653  * \param key it is key i.e. file entry to be inserted
3654  * \param rec it is value of given key i.e. fid
3655  *
3656  * \retval   0, on success
3657  * \retval -ve, on error
3658  */
3659 static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
3660                                const struct dt_rec *rec,
3661                                const struct dt_key *key, struct thandle *th,
3662                                struct lustre_capa *capa, int ignore_quota)
3663 {
3664         struct osd_object        *obj   = osd_dt_obj(dt);
3665         struct lu_fid            *fid   = (struct lu_fid *) rec;
3666         const char               *name  = (const char *)key;
3667         struct osd_object        *child;
3668 #ifdef HAVE_QUOTA_SUPPORT
3669         cfs_cap_t                 save  = cfs_curproc_cap_pack();
3670 #endif
3671         int rc;
3672
3673         ENTRY;
3674
3675         LASSERT(osd_invariant(obj));
3676         LASSERT(dt_object_exists(dt));
3677         LASSERT(th != NULL);
3678
3679         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
3680                 RETURN(-EACCES);
3681
3682         child = osd_object_find(env, dt, fid);
3683         if (!IS_ERR(child)) {
3684 #ifdef HAVE_QUOTA_SUPPORT
3685                 if (ignore_quota)
3686                         cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
3687                 else
3688                         cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
3689 #endif
3690                 rc = osd_ea_add_rec(env, obj, child->oo_inode, name, rec, th);
3691 #ifdef HAVE_QUOTA_SUPPORT
3692                 cfs_curproc_cap_unpack(save);
3693 #endif
3694                 osd_object_put(env, child);
3695         } else {
3696                 rc = PTR_ERR(child);
3697         }
3698
3699         LASSERT(osd_invariant(obj));
3700         RETURN(rc);
3701 }
3702
3703 /**
3704  *  Initialize osd Iterator for given osd index object.
3705  *
3706  *  \param  dt      osd index object
3707  */
3708
3709 static struct dt_it *osd_it_iam_init(const struct lu_env *env,
3710                                      struct dt_object *dt,
3711                                      __u32 unused,
3712                                      struct lustre_capa *capa)
3713 {
3714         struct osd_it_iam         *it;
3715         struct osd_thread_info *oti = osd_oti_get(env);
3716         struct osd_object     *obj = osd_dt_obj(dt);
3717         struct lu_object      *lo  = &dt->do_lu;
3718         struct iam_path_descr *ipd;
3719         struct iam_container  *bag = &obj->oo_dir->od_container;
3720
3721         LASSERT(lu_object_exists(lo));
3722
3723         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
3724                 return ERR_PTR(-EACCES);
3725
3726         it = &oti->oti_it;
3727         ipd = osd_it_ipd_get(env, bag);
3728         if (likely(ipd != NULL)) {
3729                 it->oi_obj = obj;
3730                 it->oi_ipd = ipd;
3731                 lu_object_get(lo);
3732                 iam_it_init(&it->oi_it, bag, IAM_IT_MOVE, ipd);
3733                 return (struct dt_it *)it;
3734         }
3735         return ERR_PTR(-ENOMEM);
3736 }
3737
3738 /**
3739  * free given Iterator.
3740  */
3741
3742 static void osd_it_iam_fini(const struct lu_env *env, struct dt_it *di)
3743 {
3744         struct osd_it_iam     *it = (struct osd_it_iam *)di;
3745         struct osd_object *obj = it->oi_obj;
3746
3747         iam_it_fini(&it->oi_it);
3748         osd_ipd_put(env, &obj->oo_dir->od_container, it->oi_ipd);
3749         lu_object_put(env, &obj->oo_dt.do_lu);
3750 }
3751
3752 /**
3753  *  Move Iterator to record specified by \a key
3754  *
3755  *  \param  di      osd iterator
3756  *  \param  key     key for index
3757  *
3758  *  \retval +ve  di points to record with least key not larger than key
3759  *  \retval  0   di points to exact matched key
3760  *  \retval -ve  failure
3761  */
3762
3763 static int osd_it_iam_get(const struct lu_env *env,
3764                       struct dt_it *di, const struct dt_key *key)
3765 {
3766         struct osd_it_iam *it = (struct osd_it_iam *)di;
3767
3768         return iam_it_get(&it->oi_it, (const struct iam_key *)key);
3769 }
3770
3771 /**
3772  *  Release Iterator
3773  *
3774  *  \param  di      osd iterator
3775  */
3776
3777 static void osd_it_iam_put(const struct lu_env *env, struct dt_it *di)
3778 {
3779         struct osd_it_iam *it = (struct osd_it_iam *)di;
3780
3781         iam_it_put(&it->oi_it);
3782 }
3783
3784 /**
3785  *  Move iterator by one record
3786  *
3787  *  \param  di      osd iterator
3788  *
3789  *  \retval +1   end of container reached
3790  *  \retval  0   success
3791  *  \retval -ve  failure
3792  */
3793
3794 static int osd_it_iam_next(const struct lu_env *env, struct dt_it *di)
3795 {
3796         struct osd_it_iam *it = (struct osd_it_iam *)di;
3797
3798         return iam_it_next(&it->oi_it);
3799 }
3800
3801 /**
3802  * Return pointer to the key under iterator.
3803  */
3804
3805 static struct dt_key *osd_it_iam_key(const struct lu_env *env,
3806                                  const struct dt_it *di)
3807 {
3808         struct osd_it_iam *it = (struct osd_it_iam *)di;
3809
3810         return (struct dt_key *)iam_it_key_get(&it->oi_it);
3811 }
3812
3813 /**
3814  * Return size of key under iterator (in bytes)
3815  */
3816
3817 static int osd_it_iam_key_size(const struct lu_env *env, const struct dt_it *di)
3818 {
3819         struct osd_it_iam *it = (struct osd_it_iam *)di;
3820
3821         return iam_it_key_size(&it->oi_it);
3822 }
3823
3824 static inline void osd_it_append_attrs(struct lu_dirent*ent,
3825                                        __u32 attr,
3826                                        int len,
3827                                        __u16 type)
3828 {
3829         struct luda_type        *lt;
3830         const unsigned           align = sizeof(struct luda_type) - 1;
3831
3832         /* check if file type is required */
3833         if (attr & LUDA_TYPE) {
3834                         len = (len + align) & ~align;
3835
3836                         lt = (void *) ent->lde_name + len;
3837                         lt->lt_type = cpu_to_le16(CFS_DTTOIF(type));
3838                         ent->lde_attrs |= LUDA_TYPE;
3839         }
3840
3841         ent->lde_attrs = cpu_to_le32(ent->lde_attrs);
3842 }
3843
3844 /**
3845  * build lu direct from backend fs dirent.
3846  */
3847
3848 static inline void osd_it_pack_dirent(struct lu_dirent *ent,
3849                                       struct lu_fid *fid,
3850                                       __u64 offset,
3851                                       char *name,
3852                                       __u16 namelen,
3853                                       __u16 type,
3854                                       __u32 attr)
3855 {
3856         fid_cpu_to_le(&ent->lde_fid, fid);
3857         ent->lde_attrs = LUDA_FID;
3858
3859         ent->lde_hash = cpu_to_le64(offset);
3860         ent->lde_reclen = cpu_to_le16(lu_dirent_calc_size(namelen, attr));
3861
3862         strncpy(ent->lde_name, name, namelen);
3863         ent->lde_namelen = cpu_to_le16(namelen);
3864
3865         /* append lustre attributes */
3866         osd_it_append_attrs(ent, attr, namelen, type);
3867 }
3868
3869 /**
3870  * Return pointer to the record under iterator.
3871  */
3872 static int osd_it_iam_rec(const struct lu_env *env,
3873                           const struct dt_it *di,
3874                           struct dt_rec *dtrec,
3875                           __u32 attr)
3876 {
3877         struct osd_it_iam *it        = (struct osd_it_iam *)di;
3878         struct osd_thread_info *info = osd_oti_get(env);
3879         struct lu_fid     *fid       = &info->oti_fid;
3880         const struct osd_fid_pack *rec;
3881         struct lu_dirent *lde = (struct lu_dirent *)dtrec;
3882         char *name;
3883         int namelen;
3884         __u64 hash;
3885         int rc;
3886
3887         name = (char *)iam_it_key_get(&it->oi_it);
3888         if (IS_ERR(name))
3889                 RETURN(PTR_ERR(name));
3890
3891         namelen = iam_it_key_size(&it->oi_it);
3892
3893         rec = (const struct osd_fid_pack *) iam_it_rec_get(&it->oi_it);
3894         if (IS_ERR(rec))
3895                 RETURN(PTR_ERR(rec));
3896
3897         rc = osd_fid_unpack(fid, rec);
3898         if (rc)
3899                 RETURN(rc);
3900
3901         hash = iam_it_store(&it->oi_it);
3902
3903         /* IAM does not store object type in IAM index (dir) */
3904         osd_it_pack_dirent(lde, fid, hash, name, namelen,
3905                            0, LUDA_FID);
3906
3907         return 0;
3908 }
3909
3910 /**
3911  * Returns cookie for current Iterator position.
3912  */
3913 static __u64 osd_it_iam_store(const struct lu_env *env, const struct dt_it *di)
3914 {
3915         struct osd_it_iam *it = (struct osd_it_iam *)di;
3916
3917         return iam_it_store(&it->oi_it);
3918 }
3919
3920 /**
3921  * Restore iterator from cookie.
3922  *
3923  * \param  di      osd iterator
3924  * \param  hash    Iterator location cookie
3925  *
3926  * \retval +ve  di points to record with least key not larger than key.
3927  * \retval  0   di points to exact matched key
3928  * \retval -ve  failure
3929  */
3930
3931 static int osd_it_iam_load(const struct lu_env *env,
3932                        const struct dt_it *di, __u64 hash)
3933 {
3934         struct osd_it_iam *it = (struct osd_it_iam *)di;
3935
3936         return iam_it_load(&it->oi_it, hash);
3937 }
3938
3939 static const struct dt_index_operations osd_index_iam_ops = {
3940         .dio_lookup         = osd_index_iam_lookup,
3941         .dio_declare_insert = osd_index_declare_iam_insert,
3942         .dio_insert         = osd_index_iam_insert,
3943         .dio_declare_delete = osd_index_declare_iam_delete,
3944         .dio_delete         = osd_index_iam_delete,
3945         .dio_it     = {
3946                 .init     = osd_it_iam_init,
3947                 .fini     = osd_it_iam_fini,
3948                 .get      = osd_it_iam_get,
3949                 .put      = osd_it_iam_put,
3950                 .next     = osd_it_iam_next,
3951                 .key      = osd_it_iam_key,
3952                 .key_size = osd_it_iam_key_size,
3953                 .rec      = osd_it_iam_rec,
3954                 .store    = osd_it_iam_store,
3955                 .load     = osd_it_iam_load
3956         }
3957 };
3958
3959 /**
3960  * Creates or initializes iterator context.
3961  *
3962  * \retval struct osd_it_ea, iterator structure on success
3963  *
3964  */
3965 static struct dt_it *osd_it_ea_init(const struct lu_env *env,
3966                                     struct dt_object *dt,
3967                                     __u32 attr,
3968                                     struct lustre_capa *capa)
3969 {
3970         struct osd_object       *obj  = osd_dt_obj(dt);
3971         struct osd_thread_info  *info = osd_oti_get(env);
3972         struct osd_it_ea        *it   = &info->oti_it_ea;
3973         struct lu_object        *lo   = &dt->do_lu;
3974         struct dentry           *obj_dentry = &info->oti_it_dentry;
3975         ENTRY;
3976         LASSERT(lu_object_exists(lo));
3977
3978         obj_dentry->d_inode = obj->oo_inode;
3979         obj_dentry->d_sb = osd_sb(osd_obj2dev(obj));
3980         obj_dentry->d_name.hash = 0;
3981
3982         it->oie_rd_dirent       = 0;
3983         it->oie_it_dirent       = 0;
3984         it->oie_dirent          = NULL;
3985         it->oie_buf             = info->oti_it_ea_buf;
3986         it->oie_obj             = obj;
3987         it->oie_file.f_pos      = 0;
3988         it->oie_file.f_dentry   = obj_dentry;
3989         if (attr & LUDA_64BITHASH)
3990                 it->oie_file.f_flags = O_64BITHASH;
3991         else
3992                 it->oie_file.f_flags = O_32BITHASH;
3993         it->oie_file.f_mapping    = obj->oo_inode->i_mapping;
3994         it->oie_file.f_op         = obj->oo_inode->i_fop;
3995         it->oie_file.private_data = NULL;
3996         lu_object_get(lo);
3997         RETURN((struct dt_it *) it);
3998 }
3999
4000 /**
4001  * Destroy or finishes iterator context.
4002  *
4003  * \param di iterator structure to be destroyed
4004  */
4005 static void osd_it_ea_fini(const struct lu_env *env, struct dt_it *di)
4006 {
4007         struct osd_it_ea     *it   = (struct osd_it_ea *)di;
4008         struct osd_object    *obj  = it->oie_obj;
4009         struct inode       *inode  = obj->oo_inode;
4010
4011         ENTRY;
4012         it->oie_file.f_op->release(inode, &it->oie_file);
4013         lu_object_put(env, &obj->oo_dt.do_lu);
4014         EXIT;
4015 }
4016
4017 /**
4018  * It position the iterator at given key, so that next lookup continues from
4019  * that key Or it is similar to dio_it->load() but based on a key,
4020  * rather than file position.
4021  *
4022  * As a special convention, osd_it_ea_get(env, di, "") has to rewind iterator
4023  * to the beginning.
4024  *
4025  * TODO: Presently return +1 considering it is only used by mdd_dir_is_empty().
4026  */
4027 static int osd_it_ea_get(const struct lu_env *env,
4028                          struct dt_it *di, const struct dt_key *key)
4029 {
4030         struct osd_it_ea     *it   = (struct osd_it_ea *)di;
4031
4032         ENTRY;
4033         LASSERT(((const char *)key)[0] == '\0');
4034         it->oie_file.f_pos      = 0;
4035         it->oie_rd_dirent       = 0;
4036         it->oie_it_dirent       = 0;
4037         it->oie_dirent          = NULL;
4038
4039         RETURN(+1);
4040 }
4041
4042 /**
4043  * Does nothing
4044  */
4045 static void osd_it_ea_put(const struct lu_env *env, struct dt_it *di)
4046 {
4047 }
4048
4049 /**
4050  * It is called internally by ->readdir(). It fills the
4051  * iterator's in-memory data structure with required
4052  * information i.e. name, namelen, rec_size etc.
4053  *
4054  * \param buf in which information to be filled in.
4055  * \param name name of the file in given dir
4056  *
4057  * \retval 0 on success
4058  * \retval 1 on buffer full
4059  */
4060 static int osd_ldiskfs_filldir(char *buf, const char *name, int namelen,
4061                                loff_t offset, __u64 ino,
4062                                unsigned d_type)
4063 {
4064         struct osd_it_ea        *it   = (struct osd_it_ea *)buf;
4065         struct osd_it_ea_dirent *ent  = it->oie_dirent;
4066         struct lu_fid           *fid  = &ent->oied_fid;
4067         struct osd_fid_pack     *rec;
4068         ENTRY;
4069
4070         /* this should never happen */
4071         if (unlikely(namelen == 0 || namelen > LDISKFS_NAME_LEN)) {
4072                 CERROR("ldiskfs return invalid namelen %d\n", namelen);
4073                 RETURN(-EIO);
4074         }
4075
4076         if ((void *) ent - it->oie_buf + sizeof(*ent) + namelen >
4077             OSD_IT_EA_BUFSIZE)
4078                 RETURN(1);
4079
4080         if (d_type & LDISKFS_DIRENT_LUFID) {
4081                 rec = (struct osd_fid_pack*) (name + namelen + 1);
4082
4083                 if (osd_fid_unpack(fid, rec) != 0)
4084                         fid_zero(fid);
4085
4086                 d_type &= ~LDISKFS_DIRENT_LUFID;
4087         } else {
4088                 fid_zero(fid);
4089         }
4090
4091         ent->oied_ino     = ino;
4092         ent->oied_off     = offset;
4093         ent->oied_namelen = namelen;
4094         ent->oied_type    = d_type;
4095
4096         memcpy(ent->oied_name, name, namelen);
4097
4098         it->oie_rd_dirent++;
4099         it->oie_dirent = (void *) ent + cfs_size_round(sizeof(*ent) + namelen);
4100         RETURN(0);
4101 }
4102
4103 /**
4104  * Calls ->readdir() to load a directory entry at a time
4105  * and stored it in iterator's in-memory data structure.
4106  *
4107  * \param di iterator's in memory structure
4108  *
4109  * \retval   0 on success
4110  * \retval -ve on error
4111  */
4112 static int osd_ldiskfs_it_fill(const struct lu_env *env,
4113                                const struct dt_it *di)
4114 {
4115         struct osd_it_ea   *it    = (struct osd_it_ea *)di;
4116         struct osd_object  *obj   = it->oie_obj;
4117         struct inode       *inode = obj->oo_inode;
4118         struct htree_lock  *hlock = NULL;
4119         int                 result = 0;
4120
4121         ENTRY;
4122         it->oie_dirent = it->oie_buf;
4123         it->oie_rd_dirent = 0;
4124
4125         if (obj->oo_hl_head != NULL) {
4126                 hlock = osd_oti_get(env)->oti_hlock;
4127                 ldiskfs_htree_lock(hlock, obj->oo_hl_head,
4128                                    inode, LDISKFS_HLOCK_READDIR);
4129         } else {
4130                 cfs_down_read(&obj->oo_ext_idx_sem);
4131         }
4132
4133         result = inode->i_fop->readdir(&it->oie_file, it,
4134                                        (filldir_t) osd_ldiskfs_filldir);
4135
4136         if (hlock != NULL)
4137                 ldiskfs_htree_unlock(hlock);
4138         else
4139                 cfs_up_read(&obj->oo_ext_idx_sem);
4140
4141         if (it->oie_rd_dirent == 0) {
4142                 result = -EIO;
4143         } else {
4144                 it->oie_dirent = it->oie_buf;
4145                 it->oie_it_dirent = 1;
4146         }
4147
4148         RETURN(result);
4149 }
4150
4151 /**
4152  * It calls osd_ldiskfs_it_fill() which will use ->readdir()
4153  * to load a directory entry at a time and stored it in
4154  * iterator's in-memory data structure.
4155  *
4156  * \param di iterator's in memory structure
4157  *
4158  * \retval +ve iterator reached to end
4159  * \retval   0 iterator not reached to end
4160  * \retval -ve on error
4161  */
4162 static int osd_it_ea_next(const struct lu_env *env, struct dt_it *di)
4163 {
4164         struct osd_it_ea *it = (struct osd_it_ea *)di;
4165         int rc;
4166
4167         ENTRY;
4168
4169         if (it->oie_it_dirent < it->oie_rd_dirent) {
4170                 it->oie_dirent =
4171                         (void *) it->oie_dirent +
4172                         cfs_size_round(sizeof(struct osd_it_ea_dirent) +
4173                                        it->oie_dirent->oied_namelen);
4174                 it->oie_it_dirent++;
4175                 RETURN(0);
4176         } else {
4177                 if (it->oie_file.f_pos == LDISKFS_HTREE_EOF)
4178                         rc = +1;
4179                 else
4180                         rc = osd_ldiskfs_it_fill(env, di);
4181         }
4182
4183         RETURN(rc);
4184 }
4185
4186 /**
4187  * Returns the key at current position from iterator's in memory structure.
4188  *
4189  * \param di iterator's in memory structure
4190  *
4191  * \retval key i.e. struct dt_key on success
4192  */
4193 static struct dt_key *osd_it_ea_key(const struct lu_env *env,
4194                                     const struct dt_it *di)
4195 {
4196         struct osd_it_ea *it = (struct osd_it_ea *)di;
4197         ENTRY;
4198         RETURN((struct dt_key *)it->oie_dirent->oied_name);
4199 }
4200
4201 /**
4202  * Returns the key's size at current position from iterator's in memory structure.
4203  *
4204  * \param di iterator's in memory structure
4205  *
4206  * \retval key_size i.e. struct dt_key on success
4207  */
4208 static int osd_it_ea_key_size(const struct lu_env *env, const struct dt_it *di)
4209 {
4210         struct osd_it_ea *it = (struct osd_it_ea *)di;
4211         ENTRY;
4212         RETURN(it->oie_dirent->oied_namelen);
4213 }
4214
4215
4216 /**
4217  * Returns the value (i.e. fid/igif) at current position from iterator's
4218  * in memory structure.
4219  *
4220  * \param di struct osd_it_ea, iterator's in memory structure
4221  * \param attr attr requested for dirent.
4222  * \param lde lustre dirent
4223  *
4224  * \retval   0 no error and \param lde has correct lustre dirent.
4225  * \retval -ve on error
4226  */
4227 static inline int osd_it_ea_rec(const struct lu_env *env,
4228                                 const struct dt_it *di,
4229                                 struct dt_rec *dtrec,
4230                                 __u32 attr)
4231 {
4232         struct osd_it_ea        *it     = (struct osd_it_ea *)di;
4233         struct osd_object       *obj    = it->oie_obj;
4234         struct lu_fid           *fid    = &it->oie_dirent->oied_fid;
4235         struct lu_dirent        *lde    = (struct lu_dirent *)dtrec;
4236         int    rc = 0;
4237
4238         ENTRY;
4239
4240         if (!fid_is_sane(fid))
4241                 rc = osd_ea_fid_get(env, obj, it->oie_dirent->oied_ino, fid);
4242
4243         if (rc == 0)
4244                 osd_it_pack_dirent(lde, fid, it->oie_dirent->oied_off,
4245                                    it->oie_dirent->oied_name,
4246                                    it->oie_dirent->oied_namelen,
4247                                    it->oie_dirent->oied_type,
4248                                    attr);
4249         RETURN(rc);
4250 }
4251
4252 /**
4253  * Returns a cookie for current position of the iterator head, so that
4254  * user can use this cookie to load/start the iterator next time.
4255  *
4256  * \param di iterator's in memory structure
4257  *
4258  * \retval cookie for current position, on success
4259  */
4260 static __u64 osd_it_ea_store(const struct lu_env *env, const struct dt_it *di)
4261 {
4262         struct osd_it_ea *it = (struct osd_it_ea *)di;
4263         ENTRY;
4264         RETURN(it->oie_dirent->oied_off);
4265 }
4266
4267 /**
4268  * It calls osd_ldiskfs_it_fill() which will use ->readdir()
4269  * to load a directory entry at a time and stored it i inn,
4270  * in iterator's in-memory data structure.
4271  *
4272  * \param di struct osd_it_ea, iterator's in memory structure
4273  *
4274  * \retval +ve on success
4275  * \retval -ve on error
4276  */
4277 static int osd_it_ea_load(const struct lu_env *env,
4278                           const struct dt_it *di, __u64 hash)
4279 {
4280         struct osd_it_ea *it = (struct osd_it_ea *)di;
4281         int rc;
4282
4283         ENTRY;
4284         it->oie_file.f_pos = hash;
4285
4286         rc =  osd_ldiskfs_it_fill(env, di);
4287         if (rc == 0)
4288                 rc = +1;
4289
4290         RETURN(rc);
4291 }
4292
4293 /**
4294  * Index lookup function for interoperability mode (b11826).
4295  *
4296  * \param key,  key i.e. file name to be searched
4297  *
4298  * \retval +ve, on success
4299  * \retval -ve, on error
4300  */
4301 static int osd_index_ea_lookup(const struct lu_env *env, struct dt_object *dt,
4302                                struct dt_rec *rec, const struct dt_key *key,
4303                                struct lustre_capa *capa)
4304 {
4305         struct osd_object *obj = osd_dt_obj(dt);
4306         int rc = 0;
4307
4308         ENTRY;
4309
4310         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
4311         LINVRNT(osd_invariant(obj));
4312
4313         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
4314                 return -EACCES;
4315
4316         rc = osd_ea_lookup_rec(env, obj, rec, key);
4317
4318         if (rc == 0)
4319                 rc = +1;
4320         RETURN(rc);
4321 }
4322
4323 /**
4324  * Index and Iterator operations for interoperability
4325  * mode (i.e. to run 2.0 mds on 1.8 disk) (b11826)
4326  */
4327 static const struct dt_index_operations osd_index_ea_ops = {
4328         .dio_lookup         = osd_index_ea_lookup,
4329         .dio_declare_insert = osd_index_declare_ea_insert,
4330         .dio_insert         = osd_index_ea_insert,
4331         .dio_declare_delete = osd_index_declare_ea_delete,
4332         .dio_delete         = osd_index_ea_delete,
4333         .dio_it     = {
4334                 .init     = osd_it_ea_init,
4335                 .fini     = osd_it_ea_fini,
4336                 .get      = osd_it_ea_get,
4337                 .put      = osd_it_ea_put,
4338                 .next     = osd_it_ea_next,
4339                 .key      = osd_it_ea_key,
4340                 .key_size = osd_it_ea_key_size,
4341                 .rec      = osd_it_ea_rec,
4342                 .store    = osd_it_ea_store,
4343                 .load     = osd_it_ea_load
4344         }
4345 };
4346
4347 static void *osd_key_init(const struct lu_context *ctx,
4348                           struct lu_context_key *key)
4349 {
4350         struct osd_thread_info *info;
4351
4352         OBD_ALLOC_PTR(info);
4353         if (info == NULL)
4354                 return ERR_PTR(-ENOMEM);
4355
4356         OBD_ALLOC(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
4357         if (info->oti_it_ea_buf == NULL)
4358                 goto out_free_info;
4359
4360         info->oti_env = container_of(ctx, struct lu_env, le_ctx);
4361
4362         info->oti_hlock = ldiskfs_htree_lock_alloc();
4363         if (info->oti_hlock == NULL)
4364                 goto out_free_ea;
4365
4366         return info;
4367
4368  out_free_ea:
4369         OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
4370  out_free_info:
4371         OBD_FREE_PTR(info);
4372         return ERR_PTR(-ENOMEM);
4373 }
4374
4375 static void osd_key_fini(const struct lu_context *ctx,
4376                          struct lu_context_key *key, void* data)
4377 {
4378         struct osd_thread_info *info = data;
4379
4380         if (info->oti_hlock != NULL)
4381                 ldiskfs_htree_lock_free(info->oti_hlock);
4382         OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
4383         OBD_FREE_PTR(info);
4384 }
4385
4386 static void osd_key_exit(const struct lu_context *ctx,
4387                          struct lu_context_key *key, void *data)
4388 {
4389         struct osd_thread_info *info = data;
4390
4391         LASSERT(info->oti_r_locks == 0);
4392         LASSERT(info->oti_w_locks == 0);
4393         LASSERT(info->oti_txns    == 0);
4394 }
4395
4396 /* type constructor/destructor: osd_type_init, osd_type_fini */
4397 LU_TYPE_INIT_FINI(osd, &osd_key);
4398
4399 static struct lu_context_key osd_key = {
4400         .lct_tags = LCT_DT_THREAD | LCT_MD_THREAD,
4401         .lct_init = osd_key_init,
4402         .lct_fini = osd_key_fini,
4403         .lct_exit = osd_key_exit
4404 };
4405
4406
4407 static int osd_device_init(const struct lu_env *env, struct lu_device *d,
4408                            const char *name, struct lu_device *next)
4409 {
4410         return osd_procfs_init(osd_dev(d), name);
4411 }
4412
4413 static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
4414 {
4415         struct osd_thread_info *info = osd_oti_get(env);
4416         ENTRY;
4417         if (o->od_obj_area != NULL) {
4418                 lu_object_put(env, &o->od_obj_area->do_lu);
4419                 o->od_obj_area = NULL;
4420         }
4421         if (o->od_oi_table != NULL)
4422                 osd_oi_fini(info, &o->od_oi_table, o->od_oi_count);
4423
4424         RETURN(0);
4425 }
4426
4427 static int osd_mount(const struct lu_env *env,
4428                      struct osd_device *o, struct lustre_cfg *cfg)
4429 {
4430         struct lustre_mount_info *lmi;
4431         const char               *dev  = lustre_cfg_string(cfg, 0);
4432         struct lustre_disk_data  *ldd;
4433         struct lustre_sb_info    *lsi;
4434
4435         ENTRY;
4436         if (o->od_mount != NULL) {
4437                 CERROR("Already mounted (%s)\n", dev);
4438                 RETURN(-EEXIST);
4439         }
4440
4441         /* get mount */
4442         lmi = server_get_mount(dev);
4443         if (lmi == NULL) {
4444                 CERROR("Cannot get mount info for %s!\n", dev);
4445                 RETURN(-EFAULT);
4446         }
4447
4448         LASSERT(lmi != NULL);
4449         /* save lustre_mount_info in dt_device */
4450         o->od_mount = lmi;
4451
4452         lsi = s2lsi(lmi->lmi_sb);
4453         ldd = lsi->lsi_ldd;
4454
4455         if (ldd->ldd_flags & LDD_F_IAM_DIR) {
4456                 o->od_iop_mode = 0;
4457                 LCONSOLE_WARN("OSD: IAM mode enabled\n");
4458         } else
4459                 o->od_iop_mode = 1;
4460
4461         o->od_obj_area = NULL;
4462         RETURN(0);
4463 }
4464
4465 static struct lu_device *osd_device_fini(const struct lu_env *env,
4466                                          struct lu_device *d)
4467 {
4468         int rc;
4469         ENTRY;
4470
4471         shrink_dcache_sb(osd_sb(osd_dev(d)));
4472         osd_sync(env, lu2dt_dev(d));
4473
4474         rc = osd_procfs_fini(osd_dev(d));
4475         if (rc) {
4476                 CERROR("proc fini error %d \n", rc);
4477                 RETURN (ERR_PTR(rc));
4478         }
4479
4480         if (osd_dev(d)->od_mount)
4481                 server_put_mount(osd_dev(d)->od_mount->lmi_name,
4482                                  osd_dev(d)->od_mount->lmi_mnt);
4483         osd_dev(d)->od_mount = NULL;
4484
4485         RETURN(NULL);
4486 }
4487
4488 static struct lu_device *osd_device_alloc(const struct lu_env *env,
4489                                           struct lu_device_type *t,
4490                                           struct lustre_cfg *cfg)
4491 {
4492         struct lu_device  *l;
4493         struct osd_device *o;
4494
4495         OBD_ALLOC_PTR(o);
4496         if (o != NULL) {
4497                 int result;
4498
4499                 result = dt_device_init(&o->od_dt_dev, t);
4500                 if (result == 0) {
4501                         l = osd2lu_dev(o);
4502                         l->ld_ops = &osd_lu_ops;
4503                         o->od_dt_dev.dd_ops = &osd_dt_ops;
4504                         cfs_spin_lock_init(&o->od_osfs_lock);
4505                         o->od_osfs_age = cfs_time_shift_64(-1000);
4506                         o->od_capa_hash = init_capa_hash();
4507                         if (o->od_capa_hash == NULL) {
4508                                 dt_device_fini(&o->od_dt_dev);
4509                                 l = ERR_PTR(-ENOMEM);
4510                         }
4511                 } else
4512                         l = ERR_PTR(result);
4513
4514                 if (IS_ERR(l))
4515                         OBD_FREE_PTR(o);
4516         } else
4517                 l = ERR_PTR(-ENOMEM);
4518         return l;
4519 }
4520
4521 static struct lu_device *osd_device_free(const struct lu_env *env,
4522                                          struct lu_device *d)
4523 {
4524         struct osd_device *o = osd_dev(d);
4525         ENTRY;
4526
4527         cleanup_capa_hash(o->od_capa_hash);
4528         dt_device_fini(&o->od_dt_dev);
4529         OBD_FREE_PTR(o);
4530         RETURN(NULL);
4531 }
4532
4533 static int osd_process_config(const struct lu_env *env,
4534                               struct lu_device *d, struct lustre_cfg *cfg)
4535 {
4536         struct osd_device *o = osd_dev(d);
4537         int err;
4538         ENTRY;
4539
4540         switch(cfg->lcfg_command) {
4541         case LCFG_SETUP:
4542                 err = osd_mount(env, o, cfg);
4543                 break;
4544         case LCFG_CLEANUP:
4545                 err = osd_shutdown(env, o);
4546                 break;
4547         default:
4548                 err = -ENOSYS;
4549         }
4550
4551         RETURN(err);
4552 }
4553
4554 static int osd_recovery_complete(const struct lu_env *env,
4555                                  struct lu_device *d)
4556 {
4557         RETURN(0);
4558 }
4559
4560 static int osd_prepare(const struct lu_env *env,
4561                        struct lu_device *pdev,
4562                        struct lu_device *dev)
4563 {
4564         struct osd_device *osd = osd_dev(dev);
4565         struct lustre_sb_info *lsi;
4566         struct lustre_disk_data *ldd;
4567         struct lustre_mount_info  *lmi;
4568         struct osd_thread_info *oti = osd_oti_get(env);
4569         struct dt_object *d;
4570         int result;
4571
4572         ENTRY;
4573         /* 1. initialize oi before any file create or file open */
4574         result = osd_oi_init(oti, &osd->od_oi_table,
4575                              &osd->od_dt_dev, lu2md_dev(pdev));
4576         if (result < 0)
4577                 RETURN(result);
4578
4579         LASSERT(result > 0);
4580         osd->od_oi_count = result;
4581
4582         lmi = osd->od_mount;
4583         lsi = s2lsi(lmi->lmi_sb);
4584         ldd = lsi->lsi_ldd;
4585
4586         /* 2. setup local objects */
4587         result = llo_local_objects_setup(env, lu2md_dev(pdev), lu2dt_dev(dev));
4588         if (result)
4589                 goto out;
4590
4591         /* 3. open remote object dir */
4592         d = dt_store_open(env, lu2dt_dev(dev), "",
4593                           remote_obj_dir, &oti->oti_fid);
4594         if (!IS_ERR(d)) {
4595                 osd->od_obj_area = d;
4596                 result = 0;
4597         } else {
4598                 result = PTR_ERR(d);
4599                 osd->od_obj_area = NULL;
4600         }
4601
4602 out:
4603         RETURN(result);
4604 }
4605
4606 static const struct lu_object_operations osd_lu_obj_ops = {
4607         .loo_object_init      = osd_object_init,
4608         .loo_object_delete    = osd_object_delete,
4609         .loo_object_release   = osd_object_release,
4610         .loo_object_free      = osd_object_free,
4611         .loo_object_print     = osd_object_print,
4612         .loo_object_invariant = osd_object_invariant
4613 };
4614
4615 static const struct lu_device_operations osd_lu_ops = {
4616         .ldo_object_alloc      = osd_object_alloc,
4617         .ldo_process_config    = osd_process_config,
4618         .ldo_recovery_complete = osd_recovery_complete,
4619         .ldo_prepare           = osd_prepare,
4620 };
4621
4622 static const struct lu_device_type_operations osd_device_type_ops = {
4623         .ldto_init = osd_type_init,
4624         .ldto_fini = osd_type_fini,
4625
4626         .ldto_start = osd_type_start,
4627         .ldto_stop  = osd_type_stop,
4628
4629         .ldto_device_alloc = osd_device_alloc,
4630         .ldto_device_free  = osd_device_free,
4631
4632         .ldto_device_init    = osd_device_init,
4633         .ldto_device_fini    = osd_device_fini
4634 };
4635
4636 static struct lu_device_type osd_device_type = {
4637         .ldt_tags     = LU_DEVICE_DT,
4638         .ldt_name     = LUSTRE_OSD_NAME,
4639         .ldt_ops      = &osd_device_type_ops,
4640         .ldt_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
4641 };
4642
4643 /*
4644  * lprocfs legacy support.
4645  */
4646 static struct obd_ops osd_obd_device_ops = {
4647         .o_owner = THIS_MODULE
4648 };
4649
4650 static struct lu_local_obj_desc llod_osd_rem_obj_dir = {
4651         .llod_name      = remote_obj_dir,
4652         .llod_oid       = OSD_REM_OBJ_DIR_OID,
4653         .llod_is_index  = 1,
4654         .llod_feat      = &dt_directory_features,
4655 };
4656
4657 static int __init osd_mod_init(void)
4658 {
4659         struct lprocfs_static_vars lvars;
4660
4661         osd_oi_mod_init();
4662         llo_local_obj_register(&llod_osd_rem_obj_dir);
4663         lprocfs_osd_init_vars(&lvars);
4664         return class_register_type(&osd_obd_device_ops, NULL, lvars.module_vars,
4665                                    LUSTRE_OSD_NAME, &osd_device_type);
4666 }
4667
4668 static void __exit osd_mod_exit(void)
4669 {
4670         llo_local_obj_unregister(&llod_osd_rem_obj_dir);
4671         class_unregister_type(LUSTRE_OSD_NAME);
4672 }
4673
4674 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4675 MODULE_DESCRIPTION("Lustre Object Storage Device ("LUSTRE_OSD_NAME")");
4676 MODULE_LICENSE("GPL");
4677
4678 cfs_module(osd, "0.0.2", osd_mod_init, osd_mod_exit);