Whamcloud - gitweb
ORNL-22 general ptlrpcd threads pool support
[fs/lustre-release.git] / lustre / osd-ldiskfs / osd_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011 Whamcloud, Inc.
33  *
34  */
35 /*
36  * Copyright (c) 2011 Whamcloud, Inc.
37  */
38 /*
39  * This file is part of Lustre, http://www.lustre.org/
40  * Lustre is a trademark of Sun Microsystems, Inc.
41  *
42  * lustre/osd/osd_handler.c
43  *
44  * Top-level entry points into osd module
45  *
46  * Author: Nikita Danilov <nikita@clusterfs.com>
47  *         Pravin Shelar <pravin.shelar@sun.com> : Added fid in dirent
48  */
49
50 #ifndef EXPORT_SYMTAB
51 # define EXPORT_SYMTAB
52 #endif
53 #define DEBUG_SUBSYSTEM S_MDS
54
55 #include <linux/module.h>
56
57 /* LUSTRE_VERSION_CODE */
58 #include <lustre_ver.h>
59 /* prerequisite for linux/xattr.h */
60 #include <linux/types.h>
61 /* prerequisite for linux/xattr.h */
62 #include <linux/fs.h>
63 /* XATTR_{REPLACE,CREATE} */
64 #include <linux/xattr.h>
65 /* simple_mkdir() */
66 #include <lvfs.h>
67
68 /*
69  * struct OBD_{ALLOC,FREE}*()
70  * OBD_FAIL_CHECK
71  */
72 #include <obd_support.h>
73 /* struct ptlrpc_thread */
74 #include <lustre_net.h>
75
76 /* fid_is_local() */
77 #include <lustre_fid.h>
78
79 #include "osd_internal.h"
80 #include "osd_igif.h"
81
82 /* llo_* api support */
83 #include <md_object.h>
84
85 static const char dot[] = ".";
86 static const char dotdot[] = "..";
87 static const char remote_obj_dir[] = "REM_OBJ_DIR";
88
89 struct osd_directory {
90         struct iam_container od_container;
91         struct iam_descr     od_descr;
92 };
93
94 struct osd_object {
95         struct dt_object       oo_dt;
96         /**
97          * Inode for file system object represented by this osd_object. This
98          * inode is pinned for the whole duration of lu_object life.
99          *
100          * Not modified concurrently (either setup early during object
101          * creation, or assigned by osd_object_create() under write lock).
102          */
103         struct inode          *oo_inode;
104         /**
105          * to protect index ops.
106          */
107         cfs_rw_semaphore_t     oo_ext_idx_sem;
108         cfs_rw_semaphore_t     oo_sem;
109         struct osd_directory  *oo_dir;
110         /** protects inode attributes. */
111         cfs_spinlock_t         oo_guard;
112         /**
113          * Following two members are used to indicate the presence of dot and
114          * dotdot in the given directory. This is required for interop mode
115          * (b11826).
116          */
117         int                    oo_compat_dot_created;
118         int                    oo_compat_dotdot_created;
119
120         const struct lu_env   *oo_owner;
121 #ifdef CONFIG_LOCKDEP
122         struct lockdep_map     oo_dep_map;
123 #endif
124 };
125
126 static const struct lu_object_operations      osd_lu_obj_ops;
127 static const struct lu_device_operations      osd_lu_ops;
128 static       struct lu_context_key            osd_key;
129 static const struct dt_object_operations      osd_obj_ops;
130 static const struct dt_object_operations      osd_obj_ea_ops;
131 static const struct dt_body_operations        osd_body_ops;
132 static const struct dt_index_operations       osd_index_iam_ops;
133 static const struct dt_index_operations       osd_index_ea_ops;
134
135 struct osd_thandle {
136         struct thandle          ot_super;
137         handle_t               *ot_handle;
138         struct journal_callback ot_jcb;
139         /* Link to the device, for debugging. */
140         struct lu_ref_link     *ot_dev_link;
141
142 #if OSD_THANDLE_STATS
143         /** time when this handle was allocated */
144         cfs_time_t oth_alloced;
145
146         /** time when this thanle was started */
147         cfs_time_t oth_started;
148 #endif
149 };
150
151 /*
152  * Helpers.
153  */
154 static int lu_device_is_osd(const struct lu_device *d)
155 {
156         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &osd_lu_ops);
157 }
158
159 static struct osd_device *osd_dt_dev(const struct dt_device *d)
160 {
161         LASSERT(lu_device_is_osd(&d->dd_lu_dev));
162         return container_of0(d, struct osd_device, od_dt_dev);
163 }
164
165 static struct osd_device *osd_dev(const struct lu_device *d)
166 {
167         LASSERT(lu_device_is_osd(d));
168         return osd_dt_dev(container_of0(d, struct dt_device, dd_lu_dev));
169 }
170
171 static struct osd_device *osd_obj2dev(const struct osd_object *o)
172 {
173         return osd_dev(o->oo_dt.do_lu.lo_dev);
174 }
175
176 static struct super_block *osd_sb(const struct osd_device *dev)
177 {
178         return dev->od_mount->lmi_mnt->mnt_sb;
179 }
180
181 static int osd_object_is_root(const struct osd_object *obj)
182 {
183         return osd_sb(osd_obj2dev(obj))->s_root->d_inode == obj->oo_inode;
184 }
185
186 static struct osd_object *osd_obj(const struct lu_object *o)
187 {
188         LASSERT(lu_device_is_osd(o->lo_dev));
189         return container_of0(o, struct osd_object, oo_dt.do_lu);
190 }
191
192 static struct osd_object *osd_dt_obj(const struct dt_object *d)
193 {
194         return osd_obj(&d->do_lu);
195 }
196
197 static struct lu_device *osd2lu_dev(struct osd_device *osd)
198 {
199         return &osd->od_dt_dev.dd_lu_dev;
200 }
201
202 static journal_t *osd_journal(const struct osd_device *dev)
203 {
204         return LDISKFS_SB(osd_sb(dev))->s_journal;
205 }
206
207 static int osd_has_index(const struct osd_object *obj)
208 {
209         return obj->oo_dt.do_index_ops != NULL;
210 }
211
212 static int osd_object_invariant(const struct lu_object *l)
213 {
214         return osd_invariant(osd_obj(l));
215 }
216
217 #ifdef HAVE_QUOTA_SUPPORT
218 static inline void
219 osd_push_ctxt(const struct lu_env *env, struct osd_ctxt *save)
220 {
221         struct md_ucred    *uc = md_ucred(env);
222         struct cred        *tc;
223
224         LASSERT(uc != NULL);
225
226         save->oc_uid = current_fsuid();
227         save->oc_gid = current_fsgid();
228         save->oc_cap = current_cap();
229         if ((tc = prepare_creds())) {
230                 tc->fsuid         = uc->mu_fsuid;
231                 tc->fsgid         = uc->mu_fsgid;
232                 commit_creds(tc);
233         }
234         /* XXX not suboptimal */
235         cfs_curproc_cap_unpack(uc->mu_cap);
236 }
237
238 static inline void
239 osd_pop_ctxt(struct osd_ctxt *save)
240 {
241         struct cred *tc;
242
243         if ((tc = prepare_creds())) {
244                 tc->fsuid         = save->oc_uid;
245                 tc->fsgid         = save->oc_gid;
246                 tc->cap_effective = save->oc_cap;
247                 commit_creds(tc);
248         }
249 }
250 #endif
251
252 static inline struct osd_thread_info *osd_oti_get(const struct lu_env *env)
253 {
254         return lu_context_key_get(&env->le_ctx, &osd_key);
255 }
256
257 /*
258  * Concurrency: doesn't matter
259  */
260 static int osd_read_locked(const struct lu_env *env, struct osd_object *o)
261 {
262         return osd_oti_get(env)->oti_r_locks > 0;
263 }
264
265 /*
266  * Concurrency: doesn't matter
267  */
268 static int osd_write_locked(const struct lu_env *env, struct osd_object *o)
269 {
270         struct osd_thread_info *oti = osd_oti_get(env);
271         return oti->oti_w_locks > 0 && o->oo_owner == env;
272 }
273
274 /*
275  * Concurrency: doesn't access mutable data
276  */
277 static int osd_root_get(const struct lu_env *env,
278                         struct dt_device *dev, struct lu_fid *f)
279 {
280         struct inode *inode;
281
282         inode = osd_sb(osd_dt_dev(dev))->s_root->d_inode;
283         LU_IGIF_BUILD(f, inode->i_ino, inode->i_generation);
284         return 0;
285 }
286
287 /*
288  * OSD object methods.
289  */
290
291 /*
292  * Concurrency: no concurrent access is possible that early in object
293  * life-cycle.
294  */
295 static struct lu_object *osd_object_alloc(const struct lu_env *env,
296                                           const struct lu_object_header *hdr,
297                                           struct lu_device *d)
298 {
299         struct osd_object *mo;
300
301         OBD_ALLOC_PTR(mo);
302         if (mo != NULL) {
303                 struct lu_object *l;
304
305                 l = &mo->oo_dt.do_lu;
306                 dt_object_init(&mo->oo_dt, NULL, d);
307                 if (osd_dev(d)->od_iop_mode)
308                         mo->oo_dt.do_ops = &osd_obj_ea_ops;
309                 else
310                         mo->oo_dt.do_ops = &osd_obj_ops;
311
312                 l->lo_ops = &osd_lu_obj_ops;
313                 cfs_init_rwsem(&mo->oo_sem);
314                 cfs_init_rwsem(&mo->oo_ext_idx_sem);
315                 cfs_spin_lock_init(&mo->oo_guard);
316                 return l;
317         } else
318                 return NULL;
319 }
320
321 /*
322  * retrieve object from backend ext fs.
323  **/
324 static struct inode *osd_iget(struct osd_thread_info *info,
325                               struct osd_device *dev,
326                               const struct osd_inode_id *id)
327 {
328         struct inode *inode = NULL;
329
330 #ifdef HAVE_EXT4_LDISKFS
331         inode = ldiskfs_iget(osd_sb(dev), id->oii_ino);
332         if (IS_ERR(inode))
333         /* Newer kernels return an error instead of a NULL pointer */
334                 inode = NULL;
335 #else
336         inode = iget(osd_sb(dev), id->oii_ino);
337 #endif
338         if (inode == NULL) {
339                 CERROR("no inode\n");
340                 inode = ERR_PTR(-EACCES);
341         } else if (id->oii_gen != OSD_OII_NOGEN &&
342                    inode->i_generation != id->oii_gen) {
343                 iput(inode);
344                 inode = ERR_PTR(-ESTALE);
345         } else if (inode->i_nlink == 0) {
346                 /* due to parallel readdir and unlink,
347                 * we can have dead inode here. */
348                 CWARN("stale inode\n");
349                 make_bad_inode(inode);
350                 iput(inode);
351                 inode = ERR_PTR(-ESTALE);
352         } else if (is_bad_inode(inode)) {
353                 CERROR("bad inode %lx\n",inode->i_ino);
354                 iput(inode);
355                 inode = ERR_PTR(-ENOENT);
356         } else {
357                 /* Do not update file c/mtime in ldiskfs.
358                  * NB: we don't have any lock to protect this because we don't
359                  * have reference on osd_object now, but contention with
360                  * another lookup + attr_set can't happen in the tiny window
361                  * between if (...) and set S_NOCMTIME. */
362                 if (!(inode->i_flags & S_NOCMTIME))
363                         inode->i_flags |= S_NOCMTIME;
364         }
365         return inode;
366 }
367
368 static int osd_fid_lookup(const struct lu_env *env,
369                           struct osd_object *obj, const struct lu_fid *fid)
370 {
371         struct osd_thread_info *info;
372         struct lu_device       *ldev = obj->oo_dt.do_lu.lo_dev;
373         struct osd_device      *dev;
374         struct osd_inode_id    *id;
375         struct osd_oi          *oi;
376         struct inode           *inode;
377         int                     result;
378
379         LINVRNT(osd_invariant(obj));
380         LASSERT(obj->oo_inode == NULL);
381         LASSERT(fid_is_sane(fid) || osd_fid_is_root(fid));
382         /*
383          * This assertion checks that osd layer sees only local
384          * fids. Unfortunately it is somewhat expensive (does a
385          * cache-lookup). Disabling it for production/acceptance-testing.
386          */
387         LASSERT(1 || fid_is_local(env, ldev->ld_site, fid));
388
389         ENTRY;
390
391         info = osd_oti_get(env);
392         dev  = osd_dev(ldev);
393         id   = &info->oti_id;
394         oi   = &dev->od_oi;
395
396         if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT))
397                 RETURN(-ENOENT);
398
399         result = osd_oi_lookup(info, oi, fid, id);
400         if (result == 0) {
401                 inode = osd_iget(info, dev, id);
402                 if (!IS_ERR(inode)) {
403                         obj->oo_inode = inode;
404                         LASSERT(obj->oo_inode->i_sb == osd_sb(dev));
405                         if (dev->od_iop_mode) {
406                                 obj->oo_compat_dot_created = 1;
407                                 obj->oo_compat_dotdot_created = 1;
408                         }
409                         result = 0;
410                 } else
411                         /*
412                          * If fid wasn't found in oi, inode-less object is
413                          * created, for which lu_object_exists() returns
414                          * false. This is used in a (frequent) case when
415                          * objects are created as locking anchors or
416                          * place holders for objects yet to be created.
417                          */
418                         result = PTR_ERR(inode);
419         } else if (result == -ENOENT)
420                 result = 0;
421         LINVRNT(osd_invariant(obj));
422
423         RETURN(result);
424 }
425
426 /*
427  * Concurrency: shouldn't matter.
428  */
429 static void osd_object_init0(struct osd_object *obj)
430 {
431         LASSERT(obj->oo_inode != NULL);
432         obj->oo_dt.do_body_ops = &osd_body_ops;
433         obj->oo_dt.do_lu.lo_header->loh_attr |=
434                 (LOHA_EXISTS | (obj->oo_inode->i_mode & S_IFMT));
435 }
436
437 /*
438  * Concurrency: no concurrent access is possible that early in object
439  * life-cycle.
440  */
441 static int osd_object_init(const struct lu_env *env, struct lu_object *l,
442                            const struct lu_object_conf *unused)
443 {
444         struct osd_object *obj = osd_obj(l);
445         int result;
446
447         LINVRNT(osd_invariant(obj));
448
449         result = osd_fid_lookup(env, obj, lu_object_fid(l));
450         if (result == 0) {
451                 if (obj->oo_inode != NULL)
452                         osd_object_init0(obj);
453         }
454         LINVRNT(osd_invariant(obj));
455         return result;
456 }
457
458 /*
459  * Concurrency: no concurrent access is possible that late in object
460  * life-cycle.
461  */
462 static void osd_object_free(const struct lu_env *env, struct lu_object *l)
463 {
464         struct osd_object *obj = osd_obj(l);
465
466         LINVRNT(osd_invariant(obj));
467
468         dt_object_fini(&obj->oo_dt);
469         OBD_FREE_PTR(obj);
470 }
471
472 /**
473  * IAM Iterator
474  */
475 static struct iam_path_descr *osd_it_ipd_get(const struct lu_env *env,
476                                              const struct iam_container *bag)
477 {
478         return bag->ic_descr->id_ops->id_ipd_alloc(bag,
479                                            osd_oti_get(env)->oti_it_ipd);
480 }
481
482 static struct iam_path_descr *osd_idx_ipd_get(const struct lu_env *env,
483                                               const struct iam_container *bag)
484 {
485         return bag->ic_descr->id_ops->id_ipd_alloc(bag,
486                                            osd_oti_get(env)->oti_idx_ipd);
487 }
488
489 static void osd_ipd_put(const struct lu_env *env,
490                         const struct iam_container *bag,
491                         struct iam_path_descr *ipd)
492 {
493         bag->ic_descr->id_ops->id_ipd_free(ipd);
494 }
495
496 /*
497  * Concurrency: no concurrent access is possible that late in object
498  * life-cycle.
499  */
500 static void osd_index_fini(struct osd_object *o)
501 {
502         struct iam_container *bag;
503
504         if (o->oo_dir != NULL) {
505                 bag = &o->oo_dir->od_container;
506                 if (o->oo_inode != NULL) {
507                         if (bag->ic_object == o->oo_inode)
508                                 iam_container_fini(bag);
509                 }
510                 OBD_FREE_PTR(o->oo_dir);
511                 o->oo_dir = NULL;
512         }
513 }
514
515 /*
516  * Concurrency: no concurrent access is possible that late in object
517  * life-cycle (for all existing callers, that is. New callers have to provide
518  * their own locking.)
519  */
520 static int osd_inode_unlinked(const struct inode *inode)
521 {
522         return inode->i_nlink == 0;
523 }
524
525 enum {
526         OSD_TXN_OI_DELETE_CREDITS    = 20,
527         OSD_TXN_INODE_DELETE_CREDITS = 20
528 };
529
530 /*
531  * Journal
532  */
533
534 #if OSD_THANDLE_STATS
535 /**
536  * Set time when the handle is allocated
537  */
538 static void osd_th_alloced(struct osd_thandle *oth)
539 {
540         oth->oth_alloced = cfs_time_current();
541 }
542
543 /**
544  * Set time when the handle started
545  */
546 static void osd_th_started(struct osd_thandle *oth)
547 {
548         oth->oth_started = cfs_time_current();
549 }
550
551 /**
552  * Helper function to convert time interval to microseconds packed in
553  * long int (default time units for the counter in "stats" initialized
554  * by lu_time_init() )
555  */
556 static long interval_to_usec(cfs_time_t start, cfs_time_t end)
557 {
558         struct timeval val;
559
560         cfs_duration_usec(cfs_time_sub(end, start), &val);
561         return val.tv_sec * 1000000 + val.tv_usec;
562 }
563
564 /**
565  * Check whether the we deal with this handle for too long.
566  */
567 static void __osd_th_check_slow(void *oth, struct osd_device *dev,
568                                 cfs_time_t alloced, cfs_time_t started,
569                                 cfs_time_t closed)
570 {
571         cfs_time_t now = cfs_time_current();
572
573         LASSERT(dev != NULL);
574
575         lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_STARTING,
576                             interval_to_usec(alloced, started));
577         lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_OPEN,
578                             interval_to_usec(started, closed));
579         lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_CLOSING,
580                             interval_to_usec(closed, now));
581
582         if (cfs_time_before(cfs_time_add(alloced, cfs_time_seconds(30)), now)) {
583                 CWARN("transaction handle %p was open for too long: "
584                       "now "CFS_TIME_T" ,"
585                       "alloced "CFS_TIME_T" ,"
586                       "started "CFS_TIME_T" ,"
587                       "closed "CFS_TIME_T"\n",
588                       oth, now, alloced, started, closed);
589                 libcfs_debug_dumpstack(NULL);
590         }
591 }
592
593 #define OSD_CHECK_SLOW_TH(oth, dev, expr)                               \
594 {                                                                       \
595         cfs_time_t __closed = cfs_time_current();                       \
596         cfs_time_t __alloced = oth->oth_alloced;                        \
597         cfs_time_t __started = oth->oth_started;                        \
598                                                                         \
599         expr;                                                           \
600         __osd_th_check_slow(oth, dev, __alloced, __started, __closed);  \
601 }
602
603 #else /* OSD_THANDLE_STATS */
604
605 #define osd_th_alloced(h)                  do {} while(0)
606 #define osd_th_started(h)                  do {} while(0)
607 #define OSD_CHECK_SLOW_TH(oth, dev, expr)  expr
608
609 #endif /* OSD_THANDLE_STATS */
610
611 /*
612  * Concurrency: doesn't access mutable data.
613  */
614 static int osd_param_is_sane(const struct osd_device *dev,
615                              const struct txn_param *param)
616 {
617         return param->tp_credits <= osd_journal(dev)->j_max_transaction_buffers;
618 }
619
620 /*
621  * Concurrency: shouldn't matter.
622  */
623 #ifdef HAVE_LDISKFS_JOURNAL_CALLBACK_ADD
624 static void osd_trans_commit_cb(struct super_block *sb,
625                                 struct journal_callback *jcb, int error)
626 #else
627 static void osd_trans_commit_cb(struct journal_callback *jcb, int error)
628 #endif
629 {
630         struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb);
631         struct thandle     *th  = &oh->ot_super;
632         struct dt_device   *dev = th->th_dev;
633         struct lu_device   *lud = &dev->dd_lu_dev;
634
635         LASSERT(dev != NULL);
636         LASSERT(oh->ot_handle == NULL);
637
638         if (error) {
639                 CERROR("transaction @0x%p commit error: %d\n", th, error);
640         } else {
641                 struct lu_env *env = &osd_dt_dev(dev)->od_env_for_commit;
642                 /*
643                  * This od_env_for_commit is only for commit usage.  see
644                  * "struct dt_device"
645                  */
646                 lu_context_enter(&env->le_ctx);
647                 dt_txn_hook_commit(env, th);
648                 lu_context_exit(&env->le_ctx);
649         }
650
651         lu_ref_del_at(&lud->ld_reference, oh->ot_dev_link, "osd-tx", th);
652         lu_device_put(lud);
653         th->th_dev = NULL;
654
655         lu_context_exit(&th->th_ctx);
656         lu_context_fini(&th->th_ctx);
657         OBD_FREE_PTR(oh);
658 }
659
660 /*
661  * Concurrency: shouldn't matter.
662  */
663 static struct thandle *osd_trans_start(const struct lu_env *env,
664                                        struct dt_device *d,
665                                        struct txn_param *p)
666 {
667         struct osd_device  *dev = osd_dt_dev(d);
668         handle_t           *jh;
669         struct osd_thandle *oh;
670         struct thandle     *th;
671         int hook_res;
672
673         ENTRY;
674
675         hook_res = dt_txn_hook_start(env, d, p);
676         if (hook_res != 0)
677                 RETURN(ERR_PTR(hook_res));
678
679         if (osd_param_is_sane(dev, p)) {
680                 OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO);
681                 if (oh != NULL) {
682                         struct osd_thread_info *oti = osd_oti_get(env);
683
684                         /*
685                          * XXX temporary stuff. Some abstraction layer should
686                          * be used.
687                          */
688                         oti->oti_dev = dev;
689                         osd_th_alloced(oh);
690                         jh = ldiskfs_journal_start_sb(osd_sb(dev), p->tp_credits);
691                         osd_th_started(oh);
692                         if (!IS_ERR(jh)) {
693                                 oh->ot_handle = jh;
694                                 th = &oh->ot_super;
695                                 th->th_dev = d;
696                                 th->th_result = 0;
697                                 jh->h_sync = p->tp_sync;
698                                 lu_device_get(&d->dd_lu_dev);
699                                 oh->ot_dev_link = lu_ref_add
700                                         (&d->dd_lu_dev.ld_reference,
701                                          "osd-tx", th);
702                                 /* add commit callback */
703                                 lu_context_init(&th->th_ctx, LCT_TX_HANDLE);
704                                 lu_context_enter(&th->th_ctx);
705                                 osd_journal_callback_set(jh,osd_trans_commit_cb,
706                                                          &oh->ot_jcb);
707                                 LASSERT(oti->oti_txns == 0);
708                                 LASSERT(oti->oti_r_locks == 0);
709                                 LASSERT(oti->oti_w_locks == 0);
710                                 oti->oti_txns++;
711                         } else {
712                                 OBD_FREE_PTR(oh);
713                                 th = (void *)jh;
714                         }
715                 } else
716                         th = ERR_PTR(-ENOMEM);
717         } else {
718                 CERROR("Invalid transaction parameters\n");
719                 th = ERR_PTR(-EINVAL);
720         }
721
722         RETURN(th);
723 }
724
725 /*
726  * Concurrency: shouldn't matter.
727  */
728 static void osd_trans_stop(const struct lu_env *env, struct thandle *th)
729 {
730         int result;
731         struct osd_thandle *oh;
732         struct osd_thread_info *oti = osd_oti_get(env);
733
734         ENTRY;
735
736         oh = container_of0(th, struct osd_thandle, ot_super);
737         if (oh->ot_handle != NULL) {
738                 handle_t *hdl = oh->ot_handle;
739
740                 LASSERT(oti->oti_txns == 1);
741                 oti->oti_txns--;
742                 LASSERT(oti->oti_r_locks == 0);
743                 LASSERT(oti->oti_w_locks == 0);
744                 result = dt_txn_hook_stop(env, th);
745                 if (result != 0)
746                         CERROR("Failure in transaction hook: %d\n", result);
747                 oh->ot_handle = NULL;
748                 OSD_CHECK_SLOW_TH(oh, oti->oti_dev,
749                                   result = ldiskfs_journal_stop(hdl));
750                 if (result != 0)
751                         CERROR("Failure to stop transaction: %d\n", result);
752         }
753         EXIT;
754 }
755
756 /*
757  * Concurrency: no concurrent access is possible that late in object
758  * life-cycle.
759  */
760 static int osd_inode_remove(const struct lu_env *env, struct osd_object *obj)
761 {
762         const struct lu_fid    *fid = lu_object_fid(&obj->oo_dt.do_lu);
763         struct osd_device      *osd = osd_obj2dev(obj);
764         struct osd_thread_info *oti = osd_oti_get(env);
765         struct txn_param       *prm = &oti->oti_txn;
766         struct lu_env          *env_del_obj = &oti->oti_obj_delete_tx_env;
767         struct thandle         *th;
768         int result;
769
770         lu_env_init(env_del_obj, LCT_DT_THREAD);
771         txn_param_init(prm, OSD_TXN_OI_DELETE_CREDITS +
772                             OSD_TXN_INODE_DELETE_CREDITS);
773         th = osd_trans_start(env_del_obj, &osd->od_dt_dev, prm);
774         if (!IS_ERR(th)) {
775                 result = osd_oi_delete(osd_oti_get(env_del_obj),
776                                        &osd->od_oi, fid, th);
777                 osd_trans_stop(env_del_obj, th);
778         } else
779                 result = PTR_ERR(th);
780
781         lu_env_fini(env_del_obj);
782         return result;
783 }
784
785 /*
786  * Called just before object is freed. Releases all resources except for
787  * object itself (that is released by osd_object_free()).
788  *
789  * Concurrency: no concurrent access is possible that late in object
790  * life-cycle.
791  */
792 static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
793 {
794         struct osd_object *obj   = osd_obj(l);
795         struct inode      *inode = obj->oo_inode;
796
797         LINVRNT(osd_invariant(obj));
798
799         /*
800          * If object is unlinked remove fid->ino mapping from object index.
801          */
802
803         osd_index_fini(obj);
804         if (inode != NULL) {
805                 int result;
806
807                 if (osd_inode_unlinked(inode)) {
808                         result = osd_inode_remove(env, obj);
809                         if (result != 0)
810                                 LU_OBJECT_DEBUG(D_ERROR, env, l,
811                                                 "Failed to cleanup: %d\n",
812                                                 result);
813                 }
814
815                 iput(inode);
816                 obj->oo_inode = NULL;
817         }
818 }
819
820 /*
821  * Concurrency: ->loo_object_release() is called under site spin-lock.
822  */
823 static void osd_object_release(const struct lu_env *env,
824                                struct lu_object *l)
825 {
826         struct osd_object *o = osd_obj(l);
827
828         LASSERT(!lu_object_is_dying(l->lo_header));
829         if (o->oo_inode != NULL && osd_inode_unlinked(o->oo_inode))
830                 cfs_set_bit(LU_OBJECT_HEARD_BANSHEE, &l->lo_header->loh_flags);
831 }
832
833 /*
834  * Concurrency: shouldn't matter.
835  */
836 static int osd_object_print(const struct lu_env *env, void *cookie,
837                             lu_printer_t p, const struct lu_object *l)
838 {
839         struct osd_object *o = osd_obj(l);
840         struct iam_descr  *d;
841
842         if (o->oo_dir != NULL)
843                 d = o->oo_dir->od_container.ic_descr;
844         else
845                 d = NULL;
846         return (*p)(env, cookie, LUSTRE_OSD_NAME"-object@%p(i:%p:%lu/%u)[%s]",
847                     o, o->oo_inode,
848                     o->oo_inode ? o->oo_inode->i_ino : 0UL,
849                     o->oo_inode ? o->oo_inode->i_generation : 0,
850                     d ? d->id_ops->id_name : "plain");
851 }
852
853 /*
854  * Concurrency: shouldn't matter.
855  */
856 int osd_statfs(const struct lu_env *env, struct dt_device *d,
857                cfs_kstatfs_t *sfs)
858 {
859         struct osd_device *osd = osd_dt_dev(d);
860         struct super_block *sb = osd_sb(osd);
861         int result = 0;
862
863         cfs_spin_lock(&osd->od_osfs_lock);
864         /* cache 1 second */
865         if (cfs_time_before_64(osd->od_osfs_age, cfs_time_shift_64(-1))) {
866                 result = ll_do_statfs(sb, &osd->od_kstatfs);
867                 if (likely(result == 0)) /* N.B. statfs can't really fail */
868                         osd->od_osfs_age = cfs_time_current_64();
869         }
870
871         if (likely(result == 0))
872                 *sfs = osd->od_kstatfs;
873         cfs_spin_unlock(&osd->od_osfs_lock);
874
875         return result;
876 }
877
878 /*
879  * Concurrency: doesn't access mutable data.
880  */
881 static void osd_conf_get(const struct lu_env *env,
882                          const struct dt_device *dev,
883                          struct dt_device_param *param)
884 {
885         /*
886          * XXX should be taken from not-yet-existing fs abstraction layer.
887          */
888         param->ddp_max_name_len  = LDISKFS_NAME_LEN;
889         param->ddp_max_nlink     = LDISKFS_LINK_MAX;
890         param->ddp_block_shift   = osd_sb(osd_dt_dev(dev))->s_blocksize_bits;
891 }
892
893 /**
894  * Helper function to get and fill the buffer with input values.
895  */
896 static struct lu_buf *osd_buf_get(const struct lu_env *env, void *area, ssize_t len)
897 {
898         struct lu_buf *buf;
899
900         buf = &osd_oti_get(env)->oti_buf;
901         buf->lb_buf = area;
902         buf->lb_len = len;
903         return buf;
904 }
905
906 /*
907  * Concurrency: shouldn't matter.
908  */
909 static int osd_sync(const struct lu_env *env, struct dt_device *d)
910 {
911         CDEBUG(D_HA, "syncing OSD %s\n", LUSTRE_OSD_NAME);
912         return ldiskfs_force_commit(osd_sb(osd_dt_dev(d)));
913 }
914
915 /**
916  * Start commit for OSD device.
917  *
918  * An implementation of dt_commit_async method for OSD device.
919  * Asychronously starts underlayng fs sync and thereby a transaction
920  * commit.
921  *
922  * \param env environment
923  * \param d dt device
924  *
925  * \see dt_device_operations
926  */
927 static int osd_commit_async(const struct lu_env *env,
928                             struct dt_device *d)
929 {
930         struct super_block *s = osd_sb(osd_dt_dev(d));
931         ENTRY;
932
933         CDEBUG(D_HA, "async commit OSD %s\n", LUSTRE_OSD_NAME);
934         RETURN(s->s_op->sync_fs(s, 0));
935 }
936
937 /*
938  * Concurrency: shouldn't matter.
939  */
940 lvfs_sbdev_type fsfilt_ldiskfs_journal_sbdev(struct super_block *);
941
942 static void osd_ro(const struct lu_env *env, struct dt_device *d)
943 {
944         ENTRY;
945
946         CERROR("*** setting device %s read-only ***\n", LUSTRE_OSD_NAME);
947
948         __lvfs_set_rdonly(lvfs_sbdev(osd_sb(osd_dt_dev(d))),
949                           fsfilt_ldiskfs_journal_sbdev(osd_sb(osd_dt_dev(d))));
950         EXIT;
951 }
952
953
954 /*
955  * Concurrency: serialization provided by callers.
956  */
957 static int osd_init_capa_ctxt(const struct lu_env *env, struct dt_device *d,
958                               int mode, unsigned long timeout, __u32 alg,
959                               struct lustre_capa_key *keys)
960 {
961         struct osd_device *dev = osd_dt_dev(d);
962         ENTRY;
963
964         dev->od_fl_capa = mode;
965         dev->od_capa_timeout = timeout;
966         dev->od_capa_alg = alg;
967         dev->od_capa_keys = keys;
968         RETURN(0);
969 }
970
971 /**
972  * Concurrency: serialization provided by callers.
973  */
974 static void osd_init_quota_ctxt(const struct lu_env *env, struct dt_device *d,
975                                struct dt_quota_ctxt *ctxt, void *data)
976 {
977         struct obd_device *obd = (void *)ctxt;
978         struct vfsmount *mnt = (struct vfsmount *)data;
979         ENTRY;
980
981         obd->u.obt.obt_sb = mnt->mnt_root->d_inode->i_sb;
982         OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
983         obd->obd_lvfs_ctxt.pwdmnt = mnt;
984         obd->obd_lvfs_ctxt.pwd = mnt->mnt_root;
985         obd->obd_lvfs_ctxt.fs = get_ds();
986
987         EXIT;
988 }
989
990 /**
991  * Note: we do not count into QUOTA here.
992  * If we mount with --data_journal we may need more.
993  */
994 static const int osd_dto_credits_noquota[DTO_NR] = {
995         /**
996          * Insert/Delete.
997          * INDEX_EXTRA_TRANS_BLOCKS(8) +
998          * SINGLEDATA_TRANS_BLOCKS(8)
999          * XXX Note: maybe iam need more, since iam have more level than
1000          *           EXT3 htree.
1001          */
1002         [DTO_INDEX_INSERT]  = 16,
1003         [DTO_INDEX_DELETE]  = 16,
1004         /**
1005          * Unused now
1006          */
1007         [DTO_IDNEX_UPDATE]  = 16,
1008         /**
1009          * Create a object. The same as create object in EXT3.
1010          * DATA_TRANS_BLOCKS(14) +
1011          * INDEX_EXTRA_BLOCKS(8) +
1012          * 3(inode bits, groups, GDT)
1013          */
1014         [DTO_OBJECT_CREATE] = 25,
1015         /**
1016          * Unused now
1017          */
1018         [DTO_OBJECT_DELETE] = 25,
1019         /**
1020          * Attr set credits.
1021          * 3(inode bits, group, GDT)
1022          */
1023         [DTO_ATTR_SET_BASE] = 3,
1024         /**
1025          * Xattr set. The same as xattr of EXT3.
1026          * DATA_TRANS_BLOCKS(14)
1027          * XXX Note: in original MDS implmentation INDEX_EXTRA_TRANS_BLOCKS
1028          * are also counted in. Do not know why?
1029          */
1030         [DTO_XATTR_SET]     = 14,
1031         [DTO_LOG_REC]       = 14,
1032         /**
1033          * creadits for inode change during write.
1034          */
1035         [DTO_WRITE_BASE]    = 3,
1036         /**
1037          * credits for single block write.
1038          */
1039         [DTO_WRITE_BLOCK]   = 14,
1040         /**
1041          * Attr set credits for chown.
1042          * This is extra credits for setattr, and it is null without quota
1043          */
1044         [DTO_ATTR_SET_CHOWN]= 0
1045 };
1046
1047 /**
1048  * Note: we count into QUOTA here.
1049  * If we mount with --data_journal we may need more.
1050  */
1051 static const int osd_dto_credits_quota[DTO_NR] = {
1052         /**
1053          * INDEX_EXTRA_TRANS_BLOCKS(8) +
1054          * SINGLEDATA_TRANS_BLOCKS(8) +
1055          * 2 * QUOTA_TRANS_BLOCKS(2)
1056          */
1057         [DTO_INDEX_INSERT]  = 20,
1058         /**
1059          * INDEX_EXTRA_TRANS_BLOCKS(8) +
1060          * SINGLEDATA_TRANS_BLOCKS(8) +
1061          * 2 * QUOTA_TRANS_BLOCKS(2)
1062          */
1063         [DTO_INDEX_DELETE]  = 20,
1064         /**
1065          * Unused now.
1066          */
1067         [DTO_IDNEX_UPDATE]  = 16,
1068         /*
1069          * Create a object. Same as create object in EXT3 filesystem.
1070          * DATA_TRANS_BLOCKS(16) +
1071          * INDEX_EXTRA_BLOCKS(8) +
1072          * 3(inode bits, groups, GDT) +
1073          * 2 * QUOTA_INIT_BLOCKS(25)
1074          */
1075         [DTO_OBJECT_CREATE] = 77,
1076         /*
1077          * Unused now.
1078          * DATA_TRANS_BLOCKS(16) +
1079          * INDEX_EXTRA_BLOCKS(8) +
1080          * 3(inode bits, groups, GDT) +
1081          * QUOTA(?)
1082          */
1083         [DTO_OBJECT_DELETE] = 27,
1084         /**
1085          * Attr set credits.
1086          * 3 (inode bit, group, GDT) +
1087          */
1088         [DTO_ATTR_SET_BASE] = 3,
1089         /**
1090          * Xattr set. The same as xattr of EXT3.
1091          * DATA_TRANS_BLOCKS(16)
1092          * XXX Note: in original MDS implmentation INDEX_EXTRA_TRANS_BLOCKS are
1093          *           also counted in. Do not know why?
1094          */
1095         [DTO_XATTR_SET]     = 16,
1096         [DTO_LOG_REC]       = 16,
1097         /**
1098          * creadits for inode change during write.
1099          */
1100         [DTO_WRITE_BASE]    = 3,
1101         /**
1102          * credits for single block write.
1103          */
1104         [DTO_WRITE_BLOCK]   = 16,
1105         /**
1106          * Attr set credits for chown.
1107          * It is added to already set setattr credits
1108          * 2 * QUOTA_INIT_BLOCKS(25) +
1109          * 2 * QUOTA_DEL_BLOCKS(9)
1110          */
1111         [DTO_ATTR_SET_CHOWN]= 68,
1112 };
1113
1114 static int osd_credit_get(const struct lu_env *env, struct dt_device *d,
1115                           enum dt_txn_op op)
1116 {
1117         LASSERT(ARRAY_SIZE(osd_dto_credits_noquota) ==
1118                 ARRAY_SIZE(osd_dto_credits_quota));
1119         LASSERT(0 <= op && op < ARRAY_SIZE(osd_dto_credits_noquota));
1120 #ifdef HAVE_QUOTA_SUPPORT
1121         if (test_opt(osd_sb(osd_dt_dev(d)), QUOTA))
1122                 return osd_dto_credits_quota[op];
1123         else
1124 #endif
1125                 return osd_dto_credits_noquota[op];
1126 }
1127
1128 static const struct dt_device_operations osd_dt_ops = {
1129         .dt_root_get       = osd_root_get,
1130         .dt_statfs         = osd_statfs,
1131         .dt_trans_start    = osd_trans_start,
1132         .dt_trans_stop     = osd_trans_stop,
1133         .dt_conf_get       = osd_conf_get,
1134         .dt_sync           = osd_sync,
1135         .dt_ro             = osd_ro,
1136         .dt_commit_async   = osd_commit_async,
1137         .dt_credit_get     = osd_credit_get,
1138         .dt_init_capa_ctxt = osd_init_capa_ctxt,
1139         .dt_init_quota_ctxt= osd_init_quota_ctxt,
1140 };
1141
1142 static void osd_object_read_lock(const struct lu_env *env,
1143                                  struct dt_object *dt, unsigned role)
1144 {
1145         struct osd_object *obj = osd_dt_obj(dt);
1146         struct osd_thread_info *oti = osd_oti_get(env);
1147
1148         LINVRNT(osd_invariant(obj));
1149
1150         LASSERT(obj->oo_owner != env);
1151         cfs_down_read_nested(&obj->oo_sem, role);
1152
1153         LASSERT(obj->oo_owner == NULL);
1154         oti->oti_r_locks++;
1155 }
1156
1157 static void osd_object_write_lock(const struct lu_env *env,
1158                                   struct dt_object *dt, unsigned role)
1159 {
1160         struct osd_object *obj = osd_dt_obj(dt);
1161         struct osd_thread_info *oti = osd_oti_get(env);
1162
1163         LINVRNT(osd_invariant(obj));
1164
1165         LASSERT(obj->oo_owner != env);
1166         cfs_down_write_nested(&obj->oo_sem, role);
1167
1168         LASSERT(obj->oo_owner == NULL);
1169         obj->oo_owner = env;
1170         oti->oti_w_locks++;
1171 }
1172
1173 static void osd_object_read_unlock(const struct lu_env *env,
1174                                    struct dt_object *dt)
1175 {
1176         struct osd_object *obj = osd_dt_obj(dt);
1177         struct osd_thread_info *oti = osd_oti_get(env);
1178
1179         LINVRNT(osd_invariant(obj));
1180
1181         LASSERT(oti->oti_r_locks > 0);
1182         oti->oti_r_locks--;
1183         cfs_up_read(&obj->oo_sem);
1184 }
1185
1186 static void osd_object_write_unlock(const struct lu_env *env,
1187                                     struct dt_object *dt)
1188 {
1189         struct osd_object *obj = osd_dt_obj(dt);
1190         struct osd_thread_info *oti = osd_oti_get(env);
1191
1192         LINVRNT(osd_invariant(obj));
1193
1194         LASSERT(obj->oo_owner == env);
1195         LASSERT(oti->oti_w_locks > 0);
1196         oti->oti_w_locks--;
1197         obj->oo_owner = NULL;
1198         cfs_up_write(&obj->oo_sem);
1199 }
1200
1201 static int osd_object_write_locked(const struct lu_env *env,
1202                                    struct dt_object *dt)
1203 {
1204         struct osd_object *obj = osd_dt_obj(dt);
1205
1206         LINVRNT(osd_invariant(obj));
1207
1208         return obj->oo_owner == env;
1209 }
1210
1211 static int capa_is_sane(const struct lu_env *env,
1212                         struct osd_device *dev,
1213                         struct lustre_capa *capa,
1214                         struct lustre_capa_key *keys)
1215 {
1216         struct osd_thread_info *oti = osd_oti_get(env);
1217         struct lustre_capa *tcapa = &oti->oti_capa;
1218         struct obd_capa *oc;
1219         int i, rc = 0;
1220         ENTRY;
1221
1222         oc = capa_lookup(dev->od_capa_hash, capa, 0);
1223         if (oc) {
1224                 if (capa_is_expired(oc)) {
1225                         DEBUG_CAPA(D_ERROR, capa, "expired");
1226                         rc = -ESTALE;
1227                 }
1228                 capa_put(oc);
1229                 RETURN(rc);
1230         }
1231
1232         if (capa_is_expired_sec(capa)) {
1233                 DEBUG_CAPA(D_ERROR, capa, "expired");
1234                 RETURN(-ESTALE);
1235         }
1236
1237         cfs_spin_lock(&capa_lock);
1238         for (i = 0; i < 2; i++) {
1239                 if (keys[i].lk_keyid == capa->lc_keyid) {
1240                         oti->oti_capa_key = keys[i];
1241                         break;
1242                 }
1243         }
1244         cfs_spin_unlock(&capa_lock);
1245
1246         if (i == 2) {
1247                 DEBUG_CAPA(D_ERROR, capa, "no matched capa key");
1248                 RETURN(-ESTALE);
1249         }
1250
1251         rc = capa_hmac(tcapa->lc_hmac, capa, oti->oti_capa_key.lk_key);
1252         if (rc)
1253                 RETURN(rc);
1254
1255         if (memcmp(tcapa->lc_hmac, capa->lc_hmac, sizeof(capa->lc_hmac))) {
1256                 DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch");
1257                 RETURN(-EACCES);
1258         }
1259
1260         oc = capa_add(dev->od_capa_hash, capa);
1261         capa_put(oc);
1262
1263         RETURN(0);
1264 }
1265
1266 static int osd_object_auth(const struct lu_env *env, struct dt_object *dt,
1267                            struct lustre_capa *capa, __u64 opc)
1268 {
1269         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
1270         struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
1271         struct md_capainfo *ci;
1272         int rc;
1273
1274         if (!dev->od_fl_capa)
1275                 return 0;
1276
1277         if (capa == BYPASS_CAPA)
1278                 return 0;
1279
1280         ci = md_capainfo(env);
1281         if (unlikely(!ci))
1282                 return 0;
1283
1284         if (ci->mc_auth == LC_ID_NONE)
1285                 return 0;
1286
1287         if (!capa) {
1288                 CERROR("no capability is provided for fid "DFID"\n", PFID(fid));
1289                 return -EACCES;
1290         }
1291
1292         if (!lu_fid_eq(fid, &capa->lc_fid)) {
1293                 DEBUG_CAPA(D_ERROR, capa, "fid "DFID" mismatch with",
1294                            PFID(fid));
1295                 return -EACCES;
1296         }
1297
1298         if (!capa_opc_supported(capa, opc)) {
1299                 DEBUG_CAPA(D_ERROR, capa, "opc "LPX64" not supported by", opc);
1300                 return -EACCES;
1301         }
1302
1303         if ((rc = capa_is_sane(env, dev, capa, dev->od_capa_keys))) {
1304                 DEBUG_CAPA(D_ERROR, capa, "insane (rc %d)", rc);
1305                 return -EACCES;
1306         }
1307
1308         return 0;
1309 }
1310
1311 static struct timespec *osd_inode_time(const struct lu_env *env,
1312                                        struct inode *inode, __u64 seconds)
1313 {
1314         struct osd_thread_info *oti = osd_oti_get(env);
1315         struct timespec        *t   = &oti->oti_time;
1316
1317         t->tv_sec  = seconds;
1318         t->tv_nsec = 0;
1319         *t = timespec_trunc(*t, get_sb_time_gran(inode->i_sb));
1320         return t;
1321 }
1322
1323
1324 static void osd_inode_getattr(const struct lu_env *env,
1325                               struct inode *inode, struct lu_attr *attr)
1326 {
1327         attr->la_valid      |= LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
1328                                LA_SIZE | LA_BLOCKS | LA_UID | LA_GID |
1329                                LA_FLAGS | LA_NLINK | LA_RDEV | LA_BLKSIZE;
1330
1331         attr->la_atime      = LTIME_S(inode->i_atime);
1332         attr->la_mtime      = LTIME_S(inode->i_mtime);
1333         attr->la_ctime      = LTIME_S(inode->i_ctime);
1334         attr->la_mode       = inode->i_mode;
1335         attr->la_size       = i_size_read(inode);
1336         attr->la_blocks     = inode->i_blocks;
1337         attr->la_uid        = inode->i_uid;
1338         attr->la_gid        = inode->i_gid;
1339         attr->la_flags      = LDISKFS_I(inode)->i_flags;
1340         attr->la_nlink      = inode->i_nlink;
1341         attr->la_rdev       = inode->i_rdev;
1342         attr->la_blksize    = ll_inode_blksize(inode);
1343         attr->la_blkbits    = inode->i_blkbits;
1344 }
1345
1346 static int osd_attr_get(const struct lu_env *env,
1347                         struct dt_object *dt,
1348                         struct lu_attr *attr,
1349                         struct lustre_capa *capa)
1350 {
1351         struct osd_object *obj = osd_dt_obj(dt);
1352
1353         LASSERT(dt_object_exists(dt));
1354         LINVRNT(osd_invariant(obj));
1355
1356         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1357                 return -EACCES;
1358
1359         cfs_spin_lock(&obj->oo_guard);
1360         osd_inode_getattr(env, obj->oo_inode, attr);
1361         cfs_spin_unlock(&obj->oo_guard);
1362         return 0;
1363 }
1364
1365 static int osd_inode_setattr(const struct lu_env *env,
1366                              struct inode *inode, const struct lu_attr *attr)
1367 {
1368         __u64 bits;
1369
1370         bits = attr->la_valid;
1371
1372         LASSERT(!(bits & LA_TYPE)); /* Huh? You want too much. */
1373
1374 #ifdef HAVE_QUOTA_SUPPORT
1375         if ((bits & LA_UID && attr->la_uid != inode->i_uid) ||
1376             (bits & LA_GID && attr->la_gid != inode->i_gid)) {
1377                 struct osd_ctxt *save = &osd_oti_get(env)->oti_ctxt;
1378                 struct iattr iattr;
1379                 int rc;
1380
1381                 iattr.ia_valid = 0;
1382                 if (bits & LA_UID)
1383                         iattr.ia_valid |= ATTR_UID;
1384                 if (bits & LA_GID)
1385                         iattr.ia_valid |= ATTR_GID;
1386                 iattr.ia_uid = attr->la_uid;
1387                 iattr.ia_gid = attr->la_gid;
1388                 osd_push_ctxt(env, save);
1389                 rc = ll_vfs_dq_transfer(inode, &iattr) ? -EDQUOT : 0;
1390                 osd_pop_ctxt(save);
1391                 if (rc != 0)
1392                         return rc;
1393         }
1394 #endif
1395
1396         if (bits & LA_ATIME)
1397                 inode->i_atime  = *osd_inode_time(env, inode, attr->la_atime);
1398         if (bits & LA_CTIME)
1399                 inode->i_ctime  = *osd_inode_time(env, inode, attr->la_ctime);
1400         if (bits & LA_MTIME)
1401                 inode->i_mtime  = *osd_inode_time(env, inode, attr->la_mtime);
1402         if (bits & LA_SIZE) {
1403                 LDISKFS_I(inode)->i_disksize = attr->la_size;
1404                 i_size_write(inode, attr->la_size);
1405         }
1406
1407 #if 0
1408         /* OSD should not change "i_blocks" which is used by quota.
1409          * "i_blocks" should be changed by ldiskfs only. */
1410         if (bits & LA_BLOCKS)
1411                 inode->i_blocks = attr->la_blocks;
1412 #endif
1413         if (bits & LA_MODE)
1414                 inode->i_mode   = (inode->i_mode & S_IFMT) |
1415                         (attr->la_mode & ~S_IFMT);
1416         if (bits & LA_UID)
1417                 inode->i_uid    = attr->la_uid;
1418         if (bits & LA_GID)
1419                 inode->i_gid    = attr->la_gid;
1420         if (bits & LA_NLINK)
1421                 inode->i_nlink  = attr->la_nlink;
1422         if (bits & LA_RDEV)
1423                 inode->i_rdev   = attr->la_rdev;
1424
1425         if (bits & LA_FLAGS) {
1426                 /* always keep S_NOCMTIME */
1427                 inode->i_flags = ll_ext_to_inode_flags(attr->la_flags) |
1428                                  S_NOCMTIME;
1429         }
1430         return 0;
1431 }
1432
1433 static int osd_attr_set(const struct lu_env *env,
1434                         struct dt_object *dt,
1435                         const struct lu_attr *attr,
1436                         struct thandle *handle,
1437                         struct lustre_capa *capa)
1438 {
1439         struct osd_object *obj = osd_dt_obj(dt);
1440         int rc;
1441
1442         LASSERT(handle != NULL);
1443         LASSERT(dt_object_exists(dt));
1444         LASSERT(osd_invariant(obj));
1445
1446         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1447                 return -EACCES;
1448
1449         cfs_spin_lock(&obj->oo_guard);
1450         rc = osd_inode_setattr(env, obj->oo_inode, attr);
1451         cfs_spin_unlock(&obj->oo_guard);
1452
1453         if (!rc)
1454                 obj->oo_inode->i_sb->s_op->dirty_inode(obj->oo_inode);
1455         return rc;
1456 }
1457
1458 /*
1459  * Object creation.
1460  *
1461  * XXX temporary solution.
1462  */
1463 static int osd_create_pre(struct osd_thread_info *info, struct osd_object *obj,
1464                           struct lu_attr *attr, struct thandle *th)
1465 {
1466         return 0;
1467 }
1468
1469 static int osd_create_post(struct osd_thread_info *info, struct osd_object *obj,
1470                            struct lu_attr *attr, struct thandle *th)
1471 {
1472         osd_object_init0(obj);
1473         if (obj->oo_inode && (obj->oo_inode->i_state & I_NEW))
1474                 unlock_new_inode(obj->oo_inode);
1475         return 0;
1476 }
1477
1478 static struct dentry * osd_child_dentry_get(const struct lu_env *env,
1479                                             struct osd_object *obj,
1480                                             const char *name,
1481                                             const int namelen)
1482 {
1483         struct osd_thread_info *info   = osd_oti_get(env);
1484         struct dentry *child_dentry = &info->oti_child_dentry;
1485         struct dentry *obj_dentry = &info->oti_obj_dentry;
1486
1487         obj_dentry->d_inode = obj->oo_inode;
1488         obj_dentry->d_sb = osd_sb(osd_obj2dev(obj));
1489         obj_dentry->d_name.hash = 0;
1490
1491         child_dentry->d_name.hash = 0;
1492         child_dentry->d_parent = obj_dentry;
1493         child_dentry->d_name.name = name;
1494         child_dentry->d_name.len = namelen;
1495         return child_dentry;
1496 }
1497
1498
1499 static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
1500                       cfs_umode_t mode,
1501                       struct dt_allocation_hint *hint,
1502                       struct thandle *th)
1503 {
1504         int result;
1505         struct osd_device  *osd = osd_obj2dev(obj);
1506         struct osd_thandle *oth;
1507         struct dt_object   *parent;
1508         struct inode       *inode;
1509 #ifdef HAVE_QUOTA_SUPPORT
1510         struct osd_ctxt    *save = &info->oti_ctxt;
1511 #endif
1512
1513         LINVRNT(osd_invariant(obj));
1514         LASSERT(obj->oo_inode == NULL);
1515
1516         oth = container_of(th, struct osd_thandle, ot_super);
1517         LASSERT(oth->ot_handle->h_transaction != NULL);
1518
1519         if (hint && hint->dah_parent)
1520                 parent = hint->dah_parent;
1521         else
1522                 parent = osd->od_obj_area;
1523
1524         LASSERT(parent != NULL);
1525         LASSERT(osd_dt_obj(parent)->oo_inode->i_op != NULL);
1526
1527 #ifdef HAVE_QUOTA_SUPPORT
1528         osd_push_ctxt(info->oti_env, save);
1529 #endif
1530         inode = ldiskfs_create_inode(oth->ot_handle,
1531                                      osd_dt_obj(parent)->oo_inode, mode);
1532 #ifdef HAVE_QUOTA_SUPPORT
1533         osd_pop_ctxt(save);
1534 #endif
1535         if (!IS_ERR(inode)) {
1536                 /* Do not update file c/mtime in ldiskfs.
1537                  * NB: don't need any lock because no contention at this
1538                  * early stage */
1539                 inode->i_flags |= S_NOCMTIME;
1540                 obj->oo_inode = inode;
1541                 result = 0;
1542         } else
1543                 result = PTR_ERR(inode);
1544         LINVRNT(osd_invariant(obj));
1545         return result;
1546 }
1547
1548 enum {
1549         OSD_NAME_LEN = 255
1550 };
1551
1552 static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj,
1553                      struct lu_attr *attr,
1554                      struct dt_allocation_hint *hint,
1555                      struct dt_object_format *dof,
1556                      struct thandle *th)
1557 {
1558         int result;
1559         struct osd_thandle *oth;
1560         struct osd_device *osd = osd_obj2dev(obj);
1561         __u32 mode = (attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX));
1562
1563         LASSERT(S_ISDIR(attr->la_mode));
1564
1565         oth = container_of(th, struct osd_thandle, ot_super);
1566         LASSERT(oth->ot_handle->h_transaction != NULL);
1567         result = osd_mkfile(info, obj, mode, hint, th);
1568         if (result == 0 && osd->od_iop_mode == 0) {
1569                 LASSERT(obj->oo_inode != NULL);
1570                 /*
1571                  * XXX uh-oh... call low-level iam function directly.
1572                  */
1573
1574                 result = iam_lvar_create(obj->oo_inode, OSD_NAME_LEN, 4,
1575                                          sizeof (struct osd_fid_pack),
1576                                          oth->ot_handle);
1577         }
1578         return result;
1579 }
1580
1581 static int osd_mk_index(struct osd_thread_info *info, struct osd_object *obj,
1582                         struct lu_attr *attr,
1583                         struct dt_allocation_hint *hint,
1584                         struct dt_object_format *dof,
1585                         struct thandle *th)
1586 {
1587         int result;
1588         struct osd_thandle *oth;
1589         const struct dt_index_features *feat = dof->u.dof_idx.di_feat;
1590
1591         __u32 mode = (attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX));
1592
1593         LASSERT(S_ISREG(attr->la_mode));
1594
1595         oth = container_of(th, struct osd_thandle, ot_super);
1596         LASSERT(oth->ot_handle->h_transaction != NULL);
1597
1598         result = osd_mkfile(info, obj, mode, hint, th);
1599         if (result == 0) {
1600                 LASSERT(obj->oo_inode != NULL);
1601                 if (feat->dif_flags & DT_IND_VARKEY)
1602                         result = iam_lvar_create(obj->oo_inode,
1603                                                  feat->dif_keysize_max,
1604                                                  feat->dif_ptrsize,
1605                                                  feat->dif_recsize_max,
1606                                                  oth->ot_handle);
1607                 else
1608                         result = iam_lfix_create(obj->oo_inode,
1609                                                  feat->dif_keysize_max,
1610                                                  feat->dif_ptrsize,
1611                                                  feat->dif_recsize_max,
1612                                                  oth->ot_handle);
1613
1614         }
1615         return result;
1616 }
1617
1618 static int osd_mkreg(struct osd_thread_info *info, struct osd_object *obj,
1619                      struct lu_attr *attr,
1620                      struct dt_allocation_hint *hint,
1621                      struct dt_object_format *dof,
1622                      struct thandle *th)
1623 {
1624         LASSERT(S_ISREG(attr->la_mode));
1625         return osd_mkfile(info, obj, (attr->la_mode &
1626                                (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1627 }
1628
1629 static int osd_mksym(struct osd_thread_info *info, struct osd_object *obj,
1630                      struct lu_attr *attr,
1631                      struct dt_allocation_hint *hint,
1632                      struct dt_object_format *dof,
1633                      struct thandle *th)
1634 {
1635         LASSERT(S_ISLNK(attr->la_mode));
1636         return osd_mkfile(info, obj, (attr->la_mode &
1637                               (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1638 }
1639
1640 static int osd_mknod(struct osd_thread_info *info, struct osd_object *obj,
1641                      struct lu_attr *attr,
1642                      struct dt_allocation_hint *hint,
1643                      struct dt_object_format *dof,
1644                      struct thandle *th)
1645 {
1646         cfs_umode_t mode = attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX);
1647         int result;
1648
1649         LINVRNT(osd_invariant(obj));
1650         LASSERT(obj->oo_inode == NULL);
1651         LASSERT(S_ISCHR(mode) || S_ISBLK(mode) ||
1652                 S_ISFIFO(mode) || S_ISSOCK(mode));
1653
1654         result = osd_mkfile(info, obj, mode, hint, th);
1655         if (result == 0) {
1656                 LASSERT(obj->oo_inode != NULL);
1657                 init_special_inode(obj->oo_inode, mode, attr->la_rdev);
1658         }
1659         LINVRNT(osd_invariant(obj));
1660         return result;
1661 }
1662
1663 typedef int (*osd_obj_type_f)(struct osd_thread_info *, struct osd_object *,
1664                               struct lu_attr *,
1665                               struct dt_allocation_hint *hint,
1666                               struct dt_object_format *dof,
1667                               struct thandle *);
1668
1669 static osd_obj_type_f osd_create_type_f(enum dt_format_type type)
1670 {
1671         osd_obj_type_f result;
1672
1673         switch (type) {
1674         case DFT_DIR:
1675                 result = osd_mkdir;
1676                 break;
1677         case DFT_REGULAR:
1678                 result = osd_mkreg;
1679                 break;
1680         case DFT_SYM:
1681                 result = osd_mksym;
1682                 break;
1683         case DFT_NODE:
1684                 result = osd_mknod;
1685                 break;
1686         case DFT_INDEX:
1687                 result = osd_mk_index;
1688                 break;
1689
1690         default:
1691                 LBUG();
1692                 break;
1693         }
1694         return result;
1695 }
1696
1697
1698 static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah,
1699                         struct dt_object *parent, cfs_umode_t child_mode)
1700 {
1701         LASSERT(ah);
1702
1703         memset(ah, 0, sizeof(*ah));
1704         ah->dah_parent = parent;
1705         ah->dah_mode = child_mode;
1706 }
1707
1708 /**
1709  * Helper function for osd_object_create()
1710  *
1711  * \retval 0, on success
1712  */
1713 static int __osd_object_create(struct osd_thread_info *info,
1714                                struct osd_object *obj, struct lu_attr *attr,
1715                                struct dt_allocation_hint *hint,
1716                                struct dt_object_format *dof,
1717                                struct thandle *th)
1718 {
1719
1720         int result;
1721
1722         result = osd_create_pre(info, obj, attr, th);
1723         if (result == 0) {
1724                 result = osd_create_type_f(dof->dof_type)(info, obj,
1725                                            attr, hint, dof, th);
1726                 if (result == 0)
1727                         result = osd_create_post(info, obj, attr, th);
1728         }
1729         return result;
1730 }
1731
1732 /**
1733  * Helper function for osd_object_create()
1734  *
1735  * \retval 0, on success
1736  */
1737 static int __osd_oi_insert(const struct lu_env *env, struct osd_object *obj,
1738                            const struct lu_fid *fid, struct thandle *th)
1739 {
1740         struct osd_thread_info *info = osd_oti_get(env);
1741         struct osd_inode_id    *id   = &info->oti_id;
1742         struct osd_device      *osd  = osd_obj2dev(obj);
1743         struct md_ucred        *uc   = md_ucred(env);
1744
1745         LASSERT(obj->oo_inode != NULL);
1746         LASSERT(uc != NULL);
1747
1748         id->oii_ino = obj->oo_inode->i_ino;
1749         id->oii_gen = obj->oo_inode->i_generation;
1750
1751         return osd_oi_insert(info, &osd->od_oi, fid, id, th,
1752                              uc->mu_cap & CFS_CAP_SYS_RESOURCE_MASK);
1753 }
1754
1755 static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
1756                              struct lu_attr *attr,
1757                              struct dt_allocation_hint *hint,
1758                              struct dt_object_format *dof,
1759                              struct thandle *th)
1760 {
1761         const struct lu_fid    *fid    = lu_object_fid(&dt->do_lu);
1762         struct osd_object      *obj    = osd_dt_obj(dt);
1763         struct osd_thread_info *info   = osd_oti_get(env);
1764         int result;
1765
1766         ENTRY;
1767
1768         LINVRNT(osd_invariant(obj));
1769         LASSERT(!dt_object_exists(dt));
1770         LASSERT(osd_write_locked(env, obj));
1771         LASSERT(th != NULL);
1772
1773         result = __osd_object_create(info, obj, attr, hint, dof, th);
1774         if (result == 0)
1775                 result = __osd_oi_insert(env, obj, fid, th);
1776
1777         LASSERT(ergo(result == 0, dt_object_exists(dt)));
1778         LASSERT(osd_invariant(obj));
1779         RETURN(result);
1780 }
1781
1782 /**
1783  * Helper function for osd_xattr_set()
1784  */
1785 static int __osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
1786                            const struct lu_buf *buf, const char *name, int fl)
1787 {
1788         struct osd_object      *obj      = osd_dt_obj(dt);
1789         struct inode           *inode    = obj->oo_inode;
1790         struct osd_thread_info *info     = osd_oti_get(env);
1791         struct dentry          *dentry   = &info->oti_child_dentry;
1792         int                     fs_flags = 0;
1793         int  rc;
1794
1795         LASSERT(dt_object_exists(dt));
1796         LASSERT(inode->i_op != NULL && inode->i_op->setxattr != NULL);
1797         LASSERT(osd_write_locked(env, obj));
1798
1799         if (fl & LU_XATTR_REPLACE)
1800                 fs_flags |= XATTR_REPLACE;
1801
1802         if (fl & LU_XATTR_CREATE)
1803                 fs_flags |= XATTR_CREATE;
1804
1805         dentry->d_inode = inode;
1806         rc = inode->i_op->setxattr(dentry, name, buf->lb_buf,
1807                                    buf->lb_len, fs_flags);
1808         return rc;
1809 }
1810
1811 /**
1812  * Put the fid into lustre_mdt_attrs, and then place the structure
1813  * inode's ea. This fid should not be altered during the life time
1814  * of the inode.
1815  *
1816  * \retval +ve, on success
1817  * \retval -ve, on error
1818  *
1819  * FIXME: It is good to have/use ldiskfs_xattr_set_handle() here
1820  */
1821 static int osd_ea_fid_set(const struct lu_env *env, struct dt_object *dt,
1822                           const struct lu_fid *fid)
1823 {
1824         struct osd_thread_info  *info      = osd_oti_get(env);
1825         struct lustre_mdt_attrs *mdt_attrs = &info->oti_mdt_attrs;
1826
1827         lustre_lma_init(mdt_attrs, fid);
1828         lustre_lma_swab(mdt_attrs);
1829         return __osd_xattr_set(env, dt,
1830                                osd_buf_get(env, mdt_attrs, sizeof *mdt_attrs),
1831                                XATTR_NAME_LMA, LU_XATTR_CREATE);
1832
1833 }
1834
1835 /**
1836  * Helper function to form igif
1837  */
1838 static inline void osd_igif_get(const struct lu_env *env, struct inode  *inode,
1839                                 struct lu_fid *fid)
1840 {
1841         LU_IGIF_BUILD(fid, inode->i_ino, inode->i_generation);
1842 }
1843
1844 /**
1845  * Helper function to pack the fid, ldiskfs stores fid in packed format.
1846  */
1847 void osd_fid_pack(struct osd_fid_pack *pack, const struct dt_rec *fid,
1848                   struct lu_fid *befider)
1849 {
1850         fid_cpu_to_be(befider, (struct lu_fid *)fid);
1851         memcpy(pack->fp_area, befider, sizeof(*befider));
1852         pack->fp_len =  sizeof(*befider) + 1;
1853 }
1854
1855 /**
1856  * ldiskfs supports fid in dirent, it is passed in dentry->d_fsdata.
1857  * lustre 1.8 also uses d_fsdata for passing other info to ldiskfs.
1858  * To have compatilibility with 1.8 ldiskfs driver we need to have
1859  * magic number at start of fid data.
1860  * \ldiskfs_dentry_param is used only to pass fid from osd to ldiskfs.
1861  * its inmemory API.
1862  */
1863 void osd_get_ldiskfs_dirent_param(struct ldiskfs_dentry_param *param,
1864                                   const struct dt_rec *fid)
1865 {
1866         param->edp_magic = LDISKFS_LUFID_MAGIC;
1867         param->edp_len =  sizeof(struct lu_fid) + 1;
1868
1869         fid_cpu_to_be((struct lu_fid *)param->edp_data,
1870                       (struct lu_fid *)fid);
1871 }
1872
1873 int osd_fid_unpack(struct lu_fid *fid, const struct osd_fid_pack *pack)
1874 {
1875         int result;
1876
1877         result = 0;
1878         switch (pack->fp_len) {
1879         case sizeof *fid + 1:
1880                 memcpy(fid, pack->fp_area, sizeof *fid);
1881                 fid_be_to_cpu(fid, fid);
1882                 break;
1883         default:
1884                 CERROR("Unexpected packed fid size: %d\n", pack->fp_len);
1885                 result = -EIO;
1886         }
1887         return result;
1888 }
1889
1890 /**
1891  * Try to read the fid from inode ea into dt_rec, if return value
1892  * i.e. rc is +ve, then we got fid, otherwise we will have to form igif
1893  *
1894  * \param fid object fid.
1895  *
1896  * \retval 0 on success
1897  */
1898 static int osd_ea_fid_get(const struct lu_env *env, struct osd_object *obj,
1899                           __u32 ino, struct lu_fid *fid)
1900 {
1901         struct osd_thread_info  *info      = osd_oti_get(env);
1902         struct lustre_mdt_attrs *mdt_attrs = &info->oti_mdt_attrs;
1903         struct lu_device        *ldev   = obj->oo_dt.do_lu.lo_dev;
1904         struct dentry           *dentry = &info->oti_child_dentry;
1905         struct osd_inode_id     *id     = &info->oti_id;
1906         struct osd_device       *dev;
1907         struct inode            *inode;
1908         int                      rc;
1909
1910         ENTRY;
1911         dev  = osd_dev(ldev);
1912
1913         id->oii_ino = ino;
1914         id->oii_gen = OSD_OII_NOGEN;
1915
1916         inode = osd_iget(info, dev, id);
1917         if (IS_ERR(inode)) {
1918                 rc = PTR_ERR(inode);
1919                 GOTO(out,rc);
1920         }
1921         dentry->d_inode = inode;
1922
1923         LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
1924         rc = inode->i_op->getxattr(dentry, XATTR_NAME_LMA, (void *)mdt_attrs,
1925                                    sizeof *mdt_attrs);
1926
1927         /* Check LMA compatibility */
1928         if (rc > 0 &&
1929             (mdt_attrs->lma_incompat & ~cpu_to_le32(LMA_INCOMPAT_SUPP))) {
1930                 CWARN("Inode %lx: Unsupported incompat LMA feature(s) %#x\n",
1931                       inode->i_ino, le32_to_cpu(mdt_attrs->lma_incompat) &
1932                       ~LMA_INCOMPAT_SUPP);
1933                 return -ENOSYS;
1934         }
1935
1936         if (rc > 0) {
1937                 lustre_lma_swab(mdt_attrs);
1938                 memcpy(fid, &mdt_attrs->lma_self_fid, sizeof(*fid));
1939                 rc = 0;
1940         } else if (rc == -ENODATA) {
1941                 osd_igif_get(env, inode, fid);
1942                 rc = 0;
1943         }
1944         iput(inode);
1945 out:
1946         RETURN(rc);
1947 }
1948
1949 /**
1950  * OSD layer object create function for interoperability mode (b11826).
1951  * This is mostly similar to osd_object_create(). Only difference being, fid is
1952  * inserted into inode ea here.
1953  *
1954  * \retval   0, on success
1955  * \retval -ve, on error
1956  */
1957 static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt,
1958                              struct lu_attr *attr,
1959                              struct dt_allocation_hint *hint,
1960                              struct dt_object_format *dof,
1961                              struct thandle *th)
1962 {
1963         const struct lu_fid    *fid    = lu_object_fid(&dt->do_lu);
1964         struct osd_object      *obj    = osd_dt_obj(dt);
1965         struct osd_thread_info *info   = osd_oti_get(env);
1966         int result;
1967
1968         ENTRY;
1969
1970         LASSERT(osd_invariant(obj));
1971         LASSERT(!dt_object_exists(dt));
1972         LASSERT(osd_write_locked(env, obj));
1973         LASSERT(th != NULL);
1974
1975         result = __osd_object_create(info, obj, attr, hint, dof, th);
1976
1977         /* objects under osd root shld have igif fid, so dont add fid EA */
1978         if (result == 0 && fid_seq(fid) >= FID_SEQ_NORMAL)
1979                 result = osd_ea_fid_set(env, dt, fid);
1980
1981         if (result == 0)
1982                 result = __osd_oi_insert(env, obj, fid, th);
1983
1984         LASSERT(ergo(result == 0, dt_object_exists(dt)));
1985         LINVRNT(osd_invariant(obj));
1986         RETURN(result);
1987 }
1988
1989 /*
1990  * Concurrency: @dt is write locked.
1991  */
1992 static void osd_object_ref_add(const struct lu_env *env,
1993                                struct dt_object *dt,
1994                                struct thandle *th)
1995 {
1996         struct osd_object *obj = osd_dt_obj(dt);
1997         struct inode *inode = obj->oo_inode;
1998
1999         LINVRNT(osd_invariant(obj));
2000         LASSERT(dt_object_exists(dt));
2001         LASSERT(osd_write_locked(env, obj));
2002         LASSERT(th != NULL);
2003
2004         cfs_spin_lock(&obj->oo_guard);
2005         LASSERT(inode->i_nlink < LDISKFS_LINK_MAX);
2006         inode->i_nlink++;
2007         cfs_spin_unlock(&obj->oo_guard);
2008         inode->i_sb->s_op->dirty_inode(inode);
2009         LINVRNT(osd_invariant(obj));
2010 }
2011
2012 /*
2013  * Concurrency: @dt is write locked.
2014  */
2015 static void osd_object_ref_del(const struct lu_env *env,
2016                                struct dt_object *dt,
2017                                struct thandle *th)
2018 {
2019         struct osd_object *obj = osd_dt_obj(dt);
2020         struct inode *inode = obj->oo_inode;
2021
2022         LINVRNT(osd_invariant(obj));
2023         LASSERT(dt_object_exists(dt));
2024         LASSERT(osd_write_locked(env, obj));
2025         LASSERT(th != NULL);
2026
2027         cfs_spin_lock(&obj->oo_guard);
2028         LASSERT(inode->i_nlink > 0);
2029         inode->i_nlink--;
2030         cfs_spin_unlock(&obj->oo_guard);
2031         inode->i_sb->s_op->dirty_inode(inode);
2032         LINVRNT(osd_invariant(obj));
2033 }
2034
2035 /*
2036  * Concurrency: @dt is read locked.
2037  */
2038 static int osd_xattr_get(const struct lu_env *env,
2039                          struct dt_object *dt,
2040                          struct lu_buf *buf,
2041                          const char *name,
2042                          struct lustre_capa *capa)
2043 {
2044         struct osd_object      *obj    = osd_dt_obj(dt);
2045         struct inode           *inode  = obj->oo_inode;
2046         struct osd_thread_info *info   = osd_oti_get(env);
2047         struct dentry          *dentry = &info->oti_obj_dentry;
2048
2049         LASSERT(dt_object_exists(dt));
2050         LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
2051         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
2052
2053         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
2054                 return -EACCES;
2055
2056         dentry->d_inode = inode;
2057         return inode->i_op->getxattr(dentry, name, buf->lb_buf, buf->lb_len);
2058 }
2059
2060 /*
2061  * Concurrency: @dt is write locked.
2062  */
2063 static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
2064                          const struct lu_buf *buf, const char *name, int fl,
2065                          struct thandle *handle, struct lustre_capa *capa)
2066 {
2067         LASSERT(handle != NULL);
2068
2069         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
2070                 return -EACCES;
2071
2072         return __osd_xattr_set(env, dt, buf, name, fl);
2073 }
2074
2075 /*
2076  * Concurrency: @dt is read locked.
2077  */
2078 static int osd_xattr_list(const struct lu_env *env,
2079                           struct dt_object *dt,
2080                           struct lu_buf *buf,
2081                           struct lustre_capa *capa)
2082 {
2083         struct osd_object      *obj    = osd_dt_obj(dt);
2084         struct inode           *inode  = obj->oo_inode;
2085         struct osd_thread_info *info   = osd_oti_get(env);
2086         struct dentry          *dentry = &info->oti_obj_dentry;
2087
2088         LASSERT(dt_object_exists(dt));
2089         LASSERT(inode->i_op != NULL && inode->i_op->listxattr != NULL);
2090         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
2091
2092         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
2093                 return -EACCES;
2094
2095         dentry->d_inode = inode;
2096         return inode->i_op->listxattr(dentry, buf->lb_buf, buf->lb_len);
2097 }
2098
2099 /*
2100  * Concurrency: @dt is write locked.
2101  */
2102 static int osd_xattr_del(const struct lu_env *env,
2103                          struct dt_object *dt,
2104                          const char *name,
2105                          struct thandle *handle,
2106                          struct lustre_capa *capa)
2107 {
2108         struct osd_object      *obj    = osd_dt_obj(dt);
2109         struct inode           *inode  = obj->oo_inode;
2110         struct osd_thread_info *info   = osd_oti_get(env);
2111         struct dentry          *dentry = &info->oti_obj_dentry;
2112         int                     rc;
2113
2114         LASSERT(dt_object_exists(dt));
2115         LASSERT(inode->i_op != NULL && inode->i_op->removexattr != NULL);
2116         LASSERT(osd_write_locked(env, obj));
2117         LASSERT(handle != NULL);
2118
2119         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
2120                 return -EACCES;
2121
2122         dentry->d_inode = inode;
2123         rc = inode->i_op->removexattr(dentry, name);
2124         return rc;
2125 }
2126
2127 static struct obd_capa *osd_capa_get(const struct lu_env *env,
2128                                      struct dt_object *dt,
2129                                      struct lustre_capa *old,
2130                                      __u64 opc)
2131 {
2132         struct osd_thread_info *info = osd_oti_get(env);
2133         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
2134         struct osd_object *obj = osd_dt_obj(dt);
2135         struct osd_device *dev = osd_obj2dev(obj);
2136         struct lustre_capa_key *key = &info->oti_capa_key;
2137         struct lustre_capa *capa = &info->oti_capa;
2138         struct obd_capa *oc;
2139         struct md_capainfo *ci;
2140         int rc;
2141         ENTRY;
2142
2143         if (!dev->od_fl_capa)
2144                 RETURN(ERR_PTR(-ENOENT));
2145
2146         LASSERT(dt_object_exists(dt));
2147         LINVRNT(osd_invariant(obj));
2148
2149         /* renewal sanity check */
2150         if (old && osd_object_auth(env, dt, old, opc))
2151                 RETURN(ERR_PTR(-EACCES));
2152
2153         ci = md_capainfo(env);
2154         if (unlikely(!ci))
2155                 RETURN(ERR_PTR(-ENOENT));
2156
2157         switch (ci->mc_auth) {
2158         case LC_ID_NONE:
2159                 RETURN(NULL);
2160         case LC_ID_PLAIN:
2161                 capa->lc_uid = obj->oo_inode->i_uid;
2162                 capa->lc_gid = obj->oo_inode->i_gid;
2163                 capa->lc_flags = LC_ID_PLAIN;
2164                 break;
2165         case LC_ID_CONVERT: {
2166                 __u32 d[4], s[4];
2167
2168                 s[0] = obj->oo_inode->i_uid;
2169                 cfs_get_random_bytes(&(s[1]), sizeof(__u32));
2170                 s[2] = obj->oo_inode->i_gid;
2171                 cfs_get_random_bytes(&(s[3]), sizeof(__u32));
2172                 rc = capa_encrypt_id(d, s, key->lk_key, CAPA_HMAC_KEY_MAX_LEN);
2173                 if (unlikely(rc))
2174                         RETURN(ERR_PTR(rc));
2175
2176                 capa->lc_uid   = ((__u64)d[1] << 32) | d[0];
2177                 capa->lc_gid   = ((__u64)d[3] << 32) | d[2];
2178                 capa->lc_flags = LC_ID_CONVERT;
2179                 break;
2180         }
2181         default:
2182                 RETURN(ERR_PTR(-EINVAL));
2183         }
2184
2185         capa->lc_fid = *fid;
2186         capa->lc_opc = opc;
2187         capa->lc_flags |= dev->od_capa_alg << 24;
2188         capa->lc_timeout = dev->od_capa_timeout;
2189         capa->lc_expiry = 0;
2190
2191         oc = capa_lookup(dev->od_capa_hash, capa, 1);
2192         if (oc) {
2193                 LASSERT(!capa_is_expired(oc));
2194                 RETURN(oc);
2195         }
2196
2197         cfs_spin_lock(&capa_lock);
2198         *key = dev->od_capa_keys[1];
2199         cfs_spin_unlock(&capa_lock);
2200
2201         capa->lc_keyid = key->lk_keyid;
2202         capa->lc_expiry = cfs_time_current_sec() + dev->od_capa_timeout;
2203
2204         rc = capa_hmac(capa->lc_hmac, capa, key->lk_key);
2205         if (rc) {
2206                 DEBUG_CAPA(D_ERROR, capa, "HMAC failed: %d for", rc);
2207                 RETURN(ERR_PTR(rc));
2208         }
2209
2210         oc = capa_add(dev->od_capa_hash, capa);
2211         RETURN(oc);
2212 }
2213
2214 static int osd_object_sync(const struct lu_env *env, struct dt_object *dt)
2215 {
2216         int rc;
2217         struct osd_object      *obj    = osd_dt_obj(dt);
2218         struct inode           *inode  = obj->oo_inode;
2219         struct osd_thread_info *info   = osd_oti_get(env);
2220         struct dentry          *dentry = &info->oti_obj_dentry;
2221         struct file            *file   = &info->oti_file;
2222         ENTRY;
2223
2224         dentry->d_inode = inode;
2225         file->f_dentry = dentry;
2226         file->f_mapping = inode->i_mapping;
2227         file->f_op = inode->i_fop;
2228         LOCK_INODE_MUTEX(inode);
2229         rc = file->f_op->fsync(file, dentry, 0);
2230         UNLOCK_INODE_MUTEX(inode);
2231         RETURN(rc);
2232 }
2233
2234 /*
2235  * Get the 64-bit version for an inode.
2236  */
2237 static dt_obj_version_t osd_object_version_get(const struct lu_env *env,
2238                                                struct dt_object *dt)
2239 {
2240         struct inode *inode = osd_dt_obj(dt)->oo_inode;
2241
2242         CDEBUG(D_INFO, "Get version "LPX64" for inode %lu\n",
2243                LDISKFS_I(inode)->i_fs_version, inode->i_ino);
2244         return LDISKFS_I(inode)->i_fs_version;
2245 }
2246
2247 /*
2248  * Set the 64-bit version and return the old version.
2249  */
2250 static void osd_object_version_set(const struct lu_env *env, struct dt_object *dt,
2251                                    dt_obj_version_t new_version)
2252 {
2253         struct inode *inode = osd_dt_obj(dt)->oo_inode;
2254
2255         CDEBUG(D_INFO, "Set version "LPX64" (old "LPX64") for inode %lu\n",
2256                new_version, LDISKFS_I(inode)->i_fs_version, inode->i_ino);
2257         LDISKFS_I(inode)->i_fs_version = new_version;
2258         /** Version is set after all inode operations are finished,
2259          *  so we should mark it dirty here */
2260         inode->i_sb->s_op->dirty_inode(inode);
2261 }
2262
2263 static int osd_data_get(const struct lu_env *env, struct dt_object *dt,
2264                         void **data)
2265 {
2266         struct osd_object *obj = osd_dt_obj(dt);
2267         ENTRY;
2268
2269         *data = (void *)obj->oo_inode;
2270         RETURN(0);
2271 }
2272
2273 /*
2274  * Index operations.
2275  */
2276
2277 static int osd_iam_index_probe(const struct lu_env *env, struct osd_object *o,
2278                            const struct dt_index_features *feat)
2279 {
2280         struct iam_descr *descr;
2281
2282         if (osd_object_is_root(o))
2283                 return feat == &dt_directory_features;
2284
2285         LASSERT(o->oo_dir != NULL);
2286
2287         descr = o->oo_dir->od_container.ic_descr;
2288         if (feat == &dt_directory_features) {
2289                 if (descr->id_rec_size == sizeof(struct osd_fid_pack))
2290                         return 1;
2291                 else
2292                         return 0;
2293         } else {
2294                 return
2295                         feat->dif_keysize_min <= descr->id_key_size &&
2296                         descr->id_key_size <= feat->dif_keysize_max &&
2297                         feat->dif_recsize_min <= descr->id_rec_size &&
2298                         descr->id_rec_size <= feat->dif_recsize_max &&
2299                         !(feat->dif_flags & (DT_IND_VARKEY |
2300                                              DT_IND_VARREC | DT_IND_NONUNQ)) &&
2301                         ergo(feat->dif_flags & DT_IND_UPDATE,
2302                              1 /* XXX check that object (and file system) is
2303                                 * writable */);
2304         }
2305 }
2306
2307 static int osd_iam_container_init(const struct lu_env *env,
2308                                   struct osd_object *obj,
2309                                   struct osd_directory *dir)
2310 {
2311         int result;
2312         struct iam_container *bag;
2313
2314         bag    = &dir->od_container;
2315         result = iam_container_init(bag, &dir->od_descr, obj->oo_inode);
2316         if (result == 0) {
2317                 result = iam_container_setup(bag);
2318                 if (result == 0)
2319                         obj->oo_dt.do_index_ops = &osd_index_iam_ops;
2320                 else
2321                         iam_container_fini(bag);
2322         }
2323         return result;
2324 }
2325
2326
2327 /*
2328  * Concurrency: no external locking is necessary.
2329  */
2330 static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
2331                          const struct dt_index_features *feat)
2332 {
2333         int result;
2334         int ea_dir = 0;
2335         struct osd_object *obj = osd_dt_obj(dt);
2336         struct osd_device *osd = osd_obj2dev(obj);
2337
2338         LINVRNT(osd_invariant(obj));
2339         LASSERT(dt_object_exists(dt));
2340
2341         if (osd_object_is_root(obj)) {
2342                 dt->do_index_ops = &osd_index_ea_ops;
2343                 result = 0;
2344         } else if (feat == &dt_directory_features && osd->od_iop_mode) {
2345                 dt->do_index_ops = &osd_index_ea_ops;
2346                 if (S_ISDIR(obj->oo_inode->i_mode))
2347                         result = 0;
2348                 else
2349                         result = -ENOTDIR;
2350                 ea_dir = 1;
2351         } else if (!osd_has_index(obj)) {
2352                 struct osd_directory *dir;
2353
2354                 OBD_ALLOC_PTR(dir);
2355                 if (dir != NULL) {
2356
2357                         cfs_spin_lock(&obj->oo_guard);
2358                         if (obj->oo_dir == NULL)
2359                                 obj->oo_dir = dir;
2360                         else
2361                                 /*
2362                                  * Concurrent thread allocated container data.
2363                                  */
2364                                 OBD_FREE_PTR(dir);
2365                         cfs_spin_unlock(&obj->oo_guard);
2366                         /*
2367                          * Now, that we have container data, serialize its
2368                          * initialization.
2369                          */
2370                         cfs_down_write(&obj->oo_ext_idx_sem);
2371                         /*
2372                          * recheck under lock.
2373                          */
2374                         if (!osd_has_index(obj))
2375                                 result = osd_iam_container_init(env, obj, dir);
2376                         else
2377                                 result = 0;
2378                         cfs_up_write(&obj->oo_ext_idx_sem);
2379                 } else
2380                         result = -ENOMEM;
2381         } else
2382                 result = 0;
2383
2384         if (result == 0 && ea_dir == 0) {
2385                 if (!osd_iam_index_probe(env, obj, feat))
2386                         result = -ENOTDIR;
2387         }
2388         LINVRNT(osd_invariant(obj));
2389
2390         return result;
2391 }
2392
2393 static const struct dt_object_operations osd_obj_ops = {
2394         .do_read_lock    = osd_object_read_lock,
2395         .do_write_lock   = osd_object_write_lock,
2396         .do_read_unlock  = osd_object_read_unlock,
2397         .do_write_unlock = osd_object_write_unlock,
2398         .do_write_locked = osd_object_write_locked,
2399         .do_attr_get     = osd_attr_get,
2400         .do_attr_set     = osd_attr_set,
2401         .do_ah_init      = osd_ah_init,
2402         .do_create       = osd_object_create,
2403         .do_index_try    = osd_index_try,
2404         .do_ref_add      = osd_object_ref_add,
2405         .do_ref_del      = osd_object_ref_del,
2406         .do_xattr_get    = osd_xattr_get,
2407         .do_xattr_set    = osd_xattr_set,
2408         .do_xattr_del    = osd_xattr_del,
2409         .do_xattr_list   = osd_xattr_list,
2410         .do_capa_get     = osd_capa_get,
2411         .do_object_sync  = osd_object_sync,
2412         .do_version_get  = osd_object_version_get,
2413         .do_version_set  = osd_object_version_set,
2414         .do_data_get     = osd_data_get,
2415 };
2416
2417 /**
2418  * dt_object_operations for interoperability mode
2419  * (i.e. to run 2.0 mds on 1.8 disk) (b11826)
2420  */
2421 static const struct dt_object_operations osd_obj_ea_ops = {
2422         .do_read_lock    = osd_object_read_lock,
2423         .do_write_lock   = osd_object_write_lock,
2424         .do_read_unlock  = osd_object_read_unlock,
2425         .do_write_unlock = osd_object_write_unlock,
2426         .do_write_locked = osd_object_write_locked,
2427         .do_attr_get     = osd_attr_get,
2428         .do_attr_set     = osd_attr_set,
2429         .do_ah_init      = osd_ah_init,
2430         .do_create       = osd_object_ea_create,
2431         .do_index_try    = osd_index_try,
2432         .do_ref_add      = osd_object_ref_add,
2433         .do_ref_del      = osd_object_ref_del,
2434         .do_xattr_get    = osd_xattr_get,
2435         .do_xattr_set    = osd_xattr_set,
2436         .do_xattr_del    = osd_xattr_del,
2437         .do_xattr_list   = osd_xattr_list,
2438         .do_capa_get     = osd_capa_get,
2439         .do_object_sync  = osd_object_sync,
2440         .do_version_get  = osd_object_version_get,
2441         .do_version_set  = osd_object_version_set,
2442         .do_data_get     = osd_data_get,
2443 };
2444
2445 /*
2446  * Body operations.
2447  */
2448
2449 /*
2450  * XXX: Another layering violation for now.
2451  *
2452  * We don't want to use ->f_op->read methods, because generic file write
2453  *
2454  *         - serializes on ->i_sem, and
2455  *
2456  *         - does a lot of extra work like balance_dirty_pages(),
2457  *
2458  * which doesn't work for globally shared files like /last-received.
2459  */
2460 static int osd_ldiskfs_readlink(struct inode *inode, char *buffer, int buflen)
2461 {
2462         struct ldiskfs_inode_info *ei = LDISKFS_I(inode);
2463
2464         memcpy(buffer, (char*)ei->i_data, buflen);
2465
2466         return  buflen;
2467 }
2468
2469 static int osd_ldiskfs_read(struct inode *inode, void *buf, int size,
2470                             loff_t *offs)
2471 {
2472         struct buffer_head *bh;
2473         unsigned long block;
2474         int osize = size;
2475         int blocksize;
2476         int csize;
2477         int boffs;
2478         int err;
2479
2480         /* prevent reading after eof */
2481         spin_lock(&inode->i_lock);
2482         if (i_size_read(inode) < *offs + size) {
2483                 size = i_size_read(inode) - *offs;
2484                 spin_unlock(&inode->i_lock);
2485                 if (size < 0) {
2486                         CDEBUG(D_EXT2, "size %llu is too short to read @%llu\n",
2487                                i_size_read(inode), *offs);
2488                         return -EBADR;
2489                 } else if (size == 0) {
2490                         return 0;
2491                 }
2492         } else {
2493                 spin_unlock(&inode->i_lock);
2494         }
2495
2496         blocksize = 1 << inode->i_blkbits;
2497
2498         while (size > 0) {
2499                 block = *offs >> inode->i_blkbits;
2500                 boffs = *offs & (blocksize - 1);
2501                 csize = min(blocksize - boffs, size);
2502                 bh = ldiskfs_bread(NULL, inode, block, 0, &err);
2503                 if (!bh) {
2504                         CERROR("can't read block: %d\n", err);
2505                         return err;
2506                 }
2507
2508                 memcpy(buf, bh->b_data + boffs, csize);
2509                 brelse(bh);
2510
2511                 *offs += csize;
2512                 buf += csize;
2513                 size -= csize;
2514         }
2515         return osize;
2516 }
2517
2518 static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt,
2519                         struct lu_buf *buf, loff_t *pos,
2520                         struct lustre_capa *capa)
2521 {
2522         struct osd_object      *obj    = osd_dt_obj(dt);
2523         struct inode           *inode  = obj->oo_inode;
2524         int rc;
2525
2526         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
2527                 RETURN(-EACCES);
2528
2529         /* Read small symlink from inode body as we need to maintain correct
2530          * on-disk symlinks for ldiskfs.
2531          */
2532         if (S_ISLNK(obj->oo_dt.do_lu.lo_header->loh_attr) &&
2533             (buf->lb_len <= sizeof (LDISKFS_I(inode)->i_data)))
2534                 rc = osd_ldiskfs_readlink(inode, buf->lb_buf, buf->lb_len);
2535         else
2536                 rc = osd_ldiskfs_read(inode, buf->lb_buf, buf->lb_len, pos);
2537
2538         return rc;
2539 }
2540
2541 static int osd_ldiskfs_writelink(struct inode *inode, char *buffer, int buflen)
2542 {
2543
2544         memcpy((char*)&LDISKFS_I(inode)->i_data, (char *)buffer,
2545                buflen);
2546         LDISKFS_I(inode)->i_disksize = buflen;
2547         i_size_write(inode, buflen);
2548         inode->i_sb->s_op->dirty_inode(inode);
2549
2550         return 0;
2551 }
2552
2553 static int osd_ldiskfs_write_record(struct inode *inode, void *buf, int bufsize,
2554                                     loff_t *offs, handle_t *handle)
2555 {
2556         struct buffer_head *bh = NULL;
2557         loff_t offset = *offs;
2558         loff_t new_size = i_size_read(inode);
2559         unsigned long block;
2560         int blocksize = 1 << inode->i_blkbits;
2561         int err = 0;
2562         int size;
2563         int boffs;
2564         int dirty_inode = 0;
2565
2566         while (bufsize > 0) {
2567                 if (bh != NULL)
2568                         brelse(bh);
2569
2570                 block = offset >> inode->i_blkbits;
2571                 boffs = offset & (blocksize - 1);
2572                 size = min(blocksize - boffs, bufsize);
2573                 bh = ldiskfs_bread(handle, inode, block, 1, &err);
2574                 if (!bh) {
2575                         CERROR("can't read/create block: %d\n", err);
2576                         break;
2577                 }
2578
2579                 err = ldiskfs_journal_get_write_access(handle, bh);
2580                 if (err) {
2581                         CERROR("journal_get_write_access() returned error %d\n",
2582                                err);
2583                         break;
2584                 }
2585                 LASSERTF(boffs + size <= bh->b_size,
2586                          "boffs %d size %d bh->b_size %lu",
2587                          boffs, size, (unsigned long)bh->b_size);
2588                 memcpy(bh->b_data + boffs, buf, size);
2589                 err = ldiskfs_journal_dirty_metadata(handle, bh);
2590                 if (err)
2591                         break;
2592
2593                 if (offset + size > new_size)
2594                         new_size = offset + size;
2595                 offset += size;
2596                 bufsize -= size;
2597                 buf += size;
2598         }
2599         if (bh)
2600                 brelse(bh);
2601
2602         /* correct in-core and on-disk sizes */
2603         if (new_size > i_size_read(inode)) {
2604                 spin_lock(&inode->i_lock);
2605                 if (new_size > i_size_read(inode))
2606                         i_size_write(inode, new_size);
2607                 if (i_size_read(inode) > LDISKFS_I(inode)->i_disksize) {
2608                         LDISKFS_I(inode)->i_disksize = i_size_read(inode);
2609                         dirty_inode = 1;
2610                 }
2611                 spin_unlock(&inode->i_lock);
2612                 if (dirty_inode)
2613                         inode->i_sb->s_op->dirty_inode(inode);
2614         }
2615
2616         if (err == 0)
2617                 *offs = offset;
2618         return err;
2619 }
2620
2621 static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
2622                          const struct lu_buf *buf, loff_t *pos,
2623                          struct thandle *handle, struct lustre_capa *capa,
2624                          int ignore_quota)
2625 {
2626         struct osd_object  *obj   = osd_dt_obj(dt);
2627         struct inode       *inode = obj->oo_inode;
2628         struct osd_thandle *oh;
2629         ssize_t            result = 0;
2630 #ifdef HAVE_QUOTA_SUPPORT
2631         cfs_cap_t           save = cfs_curproc_cap_pack();
2632 #endif
2633
2634         LASSERT(handle != NULL);
2635
2636         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_WRITE))
2637                 RETURN(-EACCES);
2638
2639         oh = container_of(handle, struct osd_thandle, ot_super);
2640         LASSERT(oh->ot_handle->h_transaction != NULL);
2641 #ifdef HAVE_QUOTA_SUPPORT
2642         if (ignore_quota)
2643                 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
2644         else
2645                 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
2646 #endif
2647         /* Write small symlink to inode body as we need to maintain correct
2648          * on-disk symlinks for ldiskfs.
2649          */
2650         if(S_ISLNK(obj->oo_dt.do_lu.lo_header->loh_attr) &&
2651            (buf->lb_len < sizeof (LDISKFS_I(inode)->i_data)))
2652                 result = osd_ldiskfs_writelink(inode, buf->lb_buf, buf->lb_len);
2653         else
2654                 result = osd_ldiskfs_write_record(inode, buf->lb_buf,
2655                                                   buf->lb_len, pos,
2656                                                   oh->ot_handle);
2657 #ifdef HAVE_QUOTA_SUPPORT
2658         cfs_curproc_cap_unpack(save);
2659 #endif
2660         if (result == 0)
2661                 result = buf->lb_len;
2662         return result;
2663 }
2664
2665 static const struct dt_body_operations osd_body_ops = {
2666         .dbo_read  = osd_read,
2667         .dbo_write = osd_write
2668 };
2669
2670
2671 /**
2672  *      delete a (key, value) pair from index \a dt specified by \a key
2673  *
2674  *      \param  dt      osd index object
2675  *      \param  key     key for index
2676  *      \param  rec     record reference
2677  *      \param  handle  transaction handler
2678  *
2679  *      \retval  0  success
2680  *      \retval -ve   failure
2681  */
2682
2683 static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt,
2684                                 const struct dt_key *key, struct thandle *handle,
2685                                 struct lustre_capa *capa)
2686 {
2687         struct osd_object     *obj = osd_dt_obj(dt);
2688         struct osd_thandle    *oh;
2689         struct iam_path_descr *ipd;
2690         struct iam_container  *bag = &obj->oo_dir->od_container;
2691         int rc;
2692
2693         ENTRY;
2694
2695         LINVRNT(osd_invariant(obj));
2696         LASSERT(dt_object_exists(dt));
2697         LASSERT(bag->ic_object == obj->oo_inode);
2698         LASSERT(handle != NULL);
2699
2700         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
2701                 RETURN(-EACCES);
2702
2703         ipd = osd_idx_ipd_get(env, bag);
2704         if (unlikely(ipd == NULL))
2705                 RETURN(-ENOMEM);
2706
2707         oh = container_of0(handle, struct osd_thandle, ot_super);
2708         LASSERT(oh->ot_handle != NULL);
2709         LASSERT(oh->ot_handle->h_transaction != NULL);
2710
2711         rc = iam_delete(oh->ot_handle, bag, (const struct iam_key *)key, ipd);
2712         osd_ipd_put(env, bag, ipd);
2713         LINVRNT(osd_invariant(obj));
2714         RETURN(rc);
2715 }
2716
2717 static inline int osd_get_fid_from_dentry(struct ldiskfs_dir_entry_2 *de,
2718                                           struct dt_rec *fid)
2719 {
2720         struct osd_fid_pack *rec;
2721         int rc = -ENODATA;
2722
2723         if (de->file_type & LDISKFS_DIRENT_LUFID) {
2724                 rec = (struct osd_fid_pack *) (de->name + de->name_len + 1);
2725                 rc = osd_fid_unpack((struct lu_fid *)fid, rec);
2726         }
2727         RETURN(rc);
2728 }
2729
2730 /**
2731  * Index delete function for interoperability mode (b11826).
2732  * It will remove the directory entry added by osd_index_ea_insert().
2733  * This entry is needed to maintain name->fid mapping.
2734  *
2735  * \param key,  key i.e. file entry to be deleted
2736  *
2737  * \retval   0, on success
2738  * \retval -ve, on error
2739  */
2740 static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
2741                                const struct dt_key *key, struct thandle *handle,
2742                                struct lustre_capa *capa)
2743 {
2744         struct osd_object          *obj    = osd_dt_obj(dt);
2745         struct inode               *dir    = obj->oo_inode;
2746         struct dentry              *dentry;
2747         struct osd_thandle         *oh;
2748         struct ldiskfs_dir_entry_2 *de;
2749         struct buffer_head         *bh;
2750
2751         int rc;
2752
2753         ENTRY;
2754
2755         LINVRNT(osd_invariant(obj));
2756         LASSERT(dt_object_exists(dt));
2757         LASSERT(handle != NULL);
2758
2759         oh = container_of(handle, struct osd_thandle, ot_super);
2760         LASSERT(oh->ot_handle != NULL);
2761         LASSERT(oh->ot_handle->h_transaction != NULL);
2762
2763         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
2764                 RETURN(-EACCES);
2765
2766         dentry = osd_child_dentry_get(env, obj,
2767                                       (char *)key, strlen((char *)key));
2768
2769         cfs_down_write(&obj->oo_ext_idx_sem);
2770         bh = ll_ldiskfs_find_entry(dir, dentry, &de);
2771         if (bh) {
2772                 rc = ldiskfs_delete_entry(oh->ot_handle,
2773                                 dir, de, bh);
2774                 brelse(bh);
2775         } else
2776                 rc = -ENOENT;
2777
2778         cfs_up_write(&obj->oo_ext_idx_sem);
2779         LASSERT(osd_invariant(obj));
2780         RETURN(rc);
2781 }
2782
2783 /**
2784  *      Lookup index for \a key and copy record to \a rec.
2785  *
2786  *      \param  dt      osd index object
2787  *      \param  key     key for index
2788  *      \param  rec     record reference
2789  *
2790  *      \retval  +ve  success : exact mach
2791  *      \retval  0    return record with key not greater than \a key
2792  *      \retval -ve   failure
2793  */
2794 static int osd_index_iam_lookup(const struct lu_env *env, struct dt_object *dt,
2795                                 struct dt_rec *rec, const struct dt_key *key,
2796                                 struct lustre_capa *capa)
2797 {
2798         struct osd_object     *obj = osd_dt_obj(dt);
2799         struct iam_path_descr *ipd;
2800         struct iam_container  *bag = &obj->oo_dir->od_container;
2801         struct osd_thread_info *oti = osd_oti_get(env);
2802         struct iam_iterator    *it = &oti->oti_idx_it;
2803         struct iam_rec *iam_rec;
2804         int rc;
2805         ENTRY;
2806
2807         LASSERT(osd_invariant(obj));
2808         LASSERT(dt_object_exists(dt));
2809         LASSERT(bag->ic_object == obj->oo_inode);
2810
2811         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
2812                 RETURN(-EACCES);
2813
2814         ipd = osd_idx_ipd_get(env, bag);
2815         if (IS_ERR(ipd))
2816                 RETURN(-ENOMEM);
2817
2818         /* got ipd now we can start iterator. */
2819         iam_it_init(it, bag, 0, ipd);
2820
2821         rc = iam_it_get(it, (struct iam_key *)key);
2822         if (rc >= 0) {
2823                 if (S_ISDIR(obj->oo_inode->i_mode))
2824                         iam_rec = (struct iam_rec *)oti->oti_ldp;
2825                 else
2826                         iam_rec = (struct iam_rec *) rec;
2827
2828                 iam_reccpy(&it->ii_path.ip_leaf, (struct iam_rec *)iam_rec);
2829                 if (S_ISDIR(obj->oo_inode->i_mode))
2830                         osd_fid_unpack((struct lu_fid *) rec,
2831                                        (struct osd_fid_pack *)iam_rec);
2832         }
2833         iam_it_put(it);
2834         iam_it_fini(it);
2835         osd_ipd_put(env, bag, ipd);
2836
2837         LINVRNT(osd_invariant(obj));
2838
2839         RETURN(rc);
2840 }
2841
2842 /**
2843  *      Inserts (key, value) pair in \a dt index object.
2844  *
2845  *      \param  dt      osd index object
2846  *      \param  key     key for index
2847  *      \param  rec     record reference
2848  *      \param  th      transaction handler
2849  *
2850  *      \retval  0  success
2851  *      \retval -ve failure
2852  */
2853 static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt,
2854                                 const struct dt_rec *rec, const struct dt_key *key,
2855                                 struct thandle *th, struct lustre_capa *capa,
2856                                 int ignore_quota)
2857 {
2858         struct osd_object     *obj = osd_dt_obj(dt);
2859         struct iam_path_descr *ipd;
2860         struct osd_thandle    *oh;
2861         struct iam_container  *bag = &obj->oo_dir->od_container;
2862 #ifdef HAVE_QUOTA_SUPPORT
2863         cfs_cap_t              save = cfs_curproc_cap_pack();
2864 #endif
2865         struct osd_thread_info *oti = osd_oti_get(env);
2866         struct iam_rec *iam_rec = (struct iam_rec *)oti->oti_ldp;
2867         int rc;
2868
2869         ENTRY;
2870
2871         LINVRNT(osd_invariant(obj));
2872         LASSERT(dt_object_exists(dt));
2873         LASSERT(bag->ic_object == obj->oo_inode);
2874         LASSERT(th != NULL);
2875
2876         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
2877                 return -EACCES;
2878
2879         ipd = osd_idx_ipd_get(env, bag);
2880         if (unlikely(ipd == NULL))
2881                 RETURN(-ENOMEM);
2882
2883         oh = container_of0(th, struct osd_thandle, ot_super);
2884         LASSERT(oh->ot_handle != NULL);
2885         LASSERT(oh->ot_handle->h_transaction != NULL);
2886 #ifdef HAVE_QUOTA_SUPPORT
2887         if (ignore_quota)
2888                 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
2889         else
2890                 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
2891 #endif
2892         if (S_ISDIR(obj->oo_inode->i_mode))
2893                 osd_fid_pack((struct osd_fid_pack *)iam_rec, rec, &oti->oti_fid);
2894         else
2895                 iam_rec = (struct iam_rec *) rec;
2896         rc = iam_insert(oh->ot_handle, bag, (const struct iam_key *)key,
2897                         iam_rec, ipd);
2898 #ifdef HAVE_QUOTA_SUPPORT
2899         cfs_curproc_cap_unpack(save);
2900 #endif
2901         osd_ipd_put(env, bag, ipd);
2902         LINVRNT(osd_invariant(obj));
2903         RETURN(rc);
2904 }
2905
2906 /**
2907  * Calls ldiskfs_add_entry() to add directory entry
2908  * into the directory. This is required for
2909  * interoperability mode (b11826)
2910  *
2911  * \retval   0, on success
2912  * \retval -ve, on error
2913  */
2914 static int __osd_ea_add_rec(struct osd_thread_info *info,
2915                             struct osd_object *pobj,
2916                             struct inode  *cinode,
2917                             const char *name,
2918                             const struct dt_rec *fid,
2919                             struct thandle *th)
2920 {
2921         struct ldiskfs_dentry_param *ldp;
2922         struct dentry      *child;
2923         struct osd_thandle *oth;
2924         int rc;
2925
2926         oth = container_of(th, struct osd_thandle, ot_super);
2927         LASSERT(oth->ot_handle != NULL);
2928         LASSERT(oth->ot_handle->h_transaction != NULL);
2929
2930         child = osd_child_dentry_get(info->oti_env, pobj, name, strlen(name));
2931
2932         if (fid_is_igif((struct lu_fid *)fid) ||
2933             fid_is_norm((struct lu_fid *)fid)) {
2934                 ldp = (struct ldiskfs_dentry_param *)info->oti_ldp;
2935                 osd_get_ldiskfs_dirent_param(ldp, fid);
2936                 child->d_fsdata = (void*) ldp;
2937         } else
2938                 child->d_fsdata = NULL;
2939         rc = ldiskfs_add_entry(oth->ot_handle, child, cinode);
2940
2941         RETURN(rc);
2942 }
2943
2944 /**
2945  * Calls ldiskfs_add_dot_dotdot() to add dot and dotdot entries
2946  * into the directory.Also sets flags into osd object to
2947  * indicate dot and dotdot are created. This is required for
2948  * interoperability mode (b11826)
2949  *
2950  * \param dir   directory for dot and dotdot fixup.
2951  * \param obj   child object for linking
2952  *
2953  * \retval   0, on success
2954  * \retval -ve, on error
2955  */
2956 static int osd_add_dot_dotdot(struct osd_thread_info *info,
2957                               struct osd_object *dir,
2958                               struct inode  *parent_dir, const char *name,
2959                               const struct dt_rec *dot_fid,
2960                               const struct dt_rec *dot_dot_fid,
2961                               struct thandle *th)
2962 {
2963         struct inode            *inode  = dir->oo_inode;
2964         struct ldiskfs_dentry_param *dot_ldp;
2965         struct ldiskfs_dentry_param *dot_dot_ldp;
2966         struct osd_thandle      *oth;
2967         int result = 0;
2968
2969         oth = container_of(th, struct osd_thandle, ot_super);
2970         LASSERT(oth->ot_handle->h_transaction != NULL);
2971         LASSERT(S_ISDIR(dir->oo_inode->i_mode));
2972
2973         if (strcmp(name, dot) == 0) {
2974                 if (dir->oo_compat_dot_created) {
2975                         result = -EEXIST;
2976                 } else {
2977                         LASSERT(inode == parent_dir);
2978                         dir->oo_compat_dot_created = 1;
2979                         result = 0;
2980                 }
2981         } else if(strcmp(name, dotdot) == 0) {
2982                 dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp;
2983                 dot_dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp2;
2984
2985                 if (!dir->oo_compat_dot_created)
2986                         return -EINVAL;
2987                 if (fid_seq((struct lu_fid *)dot_fid) >= FID_SEQ_NORMAL) {
2988                         osd_get_ldiskfs_dirent_param(dot_ldp, dot_fid);
2989                         osd_get_ldiskfs_dirent_param(dot_dot_ldp, dot_dot_fid);
2990                 } else {
2991                         dot_ldp = NULL;
2992                         dot_dot_ldp = NULL;
2993                 }
2994                 /* in case of rename, dotdot is already created */
2995                 if (dir->oo_compat_dotdot_created) {
2996                         return __osd_ea_add_rec(info, dir, parent_dir, name,
2997                                                 dot_dot_fid, th);
2998                 }
2999
3000                 result = ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir, inode,
3001                                                 dot_ldp, dot_dot_ldp);
3002                 if (result == 0)
3003                        dir->oo_compat_dotdot_created = 1;
3004         }
3005
3006         return result;
3007 }
3008
3009
3010 /**
3011  * It will call the appropriate osd_add* function and return the
3012  * value, return by respective functions.
3013  */
3014 static int osd_ea_add_rec(const struct lu_env *env,
3015                           struct osd_object *pobj,
3016                           struct inode *cinode,
3017                           const char *name,
3018                           const struct dt_rec *fid,
3019                           struct thandle *th)
3020 {
3021         struct osd_thread_info    *info   = osd_oti_get(env);
3022         int rc;
3023
3024         if (name[0] == '.' && (name[1] == '\0' || (name[1] == '.' &&
3025                                                    name[2] =='\0')))
3026                 rc = osd_add_dot_dotdot(info, pobj, cinode, name,
3027                      (struct dt_rec *)lu_object_fid(&pobj->oo_dt.do_lu),
3028                                         fid, th);
3029         else
3030                 rc = __osd_ea_add_rec(info, pobj, cinode, name, fid, th);
3031
3032         return rc;
3033 }
3034
3035 /**
3036  * Calls ->lookup() to find dentry. From dentry get inode and
3037  * read inode's ea to get fid. This is required for  interoperability
3038  * mode (b11826)
3039  *
3040  * \retval   0, on success
3041  * \retval -ve, on error
3042  */
3043 static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
3044                              struct dt_rec *rec, const struct dt_key *key)
3045 {
3046         struct inode               *dir    = obj->oo_inode;
3047         struct dentry              *dentry;
3048         struct ldiskfs_dir_entry_2 *de;
3049         struct buffer_head         *bh;
3050         struct lu_fid              *fid = (struct lu_fid *) rec;
3051         int ino;
3052         int rc;
3053
3054         LASSERT(dir->i_op != NULL && dir->i_op->lookup != NULL);
3055
3056         dentry = osd_child_dentry_get(env, obj,
3057                                       (char *)key, strlen((char *)key));
3058
3059         cfs_down_read(&obj->oo_ext_idx_sem);
3060         bh = ll_ldiskfs_find_entry(dir, dentry, &de);
3061         if (bh) {
3062                 ino = le32_to_cpu(de->inode);
3063                 rc = osd_get_fid_from_dentry(de, rec);
3064
3065                 /* done with de, release bh */
3066                 brelse(bh);
3067                 if (rc != 0)
3068                         rc = osd_ea_fid_get(env, obj, ino, fid);
3069         } else
3070                 rc = -ENOENT;
3071
3072         cfs_up_read(&obj->oo_ext_idx_sem);
3073         RETURN (rc);
3074 }
3075
3076 /**
3077  * Find the osd object for given fid.
3078  *
3079  * \param fid need to find the osd object having this fid
3080  *
3081  * \retval osd_object on success
3082  * \retval        -ve on error
3083  */
3084 struct osd_object *osd_object_find(const struct lu_env *env,
3085                                    struct dt_object *dt,
3086                                    const struct lu_fid *fid)
3087 {
3088         struct lu_device         *ludev = dt->do_lu.lo_dev;
3089         struct osd_object        *child = NULL;
3090         struct lu_object         *luch;
3091         struct lu_object         *lo;
3092
3093         luch = lu_object_find(env, ludev, fid, NULL);
3094         if (!IS_ERR(luch)) {
3095                 if (lu_object_exists(luch)) {
3096                         lo = lu_object_locate(luch->lo_header, ludev->ld_type);
3097                         if (lo != NULL)
3098                                 child = osd_obj(lo);
3099                         else
3100                                 LU_OBJECT_DEBUG(D_ERROR, env, luch,
3101                                                 "lu_object can't be located"
3102                                                 ""DFID"\n", PFID(fid));
3103
3104                         if (child == NULL) {
3105                                 lu_object_put(env, luch);
3106                                 CERROR("Unable to get osd_object\n");
3107                                 child = ERR_PTR(-ENOENT);
3108                         }
3109                 } else {
3110                         LU_OBJECT_DEBUG(D_ERROR, env, luch,
3111                                         "lu_object does not exists "DFID"\n",
3112                                         PFID(fid));
3113                         child = ERR_PTR(-ENOENT);
3114                 }
3115         } else
3116                 child = (void *)luch;
3117
3118         return child;
3119 }
3120
3121 /**
3122  * Put the osd object once done with it.
3123  *
3124  * \param obj osd object that needs to be put
3125  */
3126 static inline void osd_object_put(const struct lu_env *env,
3127                                   struct osd_object *obj)
3128 {
3129         lu_object_put(env, &obj->oo_dt.do_lu);
3130 }
3131
3132 /**
3133  * Index add function for interoperability mode (b11826).
3134  * It will add the directory entry.This entry is needed to
3135  * maintain name->fid mapping.
3136  *
3137  * \param key it is key i.e. file entry to be inserted
3138  * \param rec it is value of given key i.e. fid
3139  *
3140  * \retval   0, on success
3141  * \retval -ve, on error
3142  */
3143 static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
3144                                const struct dt_rec *rec,
3145                                const struct dt_key *key, struct thandle *th,
3146                                struct lustre_capa *capa, int ignore_quota)
3147 {
3148         struct osd_object        *obj   = osd_dt_obj(dt);
3149         struct lu_fid            *fid   = (struct lu_fid *) rec;
3150         const char               *name  = (const char *)key;
3151         struct osd_object        *child;
3152 #ifdef HAVE_QUOTA_SUPPORT
3153         cfs_cap_t                 save  = cfs_curproc_cap_pack();
3154 #endif
3155         int rc;
3156
3157         ENTRY;
3158
3159         LASSERT(osd_invariant(obj));
3160         LASSERT(dt_object_exists(dt));
3161         LASSERT(th != NULL);
3162
3163         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
3164                 RETURN(-EACCES);
3165
3166         child = osd_object_find(env, dt, fid);
3167         if (!IS_ERR(child)) {
3168 #ifdef HAVE_QUOTA_SUPPORT
3169                 if (ignore_quota)
3170                         cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
3171                 else
3172                         cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
3173 #endif
3174                 cfs_down_write(&obj->oo_ext_idx_sem);
3175                 rc = osd_ea_add_rec(env, obj, child->oo_inode, name, rec, th);
3176                 cfs_up_write(&obj->oo_ext_idx_sem);
3177 #ifdef HAVE_QUOTA_SUPPORT
3178                 cfs_curproc_cap_unpack(save);
3179 #endif
3180                 osd_object_put(env, child);
3181         } else {
3182                 rc = PTR_ERR(child);
3183         }
3184
3185         LASSERT(osd_invariant(obj));
3186         RETURN(rc);
3187 }
3188
3189 /**
3190  *  Initialize osd Iterator for given osd index object.
3191  *
3192  *  \param  dt      osd index object
3193  */
3194
3195 static struct dt_it *osd_it_iam_init(const struct lu_env *env,
3196                                      struct dt_object *dt,
3197                                      __u32 unused,
3198                                      struct lustre_capa *capa)
3199 {
3200         struct osd_it_iam         *it;
3201         struct osd_thread_info *oti = osd_oti_get(env);
3202         struct osd_object     *obj = osd_dt_obj(dt);
3203         struct lu_object      *lo  = &dt->do_lu;
3204         struct iam_path_descr *ipd;
3205         struct iam_container  *bag = &obj->oo_dir->od_container;
3206
3207         LASSERT(lu_object_exists(lo));
3208
3209         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
3210                 return ERR_PTR(-EACCES);
3211
3212         it = &oti->oti_it;
3213         ipd = osd_it_ipd_get(env, bag);
3214         if (likely(ipd != NULL)) {
3215                 it->oi_obj = obj;
3216                 it->oi_ipd = ipd;
3217                 lu_object_get(lo);
3218                 iam_it_init(&it->oi_it, bag, IAM_IT_MOVE, ipd);
3219                 return (struct dt_it *)it;
3220         }
3221         return ERR_PTR(-ENOMEM);
3222 }
3223
3224 /**
3225  * free given Iterator.
3226  */
3227
3228 static void osd_it_iam_fini(const struct lu_env *env, struct dt_it *di)
3229 {
3230         struct osd_it_iam     *it = (struct osd_it_iam *)di;
3231         struct osd_object *obj = it->oi_obj;
3232
3233         iam_it_fini(&it->oi_it);
3234         osd_ipd_put(env, &obj->oo_dir->od_container, it->oi_ipd);
3235         lu_object_put(env, &obj->oo_dt.do_lu);
3236 }
3237
3238 /**
3239  *  Move Iterator to record specified by \a key
3240  *
3241  *  \param  di      osd iterator
3242  *  \param  key     key for index
3243  *
3244  *  \retval +ve  di points to record with least key not larger than key
3245  *  \retval  0   di points to exact matched key
3246  *  \retval -ve  failure
3247  */
3248
3249 static int osd_it_iam_get(const struct lu_env *env,
3250                       struct dt_it *di, const struct dt_key *key)
3251 {
3252         struct osd_it_iam *it = (struct osd_it_iam *)di;
3253
3254         return iam_it_get(&it->oi_it, (const struct iam_key *)key);
3255 }
3256
3257 /**
3258  *  Release Iterator
3259  *
3260  *  \param  di      osd iterator
3261  */
3262
3263 static void osd_it_iam_put(const struct lu_env *env, struct dt_it *di)
3264 {
3265         struct osd_it_iam *it = (struct osd_it_iam *)di;
3266
3267         iam_it_put(&it->oi_it);
3268 }
3269
3270 /**
3271  *  Move iterator by one record
3272  *
3273  *  \param  di      osd iterator
3274  *
3275  *  \retval +1   end of container reached
3276  *  \retval  0   success
3277  *  \retval -ve  failure
3278  */
3279
3280 static int osd_it_iam_next(const struct lu_env *env, struct dt_it *di)
3281 {
3282         struct osd_it_iam *it = (struct osd_it_iam *)di;
3283
3284         return iam_it_next(&it->oi_it);
3285 }
3286
3287 /**
3288  * Return pointer to the key under iterator.
3289  */
3290
3291 static struct dt_key *osd_it_iam_key(const struct lu_env *env,
3292                                  const struct dt_it *di)
3293 {
3294         struct osd_it_iam *it = (struct osd_it_iam *)di;
3295
3296         return (struct dt_key *)iam_it_key_get(&it->oi_it);
3297 }
3298
3299 /**
3300  * Return size of key under iterator (in bytes)
3301  */
3302
3303 static int osd_it_iam_key_size(const struct lu_env *env, const struct dt_it *di)
3304 {
3305         struct osd_it_iam *it = (struct osd_it_iam *)di;
3306
3307         return iam_it_key_size(&it->oi_it);
3308 }
3309
3310 static inline void osd_it_append_attrs(struct lu_dirent*ent,
3311                                        __u32 attr,
3312                                        int len,
3313                                        __u16 type)
3314 {
3315         struct luda_type        *lt;
3316         const unsigned           align = sizeof(struct luda_type) - 1;
3317
3318         /* check if file type is required */
3319         if (attr & LUDA_TYPE) {
3320                         len = (len + align) & ~align;
3321
3322                         lt = (void *) ent->lde_name + len;
3323                         lt->lt_type = cpu_to_le16(CFS_DTTOIF(type));
3324                         ent->lde_attrs |= LUDA_TYPE;
3325         }
3326
3327         ent->lde_attrs = cpu_to_le32(ent->lde_attrs);
3328 }
3329
3330 /**
3331  * build lu direct from backend fs dirent.
3332  */
3333
3334 static inline void osd_it_pack_dirent(struct lu_dirent *ent,
3335                                       struct lu_fid *fid,
3336                                       __u64 offset,
3337                                       char *name,
3338                                       __u16 namelen,
3339                                       __u16 type,
3340                                       __u32 attr)
3341 {
3342         fid_cpu_to_le(&ent->lde_fid, fid);
3343         ent->lde_attrs = LUDA_FID;
3344
3345         ent->lde_hash = cpu_to_le64(offset);
3346         ent->lde_reclen = cpu_to_le16(lu_dirent_calc_size(namelen, attr));
3347
3348         strncpy(ent->lde_name, name, namelen);
3349         ent->lde_namelen = cpu_to_le16(namelen);
3350
3351         /* append lustre attributes */
3352         osd_it_append_attrs(ent, attr, namelen, type);
3353 }
3354
3355 /**
3356  * Return pointer to the record under iterator.
3357  */
3358 static int osd_it_iam_rec(const struct lu_env *env,
3359                           const struct dt_it *di,
3360                           struct lu_dirent *lde,
3361                           __u32 attr)
3362 {
3363         struct osd_it_iam *it        = (struct osd_it_iam *)di;
3364         struct osd_thread_info *info = osd_oti_get(env);
3365         struct lu_fid     *fid       = &info->oti_fid;
3366         const struct osd_fid_pack *rec;
3367         char *name;
3368         int namelen;
3369         __u64 hash;
3370         int rc;
3371
3372         name = (char *)iam_it_key_get(&it->oi_it);
3373         if (IS_ERR(name))
3374                 RETURN(PTR_ERR(name));
3375
3376         namelen = iam_it_key_size(&it->oi_it);
3377
3378         rec = (const struct osd_fid_pack *) iam_it_rec_get(&it->oi_it);
3379         if (IS_ERR(rec))
3380                 RETURN(PTR_ERR(rec));
3381
3382         rc = osd_fid_unpack(fid, rec);
3383         if (rc)
3384                 RETURN(rc);
3385
3386         hash = iam_it_store(&it->oi_it);
3387
3388         /* IAM does not store object type in IAM index (dir) */
3389         osd_it_pack_dirent(lde, fid, hash, name, namelen,
3390                            0, LUDA_FID);
3391
3392         return 0;
3393 }
3394
3395 /**
3396  * Returns cookie for current Iterator position.
3397  */
3398 static __u64 osd_it_iam_store(const struct lu_env *env, const struct dt_it *di)
3399 {
3400         struct osd_it_iam *it = (struct osd_it_iam *)di;
3401
3402         return iam_it_store(&it->oi_it);
3403 }
3404
3405 /**
3406  * Restore iterator from cookie.
3407  *
3408  * \param  di      osd iterator
3409  * \param  hash    Iterator location cookie
3410  *
3411  * \retval +ve  di points to record with least key not larger than key.
3412  * \retval  0   di points to exact matched key
3413  * \retval -ve  failure
3414  */
3415
3416 static int osd_it_iam_load(const struct lu_env *env,
3417                        const struct dt_it *di, __u64 hash)
3418 {
3419         struct osd_it_iam *it = (struct osd_it_iam *)di;
3420
3421         return iam_it_load(&it->oi_it, hash);
3422 }
3423
3424 static const struct dt_index_operations osd_index_iam_ops = {
3425         .dio_lookup = osd_index_iam_lookup,
3426         .dio_insert = osd_index_iam_insert,
3427         .dio_delete = osd_index_iam_delete,
3428         .dio_it     = {
3429                 .init     = osd_it_iam_init,
3430                 .fini     = osd_it_iam_fini,
3431                 .get      = osd_it_iam_get,
3432                 .put      = osd_it_iam_put,
3433                 .next     = osd_it_iam_next,
3434                 .key      = osd_it_iam_key,
3435                 .key_size = osd_it_iam_key_size,
3436                 .rec      = osd_it_iam_rec,
3437                 .store    = osd_it_iam_store,
3438                 .load     = osd_it_iam_load
3439         }
3440 };
3441
3442 /**
3443  * Creates or initializes iterator context.
3444  *
3445  * \retval struct osd_it_ea, iterator structure on success
3446  *
3447  */
3448 static struct dt_it *osd_it_ea_init(const struct lu_env *env,
3449                                     struct dt_object *dt,
3450                                     __u32 attr,
3451                                     struct lustre_capa *capa)
3452 {
3453         struct osd_object       *obj  = osd_dt_obj(dt);
3454         struct osd_thread_info  *info = osd_oti_get(env);
3455         struct osd_it_ea        *it   = &info->oti_it_ea;
3456         struct lu_object        *lo   = &dt->do_lu;
3457         struct dentry           *obj_dentry = &info->oti_it_dentry;
3458         ENTRY;
3459         LASSERT(lu_object_exists(lo));
3460
3461         obj_dentry->d_inode = obj->oo_inode;
3462         obj_dentry->d_sb = osd_sb(osd_obj2dev(obj));
3463         obj_dentry->d_name.hash = 0;
3464
3465         it->oie_rd_dirent       = 0;
3466         it->oie_it_dirent       = 0;
3467         it->oie_dirent          = NULL;
3468         it->oie_buf             = info->oti_it_ea_buf;
3469         it->oie_obj             = obj;
3470         it->oie_file.f_pos      = 0;
3471         it->oie_file.f_dentry   = obj_dentry;
3472         if (attr & LUDA_64BITHASH)
3473                 it->oie_file.f_flags = O_64BITHASH;
3474         else
3475                 it->oie_file.f_flags = O_32BITHASH;
3476         it->oie_file.f_mapping    = obj->oo_inode->i_mapping;
3477         it->oie_file.f_op         = obj->oo_inode->i_fop;
3478         it->oie_file.private_data = NULL;
3479         lu_object_get(lo);
3480         RETURN((struct dt_it *) it);
3481 }
3482
3483 /**
3484  * Destroy or finishes iterator context.
3485  *
3486  * \param di iterator structure to be destroyed
3487  */
3488 static void osd_it_ea_fini(const struct lu_env *env, struct dt_it *di)
3489 {
3490         struct osd_it_ea     *it   = (struct osd_it_ea *)di;
3491         struct osd_object    *obj  = it->oie_obj;
3492         struct inode       *inode  = obj->oo_inode;
3493
3494         ENTRY;
3495         it->oie_file.f_op->release(inode, &it->oie_file);
3496         lu_object_put(env, &obj->oo_dt.do_lu);
3497         EXIT;
3498 }
3499
3500 /**
3501  * It position the iterator at given key, so that next lookup continues from
3502  * that key Or it is similar to dio_it->load() but based on a key,
3503  * rather than file position.
3504  *
3505  * As a special convention, osd_it_ea_get(env, di, "") has to rewind iterator
3506  * to the beginning.
3507  *
3508  * TODO: Presently return +1 considering it is only used by mdd_dir_is_empty().
3509  */
3510 static int osd_it_ea_get(const struct lu_env *env,
3511                          struct dt_it *di, const struct dt_key *key)
3512 {
3513         struct osd_it_ea     *it   = (struct osd_it_ea *)di;
3514
3515         ENTRY;
3516         LASSERT(((const char *)key)[0] == '\0');
3517         it->oie_file.f_pos      = 0;
3518         it->oie_rd_dirent       = 0;
3519         it->oie_it_dirent       = 0;
3520         it->oie_dirent          = NULL;
3521
3522         RETURN(+1);
3523 }
3524
3525 /**
3526  * Does nothing
3527  */
3528 static void osd_it_ea_put(const struct lu_env *env, struct dt_it *di)
3529 {
3530 }
3531
3532 /**
3533  * It is called internally by ->readdir(). It fills the
3534  * iterator's in-memory data structure with required
3535  * information i.e. name, namelen, rec_size etc.
3536  *
3537  * \param buf in which information to be filled in.
3538  * \param name name of the file in given dir
3539  *
3540  * \retval 0 on success
3541  * \retval 1 on buffer full
3542  */
3543 static int osd_ldiskfs_filldir(char *buf, const char *name, int namelen,
3544                                loff_t offset, __u64 ino,
3545                                unsigned d_type)
3546 {
3547         struct osd_it_ea        *it   = (struct osd_it_ea *)buf;
3548         struct osd_it_ea_dirent *ent  = it->oie_dirent;
3549         struct lu_fid           *fid  = &ent->oied_fid;
3550         struct osd_fid_pack     *rec;
3551         ENTRY;
3552
3553         /* this should never happen */
3554         if (unlikely(namelen == 0 || namelen > LDISKFS_NAME_LEN)) {
3555                 CERROR("ldiskfs return invalid namelen %d\n", namelen);
3556                 RETURN(-EIO);
3557         }
3558
3559         if ((void *) ent - it->oie_buf + sizeof(*ent) + namelen >
3560             OSD_IT_EA_BUFSIZE)
3561                 RETURN(1);
3562
3563         if (d_type & LDISKFS_DIRENT_LUFID) {
3564                 rec = (struct osd_fid_pack*) (name + namelen + 1);
3565
3566                 if (osd_fid_unpack(fid, rec) != 0)
3567                         fid_zero(fid);
3568
3569                 d_type &= ~LDISKFS_DIRENT_LUFID;
3570         } else {
3571                 fid_zero(fid);
3572         }
3573
3574         ent->oied_ino     = ino;
3575         ent->oied_off     = offset;
3576         ent->oied_namelen = namelen;
3577         ent->oied_type    = d_type;
3578
3579         memcpy(ent->oied_name, name, namelen);
3580
3581         it->oie_rd_dirent++;
3582         it->oie_dirent = (void *) ent + cfs_size_round(sizeof(*ent) + namelen);
3583         RETURN(0);
3584 }
3585
3586 /**
3587  * Calls ->readdir() to load a directory entry at a time
3588  * and stored it in iterator's in-memory data structure.
3589  *
3590  * \param di iterator's in memory structure
3591  *
3592  * \retval   0 on success
3593  * \retval -ve on error
3594  */
3595 static int osd_ldiskfs_it_fill(const struct dt_it *di)
3596 {
3597         struct osd_it_ea   *it    = (struct osd_it_ea *)di;
3598         struct osd_object  *obj   = it->oie_obj;
3599         struct inode       *inode = obj->oo_inode;
3600         int                result = 0;
3601
3602         ENTRY;
3603         it->oie_dirent = it->oie_buf;
3604         it->oie_rd_dirent = 0;
3605
3606         cfs_down_read(&obj->oo_ext_idx_sem);
3607         result = inode->i_fop->readdir(&it->oie_file, it,
3608                                        (filldir_t) osd_ldiskfs_filldir);
3609
3610         cfs_up_read(&obj->oo_ext_idx_sem);
3611
3612         if (it->oie_rd_dirent == 0) {
3613                 result = -EIO;
3614         } else {
3615                 it->oie_dirent = it->oie_buf;
3616                 it->oie_it_dirent = 1;
3617         }
3618
3619         RETURN(result);
3620 }
3621
3622 /**
3623  * It calls osd_ldiskfs_it_fill() which will use ->readdir()
3624  * to load a directory entry at a time and stored it in
3625  * iterator's in-memory data structure.
3626  *
3627  * \param di iterator's in memory structure
3628  *
3629  * \retval +ve iterator reached to end
3630  * \retval   0 iterator not reached to end
3631  * \retval -ve on error
3632  */
3633 static int osd_it_ea_next(const struct lu_env *env, struct dt_it *di)
3634 {
3635         struct osd_it_ea *it = (struct osd_it_ea *)di;
3636         int rc;
3637
3638         ENTRY;
3639
3640         if (it->oie_it_dirent < it->oie_rd_dirent) {
3641                 it->oie_dirent =
3642                         (void *) it->oie_dirent +
3643                         cfs_size_round(sizeof(struct osd_it_ea_dirent) +
3644                                        it->oie_dirent->oied_namelen);
3645                 it->oie_it_dirent++;
3646                 RETURN(0);
3647         } else {
3648                 if (it->oie_file.f_pos == LDISKFS_HTREE_EOF)
3649                         rc = +1;
3650                 else
3651                         rc = osd_ldiskfs_it_fill(di);
3652         }
3653
3654         RETURN(rc);
3655 }
3656
3657 /**
3658  * Returns the key at current position from iterator's in memory structure.
3659  *
3660  * \param di iterator's in memory structure
3661  *
3662  * \retval key i.e. struct dt_key on success
3663  */
3664 static struct dt_key *osd_it_ea_key(const struct lu_env *env,
3665                                     const struct dt_it *di)
3666 {
3667         struct osd_it_ea *it = (struct osd_it_ea *)di;
3668         ENTRY;
3669         RETURN((struct dt_key *)it->oie_dirent->oied_name);
3670 }
3671
3672 /**
3673  * Returns the key's size at current position from iterator's in memory structure.
3674  *
3675  * \param di iterator's in memory structure
3676  *
3677  * \retval key_size i.e. struct dt_key on success
3678  */
3679 static int osd_it_ea_key_size(const struct lu_env *env, const struct dt_it *di)
3680 {
3681         struct osd_it_ea *it = (struct osd_it_ea *)di;
3682         ENTRY;
3683         RETURN(it->oie_dirent->oied_namelen);
3684 }
3685
3686
3687 /**
3688  * Returns the value (i.e. fid/igif) at current position from iterator's
3689  * in memory structure.
3690  *
3691  * \param di struct osd_it_ea, iterator's in memory structure
3692  * \param attr attr requested for dirent.
3693  * \param lde lustre dirent
3694  *
3695  * \retval   0 no error and \param lde has correct lustre dirent.
3696  * \retval -ve on error
3697  */
3698 static inline int osd_it_ea_rec(const struct lu_env *env,
3699                                 const struct dt_it *di,
3700                                 struct lu_dirent *lde,
3701                                 __u32 attr)
3702 {
3703         struct osd_it_ea        *it     = (struct osd_it_ea *)di;
3704         struct osd_object       *obj    = it->oie_obj;
3705         struct lu_fid           *fid    = &it->oie_dirent->oied_fid;
3706         int    rc = 0;
3707
3708         ENTRY;
3709
3710         if (!fid_is_sane(fid))
3711                 rc = osd_ea_fid_get(env, obj, it->oie_dirent->oied_ino, fid);
3712
3713         if (rc == 0)
3714                 osd_it_pack_dirent(lde, fid, it->oie_dirent->oied_off,
3715                                    it->oie_dirent->oied_name,
3716                                    it->oie_dirent->oied_namelen,
3717                                    it->oie_dirent->oied_type,
3718                                    attr);
3719         RETURN(rc);
3720 }
3721
3722 /**
3723  * Returns a cookie for current position of the iterator head, so that
3724  * user can use this cookie to load/start the iterator next time.
3725  *
3726  * \param di iterator's in memory structure
3727  *
3728  * \retval cookie for current position, on success
3729  */
3730 static __u64 osd_it_ea_store(const struct lu_env *env, const struct dt_it *di)
3731 {
3732         struct osd_it_ea *it = (struct osd_it_ea *)di;
3733         ENTRY;
3734         RETURN(it->oie_dirent->oied_off);
3735 }
3736
3737 /**
3738  * It calls osd_ldiskfs_it_fill() which will use ->readdir()
3739  * to load a directory entry at a time and stored it i inn,
3740  * in iterator's in-memory data structure.
3741  *
3742  * \param di struct osd_it_ea, iterator's in memory structure
3743  *
3744  * \retval +ve on success
3745  * \retval -ve on error
3746  */
3747 static int osd_it_ea_load(const struct lu_env *env,
3748                           const struct dt_it *di, __u64 hash)
3749 {
3750         struct osd_it_ea *it = (struct osd_it_ea *)di;
3751         int rc;
3752
3753         ENTRY;
3754         it->oie_file.f_pos = hash;
3755
3756         rc =  osd_ldiskfs_it_fill(di);
3757         if (rc == 0)
3758                 rc = +1;
3759
3760         RETURN(rc);
3761 }
3762
3763 /**
3764  * Index lookup function for interoperability mode (b11826).
3765  *
3766  * \param key,  key i.e. file name to be searched
3767  *
3768  * \retval +ve, on success
3769  * \retval -ve, on error
3770  */
3771 static int osd_index_ea_lookup(const struct lu_env *env, struct dt_object *dt,
3772                                struct dt_rec *rec, const struct dt_key *key,
3773                                struct lustre_capa *capa)
3774 {
3775         struct osd_object *obj = osd_dt_obj(dt);
3776         int rc = 0;
3777
3778         ENTRY;
3779
3780         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
3781         LINVRNT(osd_invariant(obj));
3782
3783         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
3784                 return -EACCES;
3785
3786         rc = osd_ea_lookup_rec(env, obj, rec, key);
3787
3788         if (rc == 0)
3789                 rc = +1;
3790         RETURN(rc);
3791 }
3792
3793 /**
3794  * Index and Iterator operations for interoperability
3795  * mode (i.e. to run 2.0 mds on 1.8 disk) (b11826)
3796  */
3797 static const struct dt_index_operations osd_index_ea_ops = {
3798         .dio_lookup = osd_index_ea_lookup,
3799         .dio_insert = osd_index_ea_insert,
3800         .dio_delete = osd_index_ea_delete,
3801         .dio_it     = {
3802                 .init     = osd_it_ea_init,
3803                 .fini     = osd_it_ea_fini,
3804                 .get      = osd_it_ea_get,
3805                 .put      = osd_it_ea_put,
3806                 .next     = osd_it_ea_next,
3807                 .key      = osd_it_ea_key,
3808                 .key_size = osd_it_ea_key_size,
3809                 .rec      = osd_it_ea_rec,
3810                 .store    = osd_it_ea_store,
3811                 .load     = osd_it_ea_load
3812         }
3813 };
3814
3815 static void *osd_key_init(const struct lu_context *ctx,
3816                           struct lu_context_key *key)
3817 {
3818         struct osd_thread_info *info;
3819
3820         OBD_ALLOC_PTR(info);
3821         if (info != NULL) {
3822                 OBD_ALLOC(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
3823                 if (info->oti_it_ea_buf != NULL) {
3824                         info->oti_env = container_of(ctx, struct lu_env,
3825                                                      le_ctx);
3826                 } else {
3827                         OBD_FREE_PTR(info);
3828                         info = ERR_PTR(-ENOMEM);
3829                 }
3830         } else {
3831                 info = ERR_PTR(-ENOMEM);
3832         }
3833         return info;
3834 }
3835
3836 static void osd_key_fini(const struct lu_context *ctx,
3837                          struct lu_context_key *key, void* data)
3838 {
3839         struct osd_thread_info *info = data;
3840
3841         OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
3842         OBD_FREE_PTR(info);
3843 }
3844
3845 static void osd_key_exit(const struct lu_context *ctx,
3846                          struct lu_context_key *key, void *data)
3847 {
3848         struct osd_thread_info *info = data;
3849
3850         LASSERT(info->oti_r_locks == 0);
3851         LASSERT(info->oti_w_locks == 0);
3852         LASSERT(info->oti_txns    == 0);
3853 }
3854
3855 /* type constructor/destructor: osd_type_init, osd_type_fini */
3856 LU_TYPE_INIT_FINI(osd, &osd_key);
3857
3858 static struct lu_context_key osd_key = {
3859         .lct_tags = LCT_DT_THREAD | LCT_MD_THREAD,
3860         .lct_init = osd_key_init,
3861         .lct_fini = osd_key_fini,
3862         .lct_exit = osd_key_exit
3863 };
3864
3865
3866 static int osd_device_init(const struct lu_env *env, struct lu_device *d,
3867                            const char *name, struct lu_device *next)
3868 {
3869         int rc;
3870         struct lu_context *ctx;
3871
3872         /* context for commit hooks */
3873         ctx = &osd_dev(d)->od_env_for_commit.le_ctx;
3874         rc = lu_context_init(ctx, LCT_MD_THREAD|LCT_REMEMBER|LCT_NOREF);
3875         if (rc == 0) {
3876                 rc = osd_procfs_init(osd_dev(d), name);
3877                 ctx->lc_cookie = 0x3;
3878         }
3879         return rc;
3880 }
3881
3882 static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
3883 {
3884         struct osd_thread_info *info = osd_oti_get(env);
3885         ENTRY;
3886         if (o->od_obj_area != NULL) {
3887                 lu_object_put(env, &o->od_obj_area->do_lu);
3888                 o->od_obj_area = NULL;
3889         }
3890         osd_oi_fini(info, &o->od_oi);
3891
3892         RETURN(0);
3893 }
3894
3895 static int osd_mount(const struct lu_env *env,
3896                      struct osd_device *o, struct lustre_cfg *cfg)
3897 {
3898         struct lustre_mount_info *lmi;
3899         const char               *dev  = lustre_cfg_string(cfg, 0);
3900         struct lustre_disk_data  *ldd;
3901         struct lustre_sb_info    *lsi;
3902
3903         ENTRY;
3904         if (o->od_mount != NULL) {
3905                 CERROR("Already mounted (%s)\n", dev);
3906                 RETURN(-EEXIST);
3907         }
3908
3909         /* get mount */
3910         lmi = server_get_mount(dev);
3911         if (lmi == NULL) {
3912                 CERROR("Cannot get mount info for %s!\n", dev);
3913                 RETURN(-EFAULT);
3914         }
3915
3916         LASSERT(lmi != NULL);
3917         /* save lustre_mount_info in dt_device */
3918         o->od_mount = lmi;
3919
3920         lsi = s2lsi(lmi->lmi_sb);
3921         ldd = lsi->lsi_ldd;
3922
3923         if (ldd->ldd_flags & LDD_F_IAM_DIR) {
3924                 o->od_iop_mode = 0;
3925                 LCONSOLE_WARN("OSD: IAM mode enabled\n");
3926         } else
3927                 o->od_iop_mode = 1;
3928
3929         o->od_obj_area = NULL;
3930         RETURN(0);
3931 }
3932
3933 static struct lu_device *osd_device_fini(const struct lu_env *env,
3934                                          struct lu_device *d)
3935 {
3936         int rc;
3937         ENTRY;
3938
3939         shrink_dcache_sb(osd_sb(osd_dev(d)));
3940         osd_sync(env, lu2dt_dev(d));
3941
3942         rc = osd_procfs_fini(osd_dev(d));
3943         if (rc) {
3944                 CERROR("proc fini error %d \n", rc);
3945                 RETURN (ERR_PTR(rc));
3946         }
3947
3948         if (osd_dev(d)->od_mount)
3949                 server_put_mount(osd_dev(d)->od_mount->lmi_name,
3950                                  osd_dev(d)->od_mount->lmi_mnt);
3951         osd_dev(d)->od_mount = NULL;
3952
3953         lu_context_fini(&osd_dev(d)->od_env_for_commit.le_ctx);
3954         RETURN(NULL);
3955 }
3956
3957 static struct lu_device *osd_device_alloc(const struct lu_env *env,
3958                                           struct lu_device_type *t,
3959                                           struct lustre_cfg *cfg)
3960 {
3961         struct lu_device  *l;
3962         struct osd_device *o;
3963
3964         OBD_ALLOC_PTR(o);
3965         if (o != NULL) {
3966                 int result;
3967
3968                 result = dt_device_init(&o->od_dt_dev, t);
3969                 if (result == 0) {
3970                         l = osd2lu_dev(o);
3971                         l->ld_ops = &osd_lu_ops;
3972                         o->od_dt_dev.dd_ops = &osd_dt_ops;
3973                         cfs_spin_lock_init(&o->od_osfs_lock);
3974                         o->od_osfs_age = cfs_time_shift_64(-1000);
3975                         o->od_capa_hash = init_capa_hash();
3976                         if (o->od_capa_hash == NULL) {
3977                                 dt_device_fini(&o->od_dt_dev);
3978                                 l = ERR_PTR(-ENOMEM);
3979                         }
3980                 } else
3981                         l = ERR_PTR(result);
3982
3983                 if (IS_ERR(l))
3984                         OBD_FREE_PTR(o);
3985         } else
3986                 l = ERR_PTR(-ENOMEM);
3987         return l;
3988 }
3989
3990 static struct lu_device *osd_device_free(const struct lu_env *env,
3991                                          struct lu_device *d)
3992 {
3993         struct osd_device *o = osd_dev(d);
3994         ENTRY;
3995
3996         cleanup_capa_hash(o->od_capa_hash);
3997         dt_device_fini(&o->od_dt_dev);
3998         OBD_FREE_PTR(o);
3999         RETURN(NULL);
4000 }
4001
4002 static int osd_process_config(const struct lu_env *env,
4003                               struct lu_device *d, struct lustre_cfg *cfg)
4004 {
4005         struct osd_device *o = osd_dev(d);
4006         int err;
4007         ENTRY;
4008
4009         switch(cfg->lcfg_command) {
4010         case LCFG_SETUP:
4011                 err = osd_mount(env, o, cfg);
4012                 break;
4013         case LCFG_CLEANUP:
4014                 err = osd_shutdown(env, o);
4015                 break;
4016         default:
4017                 err = -ENOSYS;
4018         }
4019
4020         RETURN(err);
4021 }
4022
4023 static int osd_recovery_complete(const struct lu_env *env,
4024                                  struct lu_device *d)
4025 {
4026         RETURN(0);
4027 }
4028
4029 static int osd_prepare(const struct lu_env *env,
4030                        struct lu_device *pdev,
4031                        struct lu_device *dev)
4032 {
4033         struct osd_device *osd = osd_dev(dev);
4034         struct lustre_sb_info *lsi;
4035         struct lustre_disk_data *ldd;
4036         struct lustre_mount_info  *lmi;
4037         struct osd_thread_info *oti = osd_oti_get(env);
4038         struct dt_object *d;
4039         int result;
4040
4041         ENTRY;
4042         /* 1. initialize oi before any file create or file open */
4043         result = osd_oi_init(oti, &osd->od_oi,
4044                              &osd->od_dt_dev, lu2md_dev(pdev));
4045         if (result != 0)
4046                 RETURN(result);
4047
4048         lmi = osd->od_mount;
4049         lsi = s2lsi(lmi->lmi_sb);
4050         ldd = lsi->lsi_ldd;
4051
4052         /* 2. setup local objects */
4053         result = llo_local_objects_setup(env, lu2md_dev(pdev), lu2dt_dev(dev));
4054         if (result)
4055                 goto out;
4056
4057         /* 3. open remote object dir */
4058         d = dt_store_open(env, lu2dt_dev(dev), "",
4059                           remote_obj_dir, &oti->oti_fid);
4060         if (!IS_ERR(d)) {
4061                 osd->od_obj_area = d;
4062                 result = 0;
4063         } else {
4064                 result = PTR_ERR(d);
4065                 osd->od_obj_area = NULL;
4066         }
4067
4068 out:
4069         RETURN(result);
4070 }
4071
4072 static const struct lu_object_operations osd_lu_obj_ops = {
4073         .loo_object_init      = osd_object_init,
4074         .loo_object_delete    = osd_object_delete,
4075         .loo_object_release   = osd_object_release,
4076         .loo_object_free      = osd_object_free,
4077         .loo_object_print     = osd_object_print,
4078         .loo_object_invariant = osd_object_invariant
4079 };
4080
4081 static const struct lu_device_operations osd_lu_ops = {
4082         .ldo_object_alloc      = osd_object_alloc,
4083         .ldo_process_config    = osd_process_config,
4084         .ldo_recovery_complete = osd_recovery_complete,
4085         .ldo_prepare           = osd_prepare,
4086 };
4087
4088 static const struct lu_device_type_operations osd_device_type_ops = {
4089         .ldto_init = osd_type_init,
4090         .ldto_fini = osd_type_fini,
4091
4092         .ldto_start = osd_type_start,
4093         .ldto_stop  = osd_type_stop,
4094
4095         .ldto_device_alloc = osd_device_alloc,
4096         .ldto_device_free  = osd_device_free,
4097
4098         .ldto_device_init    = osd_device_init,
4099         .ldto_device_fini    = osd_device_fini
4100 };
4101
4102 static struct lu_device_type osd_device_type = {
4103         .ldt_tags     = LU_DEVICE_DT,
4104         .ldt_name     = LUSTRE_OSD_NAME,
4105         .ldt_ops      = &osd_device_type_ops,
4106         .ldt_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
4107 };
4108
4109 /*
4110  * lprocfs legacy support.
4111  */
4112 static struct obd_ops osd_obd_device_ops = {
4113         .o_owner = THIS_MODULE
4114 };
4115
4116 static struct lu_local_obj_desc llod_osd_rem_obj_dir = {
4117         .llod_name      = remote_obj_dir,
4118         .llod_oid       = OSD_REM_OBJ_DIR_OID,
4119         .llod_is_index  = 1,
4120         .llod_feat      = &dt_directory_features,
4121 };
4122
4123 static int __init osd_mod_init(void)
4124 {
4125         struct lprocfs_static_vars lvars;
4126
4127         osd_oi_mod_init();
4128         llo_local_obj_register(&llod_osd_rem_obj_dir);
4129         lprocfs_osd_init_vars(&lvars);
4130         return class_register_type(&osd_obd_device_ops, NULL, lvars.module_vars,
4131                                    LUSTRE_OSD_NAME, &osd_device_type);
4132 }
4133
4134 static void __exit osd_mod_exit(void)
4135 {
4136         llo_local_obj_unregister(&llod_osd_rem_obj_dir);
4137         class_unregister_type(LUSTRE_OSD_NAME);
4138 }
4139
4140 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4141 MODULE_DESCRIPTION("Lustre Object Storage Device ("LUSTRE_OSD_NAME")");
4142 MODULE_LICENSE("GPL");
4143
4144 cfs_module(osd, "0.0.2", osd_mod_init, osd_mod_exit);