Whamcloud - gitweb
b=23289 set S_NOCMTIME in OSD and remove some mark_inode_dirty
[fs/lustre-release.git] / lustre / osd-ldiskfs / osd_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/osd/osd_handler.c
37  *
38  * Top-level entry points into osd module
39  *
40  * Author: Nikita Danilov <nikita@clusterfs.com>
41  *         Pravin Shelar <pravin.shelar@sun.com> : Added fid in dirent
42  */
43
44 #ifndef EXPORT_SYMTAB
45 # define EXPORT_SYMTAB
46 #endif
47 #define DEBUG_SUBSYSTEM S_MDS
48
49 #include <linux/module.h>
50
51 /* LUSTRE_VERSION_CODE */
52 #include <lustre_ver.h>
53 /* prerequisite for linux/xattr.h */
54 #include <linux/types.h>
55 /* prerequisite for linux/xattr.h */
56 #include <linux/fs.h>
57 /* XATTR_{REPLACE,CREATE} */
58 #include <linux/xattr.h>
59 /* simple_mkdir() */
60 #include <lvfs.h>
61
62 /*
63  * struct OBD_{ALLOC,FREE}*()
64  * OBD_FAIL_CHECK
65  */
66 #include <obd_support.h>
67 /* struct ptlrpc_thread */
68 #include <lustre_net.h>
69
70 /* fid_is_local() */
71 #include <lustre_fid.h>
72
73 #include "osd_internal.h"
74 #include "osd_igif.h"
75
76 /* llo_* api support */
77 #include <md_object.h>
78
79 static const char dot[] = ".";
80 static const char dotdot[] = "..";
81 static const char remote_obj_dir[] = "REM_OBJ_DIR";
82
83 struct osd_directory {
84         struct iam_container od_container;
85         struct iam_descr     od_descr;
86 };
87
88 struct osd_object {
89         struct dt_object       oo_dt;
90         /**
91          * Inode for file system object represented by this osd_object. This
92          * inode is pinned for the whole duration of lu_object life.
93          *
94          * Not modified concurrently (either setup early during object
95          * creation, or assigned by osd_object_create() under write lock).
96          */
97         struct inode          *oo_inode;
98         /**
99          * to protect index ops.
100          */
101         cfs_rw_semaphore_t     oo_ext_idx_sem;
102         cfs_rw_semaphore_t     oo_sem;
103         struct osd_directory  *oo_dir;
104         /** protects inode attributes. */
105         cfs_spinlock_t         oo_guard;
106         /**
107          * Following two members are used to indicate the presence of dot and
108          * dotdot in the given directory. This is required for interop mode
109          * (b11826).
110          */
111         int                    oo_compat_dot_created;
112         int                    oo_compat_dotdot_created;
113
114         const struct lu_env   *oo_owner;
115 #ifdef CONFIG_LOCKDEP
116         struct lockdep_map     oo_dep_map;
117 #endif
118 };
119
120 static const struct lu_object_operations      osd_lu_obj_ops;
121 static const struct lu_device_operations      osd_lu_ops;
122 static       struct lu_context_key            osd_key;
123 static const struct dt_object_operations      osd_obj_ops;
124 static const struct dt_object_operations      osd_obj_ea_ops;
125 static const struct dt_body_operations        osd_body_ops;
126 static const struct dt_index_operations       osd_index_iam_ops;
127 static const struct dt_index_operations       osd_index_ea_ops;
128
129 struct osd_thandle {
130         struct thandle          ot_super;
131         handle_t               *ot_handle;
132         struct journal_callback ot_jcb;
133         /* Link to the device, for debugging. */
134         struct lu_ref_link     *ot_dev_link;
135
136 #if OSD_THANDLE_STATS
137         /** time when this handle was allocated */
138         cfs_time_t oth_alloced;
139
140         /** time when this thanle was started */
141         cfs_time_t oth_started;
142 #endif
143 };
144
145 /*
146  * Helpers.
147  */
148 static int lu_device_is_osd(const struct lu_device *d)
149 {
150         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &osd_lu_ops);
151 }
152
153 static struct osd_device *osd_dt_dev(const struct dt_device *d)
154 {
155         LASSERT(lu_device_is_osd(&d->dd_lu_dev));
156         return container_of0(d, struct osd_device, od_dt_dev);
157 }
158
159 static struct osd_device *osd_dev(const struct lu_device *d)
160 {
161         LASSERT(lu_device_is_osd(d));
162         return osd_dt_dev(container_of0(d, struct dt_device, dd_lu_dev));
163 }
164
165 static struct osd_device *osd_obj2dev(const struct osd_object *o)
166 {
167         return osd_dev(o->oo_dt.do_lu.lo_dev);
168 }
169
170 static struct super_block *osd_sb(const struct osd_device *dev)
171 {
172         return dev->od_mount->lmi_mnt->mnt_sb;
173 }
174
175 static int osd_object_is_root(const struct osd_object *obj)
176 {
177         return osd_sb(osd_obj2dev(obj))->s_root->d_inode == obj->oo_inode;
178 }
179
180 static struct osd_object *osd_obj(const struct lu_object *o)
181 {
182         LASSERT(lu_device_is_osd(o->lo_dev));
183         return container_of0(o, struct osd_object, oo_dt.do_lu);
184 }
185
186 static struct osd_object *osd_dt_obj(const struct dt_object *d)
187 {
188         return osd_obj(&d->do_lu);
189 }
190
191 static struct lu_device *osd2lu_dev(struct osd_device *osd)
192 {
193         return &osd->od_dt_dev.dd_lu_dev;
194 }
195
196 static journal_t *osd_journal(const struct osd_device *dev)
197 {
198         return LDISKFS_SB(osd_sb(dev))->s_journal;
199 }
200
201 static int osd_has_index(const struct osd_object *obj)
202 {
203         return obj->oo_dt.do_index_ops != NULL;
204 }
205
206 static int osd_object_invariant(const struct lu_object *l)
207 {
208         return osd_invariant(osd_obj(l));
209 }
210
211 #ifdef HAVE_QUOTA_SUPPORT
212 static inline void
213 osd_push_ctxt(const struct lu_env *env, struct osd_ctxt *save)
214 {
215         struct md_ucred    *uc = md_ucred(env);
216         struct cred        *tc;
217
218         LASSERT(uc != NULL);
219
220         save->oc_uid = current_fsuid();
221         save->oc_gid = current_fsgid();
222         save->oc_cap = current_cap();
223         if ((tc = prepare_creds())) {
224                 tc->fsuid         = uc->mu_fsuid;
225                 tc->fsgid         = uc->mu_fsgid;
226                 commit_creds(tc);
227         }
228         /* XXX not suboptimal */
229         cfs_curproc_cap_unpack(uc->mu_cap);
230 }
231
232 static inline void
233 osd_pop_ctxt(struct osd_ctxt *save)
234 {
235         struct cred *tc;
236
237         if ((tc = prepare_creds())) {
238                 tc->fsuid         = save->oc_uid;
239                 tc->fsgid         = save->oc_gid;
240                 tc->cap_effective = save->oc_cap;
241                 commit_creds(tc);
242         }
243 }
244 #endif
245
246 static inline struct osd_thread_info *osd_oti_get(const struct lu_env *env)
247 {
248         return lu_context_key_get(&env->le_ctx, &osd_key);
249 }
250
251 /*
252  * Concurrency: doesn't matter
253  */
254 static int osd_read_locked(const struct lu_env *env, struct osd_object *o)
255 {
256         return osd_oti_get(env)->oti_r_locks > 0;
257 }
258
259 /*
260  * Concurrency: doesn't matter
261  */
262 static int osd_write_locked(const struct lu_env *env, struct osd_object *o)
263 {
264         struct osd_thread_info *oti = osd_oti_get(env);
265         return oti->oti_w_locks > 0 && o->oo_owner == env;
266 }
267
268 /*
269  * Concurrency: doesn't access mutable data
270  */
271 static int osd_root_get(const struct lu_env *env,
272                         struct dt_device *dev, struct lu_fid *f)
273 {
274         struct inode *inode;
275
276         inode = osd_sb(osd_dt_dev(dev))->s_root->d_inode;
277         LU_IGIF_BUILD(f, inode->i_ino, inode->i_generation);
278         return 0;
279 }
280
281 /*
282  * OSD object methods.
283  */
284
285 /*
286  * Concurrency: no concurrent access is possible that early in object
287  * life-cycle.
288  */
289 static struct lu_object *osd_object_alloc(const struct lu_env *env,
290                                           const struct lu_object_header *hdr,
291                                           struct lu_device *d)
292 {
293         struct osd_object *mo;
294
295         OBD_ALLOC_PTR(mo);
296         if (mo != NULL) {
297                 struct lu_object *l;
298
299                 l = &mo->oo_dt.do_lu;
300                 dt_object_init(&mo->oo_dt, NULL, d);
301                 if (osd_dev(d)->od_iop_mode)
302                         mo->oo_dt.do_ops = &osd_obj_ea_ops;
303                 else
304                         mo->oo_dt.do_ops = &osd_obj_ops;
305
306                 l->lo_ops = &osd_lu_obj_ops;
307                 cfs_init_rwsem(&mo->oo_sem);
308                 cfs_init_rwsem(&mo->oo_ext_idx_sem);
309                 cfs_spin_lock_init(&mo->oo_guard);
310                 return l;
311         } else
312                 return NULL;
313 }
314
315 /*
316  * retrieve object from backend ext fs.
317  **/
318 static struct inode *osd_iget(struct osd_thread_info *info,
319                               struct osd_device *dev,
320                               const struct osd_inode_id *id)
321 {
322         struct inode *inode = NULL;
323
324 #ifdef HAVE_EXT4_LDISKFS
325         inode = ldiskfs_iget(osd_sb(dev), id->oii_ino);
326         if (IS_ERR(inode))
327         /* Newer kernels return an error instead of a NULL pointer */
328                 inode = NULL;
329 #else
330         inode = iget(osd_sb(dev), id->oii_ino);
331 #endif
332         if (inode == NULL) {
333                 CERROR("no inode\n");
334                 inode = ERR_PTR(-EACCES);
335         } else if (id->oii_gen != OSD_OII_NOGEN &&
336                    inode->i_generation != id->oii_gen) {
337                 iput(inode);
338                 inode = ERR_PTR(-ESTALE);
339         } else if (inode->i_nlink == 0) {
340                 /* due to parallel readdir and unlink,
341                 * we can have dead inode here. */
342                 CWARN("stale inode\n");
343                 make_bad_inode(inode);
344                 iput(inode);
345                 inode = ERR_PTR(-ESTALE);
346         } else if (is_bad_inode(inode)) {
347                 CERROR("bad inode %lx\n",inode->i_ino);
348                 iput(inode);
349                 inode = ERR_PTR(-ENOENT);
350         } else {
351                 /* Do not update file c/mtime in ldiskfs.
352                  * NB: we don't have any lock to protect this because we don't
353                  * have reference on osd_object now, but contention with
354                  * another lookup + attr_set can't happen in the tiny window
355                  * between if (...) and set S_NOCMTIME. */
356                 if (!(inode->i_flags & S_NOCMTIME))
357                         inode->i_flags |= S_NOCMTIME;
358         }
359         return inode;
360 }
361
362 static int osd_fid_lookup(const struct lu_env *env,
363                           struct osd_object *obj, const struct lu_fid *fid)
364 {
365         struct osd_thread_info *info;
366         struct lu_device       *ldev = obj->oo_dt.do_lu.lo_dev;
367         struct osd_device      *dev;
368         struct osd_inode_id    *id;
369         struct osd_oi          *oi;
370         struct inode           *inode;
371         int                     result;
372
373         LINVRNT(osd_invariant(obj));
374         LASSERT(obj->oo_inode == NULL);
375         LASSERT(fid_is_sane(fid) || osd_fid_is_root(fid));
376         /*
377          * This assertion checks that osd layer sees only local
378          * fids. Unfortunately it is somewhat expensive (does a
379          * cache-lookup). Disabling it for production/acceptance-testing.
380          */
381         LASSERT(1 || fid_is_local(env, ldev->ld_site, fid));
382
383         ENTRY;
384
385         info = osd_oti_get(env);
386         dev  = osd_dev(ldev);
387         id   = &info->oti_id;
388         oi   = &dev->od_oi;
389
390         if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT))
391                 RETURN(-ENOENT);
392
393         result = osd_oi_lookup(info, oi, fid, id);
394         if (result == 0) {
395                 inode = osd_iget(info, dev, id);
396                 if (!IS_ERR(inode)) {
397                         obj->oo_inode = inode;
398                         LASSERT(obj->oo_inode->i_sb == osd_sb(dev));
399                         if (dev->od_iop_mode) {
400                                 obj->oo_compat_dot_created = 1;
401                                 obj->oo_compat_dotdot_created = 1;
402                         }
403                         result = 0;
404                 } else
405                         /*
406                          * If fid wasn't found in oi, inode-less object is
407                          * created, for which lu_object_exists() returns
408                          * false. This is used in a (frequent) case when
409                          * objects are created as locking anchors or
410                          * place holders for objects yet to be created.
411                          */
412                         result = PTR_ERR(inode);
413         } else if (result == -ENOENT)
414                 result = 0;
415         LINVRNT(osd_invariant(obj));
416
417         RETURN(result);
418 }
419
420 /*
421  * Concurrency: shouldn't matter.
422  */
423 static void osd_object_init0(struct osd_object *obj)
424 {
425         LASSERT(obj->oo_inode != NULL);
426         obj->oo_dt.do_body_ops = &osd_body_ops;
427         obj->oo_dt.do_lu.lo_header->loh_attr |=
428                 (LOHA_EXISTS | (obj->oo_inode->i_mode & S_IFMT));
429 }
430
431 /*
432  * Concurrency: no concurrent access is possible that early in object
433  * life-cycle.
434  */
435 static int osd_object_init(const struct lu_env *env, struct lu_object *l,
436                            const struct lu_object_conf *unused)
437 {
438         struct osd_object *obj = osd_obj(l);
439         int result;
440
441         LINVRNT(osd_invariant(obj));
442
443         result = osd_fid_lookup(env, obj, lu_object_fid(l));
444         if (result == 0) {
445                 if (obj->oo_inode != NULL)
446                         osd_object_init0(obj);
447         }
448         LINVRNT(osd_invariant(obj));
449         return result;
450 }
451
452 /*
453  * Concurrency: no concurrent access is possible that late in object
454  * life-cycle.
455  */
456 static void osd_object_free(const struct lu_env *env, struct lu_object *l)
457 {
458         struct osd_object *obj = osd_obj(l);
459
460         LINVRNT(osd_invariant(obj));
461
462         dt_object_fini(&obj->oo_dt);
463         OBD_FREE_PTR(obj);
464 }
465
466 /**
467  * IAM Iterator
468  */
469 static struct iam_path_descr *osd_it_ipd_get(const struct lu_env *env,
470                                              const struct iam_container *bag)
471 {
472         return bag->ic_descr->id_ops->id_ipd_alloc(bag,
473                                            osd_oti_get(env)->oti_it_ipd);
474 }
475
476 static struct iam_path_descr *osd_idx_ipd_get(const struct lu_env *env,
477                                               const struct iam_container *bag)
478 {
479         return bag->ic_descr->id_ops->id_ipd_alloc(bag,
480                                            osd_oti_get(env)->oti_idx_ipd);
481 }
482
483 static void osd_ipd_put(const struct lu_env *env,
484                         const struct iam_container *bag,
485                         struct iam_path_descr *ipd)
486 {
487         bag->ic_descr->id_ops->id_ipd_free(ipd);
488 }
489
490 /*
491  * Concurrency: no concurrent access is possible that late in object
492  * life-cycle.
493  */
494 static void osd_index_fini(struct osd_object *o)
495 {
496         struct iam_container *bag;
497
498         if (o->oo_dir != NULL) {
499                 bag = &o->oo_dir->od_container;
500                 if (o->oo_inode != NULL) {
501                         if (bag->ic_object == o->oo_inode)
502                                 iam_container_fini(bag);
503                 }
504                 OBD_FREE_PTR(o->oo_dir);
505                 o->oo_dir = NULL;
506         }
507 }
508
509 /*
510  * Concurrency: no concurrent access is possible that late in object
511  * life-cycle (for all existing callers, that is. New callers have to provide
512  * their own locking.)
513  */
514 static int osd_inode_unlinked(const struct inode *inode)
515 {
516         return inode->i_nlink == 0;
517 }
518
519 enum {
520         OSD_TXN_OI_DELETE_CREDITS    = 20,
521         OSD_TXN_INODE_DELETE_CREDITS = 20
522 };
523
524 /*
525  * Journal
526  */
527
528 #if OSD_THANDLE_STATS
529 /**
530  * Set time when the handle is allocated
531  */
532 static void osd_th_alloced(struct osd_thandle *oth)
533 {
534         oth->oth_alloced = cfs_time_current();
535 }
536
537 /**
538  * Set time when the handle started
539  */
540 static void osd_th_started(struct osd_thandle *oth)
541 {
542         oth->oth_started = cfs_time_current();
543 }
544
545 /**
546  * Helper function to convert time interval to microseconds packed in
547  * long int (default time units for the counter in "stats" initialized
548  * by lu_time_init() )
549  */
550 static long interval_to_usec(cfs_time_t start, cfs_time_t end)
551 {
552         struct timeval val;
553
554         cfs_duration_usec(cfs_time_sub(end, start), &val);
555         return val.tv_sec * 1000000 + val.tv_usec;
556 }
557
558 /**
559  * Check whether the we deal with this handle for too long.
560  */
561 static void __osd_th_check_slow(void *oth, struct osd_device *dev,
562                                 cfs_time_t alloced, cfs_time_t started,
563                                 cfs_time_t closed)
564 {
565         cfs_time_t now = cfs_time_current();
566
567         LASSERT(dev != NULL);
568
569         lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_STARTING,
570                             interval_to_usec(alloced, started));
571         lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_OPEN,
572                             interval_to_usec(started, closed));
573         lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_CLOSING,
574                             interval_to_usec(closed, now));
575
576         if (cfs_time_before(cfs_time_add(alloced, cfs_time_seconds(30)), now)) {
577                 CWARN("transaction handle %p was open for too long: "
578                       "now "CFS_TIME_T" ,"
579                       "alloced "CFS_TIME_T" ,"
580                       "started "CFS_TIME_T" ,"
581                       "closed "CFS_TIME_T"\n",
582                       oth, now, alloced, started, closed);
583                 libcfs_debug_dumpstack(NULL);
584         }
585 }
586
587 #define OSD_CHECK_SLOW_TH(oth, dev, expr)                               \
588 {                                                                       \
589         cfs_time_t __closed = cfs_time_current();                       \
590         cfs_time_t __alloced = oth->oth_alloced;                        \
591         cfs_time_t __started = oth->oth_started;                        \
592                                                                         \
593         expr;                                                           \
594         __osd_th_check_slow(oth, dev, __alloced, __started, __closed);  \
595 }
596
597 #else /* OSD_THANDLE_STATS */
598
599 #define osd_th_alloced(h)                  do {} while(0)
600 #define osd_th_started(h)                  do {} while(0)
601 #define OSD_CHECK_SLOW_TH(oth, dev, expr)  expr
602
603 #endif /* OSD_THANDLE_STATS */
604
605 /*
606  * Concurrency: doesn't access mutable data.
607  */
608 static int osd_param_is_sane(const struct osd_device *dev,
609                              const struct txn_param *param)
610 {
611         return param->tp_credits <= osd_journal(dev)->j_max_transaction_buffers;
612 }
613
614 /*
615  * Concurrency: shouldn't matter.
616  */
617 static void osd_trans_commit_cb(struct journal_callback *jcb, int error)
618 {
619         struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb);
620         struct thandle     *th  = &oh->ot_super;
621         struct dt_device   *dev = th->th_dev;
622         struct lu_device   *lud = &dev->dd_lu_dev;
623
624         LASSERT(dev != NULL);
625         LASSERT(oh->ot_handle == NULL);
626
627         if (error) {
628                 CERROR("transaction @0x%p commit error: %d\n", th, error);
629         } else {
630                 struct lu_env *env = &osd_dt_dev(dev)->od_env_for_commit;
631                 /*
632                  * This od_env_for_commit is only for commit usage.  see
633                  * "struct dt_device"
634                  */
635                 lu_context_enter(&env->le_ctx);
636                 dt_txn_hook_commit(env, th);
637                 lu_context_exit(&env->le_ctx);
638         }
639
640         lu_ref_del_at(&lud->ld_reference, oh->ot_dev_link, "osd-tx", th);
641         lu_device_put(lud);
642         th->th_dev = NULL;
643
644         lu_context_exit(&th->th_ctx);
645         lu_context_fini(&th->th_ctx);
646         OBD_FREE_PTR(oh);
647 }
648
649 /*
650  * Concurrency: shouldn't matter.
651  */
652 static struct thandle *osd_trans_start(const struct lu_env *env,
653                                        struct dt_device *d,
654                                        struct txn_param *p)
655 {
656         struct osd_device  *dev = osd_dt_dev(d);
657         handle_t           *jh;
658         struct osd_thandle *oh;
659         struct thandle     *th;
660         int hook_res;
661
662         ENTRY;
663
664         hook_res = dt_txn_hook_start(env, d, p);
665         if (hook_res != 0)
666                 RETURN(ERR_PTR(hook_res));
667
668         if (osd_param_is_sane(dev, p)) {
669                 OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO);
670                 if (oh != NULL) {
671                         struct osd_thread_info *oti = osd_oti_get(env);
672
673                         /*
674                          * XXX temporary stuff. Some abstraction layer should
675                          * be used.
676                          */
677                         oti->oti_dev = dev;
678                         osd_th_alloced(oh);
679                         jh = ldiskfs_journal_start_sb(osd_sb(dev), p->tp_credits);
680                         osd_th_started(oh);
681                         if (!IS_ERR(jh)) {
682                                 oh->ot_handle = jh;
683                                 th = &oh->ot_super;
684                                 th->th_dev = d;
685                                 th->th_result = 0;
686                                 jh->h_sync = p->tp_sync;
687                                 lu_device_get(&d->dd_lu_dev);
688                                 oh->ot_dev_link = lu_ref_add
689                                         (&d->dd_lu_dev.ld_reference,
690                                          "osd-tx", th);
691                                 /* add commit callback */
692                                 lu_context_init(&th->th_ctx, LCT_TX_HANDLE);
693                                 lu_context_enter(&th->th_ctx);
694                                 osd_journal_callback_set(jh, osd_trans_commit_cb,
695                                                          (struct journal_callback *)&oh->ot_jcb);
696                                         LASSERT(oti->oti_txns == 0);
697                                         LASSERT(oti->oti_r_locks == 0);
698                                         LASSERT(oti->oti_w_locks == 0);
699                                         oti->oti_txns++;
700                         } else {
701                                 OBD_FREE_PTR(oh);
702                                 th = (void *)jh;
703                         }
704                 } else
705                         th = ERR_PTR(-ENOMEM);
706         } else {
707                 CERROR("Invalid transaction parameters\n");
708                 th = ERR_PTR(-EINVAL);
709         }
710
711         RETURN(th);
712 }
713
714 /*
715  * Concurrency: shouldn't matter.
716  */
717 static void osd_trans_stop(const struct lu_env *env, struct thandle *th)
718 {
719         int result;
720         struct osd_thandle *oh;
721         struct osd_thread_info *oti = osd_oti_get(env);
722
723         ENTRY;
724
725         oh = container_of0(th, struct osd_thandle, ot_super);
726         if (oh->ot_handle != NULL) {
727                 handle_t *hdl = oh->ot_handle;
728
729                 LASSERT(oti->oti_txns == 1);
730                 oti->oti_txns--;
731                 LASSERT(oti->oti_r_locks == 0);
732                 LASSERT(oti->oti_w_locks == 0);
733                 result = dt_txn_hook_stop(env, th);
734                 if (result != 0)
735                         CERROR("Failure in transaction hook: %d\n", result);
736                 oh->ot_handle = NULL;
737                 OSD_CHECK_SLOW_TH(oh, oti->oti_dev,
738                                   result = ldiskfs_journal_stop(hdl));
739                 if (result != 0)
740                         CERROR("Failure to stop transaction: %d\n", result);
741         }
742         EXIT;
743 }
744
745 /*
746  * Concurrency: no concurrent access is possible that late in object
747  * life-cycle.
748  */
749 static int osd_inode_remove(const struct lu_env *env, struct osd_object *obj)
750 {
751         const struct lu_fid    *fid = lu_object_fid(&obj->oo_dt.do_lu);
752         struct osd_device      *osd = osd_obj2dev(obj);
753         struct osd_thread_info *oti = osd_oti_get(env);
754         struct txn_param       *prm = &oti->oti_txn;
755         struct lu_env          *env_del_obj = &oti->oti_obj_delete_tx_env;
756         struct thandle         *th;
757         int result;
758
759         lu_env_init(env_del_obj, LCT_DT_THREAD);
760         txn_param_init(prm, OSD_TXN_OI_DELETE_CREDITS +
761                             OSD_TXN_INODE_DELETE_CREDITS);
762         th = osd_trans_start(env_del_obj, &osd->od_dt_dev, prm);
763         if (!IS_ERR(th)) {
764                 result = osd_oi_delete(osd_oti_get(env_del_obj),
765                                        &osd->od_oi, fid, th);
766                 osd_trans_stop(env_del_obj, th);
767         } else
768                 result = PTR_ERR(th);
769
770         lu_env_fini(env_del_obj);
771         return result;
772 }
773
774 /*
775  * Called just before object is freed. Releases all resources except for
776  * object itself (that is released by osd_object_free()).
777  *
778  * Concurrency: no concurrent access is possible that late in object
779  * life-cycle.
780  */
781 static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
782 {
783         struct osd_object *obj   = osd_obj(l);
784         struct inode      *inode = obj->oo_inode;
785
786         LINVRNT(osd_invariant(obj));
787
788         /*
789          * If object is unlinked remove fid->ino mapping from object index.
790          */
791
792         osd_index_fini(obj);
793         if (inode != NULL) {
794                 int result;
795
796                 if (osd_inode_unlinked(inode)) {
797                         result = osd_inode_remove(env, obj);
798                         if (result != 0)
799                                 LU_OBJECT_DEBUG(D_ERROR, env, l,
800                                                 "Failed to cleanup: %d\n",
801                                                 result);
802                 }
803
804                 iput(inode);
805                 obj->oo_inode = NULL;
806         }
807 }
808
809 /*
810  * Concurrency: ->loo_object_release() is called under site spin-lock.
811  */
812 static void osd_object_release(const struct lu_env *env,
813                                struct lu_object *l)
814 {
815         struct osd_object *o = osd_obj(l);
816
817         LASSERT(!lu_object_is_dying(l->lo_header));
818         if (o->oo_inode != NULL && osd_inode_unlinked(o->oo_inode))
819                 cfs_set_bit(LU_OBJECT_HEARD_BANSHEE, &l->lo_header->loh_flags);
820 }
821
822 /*
823  * Concurrency: shouldn't matter.
824  */
825 static int osd_object_print(const struct lu_env *env, void *cookie,
826                             lu_printer_t p, const struct lu_object *l)
827 {
828         struct osd_object *o = osd_obj(l);
829         struct iam_descr  *d;
830
831         if (o->oo_dir != NULL)
832                 d = o->oo_dir->od_container.ic_descr;
833         else
834                 d = NULL;
835         return (*p)(env, cookie, LUSTRE_OSD_NAME"-object@%p(i:%p:%lu/%u)[%s]",
836                     o, o->oo_inode,
837                     o->oo_inode ? o->oo_inode->i_ino : 0UL,
838                     o->oo_inode ? o->oo_inode->i_generation : 0,
839                     d ? d->id_ops->id_name : "plain");
840 }
841
842 /*
843  * Concurrency: shouldn't matter.
844  */
845 int osd_statfs(const struct lu_env *env, struct dt_device *d,
846                cfs_kstatfs_t *sfs)
847 {
848         struct osd_device *osd = osd_dt_dev(d);
849         struct super_block *sb = osd_sb(osd);
850         int result = 0;
851
852         cfs_spin_lock(&osd->od_osfs_lock);
853         /* cache 1 second */
854         if (cfs_time_before_64(osd->od_osfs_age, cfs_time_shift_64(-1))) {
855                 result = ll_do_statfs(sb, &osd->od_kstatfs);
856                 if (likely(result == 0)) /* N.B. statfs can't really fail */
857                         osd->od_osfs_age = cfs_time_current_64();
858         }
859
860         if (likely(result == 0))
861                 *sfs = osd->od_kstatfs;
862         cfs_spin_unlock(&osd->od_osfs_lock);
863
864         return result;
865 }
866
867 /*
868  * Concurrency: doesn't access mutable data.
869  */
870 static void osd_conf_get(const struct lu_env *env,
871                          const struct dt_device *dev,
872                          struct dt_device_param *param)
873 {
874         /*
875          * XXX should be taken from not-yet-existing fs abstraction layer.
876          */
877         param->ddp_max_name_len  = LDISKFS_NAME_LEN;
878         param->ddp_max_nlink     = LDISKFS_LINK_MAX;
879         param->ddp_block_shift   = osd_sb(osd_dt_dev(dev))->s_blocksize_bits;
880 }
881
882 /**
883  * Helper function to get and fill the buffer with input values.
884  */
885 static struct lu_buf *osd_buf_get(const struct lu_env *env, void *area, ssize_t len)
886 {
887         struct lu_buf *buf;
888
889         buf = &osd_oti_get(env)->oti_buf;
890         buf->lb_buf = area;
891         buf->lb_len = len;
892         return buf;
893 }
894
895 /*
896  * Concurrency: shouldn't matter.
897  */
898 static int osd_sync(const struct lu_env *env, struct dt_device *d)
899 {
900         CDEBUG(D_HA, "syncing OSD %s\n", LUSTRE_OSD_NAME);
901         return ldiskfs_force_commit(osd_sb(osd_dt_dev(d)));
902 }
903
904 /**
905  * Start commit for OSD device.
906  *
907  * An implementation of dt_commit_async method for OSD device.
908  * Asychronously starts underlayng fs sync and thereby a transaction
909  * commit.
910  *
911  * \param env environment
912  * \param d dt device
913  *
914  * \see dt_device_operations
915  */
916 static int osd_commit_async(const struct lu_env *env,
917                             struct dt_device *d)
918 {
919         struct super_block *s = osd_sb(osd_dt_dev(d));
920         ENTRY;
921
922         CDEBUG(D_HA, "async commit OSD %s\n", LUSTRE_OSD_NAME);
923         RETURN(s->s_op->sync_fs(s, 0));
924 }
925
926 /*
927  * Concurrency: shouldn't matter.
928  */
929 lvfs_sbdev_type fsfilt_ldiskfs_journal_sbdev(struct super_block *);
930
931 static void osd_ro(const struct lu_env *env, struct dt_device *d)
932 {
933         ENTRY;
934
935         CERROR("*** setting device %s read-only ***\n", LUSTRE_OSD_NAME);
936
937         __lvfs_set_rdonly(lvfs_sbdev(osd_sb(osd_dt_dev(d))),
938                           fsfilt_ldiskfs_journal_sbdev(osd_sb(osd_dt_dev(d))));
939         EXIT;
940 }
941
942
943 /*
944  * Concurrency: serialization provided by callers.
945  */
946 static int osd_init_capa_ctxt(const struct lu_env *env, struct dt_device *d,
947                               int mode, unsigned long timeout, __u32 alg,
948                               struct lustre_capa_key *keys)
949 {
950         struct osd_device *dev = osd_dt_dev(d);
951         ENTRY;
952
953         dev->od_fl_capa = mode;
954         dev->od_capa_timeout = timeout;
955         dev->od_capa_alg = alg;
956         dev->od_capa_keys = keys;
957         RETURN(0);
958 }
959
960 /**
961  * Concurrency: serialization provided by callers.
962  */
963 static void osd_init_quota_ctxt(const struct lu_env *env, struct dt_device *d,
964                                struct dt_quota_ctxt *ctxt, void *data)
965 {
966         struct obd_device *obd = (void *)ctxt;
967         struct vfsmount *mnt = (struct vfsmount *)data;
968         ENTRY;
969
970         obd->u.obt.obt_sb = mnt->mnt_root->d_inode->i_sb;
971         OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
972         obd->obd_lvfs_ctxt.pwdmnt = mnt;
973         obd->obd_lvfs_ctxt.pwd = mnt->mnt_root;
974         obd->obd_lvfs_ctxt.fs = get_ds();
975
976         EXIT;
977 }
978
979 /**
980  * Note: we do not count into QUOTA here.
981  * If we mount with --data_journal we may need more.
982  */
983 static const int osd_dto_credits_noquota[DTO_NR] = {
984         /**
985          * Insert/Delete.
986          * INDEX_EXTRA_TRANS_BLOCKS(8) +
987          * SINGLEDATA_TRANS_BLOCKS(8)
988          * XXX Note: maybe iam need more, since iam have more level than
989          *           EXT3 htree.
990          */
991         [DTO_INDEX_INSERT]  = 16,
992         [DTO_INDEX_DELETE]  = 16,
993         /**
994          * Unused now
995          */
996         [DTO_IDNEX_UPDATE]  = 16,
997         /**
998          * Create a object. The same as create object in EXT3.
999          * DATA_TRANS_BLOCKS(14) +
1000          * INDEX_EXTRA_BLOCKS(8) +
1001          * 3(inode bits, groups, GDT)
1002          */
1003         [DTO_OBJECT_CREATE] = 25,
1004         /**
1005          * Unused now
1006          */
1007         [DTO_OBJECT_DELETE] = 25,
1008         /**
1009          * Attr set credits.
1010          * 3(inode bits, group, GDT)
1011          */
1012         [DTO_ATTR_SET_BASE] = 3,
1013         /**
1014          * Xattr set. The same as xattr of EXT3.
1015          * DATA_TRANS_BLOCKS(14)
1016          * XXX Note: in original MDS implmentation INDEX_EXTRA_TRANS_BLOCKS
1017          * are also counted in. Do not know why?
1018          */
1019         [DTO_XATTR_SET]     = 14,
1020         [DTO_LOG_REC]       = 14,
1021         /**
1022          * creadits for inode change during write.
1023          */
1024         [DTO_WRITE_BASE]    = 3,
1025         /**
1026          * credits for single block write.
1027          */
1028         [DTO_WRITE_BLOCK]   = 14,
1029         /**
1030          * Attr set credits for chown.
1031          * This is extra credits for setattr, and it is null without quota
1032          */
1033         [DTO_ATTR_SET_CHOWN]= 0
1034 };
1035
1036 /**
1037  * Note: we count into QUOTA here.
1038  * If we mount with --data_journal we may need more.
1039  */
1040 static const int osd_dto_credits_quota[DTO_NR] = {
1041         /**
1042          * INDEX_EXTRA_TRANS_BLOCKS(8) +
1043          * SINGLEDATA_TRANS_BLOCKS(8) +
1044          * 2 * QUOTA_TRANS_BLOCKS(2)
1045          */
1046         [DTO_INDEX_INSERT]  = 20,
1047         /**
1048          * INDEX_EXTRA_TRANS_BLOCKS(8) +
1049          * SINGLEDATA_TRANS_BLOCKS(8) +
1050          * 2 * QUOTA_TRANS_BLOCKS(2)
1051          */
1052         [DTO_INDEX_DELETE]  = 20,
1053         /**
1054          * Unused now.
1055          */
1056         [DTO_IDNEX_UPDATE]  = 16,
1057         /*
1058          * Create a object. Same as create object in EXT3 filesystem.
1059          * DATA_TRANS_BLOCKS(16) +
1060          * INDEX_EXTRA_BLOCKS(8) +
1061          * 3(inode bits, groups, GDT) +
1062          * 2 * QUOTA_INIT_BLOCKS(25)
1063          */
1064         [DTO_OBJECT_CREATE] = 77,
1065         /*
1066          * Unused now.
1067          * DATA_TRANS_BLOCKS(16) +
1068          * INDEX_EXTRA_BLOCKS(8) +
1069          * 3(inode bits, groups, GDT) +
1070          * QUOTA(?)
1071          */
1072         [DTO_OBJECT_DELETE] = 27,
1073         /**
1074          * Attr set credits.
1075          * 3 (inode bit, group, GDT) +
1076          */
1077         [DTO_ATTR_SET_BASE] = 3,
1078         /**
1079          * Xattr set. The same as xattr of EXT3.
1080          * DATA_TRANS_BLOCKS(16)
1081          * XXX Note: in original MDS implmentation INDEX_EXTRA_TRANS_BLOCKS are
1082          *           also counted in. Do not know why?
1083          */
1084         [DTO_XATTR_SET]     = 16,
1085         [DTO_LOG_REC]       = 16,
1086         /**
1087          * creadits for inode change during write.
1088          */
1089         [DTO_WRITE_BASE]    = 3,
1090         /**
1091          * credits for single block write.
1092          */
1093         [DTO_WRITE_BLOCK]   = 16,
1094         /**
1095          * Attr set credits for chown.
1096          * It is added to already set setattr credits
1097          * 2 * QUOTA_INIT_BLOCKS(25) +
1098          * 2 * QUOTA_DEL_BLOCKS(9)
1099          */
1100         [DTO_ATTR_SET_CHOWN]= 68,
1101 };
1102
1103 static int osd_credit_get(const struct lu_env *env, struct dt_device *d,
1104                           enum dt_txn_op op)
1105 {
1106         LASSERT(ARRAY_SIZE(osd_dto_credits_noquota) ==
1107                 ARRAY_SIZE(osd_dto_credits_quota));
1108         LASSERT(0 <= op && op < ARRAY_SIZE(osd_dto_credits_noquota));
1109 #ifdef HAVE_QUOTA_SUPPORT
1110         if (test_opt(osd_sb(osd_dt_dev(d)), QUOTA))
1111                 return osd_dto_credits_quota[op];
1112         else
1113 #endif
1114                 return osd_dto_credits_noquota[op];
1115 }
1116
1117 static const struct dt_device_operations osd_dt_ops = {
1118         .dt_root_get       = osd_root_get,
1119         .dt_statfs         = osd_statfs,
1120         .dt_trans_start    = osd_trans_start,
1121         .dt_trans_stop     = osd_trans_stop,
1122         .dt_conf_get       = osd_conf_get,
1123         .dt_sync           = osd_sync,
1124         .dt_ro             = osd_ro,
1125         .dt_commit_async   = osd_commit_async,
1126         .dt_credit_get     = osd_credit_get,
1127         .dt_init_capa_ctxt = osd_init_capa_ctxt,
1128         .dt_init_quota_ctxt= osd_init_quota_ctxt,
1129 };
1130
1131 static void osd_object_read_lock(const struct lu_env *env,
1132                                  struct dt_object *dt, unsigned role)
1133 {
1134         struct osd_object *obj = osd_dt_obj(dt);
1135         struct osd_thread_info *oti = osd_oti_get(env);
1136
1137         LINVRNT(osd_invariant(obj));
1138
1139         LASSERT(obj->oo_owner != env);
1140         cfs_down_read_nested(&obj->oo_sem, role);
1141
1142         LASSERT(obj->oo_owner == NULL);
1143         oti->oti_r_locks++;
1144 }
1145
1146 static void osd_object_write_lock(const struct lu_env *env,
1147                                   struct dt_object *dt, unsigned role)
1148 {
1149         struct osd_object *obj = osd_dt_obj(dt);
1150         struct osd_thread_info *oti = osd_oti_get(env);
1151
1152         LINVRNT(osd_invariant(obj));
1153
1154         LASSERT(obj->oo_owner != env);
1155         cfs_down_write_nested(&obj->oo_sem, role);
1156
1157         LASSERT(obj->oo_owner == NULL);
1158         obj->oo_owner = env;
1159         oti->oti_w_locks++;
1160 }
1161
1162 static void osd_object_read_unlock(const struct lu_env *env,
1163                                    struct dt_object *dt)
1164 {
1165         struct osd_object *obj = osd_dt_obj(dt);
1166         struct osd_thread_info *oti = osd_oti_get(env);
1167
1168         LINVRNT(osd_invariant(obj));
1169
1170         LASSERT(oti->oti_r_locks > 0);
1171         oti->oti_r_locks--;
1172         cfs_up_read(&obj->oo_sem);
1173 }
1174
1175 static void osd_object_write_unlock(const struct lu_env *env,
1176                                     struct dt_object *dt)
1177 {
1178         struct osd_object *obj = osd_dt_obj(dt);
1179         struct osd_thread_info *oti = osd_oti_get(env);
1180
1181         LINVRNT(osd_invariant(obj));
1182
1183         LASSERT(obj->oo_owner == env);
1184         LASSERT(oti->oti_w_locks > 0);
1185         oti->oti_w_locks--;
1186         obj->oo_owner = NULL;
1187         cfs_up_write(&obj->oo_sem);
1188 }
1189
1190 static int osd_object_write_locked(const struct lu_env *env,
1191                                    struct dt_object *dt)
1192 {
1193         struct osd_object *obj = osd_dt_obj(dt);
1194
1195         LINVRNT(osd_invariant(obj));
1196
1197         return obj->oo_owner == env;
1198 }
1199
1200 static int capa_is_sane(const struct lu_env *env,
1201                         struct osd_device *dev,
1202                         struct lustre_capa *capa,
1203                         struct lustre_capa_key *keys)
1204 {
1205         struct osd_thread_info *oti = osd_oti_get(env);
1206         struct lustre_capa *tcapa = &oti->oti_capa;
1207         struct obd_capa *oc;
1208         int i, rc = 0;
1209         ENTRY;
1210
1211         oc = capa_lookup(dev->od_capa_hash, capa, 0);
1212         if (oc) {
1213                 if (capa_is_expired(oc)) {
1214                         DEBUG_CAPA(D_ERROR, capa, "expired");
1215                         rc = -ESTALE;
1216                 }
1217                 capa_put(oc);
1218                 RETURN(rc);
1219         }
1220
1221         if (capa_is_expired_sec(capa)) {
1222                 DEBUG_CAPA(D_ERROR, capa, "expired");
1223                 RETURN(-ESTALE);
1224         }
1225
1226         cfs_spin_lock(&capa_lock);
1227         for (i = 0; i < 2; i++) {
1228                 if (keys[i].lk_keyid == capa->lc_keyid) {
1229                         oti->oti_capa_key = keys[i];
1230                         break;
1231                 }
1232         }
1233         cfs_spin_unlock(&capa_lock);
1234
1235         if (i == 2) {
1236                 DEBUG_CAPA(D_ERROR, capa, "no matched capa key");
1237                 RETURN(-ESTALE);
1238         }
1239
1240         rc = capa_hmac(tcapa->lc_hmac, capa, oti->oti_capa_key.lk_key);
1241         if (rc)
1242                 RETURN(rc);
1243
1244         if (memcmp(tcapa->lc_hmac, capa->lc_hmac, sizeof(capa->lc_hmac))) {
1245                 DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch");
1246                 RETURN(-EACCES);
1247         }
1248
1249         oc = capa_add(dev->od_capa_hash, capa);
1250         capa_put(oc);
1251
1252         RETURN(0);
1253 }
1254
1255 static int osd_object_auth(const struct lu_env *env, struct dt_object *dt,
1256                            struct lustre_capa *capa, __u64 opc)
1257 {
1258         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
1259         struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
1260         struct md_capainfo *ci;
1261         int rc;
1262
1263         if (!dev->od_fl_capa)
1264                 return 0;
1265
1266         if (capa == BYPASS_CAPA)
1267                 return 0;
1268
1269         ci = md_capainfo(env);
1270         if (unlikely(!ci))
1271                 return 0;
1272
1273         if (ci->mc_auth == LC_ID_NONE)
1274                 return 0;
1275
1276         if (!capa) {
1277                 CERROR("no capability is provided for fid "DFID"\n", PFID(fid));
1278                 return -EACCES;
1279         }
1280
1281         if (!lu_fid_eq(fid, &capa->lc_fid)) {
1282                 DEBUG_CAPA(D_ERROR, capa, "fid "DFID" mismatch with",
1283                            PFID(fid));
1284                 return -EACCES;
1285         }
1286
1287         if (!capa_opc_supported(capa, opc)) {
1288                 DEBUG_CAPA(D_ERROR, capa, "opc "LPX64" not supported by", opc);
1289                 return -EACCES;
1290         }
1291
1292         if ((rc = capa_is_sane(env, dev, capa, dev->od_capa_keys))) {
1293                 DEBUG_CAPA(D_ERROR, capa, "insane (rc %d)", rc);
1294                 return -EACCES;
1295         }
1296
1297         return 0;
1298 }
1299
1300 static struct timespec *osd_inode_time(const struct lu_env *env,
1301                                        struct inode *inode, __u64 seconds)
1302 {
1303         struct osd_thread_info *oti = osd_oti_get(env);
1304         struct timespec        *t   = &oti->oti_time;
1305
1306         t->tv_sec  = seconds;
1307         t->tv_nsec = 0;
1308         *t = timespec_trunc(*t, get_sb_time_gran(inode->i_sb));
1309         return t;
1310 }
1311
1312
1313 static void osd_inode_getattr(const struct lu_env *env,
1314                               struct inode *inode, struct lu_attr *attr)
1315 {
1316         attr->la_valid      |= LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
1317                                LA_SIZE | LA_BLOCKS | LA_UID | LA_GID |
1318                                LA_FLAGS | LA_NLINK | LA_RDEV | LA_BLKSIZE;
1319
1320         attr->la_atime      = LTIME_S(inode->i_atime);
1321         attr->la_mtime      = LTIME_S(inode->i_mtime);
1322         attr->la_ctime      = LTIME_S(inode->i_ctime);
1323         attr->la_mode       = inode->i_mode;
1324         attr->la_size       = i_size_read(inode);
1325         attr->la_blocks     = inode->i_blocks;
1326         attr->la_uid        = inode->i_uid;
1327         attr->la_gid        = inode->i_gid;
1328         attr->la_flags      = LDISKFS_I(inode)->i_flags;
1329         attr->la_nlink      = inode->i_nlink;
1330         attr->la_rdev       = inode->i_rdev;
1331         attr->la_blksize    = ll_inode_blksize(inode);
1332         attr->la_blkbits    = inode->i_blkbits;
1333 }
1334
1335 static int osd_attr_get(const struct lu_env *env,
1336                         struct dt_object *dt,
1337                         struct lu_attr *attr,
1338                         struct lustre_capa *capa)
1339 {
1340         struct osd_object *obj = osd_dt_obj(dt);
1341
1342         LASSERT(dt_object_exists(dt));
1343         LINVRNT(osd_invariant(obj));
1344
1345         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1346                 return -EACCES;
1347
1348         cfs_spin_lock(&obj->oo_guard);
1349         osd_inode_getattr(env, obj->oo_inode, attr);
1350         cfs_spin_unlock(&obj->oo_guard);
1351         return 0;
1352 }
1353
1354 static int osd_inode_setattr(const struct lu_env *env,
1355                              struct inode *inode, const struct lu_attr *attr)
1356 {
1357         __u64 bits;
1358
1359         bits = attr->la_valid;
1360
1361         LASSERT(!(bits & LA_TYPE)); /* Huh? You want too much. */
1362
1363 #ifdef HAVE_QUOTA_SUPPORT
1364         if ((bits & LA_UID && attr->la_uid != inode->i_uid) ||
1365             (bits & LA_GID && attr->la_gid != inode->i_gid)) {
1366                 struct osd_ctxt *save = &osd_oti_get(env)->oti_ctxt;
1367                 struct iattr iattr;
1368                 int rc;
1369
1370                 iattr.ia_valid = 0;
1371                 if (bits & LA_UID)
1372                         iattr.ia_valid |= ATTR_UID;
1373                 if (bits & LA_GID)
1374                         iattr.ia_valid |= ATTR_GID;
1375                 iattr.ia_uid = attr->la_uid;
1376                 iattr.ia_gid = attr->la_gid;
1377                 osd_push_ctxt(env, save);
1378                 rc = ll_vfs_dq_transfer(inode, &iattr) ? -EDQUOT : 0;
1379                 osd_pop_ctxt(save);
1380                 if (rc != 0)
1381                         return rc;
1382         }
1383 #endif
1384
1385         if (bits & LA_ATIME)
1386                 inode->i_atime  = *osd_inode_time(env, inode, attr->la_atime);
1387         if (bits & LA_CTIME)
1388                 inode->i_ctime  = *osd_inode_time(env, inode, attr->la_ctime);
1389         if (bits & LA_MTIME)
1390                 inode->i_mtime  = *osd_inode_time(env, inode, attr->la_mtime);
1391         if (bits & LA_SIZE) {
1392                 LDISKFS_I(inode)->i_disksize = attr->la_size;
1393                 i_size_write(inode, attr->la_size);
1394         }
1395
1396 #if 0
1397         /* OSD should not change "i_blocks" which is used by quota.
1398          * "i_blocks" should be changed by ldiskfs only. */
1399         if (bits & LA_BLOCKS)
1400                 inode->i_blocks = attr->la_blocks;
1401 #endif
1402         if (bits & LA_MODE)
1403                 inode->i_mode   = (inode->i_mode & S_IFMT) |
1404                         (attr->la_mode & ~S_IFMT);
1405         if (bits & LA_UID)
1406                 inode->i_uid    = attr->la_uid;
1407         if (bits & LA_GID)
1408                 inode->i_gid    = attr->la_gid;
1409         if (bits & LA_NLINK)
1410                 inode->i_nlink  = attr->la_nlink;
1411         if (bits & LA_RDEV)
1412                 inode->i_rdev   = attr->la_rdev;
1413
1414         if (bits & LA_FLAGS) {
1415                 /* always keep S_NOCMTIME */
1416                 inode->i_flags = ll_ext_to_inode_flags(attr->la_flags) |
1417                                  S_NOCMTIME;
1418         }
1419         return 0;
1420 }
1421
1422 static int osd_attr_set(const struct lu_env *env,
1423                         struct dt_object *dt,
1424                         const struct lu_attr *attr,
1425                         struct thandle *handle,
1426                         struct lustre_capa *capa)
1427 {
1428         struct osd_object *obj = osd_dt_obj(dt);
1429         int rc;
1430
1431         LASSERT(handle != NULL);
1432         LASSERT(dt_object_exists(dt));
1433         LASSERT(osd_invariant(obj));
1434
1435         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1436                 return -EACCES;
1437
1438         cfs_spin_lock(&obj->oo_guard);
1439         rc = osd_inode_setattr(env, obj->oo_inode, attr);
1440         cfs_spin_unlock(&obj->oo_guard);
1441
1442         if (!rc)
1443                 obj->oo_inode->i_sb->s_op->dirty_inode(obj->oo_inode);
1444         return rc;
1445 }
1446
1447 /*
1448  * Object creation.
1449  *
1450  * XXX temporary solution.
1451  */
1452 static int osd_create_pre(struct osd_thread_info *info, struct osd_object *obj,
1453                           struct lu_attr *attr, struct thandle *th)
1454 {
1455         return 0;
1456 }
1457
1458 static int osd_create_post(struct osd_thread_info *info, struct osd_object *obj,
1459                            struct lu_attr *attr, struct thandle *th)
1460 {
1461         osd_object_init0(obj);
1462         return 0;
1463 }
1464
1465 static struct dentry * osd_child_dentry_get(const struct lu_env *env,
1466                                             struct osd_object *obj,
1467                                             const char *name,
1468                                             const int namelen)
1469 {
1470         struct osd_thread_info *info   = osd_oti_get(env);
1471         struct dentry *child_dentry = &info->oti_child_dentry;
1472         struct dentry *obj_dentry = &info->oti_obj_dentry;
1473
1474         obj_dentry->d_inode = obj->oo_inode;
1475         obj_dentry->d_sb = osd_sb(osd_obj2dev(obj));
1476         obj_dentry->d_name.hash = 0;
1477
1478         child_dentry->d_name.hash = 0;
1479         child_dentry->d_parent = obj_dentry;
1480         child_dentry->d_name.name = name;
1481         child_dentry->d_name.len = namelen;
1482         return child_dentry;
1483 }
1484
1485
1486 static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
1487                       cfs_umode_t mode,
1488                       struct dt_allocation_hint *hint,
1489                       struct thandle *th)
1490 {
1491         int result;
1492         struct osd_device  *osd = osd_obj2dev(obj);
1493         struct osd_thandle *oth;
1494         struct dt_object   *parent;
1495         struct inode       *inode;
1496 #ifdef HAVE_QUOTA_SUPPORT
1497         struct osd_ctxt    *save = &info->oti_ctxt;
1498 #endif
1499
1500         LINVRNT(osd_invariant(obj));
1501         LASSERT(obj->oo_inode == NULL);
1502
1503         oth = container_of(th, struct osd_thandle, ot_super);
1504         LASSERT(oth->ot_handle->h_transaction != NULL);
1505
1506         if (hint && hint->dah_parent)
1507                 parent = hint->dah_parent;
1508         else
1509                 parent = osd->od_obj_area;
1510
1511         LASSERT(parent != NULL);
1512         LASSERT(osd_dt_obj(parent)->oo_inode->i_op != NULL);
1513
1514 #ifdef HAVE_QUOTA_SUPPORT
1515         osd_push_ctxt(info->oti_env, save);
1516 #endif
1517         inode = ldiskfs_create_inode(oth->ot_handle,
1518                                      osd_dt_obj(parent)->oo_inode, mode);
1519 #ifdef HAVE_QUOTA_SUPPORT
1520         osd_pop_ctxt(save);
1521 #endif
1522         if (!IS_ERR(inode)) {
1523                 /* Do not update file c/mtime in ldiskfs.
1524                  * NB: don't need any lock because no contention at this
1525                  * early stage */
1526                 inode->i_flags |= S_NOCMTIME;
1527                 obj->oo_inode = inode;
1528                 result = 0;
1529         } else
1530                 result = PTR_ERR(inode);
1531         LINVRNT(osd_invariant(obj));
1532         return result;
1533 }
1534
1535 enum {
1536         OSD_NAME_LEN = 255
1537 };
1538
1539 static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj,
1540                      struct lu_attr *attr,
1541                      struct dt_allocation_hint *hint,
1542                      struct dt_object_format *dof,
1543                      struct thandle *th)
1544 {
1545         int result;
1546         struct osd_thandle *oth;
1547         struct osd_device *osd = osd_obj2dev(obj);
1548         __u32 mode = (attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX));
1549
1550         LASSERT(S_ISDIR(attr->la_mode));
1551
1552         oth = container_of(th, struct osd_thandle, ot_super);
1553         LASSERT(oth->ot_handle->h_transaction != NULL);
1554         result = osd_mkfile(info, obj, mode, hint, th);
1555         if (result == 0 && osd->od_iop_mode == 0) {
1556                 LASSERT(obj->oo_inode != NULL);
1557                 /*
1558                  * XXX uh-oh... call low-level iam function directly.
1559                  */
1560
1561                 result = iam_lvar_create(obj->oo_inode, OSD_NAME_LEN, 4,
1562                                          sizeof (struct osd_fid_pack),
1563                                          oth->ot_handle);
1564         }
1565         return result;
1566 }
1567
1568 static int osd_mk_index(struct osd_thread_info *info, struct osd_object *obj,
1569                         struct lu_attr *attr,
1570                         struct dt_allocation_hint *hint,
1571                         struct dt_object_format *dof,
1572                         struct thandle *th)
1573 {
1574         int result;
1575         struct osd_thandle *oth;
1576         const struct dt_index_features *feat = dof->u.dof_idx.di_feat;
1577
1578         __u32 mode = (attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX));
1579
1580         LASSERT(S_ISREG(attr->la_mode));
1581
1582         oth = container_of(th, struct osd_thandle, ot_super);
1583         LASSERT(oth->ot_handle->h_transaction != NULL);
1584
1585         result = osd_mkfile(info, obj, mode, hint, th);
1586         if (result == 0) {
1587                 LASSERT(obj->oo_inode != NULL);
1588                 if (feat->dif_flags & DT_IND_VARKEY)
1589                         result = iam_lvar_create(obj->oo_inode,
1590                                                  feat->dif_keysize_max,
1591                                                  feat->dif_ptrsize,
1592                                                  feat->dif_recsize_max,
1593                                                  oth->ot_handle);
1594                 else
1595                         result = iam_lfix_create(obj->oo_inode,
1596                                                  feat->dif_keysize_max,
1597                                                  feat->dif_ptrsize,
1598                                                  feat->dif_recsize_max,
1599                                                  oth->ot_handle);
1600
1601         }
1602         return result;
1603 }
1604
1605 static int osd_mkreg(struct osd_thread_info *info, struct osd_object *obj,
1606                      struct lu_attr *attr,
1607                      struct dt_allocation_hint *hint,
1608                      struct dt_object_format *dof,
1609                      struct thandle *th)
1610 {
1611         LASSERT(S_ISREG(attr->la_mode));
1612         return osd_mkfile(info, obj, (attr->la_mode &
1613                                (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1614 }
1615
1616 static int osd_mksym(struct osd_thread_info *info, struct osd_object *obj,
1617                      struct lu_attr *attr,
1618                      struct dt_allocation_hint *hint,
1619                      struct dt_object_format *dof,
1620                      struct thandle *th)
1621 {
1622         LASSERT(S_ISLNK(attr->la_mode));
1623         return osd_mkfile(info, obj, (attr->la_mode &
1624                               (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1625 }
1626
1627 static int osd_mknod(struct osd_thread_info *info, struct osd_object *obj,
1628                      struct lu_attr *attr,
1629                      struct dt_allocation_hint *hint,
1630                      struct dt_object_format *dof,
1631                      struct thandle *th)
1632 {
1633         cfs_umode_t mode = attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX);
1634         int result;
1635
1636         LINVRNT(osd_invariant(obj));
1637         LASSERT(obj->oo_inode == NULL);
1638         LASSERT(S_ISCHR(mode) || S_ISBLK(mode) ||
1639                 S_ISFIFO(mode) || S_ISSOCK(mode));
1640
1641         result = osd_mkfile(info, obj, mode, hint, th);
1642         if (result == 0) {
1643                 LASSERT(obj->oo_inode != NULL);
1644                 init_special_inode(obj->oo_inode, mode, attr->la_rdev);
1645         }
1646         LINVRNT(osd_invariant(obj));
1647         return result;
1648 }
1649
1650 typedef int (*osd_obj_type_f)(struct osd_thread_info *, struct osd_object *,
1651                               struct lu_attr *,
1652                               struct dt_allocation_hint *hint,
1653                               struct dt_object_format *dof,
1654                               struct thandle *);
1655
1656 static osd_obj_type_f osd_create_type_f(enum dt_format_type type)
1657 {
1658         osd_obj_type_f result;
1659
1660         switch (type) {
1661         case DFT_DIR:
1662                 result = osd_mkdir;
1663                 break;
1664         case DFT_REGULAR:
1665                 result = osd_mkreg;
1666                 break;
1667         case DFT_SYM:
1668                 result = osd_mksym;
1669                 break;
1670         case DFT_NODE:
1671                 result = osd_mknod;
1672                 break;
1673         case DFT_INDEX:
1674                 result = osd_mk_index;
1675                 break;
1676
1677         default:
1678                 LBUG();
1679                 break;
1680         }
1681         return result;
1682 }
1683
1684
1685 static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah,
1686                         struct dt_object *parent, cfs_umode_t child_mode)
1687 {
1688         LASSERT(ah);
1689
1690         memset(ah, 0, sizeof(*ah));
1691         ah->dah_parent = parent;
1692         ah->dah_mode = child_mode;
1693 }
1694
1695 /**
1696  * Helper function for osd_object_create()
1697  *
1698  * \retval 0, on success
1699  */
1700 static int __osd_object_create(struct osd_thread_info *info,
1701                                struct osd_object *obj, struct lu_attr *attr,
1702                                struct dt_allocation_hint *hint,
1703                                struct dt_object_format *dof,
1704                                struct thandle *th)
1705 {
1706
1707         int result;
1708
1709         result = osd_create_pre(info, obj, attr, th);
1710         if (result == 0) {
1711                 result = osd_create_type_f(dof->dof_type)(info, obj,
1712                                            attr, hint, dof, th);
1713                 if (result == 0)
1714                         result = osd_create_post(info, obj, attr, th);
1715         }
1716         return result;
1717 }
1718
1719 /**
1720  * Helper function for osd_object_create()
1721  *
1722  * \retval 0, on success
1723  */
1724 static int __osd_oi_insert(const struct lu_env *env, struct osd_object *obj,
1725                            const struct lu_fid *fid, struct thandle *th)
1726 {
1727         struct osd_thread_info *info = osd_oti_get(env);
1728         struct osd_inode_id    *id   = &info->oti_id;
1729         struct osd_device      *osd  = osd_obj2dev(obj);
1730         struct md_ucred        *uc   = md_ucred(env);
1731
1732         LASSERT(obj->oo_inode != NULL);
1733         LASSERT(uc != NULL);
1734
1735         id->oii_ino = obj->oo_inode->i_ino;
1736         id->oii_gen = obj->oo_inode->i_generation;
1737
1738         return osd_oi_insert(info, &osd->od_oi, fid, id, th,
1739                              uc->mu_cap & CFS_CAP_SYS_RESOURCE_MASK);
1740 }
1741
1742 static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
1743                              struct lu_attr *attr,
1744                              struct dt_allocation_hint *hint,
1745                              struct dt_object_format *dof,
1746                              struct thandle *th)
1747 {
1748         const struct lu_fid    *fid    = lu_object_fid(&dt->do_lu);
1749         struct osd_object      *obj    = osd_dt_obj(dt);
1750         struct osd_thread_info *info   = osd_oti_get(env);
1751         int result;
1752
1753         ENTRY;
1754
1755         LINVRNT(osd_invariant(obj));
1756         LASSERT(!dt_object_exists(dt));
1757         LASSERT(osd_write_locked(env, obj));
1758         LASSERT(th != NULL);
1759
1760         result = __osd_object_create(info, obj, attr, hint, dof, th);
1761         if (result == 0)
1762                 result = __osd_oi_insert(env, obj, fid, th);
1763
1764         LASSERT(ergo(result == 0, dt_object_exists(dt)));
1765         LASSERT(osd_invariant(obj));
1766         RETURN(result);
1767 }
1768
1769 /**
1770  * Helper function for osd_xattr_set()
1771  */
1772 static int __osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
1773                            const struct lu_buf *buf, const char *name, int fl)
1774 {
1775         struct osd_object      *obj      = osd_dt_obj(dt);
1776         struct inode           *inode    = obj->oo_inode;
1777         struct osd_thread_info *info     = osd_oti_get(env);
1778         struct dentry          *dentry   = &info->oti_child_dentry;
1779         int                     fs_flags = 0;
1780         int  rc;
1781
1782         LASSERT(dt_object_exists(dt));
1783         LASSERT(inode->i_op != NULL && inode->i_op->setxattr != NULL);
1784         LASSERT(osd_write_locked(env, obj));
1785
1786         if (fl & LU_XATTR_REPLACE)
1787                 fs_flags |= XATTR_REPLACE;
1788
1789         if (fl & LU_XATTR_CREATE)
1790                 fs_flags |= XATTR_CREATE;
1791
1792         dentry->d_inode = inode;
1793         rc = inode->i_op->setxattr(dentry, name, buf->lb_buf,
1794                                    buf->lb_len, fs_flags);
1795         return rc;
1796 }
1797
1798 /**
1799  * Put the fid into lustre_mdt_attrs, and then place the structure
1800  * inode's ea. This fid should not be altered during the life time
1801  * of the inode.
1802  *
1803  * \retval +ve, on success
1804  * \retval -ve, on error
1805  *
1806  * FIXME: It is good to have/use ldiskfs_xattr_set_handle() here
1807  */
1808 static int osd_ea_fid_set(const struct lu_env *env, struct dt_object *dt,
1809                           const struct lu_fid *fid)
1810 {
1811         struct osd_thread_info  *info      = osd_oti_get(env);
1812         struct lustre_mdt_attrs *mdt_attrs = &info->oti_mdt_attrs;
1813
1814         lustre_lma_init(mdt_attrs, fid);
1815         lustre_lma_swab(mdt_attrs);
1816         return __osd_xattr_set(env, dt,
1817                                osd_buf_get(env, mdt_attrs, sizeof *mdt_attrs),
1818                                XATTR_NAME_LMA, LU_XATTR_CREATE);
1819
1820 }
1821
1822 /**
1823  * Helper function to form igif
1824  */
1825 static inline void osd_igif_get(const struct lu_env *env, struct inode  *inode,
1826                                 struct lu_fid *fid)
1827 {
1828         LU_IGIF_BUILD(fid, inode->i_ino, inode->i_generation);
1829 }
1830
1831 /**
1832  * Helper function to pack the fid, ldiskfs stores fid in packed format.
1833  */
1834 void osd_fid_pack(struct osd_fid_pack *pack, const struct dt_rec *fid,
1835                   struct lu_fid *befider)
1836 {
1837         fid_cpu_to_be(befider, (struct lu_fid *)fid);
1838         memcpy(pack->fp_area, befider, sizeof(*befider));
1839         pack->fp_len =  sizeof(*befider) + 1;
1840 }
1841
1842 /**
1843  * ldiskfs supports fid in dirent, it is passed in dentry->d_fsdata.
1844  * lustre 1.8 also uses d_fsdata for passing other info to ldiskfs.
1845  * To have compatilibility with 1.8 ldiskfs driver we need to have
1846  * magic number at start of fid data.
1847  * \ldiskfs_dentry_param is used only to pass fid from osd to ldiskfs.
1848  * its inmemory API.
1849  */
1850 void osd_get_ldiskfs_dirent_param(struct ldiskfs_dentry_param *param,
1851                                   const struct dt_rec *fid)
1852 {
1853         param->edp_magic = LDISKFS_LUFID_MAGIC;
1854         param->edp_len =  sizeof(struct lu_fid) + 1;
1855
1856         fid_cpu_to_be((struct lu_fid *)param->edp_data,
1857                       (struct lu_fid *)fid);
1858 }
1859
1860 int osd_fid_unpack(struct lu_fid *fid, const struct osd_fid_pack *pack)
1861 {
1862         int result;
1863
1864         result = 0;
1865         switch (pack->fp_len) {
1866         case sizeof *fid + 1:
1867                 memcpy(fid, pack->fp_area, sizeof *fid);
1868                 fid_be_to_cpu(fid, fid);
1869                 break;
1870         default:
1871                 CERROR("Unexpected packed fid size: %d\n", pack->fp_len);
1872                 result = -EIO;
1873         }
1874         return result;
1875 }
1876
1877 /**
1878  * Try to read the fid from inode ea into dt_rec, if return value
1879  * i.e. rc is +ve, then we got fid, otherwise we will have to form igif
1880  *
1881  * \param fid object fid.
1882  *
1883  * \retval 0 on success
1884  */
1885 static int osd_ea_fid_get(const struct lu_env *env, struct osd_object *obj,
1886                           __u32 ino, struct lu_fid *fid)
1887 {
1888         struct osd_thread_info  *info      = osd_oti_get(env);
1889         struct lustre_mdt_attrs *mdt_attrs = &info->oti_mdt_attrs;
1890         struct lu_device        *ldev   = obj->oo_dt.do_lu.lo_dev;
1891         struct dentry           *dentry = &info->oti_child_dentry;
1892         struct osd_inode_id     *id     = &info->oti_id;
1893         struct osd_device       *dev;
1894         struct inode            *inode;
1895         int                      rc;
1896
1897         ENTRY;
1898         dev  = osd_dev(ldev);
1899
1900         id->oii_ino = ino;
1901         id->oii_gen = OSD_OII_NOGEN;
1902
1903         inode = osd_iget(info, dev, id);
1904         if (IS_ERR(inode)) {
1905                 rc = PTR_ERR(inode);
1906                 GOTO(out,rc);
1907         }
1908         dentry->d_inode = inode;
1909
1910         LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
1911         rc = inode->i_op->getxattr(dentry, XATTR_NAME_LMA, (void *)mdt_attrs,
1912                                    sizeof *mdt_attrs);
1913
1914         /* Check LMA compatibility */
1915         if (rc > 0 &&
1916             (mdt_attrs->lma_incompat & ~cpu_to_le32(LMA_INCOMPAT_SUPP))) {
1917                 CWARN("Inode %lx: Unsupported incompat LMA feature(s) %#x\n",
1918                       inode->i_ino, le32_to_cpu(mdt_attrs->lma_incompat) &
1919                       ~LMA_INCOMPAT_SUPP);
1920                 return -ENOSYS;
1921         }
1922
1923         if (rc > 0) {
1924                 lustre_lma_swab(mdt_attrs);
1925                 memcpy(fid, &mdt_attrs->lma_self_fid, sizeof(*fid));
1926                 rc = 0;
1927         } else if (rc == -ENODATA) {
1928                 osd_igif_get(env, inode, fid);
1929                 rc = 0;
1930         }
1931         iput(inode);
1932 out:
1933         RETURN(rc);
1934 }
1935
1936 /**
1937  * OSD layer object create function for interoperability mode (b11826).
1938  * This is mostly similar to osd_object_create(). Only difference being, fid is
1939  * inserted into inode ea here.
1940  *
1941  * \retval   0, on success
1942  * \retval -ve, on error
1943  */
1944 static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt,
1945                              struct lu_attr *attr,
1946                              struct dt_allocation_hint *hint,
1947                              struct dt_object_format *dof,
1948                              struct thandle *th)
1949 {
1950         const struct lu_fid    *fid    = lu_object_fid(&dt->do_lu);
1951         struct osd_object      *obj    = osd_dt_obj(dt);
1952         struct osd_thread_info *info   = osd_oti_get(env);
1953         int result;
1954
1955         ENTRY;
1956
1957         LASSERT(osd_invariant(obj));
1958         LASSERT(!dt_object_exists(dt));
1959         LASSERT(osd_write_locked(env, obj));
1960         LASSERT(th != NULL);
1961
1962         result = __osd_object_create(info, obj, attr, hint, dof, th);
1963
1964         /* objects under osd root shld have igif fid, so dont add fid EA */
1965         if (result == 0 && fid_seq(fid) >= FID_SEQ_NORMAL)
1966                 result = osd_ea_fid_set(env, dt, fid);
1967
1968         if (result == 0)
1969                 result = __osd_oi_insert(env, obj, fid, th);
1970
1971         LASSERT(ergo(result == 0, dt_object_exists(dt)));
1972         LINVRNT(osd_invariant(obj));
1973         RETURN(result);
1974 }
1975
1976 /*
1977  * Concurrency: @dt is write locked.
1978  */
1979 static void osd_object_ref_add(const struct lu_env *env,
1980                                struct dt_object *dt,
1981                                struct thandle *th)
1982 {
1983         struct osd_object *obj = osd_dt_obj(dt);
1984         struct inode *inode = obj->oo_inode;
1985
1986         LINVRNT(osd_invariant(obj));
1987         LASSERT(dt_object_exists(dt));
1988         LASSERT(osd_write_locked(env, obj));
1989         LASSERT(th != NULL);
1990
1991         cfs_spin_lock(&obj->oo_guard);
1992         LASSERT(inode->i_nlink < LDISKFS_LINK_MAX);
1993         inode->i_nlink++;
1994         cfs_spin_unlock(&obj->oo_guard);
1995         inode->i_sb->s_op->dirty_inode(inode);
1996         LINVRNT(osd_invariant(obj));
1997 }
1998
1999 /*
2000  * Concurrency: @dt is write locked.
2001  */
2002 static void osd_object_ref_del(const struct lu_env *env,
2003                                struct dt_object *dt,
2004                                struct thandle *th)
2005 {
2006         struct osd_object *obj = osd_dt_obj(dt);
2007         struct inode *inode = obj->oo_inode;
2008
2009         LINVRNT(osd_invariant(obj));
2010         LASSERT(dt_object_exists(dt));
2011         LASSERT(osd_write_locked(env, obj));
2012         LASSERT(th != NULL);
2013
2014         cfs_spin_lock(&obj->oo_guard);
2015         LASSERT(inode->i_nlink > 0);
2016         inode->i_nlink--;
2017         cfs_spin_unlock(&obj->oo_guard);
2018         inode->i_sb->s_op->dirty_inode(inode);
2019         LINVRNT(osd_invariant(obj));
2020 }
2021
2022 /*
2023  * Concurrency: @dt is read locked.
2024  */
2025 static int osd_xattr_get(const struct lu_env *env,
2026                          struct dt_object *dt,
2027                          struct lu_buf *buf,
2028                          const char *name,
2029                          struct lustre_capa *capa)
2030 {
2031         struct osd_object      *obj    = osd_dt_obj(dt);
2032         struct inode           *inode  = obj->oo_inode;
2033         struct osd_thread_info *info   = osd_oti_get(env);
2034         struct dentry          *dentry = &info->oti_obj_dentry;
2035
2036         LASSERT(dt_object_exists(dt));
2037         LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
2038         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
2039
2040         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
2041                 return -EACCES;
2042
2043         dentry->d_inode = inode;
2044         return inode->i_op->getxattr(dentry, name, buf->lb_buf, buf->lb_len);
2045 }
2046
2047 /*
2048  * Concurrency: @dt is write locked.
2049  */
2050 static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
2051                          const struct lu_buf *buf, const char *name, int fl,
2052                          struct thandle *handle, struct lustre_capa *capa)
2053 {
2054         LASSERT(handle != NULL);
2055
2056         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
2057                 return -EACCES;
2058
2059         return __osd_xattr_set(env, dt, buf, name, fl);
2060 }
2061
2062 /*
2063  * Concurrency: @dt is read locked.
2064  */
2065 static int osd_xattr_list(const struct lu_env *env,
2066                           struct dt_object *dt,
2067                           struct lu_buf *buf,
2068                           struct lustre_capa *capa)
2069 {
2070         struct osd_object      *obj    = osd_dt_obj(dt);
2071         struct inode           *inode  = obj->oo_inode;
2072         struct osd_thread_info *info   = osd_oti_get(env);
2073         struct dentry          *dentry = &info->oti_obj_dentry;
2074
2075         LASSERT(dt_object_exists(dt));
2076         LASSERT(inode->i_op != NULL && inode->i_op->listxattr != NULL);
2077         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
2078
2079         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
2080                 return -EACCES;
2081
2082         dentry->d_inode = inode;
2083         return inode->i_op->listxattr(dentry, buf->lb_buf, buf->lb_len);
2084 }
2085
2086 /*
2087  * Concurrency: @dt is write locked.
2088  */
2089 static int osd_xattr_del(const struct lu_env *env,
2090                          struct dt_object *dt,
2091                          const char *name,
2092                          struct thandle *handle,
2093                          struct lustre_capa *capa)
2094 {
2095         struct osd_object      *obj    = osd_dt_obj(dt);
2096         struct inode           *inode  = obj->oo_inode;
2097         struct osd_thread_info *info   = osd_oti_get(env);
2098         struct dentry          *dentry = &info->oti_obj_dentry;
2099         int                     rc;
2100
2101         LASSERT(dt_object_exists(dt));
2102         LASSERT(inode->i_op != NULL && inode->i_op->removexattr != NULL);
2103         LASSERT(osd_write_locked(env, obj));
2104         LASSERT(handle != NULL);
2105
2106         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
2107                 return -EACCES;
2108
2109         dentry->d_inode = inode;
2110         rc = inode->i_op->removexattr(dentry, name);
2111         return rc;
2112 }
2113
2114 static struct obd_capa *osd_capa_get(const struct lu_env *env,
2115                                      struct dt_object *dt,
2116                                      struct lustre_capa *old,
2117                                      __u64 opc)
2118 {
2119         struct osd_thread_info *info = osd_oti_get(env);
2120         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
2121         struct osd_object *obj = osd_dt_obj(dt);
2122         struct osd_device *dev = osd_obj2dev(obj);
2123         struct lustre_capa_key *key = &info->oti_capa_key;
2124         struct lustre_capa *capa = &info->oti_capa;
2125         struct obd_capa *oc;
2126         struct md_capainfo *ci;
2127         int rc;
2128         ENTRY;
2129
2130         if (!dev->od_fl_capa)
2131                 RETURN(ERR_PTR(-ENOENT));
2132
2133         LASSERT(dt_object_exists(dt));
2134         LINVRNT(osd_invariant(obj));
2135
2136         /* renewal sanity check */
2137         if (old && osd_object_auth(env, dt, old, opc))
2138                 RETURN(ERR_PTR(-EACCES));
2139
2140         ci = md_capainfo(env);
2141         if (unlikely(!ci))
2142                 RETURN(ERR_PTR(-ENOENT));
2143
2144         switch (ci->mc_auth) {
2145         case LC_ID_NONE:
2146                 RETURN(NULL);
2147         case LC_ID_PLAIN:
2148                 capa->lc_uid = obj->oo_inode->i_uid;
2149                 capa->lc_gid = obj->oo_inode->i_gid;
2150                 capa->lc_flags = LC_ID_PLAIN;
2151                 break;
2152         case LC_ID_CONVERT: {
2153                 __u32 d[4], s[4];
2154
2155                 s[0] = obj->oo_inode->i_uid;
2156                 cfs_get_random_bytes(&(s[1]), sizeof(__u32));
2157                 s[2] = obj->oo_inode->i_gid;
2158                 cfs_get_random_bytes(&(s[3]), sizeof(__u32));
2159                 rc = capa_encrypt_id(d, s, key->lk_key, CAPA_HMAC_KEY_MAX_LEN);
2160                 if (unlikely(rc))
2161                         RETURN(ERR_PTR(rc));
2162
2163                 capa->lc_uid   = ((__u64)d[1] << 32) | d[0];
2164                 capa->lc_gid   = ((__u64)d[3] << 32) | d[2];
2165                 capa->lc_flags = LC_ID_CONVERT;
2166                 break;
2167         }
2168         default:
2169                 RETURN(ERR_PTR(-EINVAL));
2170         }
2171
2172         capa->lc_fid = *fid;
2173         capa->lc_opc = opc;
2174         capa->lc_flags |= dev->od_capa_alg << 24;
2175         capa->lc_timeout = dev->od_capa_timeout;
2176         capa->lc_expiry = 0;
2177
2178         oc = capa_lookup(dev->od_capa_hash, capa, 1);
2179         if (oc) {
2180                 LASSERT(!capa_is_expired(oc));
2181                 RETURN(oc);
2182         }
2183
2184         cfs_spin_lock(&capa_lock);
2185         *key = dev->od_capa_keys[1];
2186         cfs_spin_unlock(&capa_lock);
2187
2188         capa->lc_keyid = key->lk_keyid;
2189         capa->lc_expiry = cfs_time_current_sec() + dev->od_capa_timeout;
2190
2191         rc = capa_hmac(capa->lc_hmac, capa, key->lk_key);
2192         if (rc) {
2193                 DEBUG_CAPA(D_ERROR, capa, "HMAC failed: %d for", rc);
2194                 RETURN(ERR_PTR(rc));
2195         }
2196
2197         oc = capa_add(dev->od_capa_hash, capa);
2198         RETURN(oc);
2199 }
2200
2201 static int osd_object_sync(const struct lu_env *env, struct dt_object *dt)
2202 {
2203         int rc;
2204         struct osd_object      *obj    = osd_dt_obj(dt);
2205         struct inode           *inode  = obj->oo_inode;
2206         struct osd_thread_info *info   = osd_oti_get(env);
2207         struct dentry          *dentry = &info->oti_obj_dentry;
2208         struct file            *file   = &info->oti_file;
2209         ENTRY;
2210
2211         dentry->d_inode = inode;
2212         file->f_dentry = dentry;
2213         file->f_mapping = inode->i_mapping;
2214         file->f_op = inode->i_fop;
2215         LOCK_INODE_MUTEX(inode);
2216         rc = file->f_op->fsync(file, dentry, 0);
2217         UNLOCK_INODE_MUTEX(inode);
2218         RETURN(rc);
2219 }
2220
2221 /*
2222  * Get the 64-bit version for an inode.
2223  */
2224 static dt_obj_version_t osd_object_version_get(const struct lu_env *env,
2225                                                struct dt_object *dt)
2226 {
2227         struct inode *inode = osd_dt_obj(dt)->oo_inode;
2228
2229         CDEBUG(D_INFO, "Get version "LPX64" for inode %lu\n",
2230                LDISKFS_I(inode)->i_fs_version, inode->i_ino);
2231         return LDISKFS_I(inode)->i_fs_version;
2232 }
2233
2234 /*
2235  * Set the 64-bit version and return the old version.
2236  */
2237 static void osd_object_version_set(const struct lu_env *env, struct dt_object *dt,
2238                                    dt_obj_version_t new_version)
2239 {
2240         struct inode *inode = osd_dt_obj(dt)->oo_inode;
2241
2242         CDEBUG(D_INFO, "Set version "LPX64" (old "LPX64") for inode %lu\n",
2243                new_version, LDISKFS_I(inode)->i_fs_version, inode->i_ino);
2244         LDISKFS_I(inode)->i_fs_version = new_version;
2245         /** Version is set after all inode operations are finished,
2246          *  so we should mark it dirty here */
2247         inode->i_sb->s_op->dirty_inode(inode);
2248 }
2249
2250 static int osd_data_get(const struct lu_env *env, struct dt_object *dt,
2251                         void **data)
2252 {
2253         struct osd_object *obj = osd_dt_obj(dt);
2254         ENTRY;
2255
2256         *data = (void *)obj->oo_inode;
2257         RETURN(0);
2258 }
2259
2260 /*
2261  * Index operations.
2262  */
2263
2264 static int osd_iam_index_probe(const struct lu_env *env, struct osd_object *o,
2265                            const struct dt_index_features *feat)
2266 {
2267         struct iam_descr *descr;
2268
2269         if (osd_object_is_root(o))
2270                 return feat == &dt_directory_features;
2271
2272         LASSERT(o->oo_dir != NULL);
2273
2274         descr = o->oo_dir->od_container.ic_descr;
2275         if (feat == &dt_directory_features) {
2276                 if (descr->id_rec_size == sizeof(struct osd_fid_pack))
2277                         return 1;
2278                 else
2279                         return 0;
2280         } else {
2281                 return
2282                         feat->dif_keysize_min <= descr->id_key_size &&
2283                         descr->id_key_size <= feat->dif_keysize_max &&
2284                         feat->dif_recsize_min <= descr->id_rec_size &&
2285                         descr->id_rec_size <= feat->dif_recsize_max &&
2286                         !(feat->dif_flags & (DT_IND_VARKEY |
2287                                              DT_IND_VARREC | DT_IND_NONUNQ)) &&
2288                         ergo(feat->dif_flags & DT_IND_UPDATE,
2289                              1 /* XXX check that object (and file system) is
2290                                 * writable */);
2291         }
2292 }
2293
2294 static int osd_iam_container_init(const struct lu_env *env,
2295                                   struct osd_object *obj,
2296                                   struct osd_directory *dir)
2297 {
2298         int result;
2299         struct iam_container *bag;
2300
2301         bag    = &dir->od_container;
2302         result = iam_container_init(bag, &dir->od_descr, obj->oo_inode);
2303         if (result == 0) {
2304                 result = iam_container_setup(bag);
2305                 if (result == 0)
2306                         obj->oo_dt.do_index_ops = &osd_index_iam_ops;
2307                 else
2308                         iam_container_fini(bag);
2309         }
2310         return result;
2311 }
2312
2313
2314 /*
2315  * Concurrency: no external locking is necessary.
2316  */
2317 static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
2318                          const struct dt_index_features *feat)
2319 {
2320         int result;
2321         int ea_dir = 0;
2322         struct osd_object *obj = osd_dt_obj(dt);
2323         struct osd_device *osd = osd_obj2dev(obj);
2324
2325         LINVRNT(osd_invariant(obj));
2326         LASSERT(dt_object_exists(dt));
2327
2328         if (osd_object_is_root(obj)) {
2329                 dt->do_index_ops = &osd_index_ea_ops;
2330                 result = 0;
2331         } else if (feat == &dt_directory_features && osd->od_iop_mode) {
2332                 dt->do_index_ops = &osd_index_ea_ops;
2333                 if (S_ISDIR(obj->oo_inode->i_mode))
2334                         result = 0;
2335                 else
2336                         result = -ENOTDIR;
2337                 ea_dir = 1;
2338         } else if (!osd_has_index(obj)) {
2339                 struct osd_directory *dir;
2340
2341                 OBD_ALLOC_PTR(dir);
2342                 if (dir != NULL) {
2343
2344                         cfs_spin_lock(&obj->oo_guard);
2345                         if (obj->oo_dir == NULL)
2346                                 obj->oo_dir = dir;
2347                         else
2348                                 /*
2349                                  * Concurrent thread allocated container data.
2350                                  */
2351                                 OBD_FREE_PTR(dir);
2352                         cfs_spin_unlock(&obj->oo_guard);
2353                         /*
2354                          * Now, that we have container data, serialize its
2355                          * initialization.
2356                          */
2357                         cfs_down_write(&obj->oo_ext_idx_sem);
2358                         /*
2359                          * recheck under lock.
2360                          */
2361                         if (!osd_has_index(obj))
2362                                 result = osd_iam_container_init(env, obj, dir);
2363                         else
2364                                 result = 0;
2365                         cfs_up_write(&obj->oo_ext_idx_sem);
2366                 } else
2367                         result = -ENOMEM;
2368         } else
2369                 result = 0;
2370
2371         if (result == 0 && ea_dir == 0) {
2372                 if (!osd_iam_index_probe(env, obj, feat))
2373                         result = -ENOTDIR;
2374         }
2375         LINVRNT(osd_invariant(obj));
2376
2377         return result;
2378 }
2379
2380 static const struct dt_object_operations osd_obj_ops = {
2381         .do_read_lock    = osd_object_read_lock,
2382         .do_write_lock   = osd_object_write_lock,
2383         .do_read_unlock  = osd_object_read_unlock,
2384         .do_write_unlock = osd_object_write_unlock,
2385         .do_write_locked = osd_object_write_locked,
2386         .do_attr_get     = osd_attr_get,
2387         .do_attr_set     = osd_attr_set,
2388         .do_ah_init      = osd_ah_init,
2389         .do_create       = osd_object_create,
2390         .do_index_try    = osd_index_try,
2391         .do_ref_add      = osd_object_ref_add,
2392         .do_ref_del      = osd_object_ref_del,
2393         .do_xattr_get    = osd_xattr_get,
2394         .do_xattr_set    = osd_xattr_set,
2395         .do_xattr_del    = osd_xattr_del,
2396         .do_xattr_list   = osd_xattr_list,
2397         .do_capa_get     = osd_capa_get,
2398         .do_object_sync  = osd_object_sync,
2399         .do_version_get  = osd_object_version_get,
2400         .do_version_set  = osd_object_version_set,
2401         .do_data_get     = osd_data_get,
2402 };
2403
2404 /**
2405  * dt_object_operations for interoperability mode
2406  * (i.e. to run 2.0 mds on 1.8 disk) (b11826)
2407  */
2408 static const struct dt_object_operations osd_obj_ea_ops = {
2409         .do_read_lock    = osd_object_read_lock,
2410         .do_write_lock   = osd_object_write_lock,
2411         .do_read_unlock  = osd_object_read_unlock,
2412         .do_write_unlock = osd_object_write_unlock,
2413         .do_write_locked = osd_object_write_locked,
2414         .do_attr_get     = osd_attr_get,
2415         .do_attr_set     = osd_attr_set,
2416         .do_ah_init      = osd_ah_init,
2417         .do_create       = osd_object_ea_create,
2418         .do_index_try    = osd_index_try,
2419         .do_ref_add      = osd_object_ref_add,
2420         .do_ref_del      = osd_object_ref_del,
2421         .do_xattr_get    = osd_xattr_get,
2422         .do_xattr_set    = osd_xattr_set,
2423         .do_xattr_del    = osd_xattr_del,
2424         .do_xattr_list   = osd_xattr_list,
2425         .do_capa_get     = osd_capa_get,
2426         .do_object_sync  = osd_object_sync,
2427         .do_version_get  = osd_object_version_get,
2428         .do_version_set  = osd_object_version_set,
2429         .do_data_get     = osd_data_get,
2430 };
2431
2432 /*
2433  * Body operations.
2434  */
2435
2436 /*
2437  * XXX: Another layering violation for now.
2438  *
2439  * We don't want to use ->f_op->read methods, because generic file write
2440  *
2441  *         - serializes on ->i_sem, and
2442  *
2443  *         - does a lot of extra work like balance_dirty_pages(),
2444  *
2445  * which doesn't work for globally shared files like /last-received.
2446  */
2447 static int osd_ldiskfs_readlink(struct inode *inode, char *buffer, int buflen)
2448 {
2449         struct ldiskfs_inode_info *ei = LDISKFS_I(inode);
2450
2451         memcpy(buffer, (char*)ei->i_data, buflen);
2452
2453         return  buflen;
2454 }
2455
2456 static int osd_ldiskfs_read(struct inode *inode, void *buf, int size,
2457                             loff_t *offs)
2458 {
2459         struct buffer_head *bh;
2460         unsigned long block;
2461         int osize = size;
2462         int blocksize;
2463         int csize;
2464         int boffs;
2465         int err;
2466
2467         /* prevent reading after eof */
2468         spin_lock(&inode->i_lock);
2469         if (i_size_read(inode) < *offs + size) {
2470                 size = i_size_read(inode) - *offs;
2471                 spin_unlock(&inode->i_lock);
2472                 if (size < 0) {
2473                         CDEBUG(D_EXT2, "size %llu is too short to read @%llu\n",
2474                                i_size_read(inode), *offs);
2475                         return -EBADR;
2476                 } else if (size == 0) {
2477                         return 0;
2478                 }
2479         } else {
2480                 spin_unlock(&inode->i_lock);
2481         }
2482
2483         blocksize = 1 << inode->i_blkbits;
2484
2485         while (size > 0) {
2486                 block = *offs >> inode->i_blkbits;
2487                 boffs = *offs & (blocksize - 1);
2488                 csize = min(blocksize - boffs, size);
2489                 bh = ldiskfs_bread(NULL, inode, block, 0, &err);
2490                 if (!bh) {
2491                         CERROR("can't read block: %d\n", err);
2492                         return err;
2493                 }
2494
2495                 memcpy(buf, bh->b_data + boffs, csize);
2496                 brelse(bh);
2497
2498                 *offs += csize;
2499                 buf += csize;
2500                 size -= csize;
2501         }
2502         return osize;
2503 }
2504
2505 static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt,
2506                         struct lu_buf *buf, loff_t *pos,
2507                         struct lustre_capa *capa)
2508 {
2509         struct osd_object      *obj    = osd_dt_obj(dt);
2510         struct inode           *inode  = obj->oo_inode;
2511         int rc;
2512
2513         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
2514                 RETURN(-EACCES);
2515
2516         /* Read small symlink from inode body as we need to maintain correct
2517          * on-disk symlinks for ldiskfs.
2518          */
2519         if (S_ISLNK(obj->oo_dt.do_lu.lo_header->loh_attr) &&
2520             (buf->lb_len <= sizeof (LDISKFS_I(inode)->i_data)))
2521                 rc = osd_ldiskfs_readlink(inode, buf->lb_buf, buf->lb_len);
2522         else
2523                 rc = osd_ldiskfs_read(inode, buf->lb_buf, buf->lb_len, pos);
2524
2525         return rc;
2526 }
2527
2528 static int osd_ldiskfs_writelink(struct inode *inode, char *buffer, int buflen)
2529 {
2530
2531         memcpy((char*)&LDISKFS_I(inode)->i_data, (char *)buffer,
2532                buflen);
2533         LDISKFS_I(inode)->i_disksize = buflen;
2534         i_size_write(inode, buflen);
2535         inode->i_sb->s_op->dirty_inode(inode);
2536
2537         return 0;
2538 }
2539
2540 static int osd_ldiskfs_write_record(struct inode *inode, void *buf, int bufsize,
2541                                     loff_t *offs, handle_t *handle)
2542 {
2543         struct buffer_head *bh = NULL;
2544         loff_t offset = *offs;
2545         loff_t new_size = i_size_read(inode);
2546         unsigned long block;
2547         int blocksize = 1 << inode->i_blkbits;
2548         int err = 0;
2549         int size;
2550         int boffs;
2551         int dirty_inode = 0;
2552
2553         while (bufsize > 0) {
2554                 if (bh != NULL)
2555                         brelse(bh);
2556
2557                 block = offset >> inode->i_blkbits;
2558                 boffs = offset & (blocksize - 1);
2559                 size = min(blocksize - boffs, bufsize);
2560                 bh = ldiskfs_bread(handle, inode, block, 1, &err);
2561                 if (!bh) {
2562                         CERROR("can't read/create block: %d\n", err);
2563                         break;
2564                 }
2565
2566                 err = ldiskfs_journal_get_write_access(handle, bh);
2567                 if (err) {
2568                         CERROR("journal_get_write_access() returned error %d\n",
2569                                err);
2570                         break;
2571                 }
2572                 LASSERTF(boffs + size <= bh->b_size,
2573                          "boffs %d size %d bh->b_size %lu",
2574                          boffs, size, (unsigned long)bh->b_size);
2575                 memcpy(bh->b_data + boffs, buf, size);
2576                 err = ldiskfs_journal_dirty_metadata(handle, bh);
2577                 if (err)
2578                         break;
2579
2580                 if (offset + size > new_size)
2581                         new_size = offset + size;
2582                 offset += size;
2583                 bufsize -= size;
2584                 buf += size;
2585         }
2586         if (bh)
2587                 brelse(bh);
2588
2589         /* correct in-core and on-disk sizes */
2590         if (new_size > i_size_read(inode)) {
2591                 spin_lock(&inode->i_lock);
2592                 if (new_size > i_size_read(inode))
2593                         i_size_write(inode, new_size);
2594                 if (i_size_read(inode) > LDISKFS_I(inode)->i_disksize) {
2595                         LDISKFS_I(inode)->i_disksize = i_size_read(inode);
2596                         dirty_inode = 1;
2597                 }
2598                 spin_unlock(&inode->i_lock);
2599                 if (dirty_inode)
2600                         inode->i_sb->s_op->dirty_inode(inode);
2601         }
2602
2603         if (err == 0)
2604                 *offs = offset;
2605         return err;
2606 }
2607
2608 static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
2609                          const struct lu_buf *buf, loff_t *pos,
2610                          struct thandle *handle, struct lustre_capa *capa,
2611                          int ignore_quota)
2612 {
2613         struct osd_object  *obj   = osd_dt_obj(dt);
2614         struct inode       *inode = obj->oo_inode;
2615         struct osd_thandle *oh;
2616         ssize_t            result = 0;
2617 #ifdef HAVE_QUOTA_SUPPORT
2618         cfs_cap_t           save = cfs_curproc_cap_pack();
2619 #endif
2620
2621         LASSERT(handle != NULL);
2622
2623         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_WRITE))
2624                 RETURN(-EACCES);
2625
2626         oh = container_of(handle, struct osd_thandle, ot_super);
2627         LASSERT(oh->ot_handle->h_transaction != NULL);
2628 #ifdef HAVE_QUOTA_SUPPORT
2629         if (ignore_quota)
2630                 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
2631         else
2632                 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
2633 #endif
2634         /* Write small symlink to inode body as we need to maintain correct
2635          * on-disk symlinks for ldiskfs.
2636          */
2637         if(S_ISLNK(obj->oo_dt.do_lu.lo_header->loh_attr) &&
2638            (buf->lb_len < sizeof (LDISKFS_I(inode)->i_data)))
2639                 result = osd_ldiskfs_writelink(inode, buf->lb_buf, buf->lb_len);
2640         else
2641                 result = osd_ldiskfs_write_record(inode, buf->lb_buf,
2642                                                   buf->lb_len, pos,
2643                                                   oh->ot_handle);
2644 #ifdef HAVE_QUOTA_SUPPORT
2645         cfs_curproc_cap_unpack(save);
2646 #endif
2647         if (result == 0)
2648                 result = buf->lb_len;
2649         return result;
2650 }
2651
2652 static const struct dt_body_operations osd_body_ops = {
2653         .dbo_read  = osd_read,
2654         .dbo_write = osd_write
2655 };
2656
2657
2658 /**
2659  *      delete a (key, value) pair from index \a dt specified by \a key
2660  *
2661  *      \param  dt      osd index object
2662  *      \param  key     key for index
2663  *      \param  rec     record reference
2664  *      \param  handle  transaction handler
2665  *
2666  *      \retval  0  success
2667  *      \retval -ve   failure
2668  */
2669
2670 static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt,
2671                                 const struct dt_key *key, struct thandle *handle,
2672                                 struct lustre_capa *capa)
2673 {
2674         struct osd_object     *obj = osd_dt_obj(dt);
2675         struct osd_thandle    *oh;
2676         struct iam_path_descr *ipd;
2677         struct iam_container  *bag = &obj->oo_dir->od_container;
2678         int rc;
2679
2680         ENTRY;
2681
2682         LINVRNT(osd_invariant(obj));
2683         LASSERT(dt_object_exists(dt));
2684         LASSERT(bag->ic_object == obj->oo_inode);
2685         LASSERT(handle != NULL);
2686
2687         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
2688                 RETURN(-EACCES);
2689
2690         ipd = osd_idx_ipd_get(env, bag);
2691         if (unlikely(ipd == NULL))
2692                 RETURN(-ENOMEM);
2693
2694         oh = container_of0(handle, struct osd_thandle, ot_super);
2695         LASSERT(oh->ot_handle != NULL);
2696         LASSERT(oh->ot_handle->h_transaction != NULL);
2697
2698         rc = iam_delete(oh->ot_handle, bag, (const struct iam_key *)key, ipd);
2699         osd_ipd_put(env, bag, ipd);
2700         LINVRNT(osd_invariant(obj));
2701         RETURN(rc);
2702 }
2703
2704 static inline int osd_get_fid_from_dentry(struct ldiskfs_dir_entry_2 *de,
2705                                           struct dt_rec *fid)
2706 {
2707         struct osd_fid_pack *rec;
2708         int rc = -ENODATA;
2709
2710         if (de->file_type & LDISKFS_DIRENT_LUFID) {
2711                 rec = (struct osd_fid_pack *) (de->name + de->name_len + 1);
2712                 rc = osd_fid_unpack((struct lu_fid *)fid, rec);
2713         }
2714         RETURN(rc);
2715 }
2716
2717 /**
2718  * Index delete function for interoperability mode (b11826).
2719  * It will remove the directory entry added by osd_index_ea_insert().
2720  * This entry is needed to maintain name->fid mapping.
2721  *
2722  * \param key,  key i.e. file entry to be deleted
2723  *
2724  * \retval   0, on success
2725  * \retval -ve, on error
2726  */
2727 static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
2728                                const struct dt_key *key, struct thandle *handle,
2729                                struct lustre_capa *capa)
2730 {
2731         struct osd_object          *obj    = osd_dt_obj(dt);
2732         struct inode               *dir    = obj->oo_inode;
2733         struct dentry              *dentry;
2734         struct osd_thandle         *oh;
2735         struct ldiskfs_dir_entry_2 *de;
2736         struct buffer_head         *bh;
2737
2738         int rc;
2739
2740         ENTRY;
2741
2742         LINVRNT(osd_invariant(obj));
2743         LASSERT(dt_object_exists(dt));
2744         LASSERT(handle != NULL);
2745
2746         oh = container_of(handle, struct osd_thandle, ot_super);
2747         LASSERT(oh->ot_handle != NULL);
2748         LASSERT(oh->ot_handle->h_transaction != NULL);
2749
2750         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
2751                 RETURN(-EACCES);
2752
2753         dentry = osd_child_dentry_get(env, obj,
2754                                       (char *)key, strlen((char *)key));
2755
2756         cfs_down_write(&obj->oo_ext_idx_sem);
2757         bh = ll_ldiskfs_find_entry(dir, dentry, &de);
2758         if (bh) {
2759                 rc = ldiskfs_delete_entry(oh->ot_handle,
2760                                 dir, de, bh);
2761                 brelse(bh);
2762         } else
2763                 rc = -ENOENT;
2764
2765         cfs_up_write(&obj->oo_ext_idx_sem);
2766         LASSERT(osd_invariant(obj));
2767         RETURN(rc);
2768 }
2769
2770 /**
2771  *      Lookup index for \a key and copy record to \a rec.
2772  *
2773  *      \param  dt      osd index object
2774  *      \param  key     key for index
2775  *      \param  rec     record reference
2776  *
2777  *      \retval  +ve  success : exact mach
2778  *      \retval  0    return record with key not greater than \a key
2779  *      \retval -ve   failure
2780  */
2781 static int osd_index_iam_lookup(const struct lu_env *env, struct dt_object *dt,
2782                                 struct dt_rec *rec, const struct dt_key *key,
2783                                 struct lustre_capa *capa)
2784 {
2785         struct osd_object     *obj = osd_dt_obj(dt);
2786         struct iam_path_descr *ipd;
2787         struct iam_container  *bag = &obj->oo_dir->od_container;
2788         struct osd_thread_info *oti = osd_oti_get(env);
2789         struct iam_iterator    *it = &oti->oti_idx_it;
2790         struct iam_rec *iam_rec;
2791         int rc;
2792         ENTRY;
2793
2794         LASSERT(osd_invariant(obj));
2795         LASSERT(dt_object_exists(dt));
2796         LASSERT(bag->ic_object == obj->oo_inode);
2797
2798         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
2799                 RETURN(-EACCES);
2800
2801         ipd = osd_idx_ipd_get(env, bag);
2802         if (IS_ERR(ipd))
2803                 RETURN(-ENOMEM);
2804
2805         /* got ipd now we can start iterator. */
2806         iam_it_init(it, bag, 0, ipd);
2807
2808         rc = iam_it_get(it, (struct iam_key *)key);
2809         if (rc >= 0) {
2810                 if (S_ISDIR(obj->oo_inode->i_mode))
2811                         iam_rec = (struct iam_rec *)oti->oti_ldp;
2812                 else
2813                         iam_rec = (struct iam_rec *) rec;
2814
2815                 iam_reccpy(&it->ii_path.ip_leaf, (struct iam_rec *)iam_rec);
2816                 if (S_ISDIR(obj->oo_inode->i_mode))
2817                         osd_fid_unpack((struct lu_fid *) rec,
2818                                        (struct osd_fid_pack *)iam_rec);
2819         }
2820         iam_it_put(it);
2821         iam_it_fini(it);
2822         osd_ipd_put(env, bag, ipd);
2823
2824         LINVRNT(osd_invariant(obj));
2825
2826         RETURN(rc);
2827 }
2828
2829 /**
2830  *      Inserts (key, value) pair in \a dt index object.
2831  *
2832  *      \param  dt      osd index object
2833  *      \param  key     key for index
2834  *      \param  rec     record reference
2835  *      \param  th      transaction handler
2836  *
2837  *      \retval  0  success
2838  *      \retval -ve failure
2839  */
2840 static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt,
2841                                 const struct dt_rec *rec, const struct dt_key *key,
2842                                 struct thandle *th, struct lustre_capa *capa,
2843                                 int ignore_quota)
2844 {
2845         struct osd_object     *obj = osd_dt_obj(dt);
2846         struct iam_path_descr *ipd;
2847         struct osd_thandle    *oh;
2848         struct iam_container  *bag = &obj->oo_dir->od_container;
2849 #ifdef HAVE_QUOTA_SUPPORT
2850         cfs_cap_t              save = cfs_curproc_cap_pack();
2851 #endif
2852         struct osd_thread_info *oti = osd_oti_get(env);
2853         struct iam_rec *iam_rec = (struct iam_rec *)oti->oti_ldp;
2854         int rc;
2855
2856         ENTRY;
2857
2858         LINVRNT(osd_invariant(obj));
2859         LASSERT(dt_object_exists(dt));
2860         LASSERT(bag->ic_object == obj->oo_inode);
2861         LASSERT(th != NULL);
2862
2863         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
2864                 return -EACCES;
2865
2866         ipd = osd_idx_ipd_get(env, bag);
2867         if (unlikely(ipd == NULL))
2868                 RETURN(-ENOMEM);
2869
2870         oh = container_of0(th, struct osd_thandle, ot_super);
2871         LASSERT(oh->ot_handle != NULL);
2872         LASSERT(oh->ot_handle->h_transaction != NULL);
2873 #ifdef HAVE_QUOTA_SUPPORT
2874         if (ignore_quota)
2875                 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
2876         else
2877                 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
2878 #endif
2879         if (S_ISDIR(obj->oo_inode->i_mode))
2880                 osd_fid_pack((struct osd_fid_pack *)iam_rec, rec, &oti->oti_fid);
2881         else
2882                 iam_rec = (struct iam_rec *) rec;
2883         rc = iam_insert(oh->ot_handle, bag, (const struct iam_key *)key,
2884                         iam_rec, ipd);
2885 #ifdef HAVE_QUOTA_SUPPORT
2886         cfs_curproc_cap_unpack(save);
2887 #endif
2888         osd_ipd_put(env, bag, ipd);
2889         LINVRNT(osd_invariant(obj));
2890         RETURN(rc);
2891 }
2892
2893 /**
2894  * Calls ldiskfs_add_entry() to add directory entry
2895  * into the directory. This is required for
2896  * interoperability mode (b11826)
2897  *
2898  * \retval   0, on success
2899  * \retval -ve, on error
2900  */
2901 static int __osd_ea_add_rec(struct osd_thread_info *info,
2902                             struct osd_object *pobj,
2903                             struct inode  *cinode,
2904                             const char *name,
2905                             const struct dt_rec *fid,
2906                             struct thandle *th)
2907 {
2908         struct ldiskfs_dentry_param *ldp;
2909         struct dentry      *child;
2910         struct osd_thandle *oth;
2911         int rc;
2912
2913         oth = container_of(th, struct osd_thandle, ot_super);
2914         LASSERT(oth->ot_handle != NULL);
2915         LASSERT(oth->ot_handle->h_transaction != NULL);
2916
2917         child = osd_child_dentry_get(info->oti_env, pobj, name, strlen(name));
2918
2919         if (fid_is_igif((struct lu_fid *)fid) ||
2920             fid_seq((struct lu_fid *)fid) >= FID_SEQ_NORMAL) {
2921                 ldp = (struct ldiskfs_dentry_param *)info->oti_ldp;
2922                 osd_get_ldiskfs_dirent_param(ldp, fid);
2923                 child->d_fsdata = (void*) ldp;
2924         } else
2925                 child->d_fsdata = NULL;
2926         rc = ldiskfs_add_entry(oth->ot_handle, child, cinode);
2927
2928         RETURN(rc);
2929 }
2930
2931 /**
2932  * Calls ldiskfs_add_dot_dotdot() to add dot and dotdot entries
2933  * into the directory.Also sets flags into osd object to
2934  * indicate dot and dotdot are created. This is required for
2935  * interoperability mode (b11826)
2936  *
2937  * \param dir   directory for dot and dotdot fixup.
2938  * \param obj   child object for linking
2939  *
2940  * \retval   0, on success
2941  * \retval -ve, on error
2942  */
2943 static int osd_add_dot_dotdot(struct osd_thread_info *info,
2944                               struct osd_object *dir,
2945                               struct inode  *parent_dir, const char *name,
2946                               const struct dt_rec *dot_fid,
2947                               const struct dt_rec *dot_dot_fid,
2948                               struct thandle *th)
2949 {
2950         struct inode            *inode  = dir->oo_inode;
2951         struct ldiskfs_dentry_param *dot_ldp;
2952         struct ldiskfs_dentry_param *dot_dot_ldp;
2953         struct osd_thandle      *oth;
2954         int result = 0;
2955
2956         oth = container_of(th, struct osd_thandle, ot_super);
2957         LASSERT(oth->ot_handle->h_transaction != NULL);
2958         LASSERT(S_ISDIR(dir->oo_inode->i_mode));
2959
2960         if (strcmp(name, dot) == 0) {
2961                 if (dir->oo_compat_dot_created) {
2962                         result = -EEXIST;
2963                 } else {
2964                         LASSERT(inode == parent_dir);
2965                         dir->oo_compat_dot_created = 1;
2966                         result = 0;
2967                 }
2968         } else if(strcmp(name, dotdot) == 0) {
2969                 dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp;
2970                 dot_dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp2;
2971
2972                 if (!dir->oo_compat_dot_created)
2973                         return -EINVAL;
2974                 if (fid_seq((struct lu_fid *)dot_fid) >= FID_SEQ_NORMAL) {
2975                         osd_get_ldiskfs_dirent_param(dot_ldp, dot_fid);
2976                         osd_get_ldiskfs_dirent_param(dot_dot_ldp, dot_dot_fid);
2977                 } else {
2978                         dot_ldp = NULL;
2979                         dot_dot_ldp = NULL;
2980                 }
2981                 /* in case of rename, dotdot is already created */
2982                 if (dir->oo_compat_dotdot_created) {
2983                         return __osd_ea_add_rec(info, dir, parent_dir, name,
2984                                                 dot_dot_fid, th);
2985                 }
2986
2987                 result = ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir, inode,
2988                                                 dot_ldp, dot_dot_ldp);
2989                 if (result == 0)
2990                        dir->oo_compat_dotdot_created = 1;
2991         }
2992
2993         return result;
2994 }
2995
2996
2997 /**
2998  * It will call the appropriate osd_add* function and return the
2999  * value, return by respective functions.
3000  */
3001 static int osd_ea_add_rec(const struct lu_env *env,
3002                           struct osd_object *pobj,
3003                           struct inode *cinode,
3004                           const char *name,
3005                           const struct dt_rec *fid,
3006                           struct thandle *th)
3007 {
3008         struct osd_thread_info    *info   = osd_oti_get(env);
3009         int rc;
3010
3011         if (name[0] == '.' && (name[1] == '\0' || (name[1] == '.' &&
3012                                                    name[2] =='\0')))
3013                 rc = osd_add_dot_dotdot(info, pobj, cinode, name,
3014                      (struct dt_rec *)lu_object_fid(&pobj->oo_dt.do_lu),
3015                                         fid, th);
3016         else
3017                 rc = __osd_ea_add_rec(info, pobj, cinode, name, fid, th);
3018
3019         return rc;
3020 }
3021
3022 /**
3023  * Calls ->lookup() to find dentry. From dentry get inode and
3024  * read inode's ea to get fid. This is required for  interoperability
3025  * mode (b11826)
3026  *
3027  * \retval   0, on success
3028  * \retval -ve, on error
3029  */
3030 static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
3031                              struct dt_rec *rec, const struct dt_key *key)
3032 {
3033         struct inode               *dir    = obj->oo_inode;
3034         struct dentry              *dentry;
3035         struct ldiskfs_dir_entry_2 *de;
3036         struct buffer_head         *bh;
3037         struct lu_fid              *fid = (struct lu_fid *) rec;
3038         int ino;
3039         int rc;
3040
3041         LASSERT(dir->i_op != NULL && dir->i_op->lookup != NULL);
3042
3043         dentry = osd_child_dentry_get(env, obj,
3044                                       (char *)key, strlen((char *)key));
3045
3046         cfs_down_read(&obj->oo_ext_idx_sem);
3047         bh = ll_ldiskfs_find_entry(dir, dentry, &de);
3048         if (bh) {
3049                 ino = le32_to_cpu(de->inode);
3050                 rc = osd_get_fid_from_dentry(de, rec);
3051
3052                 /* done with de, release bh */
3053                 brelse(bh);
3054                 if (rc != 0)
3055                         rc = osd_ea_fid_get(env, obj, ino, fid);
3056         } else
3057                 rc = -ENOENT;
3058
3059         cfs_up_read(&obj->oo_ext_idx_sem);
3060         RETURN (rc);
3061 }
3062
3063 /**
3064  * Find the osd object for given fid.
3065  *
3066  * \param fid need to find the osd object having this fid
3067  *
3068  * \retval osd_object on success
3069  * \retval        -ve on error
3070  */
3071 struct osd_object *osd_object_find(const struct lu_env *env,
3072                                    struct dt_object *dt,
3073                                    const struct lu_fid *fid)
3074 {
3075         struct lu_device         *ludev = dt->do_lu.lo_dev;
3076         struct osd_object        *child = NULL;
3077         struct lu_object         *luch;
3078         struct lu_object         *lo;
3079
3080         luch = lu_object_find(env, ludev, fid, NULL);
3081         if (!IS_ERR(luch)) {
3082                 if (lu_object_exists(luch)) {
3083                         lo = lu_object_locate(luch->lo_header, ludev->ld_type);
3084                         if (lo != NULL)
3085                                 child = osd_obj(lo);
3086                         else
3087                                 LU_OBJECT_DEBUG(D_ERROR, env, luch,
3088                                                 "lu_object can't be located"
3089                                                 ""DFID"\n", PFID(fid));
3090
3091                         if (child == NULL) {
3092                                 lu_object_put(env, luch);
3093                                 CERROR("Unable to get osd_object\n");
3094                                 child = ERR_PTR(-ENOENT);
3095                         }
3096                 } else {
3097                         LU_OBJECT_DEBUG(D_ERROR, env, luch,
3098                                         "lu_object does not exists "DFID"\n",
3099                                         PFID(fid));
3100                         child = ERR_PTR(-ENOENT);
3101                 }
3102         } else
3103                 child = (void *)luch;
3104
3105         return child;
3106 }
3107
3108 /**
3109  * Put the osd object once done with it.
3110  *
3111  * \param obj osd object that needs to be put
3112  */
3113 static inline void osd_object_put(const struct lu_env *env,
3114                                   struct osd_object *obj)
3115 {
3116         lu_object_put(env, &obj->oo_dt.do_lu);
3117 }
3118
3119 /**
3120  * Index add function for interoperability mode (b11826).
3121  * It will add the directory entry.This entry is needed to
3122  * maintain name->fid mapping.
3123  *
3124  * \param key it is key i.e. file entry to be inserted
3125  * \param rec it is value of given key i.e. fid
3126  *
3127  * \retval   0, on success
3128  * \retval -ve, on error
3129  */
3130 static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
3131                                const struct dt_rec *rec,
3132                                const struct dt_key *key, struct thandle *th,
3133                                struct lustre_capa *capa, int ignore_quota)
3134 {
3135         struct osd_object        *obj   = osd_dt_obj(dt);
3136         struct lu_fid            *fid   = (struct lu_fid *) rec;
3137         const char               *name  = (const char *)key;
3138         struct osd_object        *child;
3139 #ifdef HAVE_QUOTA_SUPPORT
3140         cfs_cap_t                 save  = cfs_curproc_cap_pack();
3141 #endif
3142         int rc;
3143
3144         ENTRY;
3145
3146         LASSERT(osd_invariant(obj));
3147         LASSERT(dt_object_exists(dt));
3148         LASSERT(th != NULL);
3149
3150         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
3151                 RETURN(-EACCES);
3152
3153         child = osd_object_find(env, dt, fid);
3154         if (!IS_ERR(child)) {
3155 #ifdef HAVE_QUOTA_SUPPORT
3156                 if (ignore_quota)
3157                         cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
3158                 else
3159                         cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
3160 #endif
3161                 cfs_down_write(&obj->oo_ext_idx_sem);
3162                 rc = osd_ea_add_rec(env, obj, child->oo_inode, name, rec, th);
3163                 cfs_up_write(&obj->oo_ext_idx_sem);
3164 #ifdef HAVE_QUOTA_SUPPORT
3165                 cfs_curproc_cap_unpack(save);
3166 #endif
3167                 osd_object_put(env, child);
3168         } else {
3169                 rc = PTR_ERR(child);
3170         }
3171
3172         LASSERT(osd_invariant(obj));
3173         RETURN(rc);
3174 }
3175
3176 /**
3177  *  Initialize osd Iterator for given osd index object.
3178  *
3179  *  \param  dt      osd index object
3180  */
3181
3182 static struct dt_it *osd_it_iam_init(const struct lu_env *env,
3183                                  struct dt_object *dt,
3184                                  struct lustre_capa *capa)
3185 {
3186         struct osd_it_iam         *it;
3187         struct osd_thread_info *oti = osd_oti_get(env);
3188         struct osd_object     *obj = osd_dt_obj(dt);
3189         struct lu_object      *lo  = &dt->do_lu;
3190         struct iam_path_descr *ipd;
3191         struct iam_container  *bag = &obj->oo_dir->od_container;
3192
3193         LASSERT(lu_object_exists(lo));
3194
3195         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
3196                 return ERR_PTR(-EACCES);
3197
3198         it = &oti->oti_it;
3199         ipd = osd_it_ipd_get(env, bag);
3200         if (likely(ipd != NULL)) {
3201                 it->oi_obj = obj;
3202                 it->oi_ipd = ipd;
3203                 lu_object_get(lo);
3204                 iam_it_init(&it->oi_it, bag, IAM_IT_MOVE, ipd);
3205                 return (struct dt_it *)it;
3206         }
3207         return ERR_PTR(-ENOMEM);
3208 }
3209
3210 /**
3211  * free given Iterator.
3212  */
3213
3214 static void osd_it_iam_fini(const struct lu_env *env, struct dt_it *di)
3215 {
3216         struct osd_it_iam     *it = (struct osd_it_iam *)di;
3217         struct osd_object *obj = it->oi_obj;
3218
3219         iam_it_fini(&it->oi_it);
3220         osd_ipd_put(env, &obj->oo_dir->od_container, it->oi_ipd);
3221         lu_object_put(env, &obj->oo_dt.do_lu);
3222 }
3223
3224 /**
3225  *  Move Iterator to record specified by \a key
3226  *
3227  *  \param  di      osd iterator
3228  *  \param  key     key for index
3229  *
3230  *  \retval +ve  di points to record with least key not larger than key
3231  *  \retval  0   di points to exact matched key
3232  *  \retval -ve  failure
3233  */
3234
3235 static int osd_it_iam_get(const struct lu_env *env,
3236                       struct dt_it *di, const struct dt_key *key)
3237 {
3238         struct osd_it_iam *it = (struct osd_it_iam *)di;
3239
3240         return iam_it_get(&it->oi_it, (const struct iam_key *)key);
3241 }
3242
3243 /**
3244  *  Release Iterator
3245  *
3246  *  \param  di      osd iterator
3247  */
3248
3249 static void osd_it_iam_put(const struct lu_env *env, struct dt_it *di)
3250 {
3251         struct osd_it_iam *it = (struct osd_it_iam *)di;
3252
3253         iam_it_put(&it->oi_it);
3254 }
3255
3256 /**
3257  *  Move iterator by one record
3258  *
3259  *  \param  di      osd iterator
3260  *
3261  *  \retval +1   end of container reached
3262  *  \retval  0   success
3263  *  \retval -ve  failure
3264  */
3265
3266 static int osd_it_iam_next(const struct lu_env *env, struct dt_it *di)
3267 {
3268         struct osd_it_iam *it = (struct osd_it_iam *)di;
3269
3270         return iam_it_next(&it->oi_it);
3271 }
3272
3273 /**
3274  * Return pointer to the key under iterator.
3275  */
3276
3277 static struct dt_key *osd_it_iam_key(const struct lu_env *env,
3278                                  const struct dt_it *di)
3279 {
3280         struct osd_it_iam *it = (struct osd_it_iam *)di;
3281
3282         return (struct dt_key *)iam_it_key_get(&it->oi_it);
3283 }
3284
3285 /**
3286  * Return size of key under iterator (in bytes)
3287  */
3288
3289 static int osd_it_iam_key_size(const struct lu_env *env, const struct dt_it *di)
3290 {
3291         struct osd_it_iam *it = (struct osd_it_iam *)di;
3292
3293         return iam_it_key_size(&it->oi_it);
3294 }
3295
3296 static inline void osd_it_append_attrs(struct lu_dirent*ent,
3297                                        __u32 attr,
3298                                        int len,
3299                                        __u16 type)
3300 {
3301         struct luda_type        *lt;
3302         const unsigned           align = sizeof(struct luda_type) - 1;
3303
3304         /* check if file type is required */
3305         if (attr & LUDA_TYPE) {
3306                         len = (len + align) & ~align;
3307
3308                         lt = (void *) ent->lde_name + len;
3309                         lt->lt_type = cpu_to_le16(CFS_DTTOIF(type));
3310                         ent->lde_attrs |= LUDA_TYPE;
3311         }
3312
3313         ent->lde_attrs = cpu_to_le32(ent->lde_attrs);
3314 }
3315
3316 /**
3317  * build lu direct from backend fs dirent.
3318  */
3319
3320 static inline void osd_it_pack_dirent(struct lu_dirent *ent,
3321                                       struct lu_fid *fid,
3322                                       __u64 offset,
3323                                       char *name,
3324                                       __u16 namelen,
3325                                       __u16 type,
3326                                       __u32 attr)
3327 {
3328         fid_cpu_to_le(&ent->lde_fid, fid);
3329         ent->lde_attrs = LUDA_FID;
3330
3331         ent->lde_hash = cpu_to_le64(offset);
3332         ent->lde_reclen = cpu_to_le16(lu_dirent_calc_size(namelen, attr));
3333
3334         strncpy(ent->lde_name, name, namelen);
3335         ent->lde_namelen = cpu_to_le16(namelen);
3336
3337         /* append lustre attributes */
3338         osd_it_append_attrs(ent, attr, namelen, type);
3339 }
3340
3341 /**
3342  * Return pointer to the record under iterator.
3343  */
3344 static int osd_it_iam_rec(const struct lu_env *env,
3345                           const struct dt_it *di,
3346                           struct lu_dirent *lde,
3347                           __u32 attr)
3348 {
3349         struct osd_it_iam *it        = (struct osd_it_iam *)di;
3350         struct osd_thread_info *info = osd_oti_get(env);
3351         struct lu_fid     *fid       = &info->oti_fid;
3352         const struct osd_fid_pack *rec;
3353         char *name;
3354         int namelen;
3355         __u64 hash;
3356         int rc;
3357
3358         name = (char *)iam_it_key_get(&it->oi_it);
3359         if (IS_ERR(name))
3360                 RETURN(PTR_ERR(name));
3361
3362         namelen = iam_it_key_size(&it->oi_it);
3363
3364         rec = (const struct osd_fid_pack *) iam_it_rec_get(&it->oi_it);
3365         if (IS_ERR(rec))
3366                 RETURN(PTR_ERR(rec));
3367
3368         rc = osd_fid_unpack(fid, rec);
3369         if (rc)
3370                 RETURN(rc);
3371
3372         hash = iam_it_store(&it->oi_it);
3373
3374         /* IAM does not store object type in IAM index (dir) */
3375         osd_it_pack_dirent(lde, fid, hash, name, namelen,
3376                            0, LUDA_FID);
3377
3378         return 0;
3379 }
3380
3381 /**
3382  * Returns cookie for current Iterator position.
3383  */
3384 static __u64 osd_it_iam_store(const struct lu_env *env, const struct dt_it *di)
3385 {
3386         struct osd_it_iam *it = (struct osd_it_iam *)di;
3387
3388         return iam_it_store(&it->oi_it);
3389 }
3390
3391 /**
3392  * Restore iterator from cookie.
3393  *
3394  * \param  di      osd iterator
3395  * \param  hash    Iterator location cookie
3396  *
3397  * \retval +ve  di points to record with least key not larger than key.
3398  * \retval  0   di points to exact matched key
3399  * \retval -ve  failure
3400  */
3401
3402 static int osd_it_iam_load(const struct lu_env *env,
3403                        const struct dt_it *di, __u64 hash)
3404 {
3405         struct osd_it_iam *it = (struct osd_it_iam *)di;
3406
3407         return iam_it_load(&it->oi_it, hash);
3408 }
3409
3410 static const struct dt_index_operations osd_index_iam_ops = {
3411         .dio_lookup = osd_index_iam_lookup,
3412         .dio_insert = osd_index_iam_insert,
3413         .dio_delete = osd_index_iam_delete,
3414         .dio_it     = {
3415                 .init     = osd_it_iam_init,
3416                 .fini     = osd_it_iam_fini,
3417                 .get      = osd_it_iam_get,
3418                 .put      = osd_it_iam_put,
3419                 .next     = osd_it_iam_next,
3420                 .key      = osd_it_iam_key,
3421                 .key_size = osd_it_iam_key_size,
3422                 .rec      = osd_it_iam_rec,
3423                 .store    = osd_it_iam_store,
3424                 .load     = osd_it_iam_load
3425         }
3426 };
3427
3428 /**
3429  * Creates or initializes iterator context.
3430  *
3431  * \retval struct osd_it_ea, iterator structure on success
3432  *
3433  */
3434 static struct dt_it *osd_it_ea_init(const struct lu_env *env,
3435                                     struct dt_object *dt,
3436                                     struct lustre_capa *capa)
3437 {
3438         struct osd_object       *obj  = osd_dt_obj(dt);
3439         struct osd_thread_info  *info = osd_oti_get(env);
3440         struct osd_it_ea        *it   = &info->oti_it_ea;
3441         struct lu_object        *lo   = &dt->do_lu;
3442         struct dentry           *obj_dentry = &info->oti_it_dentry;
3443         ENTRY;
3444         LASSERT(lu_object_exists(lo));
3445
3446         obj_dentry->d_inode = obj->oo_inode;
3447         obj_dentry->d_sb = osd_sb(osd_obj2dev(obj));
3448         obj_dentry->d_name.hash = 0;
3449
3450         it->oie_rd_dirent       = 0;
3451         it->oie_it_dirent       = 0;
3452         it->oie_dirent          = NULL;
3453         it->oie_buf             = info->oti_it_ea_buf;
3454         it->oie_obj             = obj;
3455         it->oie_file.f_pos      = 0;
3456         it->oie_file.f_dentry   = obj_dentry;
3457         it->oie_file.f_mapping    = obj->oo_inode->i_mapping;
3458         it->oie_file.f_op         = obj->oo_inode->i_fop;
3459         it->oie_file.private_data = NULL;
3460         lu_object_get(lo);
3461         RETURN((struct dt_it *) it);
3462 }
3463
3464 /**
3465  * Destroy or finishes iterator context.
3466  *
3467  * \param di iterator structure to be destroyed
3468  */
3469 static void osd_it_ea_fini(const struct lu_env *env, struct dt_it *di)
3470 {
3471         struct osd_it_ea     *it   = (struct osd_it_ea *)di;
3472         struct osd_object    *obj  = it->oie_obj;
3473         struct inode       *inode  = obj->oo_inode;
3474
3475         ENTRY;
3476         it->oie_file.f_op->release(inode, &it->oie_file);
3477         lu_object_put(env, &obj->oo_dt.do_lu);
3478         EXIT;
3479 }
3480
3481 /**
3482  * It position the iterator at given key, so that next lookup continues from
3483  * that key Or it is similar to dio_it->load() but based on a key,
3484  * rather than file position.
3485  *
3486  * As a special convention, osd_it_ea_get(env, di, "") has to rewind iterator
3487  * to the beginning.
3488  *
3489  * TODO: Presently return +1 considering it is only used by mdd_dir_is_empty().
3490  */
3491 static int osd_it_ea_get(const struct lu_env *env,
3492                          struct dt_it *di, const struct dt_key *key)
3493 {
3494         struct osd_it_ea     *it   = (struct osd_it_ea *)di;
3495
3496         ENTRY;
3497         LASSERT(((const char *)key)[0] == '\0');
3498         it->oie_file.f_pos      = 0;
3499         it->oie_rd_dirent       = 0;
3500         it->oie_it_dirent       = 0;
3501         it->oie_dirent          = NULL;
3502
3503         RETURN(+1);
3504 }
3505
3506 /**
3507  * Does nothing
3508  */
3509 static void osd_it_ea_put(const struct lu_env *env, struct dt_it *di)
3510 {
3511 }
3512
3513 /**
3514  * It is called internally by ->readdir(). It fills the
3515  * iterator's in-memory data structure with required
3516  * information i.e. name, namelen, rec_size etc.
3517  *
3518  * \param buf in which information to be filled in.
3519  * \param name name of the file in given dir
3520  *
3521  * \retval 0 on success
3522  * \retval 1 on buffer full
3523  */
3524 static int osd_ldiskfs_filldir(char *buf, const char *name, int namelen,
3525                                loff_t offset, __u64 ino,
3526                                unsigned d_type)
3527 {
3528         struct osd_it_ea        *it   = (struct osd_it_ea *)buf;
3529         struct osd_it_ea_dirent *ent  = it->oie_dirent;
3530         struct lu_fid           *fid  = &ent->oied_fid;
3531         struct osd_fid_pack     *rec;
3532         ENTRY;
3533
3534         /* this should never happen */
3535         if (unlikely(namelen == 0 || namelen > LDISKFS_NAME_LEN)) {
3536                 CERROR("ldiskfs return invalid namelen %d\n", namelen);
3537                 RETURN(-EIO);
3538         }
3539
3540         if ((void *) ent - it->oie_buf + sizeof(*ent) + namelen >
3541             OSD_IT_EA_BUFSIZE)
3542                 RETURN(1);
3543
3544         if (d_type & LDISKFS_DIRENT_LUFID) {
3545                 rec = (struct osd_fid_pack*) (name + namelen + 1);
3546
3547                 if (osd_fid_unpack(fid, rec) != 0)
3548                         fid_zero(fid);
3549
3550                 d_type &= ~LDISKFS_DIRENT_LUFID;
3551         } else {
3552                 fid_zero(fid);
3553         }
3554
3555         ent->oied_ino     = ino;
3556         ent->oied_off     = offset;
3557         ent->oied_namelen = namelen;
3558         ent->oied_type    = d_type;
3559
3560         memcpy(ent->oied_name, name, namelen);
3561
3562         it->oie_rd_dirent++;
3563         it->oie_dirent = (void *) ent + cfs_size_round(sizeof(*ent) + namelen);
3564         RETURN(0);
3565 }
3566
3567 /**
3568  * Calls ->readdir() to load a directory entry at a time
3569  * and stored it in iterator's in-memory data structure.
3570  *
3571  * \param di iterator's in memory structure
3572  *
3573  * \retval   0 on success
3574  * \retval -ve on error
3575  */
3576 static int osd_ldiskfs_it_fill(const struct dt_it *di)
3577 {
3578         struct osd_it_ea   *it    = (struct osd_it_ea *)di;
3579         struct osd_object  *obj   = it->oie_obj;
3580         struct inode       *inode = obj->oo_inode;
3581         int                result = 0;
3582
3583         ENTRY;
3584         it->oie_dirent = it->oie_buf;
3585         it->oie_rd_dirent = 0;
3586
3587         cfs_down_read(&obj->oo_ext_idx_sem);
3588         result = inode->i_fop->readdir(&it->oie_file, it,
3589                                        (filldir_t) osd_ldiskfs_filldir);
3590
3591         cfs_up_read(&obj->oo_ext_idx_sem);
3592
3593         if (it->oie_rd_dirent == 0) {
3594                 result = -EIO;
3595         } else {
3596                 it->oie_dirent = it->oie_buf;
3597                 it->oie_it_dirent = 1;
3598         }
3599
3600         RETURN(result);
3601 }
3602
3603 /**
3604  * It calls osd_ldiskfs_it_fill() which will use ->readdir()
3605  * to load a directory entry at a time and stored it in
3606  * iterator's in-memory data structure.
3607  *
3608  * \param di iterator's in memory structure
3609  *
3610  * \retval +ve iterator reached to end
3611  * \retval   0 iterator not reached to end
3612  * \retval -ve on error
3613  */
3614 static int osd_it_ea_next(const struct lu_env *env, struct dt_it *di)
3615 {
3616         struct osd_it_ea *it = (struct osd_it_ea *)di;
3617         int rc;
3618
3619         ENTRY;
3620
3621         if (it->oie_it_dirent < it->oie_rd_dirent) {
3622                 it->oie_dirent =
3623                         (void *) it->oie_dirent +
3624                         cfs_size_round(sizeof(struct osd_it_ea_dirent) +
3625                                        it->oie_dirent->oied_namelen);
3626                 it->oie_it_dirent++;
3627                 RETURN(0);
3628         } else {
3629                 if (it->oie_file.f_pos == LDISKFS_HTREE_EOF)
3630                         rc = +1;
3631                 else
3632                         rc = osd_ldiskfs_it_fill(di);
3633         }
3634
3635         RETURN(rc);
3636 }
3637
3638 /**
3639  * Returns the key at current position from iterator's in memory structure.
3640  *
3641  * \param di iterator's in memory structure
3642  *
3643  * \retval key i.e. struct dt_key on success
3644  */
3645 static struct dt_key *osd_it_ea_key(const struct lu_env *env,
3646                                     const struct dt_it *di)
3647 {
3648         struct osd_it_ea *it = (struct osd_it_ea *)di;
3649         ENTRY;
3650         RETURN((struct dt_key *)it->oie_dirent->oied_name);
3651 }
3652
3653 /**
3654  * Returns the key's size at current position from iterator's in memory structure.
3655  *
3656  * \param di iterator's in memory structure
3657  *
3658  * \retval key_size i.e. struct dt_key on success
3659  */
3660 static int osd_it_ea_key_size(const struct lu_env *env, const struct dt_it *di)
3661 {
3662         struct osd_it_ea *it = (struct osd_it_ea *)di;
3663         ENTRY;
3664         RETURN(it->oie_dirent->oied_namelen);
3665 }
3666
3667
3668 /**
3669  * Returns the value (i.e. fid/igif) at current position from iterator's
3670  * in memory structure.
3671  *
3672  * \param di struct osd_it_ea, iterator's in memory structure
3673  * \param attr attr requested for dirent.
3674  * \param lde lustre dirent
3675  *
3676  * \retval   0 no error and \param lde has correct lustre dirent.
3677  * \retval -ve on error
3678  */
3679 static inline int osd_it_ea_rec(const struct lu_env *env,
3680                                 const struct dt_it *di,
3681                                 struct lu_dirent *lde,
3682                                 __u32 attr)
3683 {
3684         struct osd_it_ea        *it     = (struct osd_it_ea *)di;
3685         struct osd_object       *obj    = it->oie_obj;
3686         struct lu_fid           *fid    = &it->oie_dirent->oied_fid;
3687         int    rc = 0;
3688
3689         ENTRY;
3690
3691         if (!fid_is_sane(fid))
3692                 rc = osd_ea_fid_get(env, obj, it->oie_dirent->oied_ino, fid);
3693
3694         if (rc == 0)
3695                 osd_it_pack_dirent(lde, fid, it->oie_dirent->oied_off,
3696                                    it->oie_dirent->oied_name,
3697                                    it->oie_dirent->oied_namelen,
3698                                    it->oie_dirent->oied_type,
3699                                    attr);
3700         RETURN(rc);
3701 }
3702
3703 /**
3704  * Returns a cookie for current position of the iterator head, so that
3705  * user can use this cookie to load/start the iterator next time.
3706  *
3707  * \param di iterator's in memory structure
3708  *
3709  * \retval cookie for current position, on success
3710  */
3711 static __u64 osd_it_ea_store(const struct lu_env *env, const struct dt_it *di)
3712 {
3713         struct osd_it_ea *it = (struct osd_it_ea *)di;
3714         ENTRY;
3715         RETURN(it->oie_dirent->oied_off);
3716 }
3717
3718 /**
3719  * It calls osd_ldiskfs_it_fill() which will use ->readdir()
3720  * to load a directory entry at a time and stored it i inn,
3721  * in iterator's in-memory data structure.
3722  *
3723  * \param di struct osd_it_ea, iterator's in memory structure
3724  *
3725  * \retval +ve on success
3726  * \retval -ve on error
3727  */
3728 static int osd_it_ea_load(const struct lu_env *env,
3729                           const struct dt_it *di, __u64 hash)
3730 {
3731         struct osd_it_ea *it = (struct osd_it_ea *)di;
3732         int rc;
3733
3734         ENTRY;
3735         it->oie_file.f_pos = hash;
3736
3737         rc =  osd_ldiskfs_it_fill(di);
3738         if (rc == 0)
3739                 rc = +1;
3740
3741         RETURN(rc);
3742 }
3743
3744 /**
3745  * Index lookup function for interoperability mode (b11826).
3746  *
3747  * \param key,  key i.e. file name to be searched
3748  *
3749  * \retval +ve, on success
3750  * \retval -ve, on error
3751  */
3752 static int osd_index_ea_lookup(const struct lu_env *env, struct dt_object *dt,
3753                                struct dt_rec *rec, const struct dt_key *key,
3754                                struct lustre_capa *capa)
3755 {
3756         struct osd_object *obj = osd_dt_obj(dt);
3757         int rc = 0;
3758
3759         ENTRY;
3760
3761         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
3762         LINVRNT(osd_invariant(obj));
3763
3764         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
3765                 return -EACCES;
3766
3767         rc = osd_ea_lookup_rec(env, obj, rec, key);
3768
3769         if (rc == 0)
3770                 rc = +1;
3771         RETURN(rc);
3772 }
3773
3774 /**
3775  * Index and Iterator operations for interoperability
3776  * mode (i.e. to run 2.0 mds on 1.8 disk) (b11826)
3777  */
3778 static const struct dt_index_operations osd_index_ea_ops = {
3779         .dio_lookup = osd_index_ea_lookup,
3780         .dio_insert = osd_index_ea_insert,
3781         .dio_delete = osd_index_ea_delete,
3782         .dio_it     = {
3783                 .init     = osd_it_ea_init,
3784                 .fini     = osd_it_ea_fini,
3785                 .get      = osd_it_ea_get,
3786                 .put      = osd_it_ea_put,
3787                 .next     = osd_it_ea_next,
3788                 .key      = osd_it_ea_key,
3789                 .key_size = osd_it_ea_key_size,
3790                 .rec      = osd_it_ea_rec,
3791                 .store    = osd_it_ea_store,
3792                 .load     = osd_it_ea_load
3793         }
3794 };
3795
3796 static void *osd_key_init(const struct lu_context *ctx,
3797                           struct lu_context_key *key)
3798 {
3799         struct osd_thread_info *info;
3800
3801         OBD_ALLOC_PTR(info);
3802         if (info != NULL) {
3803                 OBD_ALLOC(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
3804                 if (info->oti_it_ea_buf != NULL) {
3805                         info->oti_env = container_of(ctx, struct lu_env,
3806                                                      le_ctx);
3807                 } else {
3808                         OBD_FREE_PTR(info);
3809                         info = ERR_PTR(-ENOMEM);
3810                 }
3811         } else {
3812                 info = ERR_PTR(-ENOMEM);
3813         }
3814         return info;
3815 }
3816
3817 static void osd_key_fini(const struct lu_context *ctx,
3818                          struct lu_context_key *key, void* data)
3819 {
3820         struct osd_thread_info *info = data;
3821
3822         OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
3823         OBD_FREE_PTR(info);
3824 }
3825
3826 static void osd_key_exit(const struct lu_context *ctx,
3827                          struct lu_context_key *key, void *data)
3828 {
3829         struct osd_thread_info *info = data;
3830
3831         LASSERT(info->oti_r_locks == 0);
3832         LASSERT(info->oti_w_locks == 0);
3833         LASSERT(info->oti_txns    == 0);
3834 }
3835
3836 /* type constructor/destructor: osd_type_init, osd_type_fini */
3837 LU_TYPE_INIT_FINI(osd, &osd_key);
3838
3839 static struct lu_context_key osd_key = {
3840         .lct_tags = LCT_DT_THREAD | LCT_MD_THREAD,
3841         .lct_init = osd_key_init,
3842         .lct_fini = osd_key_fini,
3843         .lct_exit = osd_key_exit
3844 };
3845
3846
3847 static int osd_device_init(const struct lu_env *env, struct lu_device *d,
3848                            const char *name, struct lu_device *next)
3849 {
3850         int rc;
3851         struct lu_context *ctx;
3852
3853         /* context for commit hooks */
3854         ctx = &osd_dev(d)->od_env_for_commit.le_ctx;
3855         rc = lu_context_init(ctx, LCT_MD_THREAD|LCT_REMEMBER|LCT_NOREF);
3856         if (rc == 0) {
3857                 rc = osd_procfs_init(osd_dev(d), name);
3858                 ctx->lc_cookie = 0x3;
3859         }
3860         return rc;
3861 }
3862
3863 static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
3864 {
3865         struct osd_thread_info *info = osd_oti_get(env);
3866         ENTRY;
3867         if (o->od_obj_area != NULL) {
3868                 lu_object_put(env, &o->od_obj_area->do_lu);
3869                 o->od_obj_area = NULL;
3870         }
3871         osd_oi_fini(info, &o->od_oi);
3872
3873         RETURN(0);
3874 }
3875
3876 static int osd_mount(const struct lu_env *env,
3877                      struct osd_device *o, struct lustre_cfg *cfg)
3878 {
3879         struct lustre_mount_info *lmi;
3880         const char               *dev  = lustre_cfg_string(cfg, 0);
3881         struct lustre_disk_data  *ldd;
3882         struct lustre_sb_info    *lsi;
3883
3884         ENTRY;
3885         if (o->od_mount != NULL) {
3886                 CERROR("Already mounted (%s)\n", dev);
3887                 RETURN(-EEXIST);
3888         }
3889
3890         /* get mount */
3891         lmi = server_get_mount(dev);
3892         if (lmi == NULL) {
3893                 CERROR("Cannot get mount info for %s!\n", dev);
3894                 RETURN(-EFAULT);
3895         }
3896
3897         LASSERT(lmi != NULL);
3898         /* save lustre_mount_info in dt_device */
3899         o->od_mount = lmi;
3900
3901         lsi = s2lsi(lmi->lmi_sb);
3902         ldd = lsi->lsi_ldd;
3903
3904         if (ldd->ldd_flags & LDD_F_IAM_DIR) {
3905                 o->od_iop_mode = 0;
3906                 LCONSOLE_WARN("OSD: IAM mode enabled\n");
3907         } else
3908                 o->od_iop_mode = 1;
3909
3910         o->od_obj_area = NULL;
3911         RETURN(0);
3912 }
3913
3914 static struct lu_device *osd_device_fini(const struct lu_env *env,
3915                                          struct lu_device *d)
3916 {
3917         int rc;
3918         ENTRY;
3919
3920         shrink_dcache_sb(osd_sb(osd_dev(d)));
3921         osd_sync(env, lu2dt_dev(d));
3922
3923         rc = osd_procfs_fini(osd_dev(d));
3924         if (rc) {
3925                 CERROR("proc fini error %d \n", rc);
3926                 RETURN (ERR_PTR(rc));
3927         }
3928
3929         if (osd_dev(d)->od_mount)
3930                 server_put_mount(osd_dev(d)->od_mount->lmi_name,
3931                                  osd_dev(d)->od_mount->lmi_mnt);
3932         osd_dev(d)->od_mount = NULL;
3933
3934         lu_context_fini(&osd_dev(d)->od_env_for_commit.le_ctx);
3935         RETURN(NULL);
3936 }
3937
3938 static struct lu_device *osd_device_alloc(const struct lu_env *env,
3939                                           struct lu_device_type *t,
3940                                           struct lustre_cfg *cfg)
3941 {
3942         struct lu_device  *l;
3943         struct osd_device *o;
3944
3945         OBD_ALLOC_PTR(o);
3946         if (o != NULL) {
3947                 int result;
3948
3949                 result = dt_device_init(&o->od_dt_dev, t);
3950                 if (result == 0) {
3951                         l = osd2lu_dev(o);
3952                         l->ld_ops = &osd_lu_ops;
3953                         o->od_dt_dev.dd_ops = &osd_dt_ops;
3954                         cfs_spin_lock_init(&o->od_osfs_lock);
3955                         o->od_osfs_age = cfs_time_shift_64(-1000);
3956                         o->od_capa_hash = init_capa_hash();
3957                         if (o->od_capa_hash == NULL) {
3958                                 dt_device_fini(&o->od_dt_dev);
3959                                 l = ERR_PTR(-ENOMEM);
3960                         }
3961                 } else
3962                         l = ERR_PTR(result);
3963
3964                 if (IS_ERR(l))
3965                         OBD_FREE_PTR(o);
3966         } else
3967                 l = ERR_PTR(-ENOMEM);
3968         return l;
3969 }
3970
3971 static struct lu_device *osd_device_free(const struct lu_env *env,
3972                                          struct lu_device *d)
3973 {
3974         struct osd_device *o = osd_dev(d);
3975         ENTRY;
3976
3977         cleanup_capa_hash(o->od_capa_hash);
3978         dt_device_fini(&o->od_dt_dev);
3979         OBD_FREE_PTR(o);
3980         RETURN(NULL);
3981 }
3982
3983 static int osd_process_config(const struct lu_env *env,
3984                               struct lu_device *d, struct lustre_cfg *cfg)
3985 {
3986         struct osd_device *o = osd_dev(d);
3987         int err;
3988         ENTRY;
3989
3990         switch(cfg->lcfg_command) {
3991         case LCFG_SETUP:
3992                 err = osd_mount(env, o, cfg);
3993                 break;
3994         case LCFG_CLEANUP:
3995                 err = osd_shutdown(env, o);
3996                 break;
3997         default:
3998                 err = -ENOSYS;
3999         }
4000
4001         RETURN(err);
4002 }
4003
4004 static int osd_recovery_complete(const struct lu_env *env,
4005                                  struct lu_device *d)
4006 {
4007         RETURN(0);
4008 }
4009
4010 static int osd_prepare(const struct lu_env *env,
4011                        struct lu_device *pdev,
4012                        struct lu_device *dev)
4013 {
4014         struct osd_device *osd = osd_dev(dev);
4015         struct lustre_sb_info *lsi;
4016         struct lustre_disk_data *ldd;
4017         struct lustre_mount_info  *lmi;
4018         struct osd_thread_info *oti = osd_oti_get(env);
4019         struct dt_object *d;
4020         int result;
4021
4022         ENTRY;
4023         /* 1. initialize oi before any file create or file open */
4024         result = osd_oi_init(oti, &osd->od_oi,
4025                              &osd->od_dt_dev, lu2md_dev(pdev));
4026         if (result != 0)
4027                 RETURN(result);
4028
4029         lmi = osd->od_mount;
4030         lsi = s2lsi(lmi->lmi_sb);
4031         ldd = lsi->lsi_ldd;
4032
4033         /* 2. setup local objects */
4034         result = llo_local_objects_setup(env, lu2md_dev(pdev), lu2dt_dev(dev));
4035         if (result)
4036                 goto out;
4037
4038         /* 3. open remote object dir */
4039         d = dt_store_open(env, lu2dt_dev(dev), "",
4040                           remote_obj_dir, &oti->oti_fid);
4041         if (!IS_ERR(d)) {
4042                 osd->od_obj_area = d;
4043                 result = 0;
4044         } else {
4045                 result = PTR_ERR(d);
4046                 osd->od_obj_area = NULL;
4047         }
4048
4049 out:
4050         RETURN(result);
4051 }
4052
4053 static const struct lu_object_operations osd_lu_obj_ops = {
4054         .loo_object_init      = osd_object_init,
4055         .loo_object_delete    = osd_object_delete,
4056         .loo_object_release   = osd_object_release,
4057         .loo_object_free      = osd_object_free,
4058         .loo_object_print     = osd_object_print,
4059         .loo_object_invariant = osd_object_invariant
4060 };
4061
4062 static const struct lu_device_operations osd_lu_ops = {
4063         .ldo_object_alloc      = osd_object_alloc,
4064         .ldo_process_config    = osd_process_config,
4065         .ldo_recovery_complete = osd_recovery_complete,
4066         .ldo_prepare           = osd_prepare,
4067 };
4068
4069 static const struct lu_device_type_operations osd_device_type_ops = {
4070         .ldto_init = osd_type_init,
4071         .ldto_fini = osd_type_fini,
4072
4073         .ldto_start = osd_type_start,
4074         .ldto_stop  = osd_type_stop,
4075
4076         .ldto_device_alloc = osd_device_alloc,
4077         .ldto_device_free  = osd_device_free,
4078
4079         .ldto_device_init    = osd_device_init,
4080         .ldto_device_fini    = osd_device_fini
4081 };
4082
4083 static struct lu_device_type osd_device_type = {
4084         .ldt_tags     = LU_DEVICE_DT,
4085         .ldt_name     = LUSTRE_OSD_NAME,
4086         .ldt_ops      = &osd_device_type_ops,
4087         .ldt_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
4088 };
4089
4090 /*
4091  * lprocfs legacy support.
4092  */
4093 static struct obd_ops osd_obd_device_ops = {
4094         .o_owner = THIS_MODULE
4095 };
4096
4097 static struct lu_local_obj_desc llod_osd_rem_obj_dir = {
4098         .llod_name      = remote_obj_dir,
4099         .llod_oid       = OSD_REM_OBJ_DIR_OID,
4100         .llod_is_index  = 1,
4101         .llod_feat      = &dt_directory_features,
4102 };
4103
4104 static int __init osd_mod_init(void)
4105 {
4106         struct lprocfs_static_vars lvars;
4107
4108         osd_oi_mod_init();
4109         llo_local_obj_register(&llod_osd_rem_obj_dir);
4110         lprocfs_osd_init_vars(&lvars);
4111         return class_register_type(&osd_obd_device_ops, NULL, lvars.module_vars,
4112                                    LUSTRE_OSD_NAME, &osd_device_type);
4113 }
4114
4115 static void __exit osd_mod_exit(void)
4116 {
4117         llo_local_obj_unregister(&llod_osd_rem_obj_dir);
4118         class_unregister_type(LUSTRE_OSD_NAME);
4119 }
4120
4121 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4122 MODULE_DESCRIPTION("Lustre Object Storage Device ("LUSTRE_OSD_NAME")");
4123 MODULE_LICENSE("GPL");
4124
4125 cfs_module(osd, "0.0.2", osd_mod_init, osd_mod_exit);