Whamcloud - gitweb
3645cdc8292605f7c1da00df678a0f970ac2bdf4
[fs/lustre-release.git] / lustre / osd-ldiskfs / osd_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/osd/osd_handler.c
37  *
38  * Top-level entry points into osd module
39  *
40  * Author: Nikita Danilov <nikita@clusterfs.com>
41  *         Pravin Shelar <pravin.shelar@sun.com> : Added fid in dirent
42  */
43
44 #ifndef EXPORT_SYMTAB
45 # define EXPORT_SYMTAB
46 #endif
47 #define DEBUG_SUBSYSTEM S_MDS
48
49 #include <linux/module.h>
50
51 /* LUSTRE_VERSION_CODE */
52 #include <lustre_ver.h>
53 /* prerequisite for linux/xattr.h */
54 #include <linux/types.h>
55 /* prerequisite for linux/xattr.h */
56 #include <linux/fs.h>
57 /* XATTR_{REPLACE,CREATE} */
58 #include <linux/xattr.h>
59 /* simple_mkdir() */
60 #include <lvfs.h>
61
62 /*
63  * struct OBD_{ALLOC,FREE}*()
64  * OBD_FAIL_CHECK
65  */
66 #include <obd_support.h>
67 /* struct ptlrpc_thread */
68 #include <lustre_net.h>
69
70 /* fid_is_local() */
71 #include <lustre_fid.h>
72
73 #include "osd_internal.h"
74 #include "osd_igif.h"
75
76 /* llo_* api support */
77 #include <md_object.h>
78
79 static const char dot[] = ".";
80 static const char dotdot[] = "..";
81 static const char remote_obj_dir[] = "REM_OBJ_DIR";
82
83 struct osd_directory {
84         struct iam_container od_container;
85         struct iam_descr     od_descr;
86 };
87
88 struct osd_object {
89         struct dt_object       oo_dt;
90         /**
91          * Inode for file system object represented by this osd_object. This
92          * inode is pinned for the whole duration of lu_object life.
93          *
94          * Not modified concurrently (either setup early during object
95          * creation, or assigned by osd_object_create() under write lock).
96          */
97         struct inode          *oo_inode;
98         /**
99          * to protect index ops.
100          */
101         cfs_rw_semaphore_t     oo_ext_idx_sem;
102         cfs_rw_semaphore_t     oo_sem;
103         struct osd_directory  *oo_dir;
104         /** protects inode attributes. */
105         cfs_spinlock_t         oo_guard;
106         /**
107          * Following two members are used to indicate the presence of dot and
108          * dotdot in the given directory. This is required for interop mode
109          * (b11826).
110          */
111         int                    oo_compat_dot_created;
112         int                    oo_compat_dotdot_created;
113
114         const struct lu_env   *oo_owner;
115 #ifdef CONFIG_LOCKDEP
116         struct lockdep_map     oo_dep_map;
117 #endif
118 };
119
120 static const struct lu_object_operations      osd_lu_obj_ops;
121 static const struct lu_device_operations      osd_lu_ops;
122 static       struct lu_context_key            osd_key;
123 static const struct dt_object_operations      osd_obj_ops;
124 static const struct dt_object_operations      osd_obj_ea_ops;
125 static const struct dt_body_operations        osd_body_ops;
126 static const struct dt_index_operations       osd_index_iam_ops;
127 static const struct dt_index_operations       osd_index_ea_ops;
128
129 struct osd_thandle {
130         struct thandle          ot_super;
131         handle_t               *ot_handle;
132         struct journal_callback ot_jcb;
133         /* Link to the device, for debugging. */
134         struct lu_ref_link     *ot_dev_link;
135
136 #if OSD_THANDLE_STATS
137         /** time when this handle was allocated */
138         cfs_time_t oth_alloced;
139
140         /** time when this thanle was started */
141         cfs_time_t oth_started;
142 #endif
143 };
144
145 /*
146  * Helpers.
147  */
148 static int lu_device_is_osd(const struct lu_device *d)
149 {
150         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &osd_lu_ops);
151 }
152
153 static struct osd_device *osd_dt_dev(const struct dt_device *d)
154 {
155         LASSERT(lu_device_is_osd(&d->dd_lu_dev));
156         return container_of0(d, struct osd_device, od_dt_dev);
157 }
158
159 static struct osd_device *osd_dev(const struct lu_device *d)
160 {
161         LASSERT(lu_device_is_osd(d));
162         return osd_dt_dev(container_of0(d, struct dt_device, dd_lu_dev));
163 }
164
165 static struct osd_device *osd_obj2dev(const struct osd_object *o)
166 {
167         return osd_dev(o->oo_dt.do_lu.lo_dev);
168 }
169
170 static struct super_block *osd_sb(const struct osd_device *dev)
171 {
172         return dev->od_mount->lmi_mnt->mnt_sb;
173 }
174
175 static int osd_object_is_root(const struct osd_object *obj)
176 {
177         return osd_sb(osd_obj2dev(obj))->s_root->d_inode == obj->oo_inode;
178 }
179
180 static struct osd_object *osd_obj(const struct lu_object *o)
181 {
182         LASSERT(lu_device_is_osd(o->lo_dev));
183         return container_of0(o, struct osd_object, oo_dt.do_lu);
184 }
185
186 static struct osd_object *osd_dt_obj(const struct dt_object *d)
187 {
188         return osd_obj(&d->do_lu);
189 }
190
191 static struct lu_device *osd2lu_dev(struct osd_device *osd)
192 {
193         return &osd->od_dt_dev.dd_lu_dev;
194 }
195
196 static journal_t *osd_journal(const struct osd_device *dev)
197 {
198         return LDISKFS_SB(osd_sb(dev))->s_journal;
199 }
200
201 static int osd_has_index(const struct osd_object *obj)
202 {
203         return obj->oo_dt.do_index_ops != NULL;
204 }
205
206 static int osd_object_invariant(const struct lu_object *l)
207 {
208         return osd_invariant(osd_obj(l));
209 }
210
211 #ifdef HAVE_QUOTA_SUPPORT
212 static inline void
213 osd_push_ctxt(const struct lu_env *env, struct osd_ctxt *save)
214 {
215         struct md_ucred    *uc = md_ucred(env);
216         struct cred        *tc;
217
218         LASSERT(uc != NULL);
219
220         save->oc_uid = current_fsuid();
221         save->oc_gid = current_fsgid();
222         save->oc_cap = current_cap();
223         if ((tc = prepare_creds())) {
224                 tc->fsuid         = uc->mu_fsuid;
225                 tc->fsgid         = uc->mu_fsgid;
226                 tc->cap_effective = uc->mu_cap;
227                 commit_creds(tc);
228         }
229 }
230
231 static inline void
232 osd_pop_ctxt(struct osd_ctxt *save)
233 {
234         struct cred *tc;
235
236         if ((tc = prepare_creds())) {
237                 tc->fsuid         = save->oc_uid;
238                 tc->fsgid         = save->oc_gid;
239                 tc->cap_effective = save->oc_cap;
240                 commit_creds(tc);
241         }
242 }
243 #endif
244
245 static inline struct osd_thread_info *osd_oti_get(const struct lu_env *env)
246 {
247         return lu_context_key_get(&env->le_ctx, &osd_key);
248 }
249
250 /*
251  * Concurrency: doesn't matter
252  */
253 static int osd_read_locked(const struct lu_env *env, struct osd_object *o)
254 {
255         return osd_oti_get(env)->oti_r_locks > 0;
256 }
257
258 /*
259  * Concurrency: doesn't matter
260  */
261 static int osd_write_locked(const struct lu_env *env, struct osd_object *o)
262 {
263         struct osd_thread_info *oti = osd_oti_get(env);
264         return oti->oti_w_locks > 0 && o->oo_owner == env;
265 }
266
267 /*
268  * Concurrency: doesn't access mutable data
269  */
270 static int osd_root_get(const struct lu_env *env,
271                         struct dt_device *dev, struct lu_fid *f)
272 {
273         struct inode *inode;
274
275         inode = osd_sb(osd_dt_dev(dev))->s_root->d_inode;
276         LU_IGIF_BUILD(f, inode->i_ino, inode->i_generation);
277         return 0;
278 }
279
280 /*
281  * OSD object methods.
282  */
283
284 /*
285  * Concurrency: no concurrent access is possible that early in object
286  * life-cycle.
287  */
288 static struct lu_object *osd_object_alloc(const struct lu_env *env,
289                                           const struct lu_object_header *hdr,
290                                           struct lu_device *d)
291 {
292         struct osd_object *mo;
293
294         OBD_ALLOC_PTR(mo);
295         if (mo != NULL) {
296                 struct lu_object *l;
297
298                 l = &mo->oo_dt.do_lu;
299                 dt_object_init(&mo->oo_dt, NULL, d);
300                 if (osd_dev(d)->od_iop_mode)
301                         mo->oo_dt.do_ops = &osd_obj_ea_ops;
302                 else
303                         mo->oo_dt.do_ops = &osd_obj_ops;
304
305                 l->lo_ops = &osd_lu_obj_ops;
306                 cfs_init_rwsem(&mo->oo_sem);
307                 cfs_init_rwsem(&mo->oo_ext_idx_sem);
308                 cfs_spin_lock_init(&mo->oo_guard);
309                 return l;
310         } else
311                 return NULL;
312 }
313
314 /*
315  * retrieve object from backend ext fs.
316  **/
317 static struct inode *osd_iget(struct osd_thread_info *info,
318                               struct osd_device *dev,
319                               const struct osd_inode_id *id)
320 {
321         struct inode *inode = NULL;
322
323 #ifdef HAVE_EXT4_LDISKFS
324         inode = ldiskfs_iget(osd_sb(dev), id->oii_ino);
325         if (IS_ERR(inode))
326         /* Newer kernels return an error instead of a NULL pointer */
327                 inode = NULL;
328 #else
329         inode = iget(osd_sb(dev), id->oii_ino);
330 #endif
331         if (inode == NULL) {
332                 CERROR("no inode\n");
333                 inode = ERR_PTR(-EACCES);
334         } else if (id->oii_gen != OSD_OII_NOGEN &&
335                    inode->i_generation != id->oii_gen) {
336                 iput(inode);
337                 inode = ERR_PTR(-ESTALE);
338         } else if (inode->i_nlink == 0) {
339                 /* due to parallel readdir and unlink,
340                 * we can have dead inode here. */
341                 CWARN("stale inode\n");
342                 make_bad_inode(inode);
343                 iput(inode);
344                 inode = ERR_PTR(-ESTALE);
345         } else if (is_bad_inode(inode)) {
346                 CERROR("bad inode %lx\n",inode->i_ino);
347                 iput(inode);
348                 inode = ERR_PTR(-ENOENT);
349         }
350         return inode;
351 }
352
353 static int osd_fid_lookup(const struct lu_env *env,
354                           struct osd_object *obj, const struct lu_fid *fid)
355 {
356         struct osd_thread_info *info;
357         struct lu_device       *ldev = obj->oo_dt.do_lu.lo_dev;
358         struct osd_device      *dev;
359         struct osd_inode_id    *id;
360         struct osd_oi          *oi;
361         struct inode           *inode;
362         int                     result;
363
364         LINVRNT(osd_invariant(obj));
365         LASSERT(obj->oo_inode == NULL);
366         LASSERT(fid_is_sane(fid) || osd_fid_is_root(fid));
367         /*
368          * This assertion checks that osd layer sees only local
369          * fids. Unfortunately it is somewhat expensive (does a
370          * cache-lookup). Disabling it for production/acceptance-testing.
371          */
372         LASSERT(1 || fid_is_local(env, ldev->ld_site, fid));
373
374         ENTRY;
375
376         info = osd_oti_get(env);
377         dev  = osd_dev(ldev);
378         id   = &info->oti_id;
379         oi   = &dev->od_oi;
380
381         if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT))
382                 RETURN(-ENOENT);
383
384         result = osd_oi_lookup(info, oi, fid, id);
385         if (result == 0) {
386                 inode = osd_iget(info, dev, id);
387                 if (!IS_ERR(inode)) {
388                         obj->oo_inode = inode;
389                         LASSERT(obj->oo_inode->i_sb == osd_sb(dev));
390                         if (dev->od_iop_mode) {
391                                 obj->oo_compat_dot_created = 1;
392                                 obj->oo_compat_dotdot_created = 1;
393                         }
394                         result = 0;
395                 } else
396                         /*
397                          * If fid wasn't found in oi, inode-less object is
398                          * created, for which lu_object_exists() returns
399                          * false. This is used in a (frequent) case when
400                          * objects are created as locking anchors or
401                          * place holders for objects yet to be created.
402                          */
403                         result = PTR_ERR(inode);
404         } else if (result == -ENOENT)
405                 result = 0;
406         LINVRNT(osd_invariant(obj));
407
408         RETURN(result);
409 }
410
411 /*
412  * Concurrency: shouldn't matter.
413  */
414 static void osd_object_init0(struct osd_object *obj)
415 {
416         LASSERT(obj->oo_inode != NULL);
417         obj->oo_dt.do_body_ops = &osd_body_ops;
418         obj->oo_dt.do_lu.lo_header->loh_attr |=
419                 (LOHA_EXISTS | (obj->oo_inode->i_mode & S_IFMT));
420 }
421
422 /*
423  * Concurrency: no concurrent access is possible that early in object
424  * life-cycle.
425  */
426 static int osd_object_init(const struct lu_env *env, struct lu_object *l,
427                            const struct lu_object_conf *unused)
428 {
429         struct osd_object *obj = osd_obj(l);
430         int result;
431
432         LINVRNT(osd_invariant(obj));
433
434         result = osd_fid_lookup(env, obj, lu_object_fid(l));
435         if (result == 0) {
436                 if (obj->oo_inode != NULL)
437                         osd_object_init0(obj);
438         }
439         LINVRNT(osd_invariant(obj));
440         return result;
441 }
442
443 /*
444  * Concurrency: no concurrent access is possible that late in object
445  * life-cycle.
446  */
447 static void osd_object_free(const struct lu_env *env, struct lu_object *l)
448 {
449         struct osd_object *obj = osd_obj(l);
450
451         LINVRNT(osd_invariant(obj));
452
453         dt_object_fini(&obj->oo_dt);
454         OBD_FREE_PTR(obj);
455 }
456
457 /**
458  * IAM Iterator
459  */
460 static struct iam_path_descr *osd_it_ipd_get(const struct lu_env *env,
461                                              const struct iam_container *bag)
462 {
463         return bag->ic_descr->id_ops->id_ipd_alloc(bag,
464                                            osd_oti_get(env)->oti_it_ipd);
465 }
466
467 static struct iam_path_descr *osd_idx_ipd_get(const struct lu_env *env,
468                                               const struct iam_container *bag)
469 {
470         return bag->ic_descr->id_ops->id_ipd_alloc(bag,
471                                            osd_oti_get(env)->oti_idx_ipd);
472 }
473
474 static void osd_ipd_put(const struct lu_env *env,
475                         const struct iam_container *bag,
476                         struct iam_path_descr *ipd)
477 {
478         bag->ic_descr->id_ops->id_ipd_free(ipd);
479 }
480
481 /*
482  * Concurrency: no concurrent access is possible that late in object
483  * life-cycle.
484  */
485 static void osd_index_fini(struct osd_object *o)
486 {
487         struct iam_container *bag;
488
489         if (o->oo_dir != NULL) {
490                 bag = &o->oo_dir->od_container;
491                 if (o->oo_inode != NULL) {
492                         if (bag->ic_object == o->oo_inode)
493                                 iam_container_fini(bag);
494                 }
495                 OBD_FREE_PTR(o->oo_dir);
496                 o->oo_dir = NULL;
497         }
498 }
499
500 /*
501  * Concurrency: no concurrent access is possible that late in object
502  * life-cycle (for all existing callers, that is. New callers have to provide
503  * their own locking.)
504  */
505 static int osd_inode_unlinked(const struct inode *inode)
506 {
507         return inode->i_nlink == 0;
508 }
509
510 enum {
511         OSD_TXN_OI_DELETE_CREDITS    = 20,
512         OSD_TXN_INODE_DELETE_CREDITS = 20
513 };
514
515 /*
516  * Journal
517  */
518
519 #if OSD_THANDLE_STATS
520 /**
521  * Set time when the handle is allocated
522  */
523 static void osd_th_alloced(struct osd_thandle *oth)
524 {
525         oth->oth_alloced = cfs_time_current();
526 }
527
528 /**
529  * Set time when the handle started
530  */
531 static void osd_th_started(struct osd_thandle *oth)
532 {
533         oth->oth_started = cfs_time_current();
534 }
535
536 /**
537  * Helper function to convert time interval to microseconds packed in
538  * long int (default time units for the counter in "stats" initialized
539  * by lu_time_init() )
540  */
541 static long interval_to_usec(cfs_time_t start, cfs_time_t end)
542 {
543         struct timeval val;
544
545         cfs_duration_usec(cfs_time_sub(end, start), &val);
546         return val.tv_sec * 1000000 + val.tv_usec;
547 }
548
549 /**
550  * Check whether the we deal with this handle for too long.
551  */
552 static void __osd_th_check_slow(void *oth, struct osd_device *dev,
553                                 cfs_time_t alloced, cfs_time_t started,
554                                 cfs_time_t closed)
555 {
556         cfs_time_t now = cfs_time_current();
557
558         LASSERT(dev != NULL);
559
560         lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_STARTING,
561                             interval_to_usec(alloced, started));
562         lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_OPEN,
563                             interval_to_usec(started, closed));
564         lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_CLOSING,
565                             interval_to_usec(closed, now));
566
567         if (cfs_time_before(cfs_time_add(alloced, cfs_time_seconds(30)), now)) {
568                 CWARN("transaction handle %p was open for too long: "
569                       "now "CFS_TIME_T" ,"
570                       "alloced "CFS_TIME_T" ,"
571                       "started "CFS_TIME_T" ,"
572                       "closed "CFS_TIME_T"\n",
573                       oth, now, alloced, started, closed);
574                 libcfs_debug_dumpstack(NULL);
575         }
576 }
577
578 #define OSD_CHECK_SLOW_TH(oth, dev, expr)                               \
579 {                                                                       \
580         cfs_time_t __closed = cfs_time_current();                       \
581         cfs_time_t __alloced = oth->oth_alloced;                        \
582         cfs_time_t __started = oth->oth_started;                        \
583                                                                         \
584         expr;                                                           \
585         __osd_th_check_slow(oth, dev, __alloced, __started, __closed);  \
586 }
587
588 #else /* OSD_THANDLE_STATS */
589
590 #define osd_th_alloced(h)                  do {} while(0)
591 #define osd_th_started(h)                  do {} while(0)
592 #define OSD_CHECK_SLOW_TH(oth, dev, expr)  expr
593
594 #endif /* OSD_THANDLE_STATS */
595
596 /*
597  * Concurrency: doesn't access mutable data.
598  */
599 static int osd_param_is_sane(const struct osd_device *dev,
600                              const struct txn_param *param)
601 {
602         return param->tp_credits <= osd_journal(dev)->j_max_transaction_buffers;
603 }
604
605 /*
606  * Concurrency: shouldn't matter.
607  */
608 static void osd_trans_commit_cb(struct journal_callback *jcb, int error)
609 {
610         struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb);
611         struct thandle     *th  = &oh->ot_super;
612         struct dt_device   *dev = th->th_dev;
613         struct lu_device   *lud = &dev->dd_lu_dev;
614
615         LASSERT(dev != NULL);
616         LASSERT(oh->ot_handle == NULL);
617
618         if (error) {
619                 CERROR("transaction @0x%p commit error: %d\n", th, error);
620         } else {
621                 struct lu_env *env = &osd_dt_dev(dev)->od_env_for_commit;
622                 /*
623                  * This od_env_for_commit is only for commit usage.  see
624                  * "struct dt_device"
625                  */
626                 lu_context_enter(&env->le_ctx);
627                 dt_txn_hook_commit(env, th);
628                 lu_context_exit(&env->le_ctx);
629         }
630
631         lu_ref_del_at(&lud->ld_reference, oh->ot_dev_link, "osd-tx", th);
632         lu_device_put(lud);
633         th->th_dev = NULL;
634
635         lu_context_exit(&th->th_ctx);
636         lu_context_fini(&th->th_ctx);
637         OBD_FREE_PTR(oh);
638 }
639
640 /*
641  * Concurrency: shouldn't matter.
642  */
643 static struct thandle *osd_trans_start(const struct lu_env *env,
644                                        struct dt_device *d,
645                                        struct txn_param *p)
646 {
647         struct osd_device  *dev = osd_dt_dev(d);
648         handle_t           *jh;
649         struct osd_thandle *oh;
650         struct thandle     *th;
651         int hook_res;
652
653         ENTRY;
654
655         hook_res = dt_txn_hook_start(env, d, p);
656         if (hook_res != 0)
657                 RETURN(ERR_PTR(hook_res));
658
659         if (osd_param_is_sane(dev, p)) {
660                 OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO);
661                 if (oh != NULL) {
662                         struct osd_thread_info *oti = osd_oti_get(env);
663
664                         /*
665                          * XXX temporary stuff. Some abstraction layer should
666                          * be used.
667                          */
668                         oti->oti_dev = dev;
669                         osd_th_alloced(oh);
670                         jh = ldiskfs_journal_start_sb(osd_sb(dev), p->tp_credits);
671                         osd_th_started(oh);
672                         if (!IS_ERR(jh)) {
673                                 oh->ot_handle = jh;
674                                 th = &oh->ot_super;
675                                 th->th_dev = d;
676                                 th->th_result = 0;
677                                 jh->h_sync = p->tp_sync;
678                                 lu_device_get(&d->dd_lu_dev);
679                                 oh->ot_dev_link = lu_ref_add
680                                         (&d->dd_lu_dev.ld_reference,
681                                          "osd-tx", th);
682                                 /* add commit callback */
683                                 lu_context_init(&th->th_ctx, LCT_TX_HANDLE);
684                                 lu_context_enter(&th->th_ctx);
685                                 osd_journal_callback_set(jh, osd_trans_commit_cb,
686                                                          (struct journal_callback *)&oh->ot_jcb);
687                                         LASSERT(oti->oti_txns == 0);
688                                         LASSERT(oti->oti_r_locks == 0);
689                                         LASSERT(oti->oti_w_locks == 0);
690                                         oti->oti_txns++;
691                         } else {
692                                 OBD_FREE_PTR(oh);
693                                 th = (void *)jh;
694                         }
695                 } else
696                         th = ERR_PTR(-ENOMEM);
697         } else {
698                 CERROR("Invalid transaction parameters\n");
699                 th = ERR_PTR(-EINVAL);
700         }
701
702         RETURN(th);
703 }
704
705 /*
706  * Concurrency: shouldn't matter.
707  */
708 static void osd_trans_stop(const struct lu_env *env, struct thandle *th)
709 {
710         int result;
711         struct osd_thandle *oh;
712         struct osd_thread_info *oti = osd_oti_get(env);
713
714         ENTRY;
715
716         oh = container_of0(th, struct osd_thandle, ot_super);
717         if (oh->ot_handle != NULL) {
718                 handle_t *hdl = oh->ot_handle;
719
720                 LASSERT(oti->oti_txns == 1);
721                 oti->oti_txns--;
722                 LASSERT(oti->oti_r_locks == 0);
723                 LASSERT(oti->oti_w_locks == 0);
724                 result = dt_txn_hook_stop(env, th);
725                 if (result != 0)
726                         CERROR("Failure in transaction hook: %d\n", result);
727                 oh->ot_handle = NULL;
728                 OSD_CHECK_SLOW_TH(oh, oti->oti_dev,
729                                   result = ldiskfs_journal_stop(hdl));
730                 if (result != 0)
731                         CERROR("Failure to stop transaction: %d\n", result);
732         }
733         EXIT;
734 }
735
736 /*
737  * Concurrency: no concurrent access is possible that late in object
738  * life-cycle.
739  */
740 static int osd_inode_remove(const struct lu_env *env, struct osd_object *obj)
741 {
742         const struct lu_fid    *fid = lu_object_fid(&obj->oo_dt.do_lu);
743         struct osd_device      *osd = osd_obj2dev(obj);
744         struct osd_thread_info *oti = osd_oti_get(env);
745         struct txn_param       *prm = &oti->oti_txn;
746         struct lu_env          *env_del_obj = &oti->oti_obj_delete_tx_env;
747         struct thandle         *th;
748         int result;
749
750         lu_env_init(env_del_obj, LCT_DT_THREAD);
751         txn_param_init(prm, OSD_TXN_OI_DELETE_CREDITS +
752                             OSD_TXN_INODE_DELETE_CREDITS);
753         th = osd_trans_start(env_del_obj, &osd->od_dt_dev, prm);
754         if (!IS_ERR(th)) {
755                 result = osd_oi_delete(osd_oti_get(env_del_obj),
756                                        &osd->od_oi, fid, th);
757                 osd_trans_stop(env_del_obj, th);
758         } else
759                 result = PTR_ERR(th);
760
761         lu_env_fini(env_del_obj);
762         return result;
763 }
764
765 /*
766  * Called just before object is freed. Releases all resources except for
767  * object itself (that is released by osd_object_free()).
768  *
769  * Concurrency: no concurrent access is possible that late in object
770  * life-cycle.
771  */
772 static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
773 {
774         struct osd_object *obj   = osd_obj(l);
775         struct inode      *inode = obj->oo_inode;
776
777         LINVRNT(osd_invariant(obj));
778
779         /*
780          * If object is unlinked remove fid->ino mapping from object index.
781          */
782
783         osd_index_fini(obj);
784         if (inode != NULL) {
785                 int result;
786
787                 if (osd_inode_unlinked(inode)) {
788                         result = osd_inode_remove(env, obj);
789                         if (result != 0)
790                                 LU_OBJECT_DEBUG(D_ERROR, env, l,
791                                                 "Failed to cleanup: %d\n",
792                                                 result);
793                 }
794
795                 iput(inode);
796                 obj->oo_inode = NULL;
797         }
798 }
799
800 /*
801  * Concurrency: ->loo_object_release() is called under site spin-lock.
802  */
803 static void osd_object_release(const struct lu_env *env,
804                                struct lu_object *l)
805 {
806         struct osd_object *o = osd_obj(l);
807
808         LASSERT(!lu_object_is_dying(l->lo_header));
809         if (o->oo_inode != NULL && osd_inode_unlinked(o->oo_inode))
810                 cfs_set_bit(LU_OBJECT_HEARD_BANSHEE, &l->lo_header->loh_flags);
811 }
812
813 /*
814  * Concurrency: shouldn't matter.
815  */
816 static int osd_object_print(const struct lu_env *env, void *cookie,
817                             lu_printer_t p, const struct lu_object *l)
818 {
819         struct osd_object *o = osd_obj(l);
820         struct iam_descr  *d;
821
822         if (o->oo_dir != NULL)
823                 d = o->oo_dir->od_container.ic_descr;
824         else
825                 d = NULL;
826         return (*p)(env, cookie, LUSTRE_OSD_NAME"-object@%p(i:%p:%lu/%u)[%s]",
827                     o, o->oo_inode,
828                     o->oo_inode ? o->oo_inode->i_ino : 0UL,
829                     o->oo_inode ? o->oo_inode->i_generation : 0,
830                     d ? d->id_ops->id_name : "plain");
831 }
832
833 /*
834  * Concurrency: shouldn't matter.
835  */
836 int osd_statfs(const struct lu_env *env, struct dt_device *d,
837                cfs_kstatfs_t *sfs)
838 {
839         struct osd_device *osd = osd_dt_dev(d);
840         struct super_block *sb = osd_sb(osd);
841         int result = 0;
842
843         cfs_spin_lock(&osd->od_osfs_lock);
844         /* cache 1 second */
845         if (cfs_time_before_64(osd->od_osfs_age, cfs_time_shift_64(-1))) {
846                 result = ll_do_statfs(sb, &osd->od_kstatfs);
847                 if (likely(result == 0)) /* N.B. statfs can't really fail */
848                         osd->od_osfs_age = cfs_time_current_64();
849         }
850
851         if (likely(result == 0))
852                 *sfs = osd->od_kstatfs;
853         cfs_spin_unlock(&osd->od_osfs_lock);
854
855         return result;
856 }
857
858 /*
859  * Concurrency: doesn't access mutable data.
860  */
861 static void osd_conf_get(const struct lu_env *env,
862                          const struct dt_device *dev,
863                          struct dt_device_param *param)
864 {
865         /*
866          * XXX should be taken from not-yet-existing fs abstraction layer.
867          */
868         param->ddp_max_name_len  = LDISKFS_NAME_LEN;
869         param->ddp_max_nlink     = LDISKFS_LINK_MAX;
870         param->ddp_block_shift   = osd_sb(osd_dt_dev(dev))->s_blocksize_bits;
871 }
872
873 /**
874  * Helper function to get and fill the buffer with input values.
875  */
876 static struct lu_buf *osd_buf_get(const struct lu_env *env, void *area, ssize_t len)
877 {
878         struct lu_buf *buf;
879
880         buf = &osd_oti_get(env)->oti_buf;
881         buf->lb_buf = area;
882         buf->lb_len = len;
883         return buf;
884 }
885
886 /*
887  * Concurrency: shouldn't matter.
888  */
889 static int osd_sync(const struct lu_env *env, struct dt_device *d)
890 {
891         CDEBUG(D_HA, "syncing OSD %s\n", LUSTRE_OSD_NAME);
892         return ldiskfs_force_commit(osd_sb(osd_dt_dev(d)));
893 }
894
895 /**
896  * Start commit for OSD device.
897  *
898  * An implementation of dt_commit_async method for OSD device.
899  * Asychronously starts underlayng fs sync and thereby a transaction
900  * commit.
901  *
902  * \param env environment
903  * \param d dt device
904  *
905  * \see dt_device_operations
906  */
907 static int osd_commit_async(const struct lu_env *env,
908                             struct dt_device *d)
909 {
910         struct super_block *s = osd_sb(osd_dt_dev(d));
911         ENTRY;
912
913         CDEBUG(D_HA, "async commit OSD %s\n", LUSTRE_OSD_NAME);
914         RETURN(s->s_op->sync_fs(s, 0));
915 }
916
917 /*
918  * Concurrency: shouldn't matter.
919  */
920 lvfs_sbdev_type fsfilt_ldiskfs_journal_sbdev(struct super_block *);
921
922 static void osd_ro(const struct lu_env *env, struct dt_device *d)
923 {
924         ENTRY;
925
926         CERROR("*** setting device %s read-only ***\n", LUSTRE_OSD_NAME);
927
928         __lvfs_set_rdonly(lvfs_sbdev(osd_sb(osd_dt_dev(d))),
929                           fsfilt_ldiskfs_journal_sbdev(osd_sb(osd_dt_dev(d))));
930         EXIT;
931 }
932
933
934 /*
935  * Concurrency: serialization provided by callers.
936  */
937 static int osd_init_capa_ctxt(const struct lu_env *env, struct dt_device *d,
938                               int mode, unsigned long timeout, __u32 alg,
939                               struct lustre_capa_key *keys)
940 {
941         struct osd_device *dev = osd_dt_dev(d);
942         ENTRY;
943
944         dev->od_fl_capa = mode;
945         dev->od_capa_timeout = timeout;
946         dev->od_capa_alg = alg;
947         dev->od_capa_keys = keys;
948         RETURN(0);
949 }
950
951 /**
952  * Concurrency: serialization provided by callers.
953  */
954 static void osd_init_quota_ctxt(const struct lu_env *env, struct dt_device *d,
955                                struct dt_quota_ctxt *ctxt, void *data)
956 {
957         struct obd_device *obd = (void *)ctxt;
958         struct vfsmount *mnt = (struct vfsmount *)data;
959         ENTRY;
960
961         obd->u.obt.obt_sb = mnt->mnt_root->d_inode->i_sb;
962         OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
963         obd->obd_lvfs_ctxt.pwdmnt = mnt;
964         obd->obd_lvfs_ctxt.pwd = mnt->mnt_root;
965         obd->obd_lvfs_ctxt.fs = get_ds();
966
967         EXIT;
968 }
969
970 /**
971  * Note: we do not count into QUOTA here.
972  * If we mount with --data_journal we may need more.
973  */
974 static const int osd_dto_credits_noquota[DTO_NR] = {
975         /**
976          * Insert/Delete.
977          * INDEX_EXTRA_TRANS_BLOCKS(8) +
978          * SINGLEDATA_TRANS_BLOCKS(8)
979          * XXX Note: maybe iam need more, since iam have more level than
980          *           EXT3 htree.
981          */
982         [DTO_INDEX_INSERT]  = 16,
983         [DTO_INDEX_DELETE]  = 16,
984         /**
985          * Unused now
986          */
987         [DTO_IDNEX_UPDATE]  = 16,
988         /**
989          * Create a object. The same as create object in EXT3.
990          * DATA_TRANS_BLOCKS(14) +
991          * INDEX_EXTRA_BLOCKS(8) +
992          * 3(inode bits, groups, GDT)
993          */
994         [DTO_OBJECT_CREATE] = 25,
995         /**
996          * Unused now
997          */
998         [DTO_OBJECT_DELETE] = 25,
999         /**
1000          * Attr set credits.
1001          * 3(inode bits, group, GDT)
1002          */
1003         [DTO_ATTR_SET_BASE] = 3,
1004         /**
1005          * Xattr set. The same as xattr of EXT3.
1006          * DATA_TRANS_BLOCKS(14)
1007          * XXX Note: in original MDS implmentation INDEX_EXTRA_TRANS_BLOCKS
1008          * are also counted in. Do not know why?
1009          */
1010         [DTO_XATTR_SET]     = 14,
1011         [DTO_LOG_REC]       = 14,
1012         /**
1013          * creadits for inode change during write.
1014          */
1015         [DTO_WRITE_BASE]    = 3,
1016         /**
1017          * credits for single block write.
1018          */
1019         [DTO_WRITE_BLOCK]   = 14,
1020         /**
1021          * Attr set credits for chown.
1022          * This is extra credits for setattr, and it is null without quota
1023          */
1024         [DTO_ATTR_SET_CHOWN]= 0
1025 };
1026
1027 /**
1028  * Note: we count into QUOTA here.
1029  * If we mount with --data_journal we may need more.
1030  */
1031 static const int osd_dto_credits_quota[DTO_NR] = {
1032         /**
1033          * INDEX_EXTRA_TRANS_BLOCKS(8) +
1034          * SINGLEDATA_TRANS_BLOCKS(8) +
1035          * 2 * QUOTA_TRANS_BLOCKS(2)
1036          */
1037         [DTO_INDEX_INSERT]  = 20,
1038         /**
1039          * INDEX_EXTRA_TRANS_BLOCKS(8) +
1040          * SINGLEDATA_TRANS_BLOCKS(8) +
1041          * 2 * QUOTA_TRANS_BLOCKS(2)
1042          */
1043         [DTO_INDEX_DELETE]  = 20,
1044         /**
1045          * Unused now.
1046          */
1047         [DTO_IDNEX_UPDATE]  = 16,
1048         /*
1049          * Create a object. Same as create object in EXT3 filesystem.
1050          * DATA_TRANS_BLOCKS(16) +
1051          * INDEX_EXTRA_BLOCKS(8) +
1052          * 3(inode bits, groups, GDT) +
1053          * 2 * QUOTA_INIT_BLOCKS(25)
1054          */
1055         [DTO_OBJECT_CREATE] = 77,
1056         /*
1057          * Unused now.
1058          * DATA_TRANS_BLOCKS(16) +
1059          * INDEX_EXTRA_BLOCKS(8) +
1060          * 3(inode bits, groups, GDT) +
1061          * QUOTA(?)
1062          */
1063         [DTO_OBJECT_DELETE] = 27,
1064         /**
1065          * Attr set credits.
1066          * 3 (inode bit, group, GDT) +
1067          */
1068         [DTO_ATTR_SET_BASE] = 3,
1069         /**
1070          * Xattr set. The same as xattr of EXT3.
1071          * DATA_TRANS_BLOCKS(16)
1072          * XXX Note: in original MDS implmentation INDEX_EXTRA_TRANS_BLOCKS are
1073          *           also counted in. Do not know why?
1074          */
1075         [DTO_XATTR_SET]     = 16,
1076         [DTO_LOG_REC]       = 16,
1077         /**
1078          * creadits for inode change during write.
1079          */
1080         [DTO_WRITE_BASE]    = 3,
1081         /**
1082          * credits for single block write.
1083          */
1084         [DTO_WRITE_BLOCK]   = 16,
1085         /**
1086          * Attr set credits for chown.
1087          * It is added to already set setattr credits
1088          * 2 * QUOTA_INIT_BLOCKS(25) +
1089          * 2 * QUOTA_DEL_BLOCKS(9)
1090          */
1091         [DTO_ATTR_SET_CHOWN]= 68,
1092 };
1093
1094 static int osd_credit_get(const struct lu_env *env, struct dt_device *d,
1095                           enum dt_txn_op op)
1096 {
1097         LASSERT(ARRAY_SIZE(osd_dto_credits_noquota) ==
1098                 ARRAY_SIZE(osd_dto_credits_quota));
1099         LASSERT(0 <= op && op < ARRAY_SIZE(osd_dto_credits_noquota));
1100 #ifdef HAVE_QUOTA_SUPPORT
1101         if (test_opt(osd_sb(osd_dt_dev(d)), QUOTA))
1102                 return osd_dto_credits_quota[op];
1103         else
1104 #endif
1105                 return osd_dto_credits_noquota[op];
1106 }
1107
1108 static const struct dt_device_operations osd_dt_ops = {
1109         .dt_root_get       = osd_root_get,
1110         .dt_statfs         = osd_statfs,
1111         .dt_trans_start    = osd_trans_start,
1112         .dt_trans_stop     = osd_trans_stop,
1113         .dt_conf_get       = osd_conf_get,
1114         .dt_sync           = osd_sync,
1115         .dt_ro             = osd_ro,
1116         .dt_commit_async   = osd_commit_async,
1117         .dt_credit_get     = osd_credit_get,
1118         .dt_init_capa_ctxt = osd_init_capa_ctxt,
1119         .dt_init_quota_ctxt= osd_init_quota_ctxt,
1120 };
1121
1122 static void osd_object_read_lock(const struct lu_env *env,
1123                                  struct dt_object *dt, unsigned role)
1124 {
1125         struct osd_object *obj = osd_dt_obj(dt);
1126         struct osd_thread_info *oti = osd_oti_get(env);
1127
1128         LINVRNT(osd_invariant(obj));
1129
1130         LASSERT(obj->oo_owner != env);
1131         cfs_down_read_nested(&obj->oo_sem, role);
1132
1133         LASSERT(obj->oo_owner == NULL);
1134         oti->oti_r_locks++;
1135 }
1136
1137 static void osd_object_write_lock(const struct lu_env *env,
1138                                   struct dt_object *dt, unsigned role)
1139 {
1140         struct osd_object *obj = osd_dt_obj(dt);
1141         struct osd_thread_info *oti = osd_oti_get(env);
1142
1143         LINVRNT(osd_invariant(obj));
1144
1145         LASSERT(obj->oo_owner != env);
1146         cfs_down_write_nested(&obj->oo_sem, role);
1147
1148         LASSERT(obj->oo_owner == NULL);
1149         obj->oo_owner = env;
1150         oti->oti_w_locks++;
1151 }
1152
1153 static void osd_object_read_unlock(const struct lu_env *env,
1154                                    struct dt_object *dt)
1155 {
1156         struct osd_object *obj = osd_dt_obj(dt);
1157         struct osd_thread_info *oti = osd_oti_get(env);
1158
1159         LINVRNT(osd_invariant(obj));
1160
1161         LASSERT(oti->oti_r_locks > 0);
1162         oti->oti_r_locks--;
1163         cfs_up_read(&obj->oo_sem);
1164 }
1165
1166 static void osd_object_write_unlock(const struct lu_env *env,
1167                                     struct dt_object *dt)
1168 {
1169         struct osd_object *obj = osd_dt_obj(dt);
1170         struct osd_thread_info *oti = osd_oti_get(env);
1171
1172         LINVRNT(osd_invariant(obj));
1173
1174         LASSERT(obj->oo_owner == env);
1175         LASSERT(oti->oti_w_locks > 0);
1176         oti->oti_w_locks--;
1177         obj->oo_owner = NULL;
1178         cfs_up_write(&obj->oo_sem);
1179 }
1180
1181 static int osd_object_write_locked(const struct lu_env *env,
1182                                    struct dt_object *dt)
1183 {
1184         struct osd_object *obj = osd_dt_obj(dt);
1185
1186         LINVRNT(osd_invariant(obj));
1187
1188         return obj->oo_owner == env;
1189 }
1190
1191 static int capa_is_sane(const struct lu_env *env,
1192                         struct osd_device *dev,
1193                         struct lustre_capa *capa,
1194                         struct lustre_capa_key *keys)
1195 {
1196         struct osd_thread_info *oti = osd_oti_get(env);
1197         struct lustre_capa *tcapa = &oti->oti_capa;
1198         struct obd_capa *oc;
1199         int i, rc = 0;
1200         ENTRY;
1201
1202         oc = capa_lookup(dev->od_capa_hash, capa, 0);
1203         if (oc) {
1204                 if (capa_is_expired(oc)) {
1205                         DEBUG_CAPA(D_ERROR, capa, "expired");
1206                         rc = -ESTALE;
1207                 }
1208                 capa_put(oc);
1209                 RETURN(rc);
1210         }
1211
1212         if (capa_is_expired_sec(capa)) {
1213                 DEBUG_CAPA(D_ERROR, capa, "expired");
1214                 RETURN(-ESTALE);
1215         }
1216
1217         cfs_spin_lock(&capa_lock);
1218         for (i = 0; i < 2; i++) {
1219                 if (keys[i].lk_keyid == capa->lc_keyid) {
1220                         oti->oti_capa_key = keys[i];
1221                         break;
1222                 }
1223         }
1224         cfs_spin_unlock(&capa_lock);
1225
1226         if (i == 2) {
1227                 DEBUG_CAPA(D_ERROR, capa, "no matched capa key");
1228                 RETURN(-ESTALE);
1229         }
1230
1231         rc = capa_hmac(tcapa->lc_hmac, capa, oti->oti_capa_key.lk_key);
1232         if (rc)
1233                 RETURN(rc);
1234
1235         if (memcmp(tcapa->lc_hmac, capa->lc_hmac, sizeof(capa->lc_hmac))) {
1236                 DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch");
1237                 RETURN(-EACCES);
1238         }
1239
1240         oc = capa_add(dev->od_capa_hash, capa);
1241         capa_put(oc);
1242
1243         RETURN(0);
1244 }
1245
1246 static int osd_object_auth(const struct lu_env *env, struct dt_object *dt,
1247                            struct lustre_capa *capa, __u64 opc)
1248 {
1249         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
1250         struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
1251         struct md_capainfo *ci;
1252         int rc;
1253
1254         if (!dev->od_fl_capa)
1255                 return 0;
1256
1257         if (capa == BYPASS_CAPA)
1258                 return 0;
1259
1260         ci = md_capainfo(env);
1261         if (unlikely(!ci))
1262                 return 0;
1263
1264         if (ci->mc_auth == LC_ID_NONE)
1265                 return 0;
1266
1267         if (!capa) {
1268                 CERROR("no capability is provided for fid "DFID"\n", PFID(fid));
1269                 return -EACCES;
1270         }
1271
1272         if (!lu_fid_eq(fid, &capa->lc_fid)) {
1273                 DEBUG_CAPA(D_ERROR, capa, "fid "DFID" mismatch with",
1274                            PFID(fid));
1275                 return -EACCES;
1276         }
1277
1278         if (!capa_opc_supported(capa, opc)) {
1279                 DEBUG_CAPA(D_ERROR, capa, "opc "LPX64" not supported by", opc);
1280                 return -EACCES;
1281         }
1282
1283         if ((rc = capa_is_sane(env, dev, capa, dev->od_capa_keys))) {
1284                 DEBUG_CAPA(D_ERROR, capa, "insane (rc %d)", rc);
1285                 return -EACCES;
1286         }
1287
1288         return 0;
1289 }
1290
1291 static struct timespec *osd_inode_time(const struct lu_env *env,
1292                                        struct inode *inode, __u64 seconds)
1293 {
1294         struct osd_thread_info *oti = osd_oti_get(env);
1295         struct timespec        *t   = &oti->oti_time;
1296
1297         t->tv_sec  = seconds;
1298         t->tv_nsec = 0;
1299         *t = timespec_trunc(*t, get_sb_time_gran(inode->i_sb));
1300         return t;
1301 }
1302
1303
1304 static void osd_inode_getattr(const struct lu_env *env,
1305                               struct inode *inode, struct lu_attr *attr)
1306 {
1307         attr->la_valid      |= LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
1308                                LA_SIZE | LA_BLOCKS | LA_UID | LA_GID |
1309                                LA_FLAGS | LA_NLINK | LA_RDEV | LA_BLKSIZE;
1310
1311         attr->la_atime      = LTIME_S(inode->i_atime);
1312         attr->la_mtime      = LTIME_S(inode->i_mtime);
1313         attr->la_ctime      = LTIME_S(inode->i_ctime);
1314         attr->la_mode       = inode->i_mode;
1315         attr->la_size       = i_size_read(inode);
1316         attr->la_blocks     = inode->i_blocks;
1317         attr->la_uid        = inode->i_uid;
1318         attr->la_gid        = inode->i_gid;
1319         attr->la_flags      = LDISKFS_I(inode)->i_flags;
1320         attr->la_nlink      = inode->i_nlink;
1321         attr->la_rdev       = inode->i_rdev;
1322         attr->la_blksize    = ll_inode_blksize(inode);
1323         attr->la_blkbits    = inode->i_blkbits;
1324 }
1325
1326 static int osd_attr_get(const struct lu_env *env,
1327                         struct dt_object *dt,
1328                         struct lu_attr *attr,
1329                         struct lustre_capa *capa)
1330 {
1331         struct osd_object *obj = osd_dt_obj(dt);
1332
1333         LASSERT(dt_object_exists(dt));
1334         LINVRNT(osd_invariant(obj));
1335
1336         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1337                 return -EACCES;
1338
1339         cfs_spin_lock(&obj->oo_guard);
1340         osd_inode_getattr(env, obj->oo_inode, attr);
1341         cfs_spin_unlock(&obj->oo_guard);
1342         return 0;
1343 }
1344
1345 static int osd_inode_setattr(const struct lu_env *env,
1346                              struct inode *inode, const struct lu_attr *attr)
1347 {
1348         __u64 bits;
1349
1350         bits = attr->la_valid;
1351
1352         LASSERT(!(bits & LA_TYPE)); /* Huh? You want too much. */
1353
1354 #ifdef HAVE_QUOTA_SUPPORT
1355         if ((bits & LA_UID && attr->la_uid != inode->i_uid) ||
1356             (bits & LA_GID && attr->la_gid != inode->i_gid)) {
1357                 struct osd_ctxt *save = &osd_oti_get(env)->oti_ctxt;
1358                 struct iattr iattr;
1359                 int rc;
1360
1361                 iattr.ia_valid = 0;
1362                 if (bits & LA_UID)
1363                         iattr.ia_valid |= ATTR_UID;
1364                 if (bits & LA_GID)
1365                         iattr.ia_valid |= ATTR_GID;
1366                 iattr.ia_uid = attr->la_uid;
1367                 iattr.ia_gid = attr->la_gid;
1368                 osd_push_ctxt(env, save);
1369                 rc = DQUOT_TRANSFER(inode, &iattr) ? -EDQUOT : 0;
1370                 osd_pop_ctxt(save);
1371                 if (rc != 0)
1372                         return rc;
1373         }
1374 #endif
1375
1376         if (bits & LA_ATIME)
1377                 inode->i_atime  = *osd_inode_time(env, inode, attr->la_atime);
1378         if (bits & LA_CTIME)
1379                 inode->i_ctime  = *osd_inode_time(env, inode, attr->la_ctime);
1380         if (bits & LA_MTIME)
1381                 inode->i_mtime  = *osd_inode_time(env, inode, attr->la_mtime);
1382         if (bits & LA_SIZE) {
1383                 LDISKFS_I(inode)->i_disksize = attr->la_size;
1384                 i_size_write(inode, attr->la_size);
1385         }
1386
1387 #if 0
1388         /* OSD should not change "i_blocks" which is used by quota.
1389          * "i_blocks" should be changed by ldiskfs only. */
1390         if (bits & LA_BLOCKS)
1391                 inode->i_blocks = attr->la_blocks;
1392 #endif
1393         if (bits & LA_MODE)
1394                 inode->i_mode   = (inode->i_mode & S_IFMT) |
1395                         (attr->la_mode & ~S_IFMT);
1396         if (bits & LA_UID)
1397                 inode->i_uid    = attr->la_uid;
1398         if (bits & LA_GID)
1399                 inode->i_gid    = attr->la_gid;
1400         if (bits & LA_NLINK)
1401                 inode->i_nlink  = attr->la_nlink;
1402         if (bits & LA_RDEV)
1403                 inode->i_rdev   = attr->la_rdev;
1404
1405         if (bits & LA_FLAGS)
1406                 inode->i_flags = ll_ext_to_inode_flags(attr->la_flags);
1407         return 0;
1408 }
1409
1410 static int osd_attr_set(const struct lu_env *env,
1411                         struct dt_object *dt,
1412                         const struct lu_attr *attr,
1413                         struct thandle *handle,
1414                         struct lustre_capa *capa)
1415 {
1416         struct osd_object *obj = osd_dt_obj(dt);
1417         int rc;
1418
1419         LASSERT(handle != NULL);
1420         LASSERT(dt_object_exists(dt));
1421         LASSERT(osd_invariant(obj));
1422
1423         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1424                 return -EACCES;
1425
1426         cfs_spin_lock(&obj->oo_guard);
1427         rc = osd_inode_setattr(env, obj->oo_inode, attr);
1428         cfs_spin_unlock(&obj->oo_guard);
1429
1430         if (!rc)
1431                 mark_inode_dirty(obj->oo_inode);
1432         return rc;
1433 }
1434
1435 /*
1436  * Object creation.
1437  *
1438  * XXX temporary solution.
1439  */
1440 static int osd_create_pre(struct osd_thread_info *info, struct osd_object *obj,
1441                           struct lu_attr *attr, struct thandle *th)
1442 {
1443         return 0;
1444 }
1445
1446 static int osd_create_post(struct osd_thread_info *info, struct osd_object *obj,
1447                            struct lu_attr *attr, struct thandle *th)
1448 {
1449         osd_object_init0(obj);
1450         return 0;
1451 }
1452
1453 static struct dentry * osd_child_dentry_get(const struct lu_env *env,
1454                                             struct osd_object *obj,
1455                                             const char *name,
1456                                             const int namelen)
1457 {
1458         struct osd_thread_info *info   = osd_oti_get(env);
1459         struct dentry *child_dentry = &info->oti_child_dentry;
1460         struct dentry *obj_dentry = &info->oti_obj_dentry;
1461
1462         obj_dentry->d_inode = obj->oo_inode;
1463         obj_dentry->d_sb = osd_sb(osd_obj2dev(obj));
1464         obj_dentry->d_name.hash = 0;
1465
1466         child_dentry->d_name.hash = 0;
1467         child_dentry->d_parent = obj_dentry;
1468         child_dentry->d_name.name = name;
1469         child_dentry->d_name.len = namelen;
1470         return child_dentry;
1471 }
1472
1473
1474 static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
1475                       cfs_umode_t mode,
1476                       struct dt_allocation_hint *hint,
1477                       struct thandle *th)
1478 {
1479         int result;
1480         struct osd_device  *osd = osd_obj2dev(obj);
1481         struct osd_thandle *oth;
1482         struct dt_object   *parent;
1483         struct inode       *inode;
1484 #ifdef HAVE_QUOTA_SUPPORT
1485         struct osd_ctxt    *save = &info->oti_ctxt;
1486 #endif
1487
1488         LINVRNT(osd_invariant(obj));
1489         LASSERT(obj->oo_inode == NULL);
1490
1491         oth = container_of(th, struct osd_thandle, ot_super);
1492         LASSERT(oth->ot_handle->h_transaction != NULL);
1493
1494         if (hint && hint->dah_parent)
1495                 parent = hint->dah_parent;
1496         else
1497                 parent = osd->od_obj_area;
1498
1499         LASSERT(parent != NULL);
1500         LASSERT(osd_dt_obj(parent)->oo_inode->i_op != NULL);
1501
1502 #ifdef HAVE_QUOTA_SUPPORT
1503         osd_push_ctxt(info->oti_env, save);
1504 #endif
1505         inode = ldiskfs_create_inode(oth->ot_handle,
1506                                      osd_dt_obj(parent)->oo_inode, mode);
1507 #ifdef HAVE_QUOTA_SUPPORT
1508         osd_pop_ctxt(save);
1509 #endif
1510         if (!IS_ERR(inode)) {
1511                 obj->oo_inode = inode;
1512                 result = 0;
1513         } else
1514                 result = PTR_ERR(inode);
1515         LINVRNT(osd_invariant(obj));
1516         return result;
1517 }
1518
1519 enum {
1520         OSD_NAME_LEN = 255
1521 };
1522
1523 static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj,
1524                      struct lu_attr *attr,
1525                      struct dt_allocation_hint *hint,
1526                      struct dt_object_format *dof,
1527                      struct thandle *th)
1528 {
1529         int result;
1530         struct osd_thandle *oth;
1531         struct osd_device *osd = osd_obj2dev(obj);
1532         __u32 mode = (attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX));
1533
1534         LASSERT(S_ISDIR(attr->la_mode));
1535
1536         oth = container_of(th, struct osd_thandle, ot_super);
1537         LASSERT(oth->ot_handle->h_transaction != NULL);
1538         result = osd_mkfile(info, obj, mode, hint, th);
1539         if (result == 0 && osd->od_iop_mode == 0) {
1540                 LASSERT(obj->oo_inode != NULL);
1541                 /*
1542                  * XXX uh-oh... call low-level iam function directly.
1543                  */
1544
1545                 result = iam_lvar_create(obj->oo_inode, OSD_NAME_LEN, 4,
1546                                          sizeof (struct osd_fid_pack),
1547                                          oth->ot_handle);
1548         }
1549         return result;
1550 }
1551
1552 static int osd_mk_index(struct osd_thread_info *info, struct osd_object *obj,
1553                         struct lu_attr *attr,
1554                         struct dt_allocation_hint *hint,
1555                         struct dt_object_format *dof,
1556                         struct thandle *th)
1557 {
1558         int result;
1559         struct osd_thandle *oth;
1560         const struct dt_index_features *feat = dof->u.dof_idx.di_feat;
1561
1562         __u32 mode = (attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX));
1563
1564         LASSERT(S_ISREG(attr->la_mode));
1565
1566         oth = container_of(th, struct osd_thandle, ot_super);
1567         LASSERT(oth->ot_handle->h_transaction != NULL);
1568
1569         result = osd_mkfile(info, obj, mode, hint, th);
1570         if (result == 0) {
1571                 LASSERT(obj->oo_inode != NULL);
1572                 if (feat->dif_flags & DT_IND_VARKEY)
1573                         result = iam_lvar_create(obj->oo_inode,
1574                                                  feat->dif_keysize_max,
1575                                                  feat->dif_ptrsize,
1576                                                  feat->dif_recsize_max,
1577                                                  oth->ot_handle);
1578                 else
1579                         result = iam_lfix_create(obj->oo_inode,
1580                                                  feat->dif_keysize_max,
1581                                                  feat->dif_ptrsize,
1582                                                  feat->dif_recsize_max,
1583                                                  oth->ot_handle);
1584
1585         }
1586         return result;
1587 }
1588
1589 static int osd_mkreg(struct osd_thread_info *info, struct osd_object *obj,
1590                      struct lu_attr *attr,
1591                      struct dt_allocation_hint *hint,
1592                      struct dt_object_format *dof,
1593                      struct thandle *th)
1594 {
1595         LASSERT(S_ISREG(attr->la_mode));
1596         return osd_mkfile(info, obj, (attr->la_mode &
1597                                (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1598 }
1599
1600 static int osd_mksym(struct osd_thread_info *info, struct osd_object *obj,
1601                      struct lu_attr *attr,
1602                      struct dt_allocation_hint *hint,
1603                      struct dt_object_format *dof,
1604                      struct thandle *th)
1605 {
1606         LASSERT(S_ISLNK(attr->la_mode));
1607         return osd_mkfile(info, obj, (attr->la_mode &
1608                               (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1609 }
1610
1611 static int osd_mknod(struct osd_thread_info *info, struct osd_object *obj,
1612                      struct lu_attr *attr,
1613                      struct dt_allocation_hint *hint,
1614                      struct dt_object_format *dof,
1615                      struct thandle *th)
1616 {
1617         cfs_umode_t mode = attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX);
1618         int result;
1619
1620         LINVRNT(osd_invariant(obj));
1621         LASSERT(obj->oo_inode == NULL);
1622         LASSERT(S_ISCHR(mode) || S_ISBLK(mode) ||
1623                 S_ISFIFO(mode) || S_ISSOCK(mode));
1624
1625         result = osd_mkfile(info, obj, mode, hint, th);
1626         if (result == 0) {
1627                 LASSERT(obj->oo_inode != NULL);
1628                 init_special_inode(obj->oo_inode, mode, attr->la_rdev);
1629         }
1630         LINVRNT(osd_invariant(obj));
1631         return result;
1632 }
1633
1634 typedef int (*osd_obj_type_f)(struct osd_thread_info *, struct osd_object *,
1635                               struct lu_attr *,
1636                               struct dt_allocation_hint *hint,
1637                               struct dt_object_format *dof,
1638                               struct thandle *);
1639
1640 static osd_obj_type_f osd_create_type_f(enum dt_format_type type)
1641 {
1642         osd_obj_type_f result;
1643
1644         switch (type) {
1645         case DFT_DIR:
1646                 result = osd_mkdir;
1647                 break;
1648         case DFT_REGULAR:
1649                 result = osd_mkreg;
1650                 break;
1651         case DFT_SYM:
1652                 result = osd_mksym;
1653                 break;
1654         case DFT_NODE:
1655                 result = osd_mknod;
1656                 break;
1657         case DFT_INDEX:
1658                 result = osd_mk_index;
1659                 break;
1660
1661         default:
1662                 LBUG();
1663                 break;
1664         }
1665         return result;
1666 }
1667
1668
1669 static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah,
1670                         struct dt_object *parent, cfs_umode_t child_mode)
1671 {
1672         LASSERT(ah);
1673
1674         memset(ah, 0, sizeof(*ah));
1675         ah->dah_parent = parent;
1676         ah->dah_mode = child_mode;
1677 }
1678
1679 /**
1680  * Helper function for osd_object_create()
1681  *
1682  * \retval 0, on success
1683  */
1684 static int __osd_object_create(struct osd_thread_info *info,
1685                                struct osd_object *obj, struct lu_attr *attr,
1686                                struct dt_allocation_hint *hint,
1687                                struct dt_object_format *dof,
1688                                struct thandle *th)
1689 {
1690
1691         int result;
1692
1693         result = osd_create_pre(info, obj, attr, th);
1694         if (result == 0) {
1695                 result = osd_create_type_f(dof->dof_type)(info, obj,
1696                                            attr, hint, dof, th);
1697                 if (result == 0)
1698                         result = osd_create_post(info, obj, attr, th);
1699         }
1700         return result;
1701 }
1702
1703 /**
1704  * Helper function for osd_object_create()
1705  *
1706  * \retval 0, on success
1707  */
1708 static int __osd_oi_insert(const struct lu_env *env, struct osd_object *obj,
1709                            const struct lu_fid *fid, struct thandle *th)
1710 {
1711         struct osd_thread_info *info = osd_oti_get(env);
1712         struct osd_inode_id    *id   = &info->oti_id;
1713         struct osd_device      *osd  = osd_obj2dev(obj);
1714         struct md_ucred        *uc   = md_ucred(env);
1715
1716         LASSERT(obj->oo_inode != NULL);
1717         LASSERT(uc != NULL);
1718
1719         id->oii_ino = obj->oo_inode->i_ino;
1720         id->oii_gen = obj->oo_inode->i_generation;
1721
1722         return osd_oi_insert(info, &osd->od_oi, fid, id, th,
1723                              uc->mu_cap & CFS_CAP_SYS_RESOURCE_MASK);
1724 }
1725
1726 static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
1727                              struct lu_attr *attr,
1728                              struct dt_allocation_hint *hint,
1729                              struct dt_object_format *dof,
1730                              struct thandle *th)
1731 {
1732         const struct lu_fid    *fid    = lu_object_fid(&dt->do_lu);
1733         struct osd_object      *obj    = osd_dt_obj(dt);
1734         struct osd_thread_info *info   = osd_oti_get(env);
1735         int result;
1736
1737         ENTRY;
1738
1739         LINVRNT(osd_invariant(obj));
1740         LASSERT(!dt_object_exists(dt));
1741         LASSERT(osd_write_locked(env, obj));
1742         LASSERT(th != NULL);
1743
1744         result = __osd_object_create(info, obj, attr, hint, dof, th);
1745         if (result == 0)
1746                 result = __osd_oi_insert(env, obj, fid, th);
1747
1748         LASSERT(ergo(result == 0, dt_object_exists(dt)));
1749         LASSERT(osd_invariant(obj));
1750         RETURN(result);
1751 }
1752
1753 /**
1754  * Helper function for osd_xattr_set()
1755  */
1756 static int __osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
1757                            const struct lu_buf *buf, const char *name, int fl)
1758 {
1759         struct osd_object      *obj      = osd_dt_obj(dt);
1760         struct inode           *inode    = obj->oo_inode;
1761         struct osd_thread_info *info     = osd_oti_get(env);
1762         struct dentry          *dentry   = &info->oti_child_dentry;
1763         struct timespec        *t        = &info->oti_time;
1764         int                     fs_flags = 0;
1765         int  rc;
1766
1767         LASSERT(dt_object_exists(dt));
1768         LASSERT(inode->i_op != NULL && inode->i_op->setxattr != NULL);
1769         LASSERT(osd_write_locked(env, obj));
1770
1771         if (fl & LU_XATTR_REPLACE)
1772                 fs_flags |= XATTR_REPLACE;
1773
1774         if (fl & LU_XATTR_CREATE)
1775                 fs_flags |= XATTR_CREATE;
1776
1777         dentry->d_inode = inode;
1778         *t = inode->i_ctime;
1779         rc = inode->i_op->setxattr(dentry, name, buf->lb_buf,
1780                                    buf->lb_len, fs_flags);
1781         /* ctime should not be updated with server-side time. */
1782         cfs_spin_lock(&obj->oo_guard);
1783         inode->i_ctime = *t;
1784         cfs_spin_unlock(&obj->oo_guard);
1785         mark_inode_dirty(inode);
1786         return rc;
1787 }
1788
1789 /**
1790  * Put the fid into lustre_mdt_attrs, and then place the structure
1791  * inode's ea. This fid should not be altered during the life time
1792  * of the inode.
1793  *
1794  * \retval +ve, on success
1795  * \retval -ve, on error
1796  *
1797  * FIXME: It is good to have/use ldiskfs_xattr_set_handle() here
1798  */
1799 static int osd_ea_fid_set(const struct lu_env *env, struct dt_object *dt,
1800                           const struct lu_fid *fid)
1801 {
1802         struct osd_thread_info  *info      = osd_oti_get(env);
1803         struct lustre_mdt_attrs *mdt_attrs = &info->oti_mdt_attrs;
1804
1805         lustre_lma_init(mdt_attrs, fid);
1806         lustre_lma_swab(mdt_attrs);
1807         return __osd_xattr_set(env, dt,
1808                                osd_buf_get(env, mdt_attrs, sizeof *mdt_attrs),
1809                                XATTR_NAME_LMA, LU_XATTR_CREATE);
1810
1811 }
1812
1813 /**
1814  * Helper function to form igif
1815  */
1816 static inline void osd_igif_get(const struct lu_env *env, struct inode  *inode,
1817                                 struct lu_fid *fid)
1818 {
1819         LU_IGIF_BUILD(fid, inode->i_ino, inode->i_generation);
1820 }
1821
1822 /**
1823  * Helper function to pack the fid, ldiskfs stores fid in packed format.
1824  */
1825 void osd_fid_pack(struct osd_fid_pack *pack, const struct dt_rec *fid,
1826                   struct lu_fid *befider)
1827 {
1828         fid_cpu_to_be(befider, (struct lu_fid *)fid);
1829         memcpy(pack->fp_area, befider, sizeof(*befider));
1830         pack->fp_len =  sizeof(*befider) + 1;
1831 }
1832
1833 /**
1834  * ldiskfs supports fid in dirent, it is passed in dentry->d_fsdata.
1835  * lustre 1.8 also uses d_fsdata for passing other info to ldiskfs.
1836  * To have compatilibility with 1.8 ldiskfs driver we need to have
1837  * magic number at start of fid data.
1838  * \ldiskfs_dentry_param is used only to pass fid from osd to ldiskfs.
1839  * its inmemory API.
1840  */
1841 void osd_get_ldiskfs_dirent_param(struct ldiskfs_dentry_param *param,
1842                                   const struct dt_rec *fid)
1843 {
1844         param->edp_magic = LDISKFS_LUFID_MAGIC;
1845         param->edp_len =  sizeof(struct lu_fid) + 1;
1846
1847         fid_cpu_to_be((struct lu_fid *)param->edp_data,
1848                       (struct lu_fid *)fid);
1849 }
1850
1851 int osd_fid_unpack(struct lu_fid *fid, const struct osd_fid_pack *pack)
1852 {
1853         int result;
1854
1855         result = 0;
1856         switch (pack->fp_len) {
1857         case sizeof *fid + 1:
1858                 memcpy(fid, pack->fp_area, sizeof *fid);
1859                 fid_be_to_cpu(fid, fid);
1860                 break;
1861         default:
1862                 CERROR("Unexpected packed fid size: %d\n", pack->fp_len);
1863                 result = -EIO;
1864         }
1865         return result;
1866 }
1867
1868 /**
1869  * Try to read the fid from inode ea into dt_rec, if return value
1870  * i.e. rc is +ve, then we got fid, otherwise we will have to form igif
1871  *
1872  * \param fid object fid.
1873  *
1874  * \retval 0 on success
1875  */
1876 static int osd_ea_fid_get(const struct lu_env *env, struct osd_object *obj,
1877                           __u32 ino, struct lu_fid *fid)
1878 {
1879         struct osd_thread_info  *info      = osd_oti_get(env);
1880         struct lustre_mdt_attrs *mdt_attrs = &info->oti_mdt_attrs;
1881         struct lu_device        *ldev   = obj->oo_dt.do_lu.lo_dev;
1882         struct dentry           *dentry = &info->oti_child_dentry;
1883         struct osd_inode_id     *id     = &info->oti_id;
1884         struct osd_device       *dev;
1885         struct inode            *inode;
1886         int                      rc;
1887
1888         ENTRY;
1889         dev  = osd_dev(ldev);
1890
1891         id->oii_ino = ino;
1892         id->oii_gen = OSD_OII_NOGEN;
1893
1894         inode = osd_iget(info, dev, id);
1895         if (IS_ERR(inode)) {
1896                 rc = PTR_ERR(inode);
1897                 GOTO(out,rc);
1898         }
1899         dentry->d_inode = inode;
1900
1901         LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
1902         rc = inode->i_op->getxattr(dentry, XATTR_NAME_LMA, (void *)mdt_attrs,
1903                                    sizeof *mdt_attrs);
1904
1905         /* Check LMA compatibility */
1906         if (rc > 0 &&
1907             (mdt_attrs->lma_incompat & ~cpu_to_le32(LMA_INCOMPAT_SUPP))) {
1908                 CWARN("Inode %lx: Unsupported incompat LMA feature(s) %#x\n",
1909                       inode->i_ino, le32_to_cpu(mdt_attrs->lma_incompat) &
1910                       ~LMA_INCOMPAT_SUPP);
1911                 return -ENOSYS;
1912         }
1913
1914         if (rc > 0) {
1915                 lustre_lma_swab(mdt_attrs);
1916                 memcpy(fid, &mdt_attrs->lma_self_fid, sizeof(*fid));
1917                 rc = 0;
1918         } else if (rc == -ENODATA) {
1919                 osd_igif_get(env, inode, fid);
1920                 rc = 0;
1921         }
1922         iput(inode);
1923 out:
1924         RETURN(rc);
1925 }
1926
1927 /**
1928  * OSD layer object create function for interoperability mode (b11826).
1929  * This is mostly similar to osd_object_create(). Only difference being, fid is
1930  * inserted into inode ea here.
1931  *
1932  * \retval   0, on success
1933  * \retval -ve, on error
1934  */
1935 static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt,
1936                              struct lu_attr *attr,
1937                              struct dt_allocation_hint *hint,
1938                              struct dt_object_format *dof,
1939                              struct thandle *th)
1940 {
1941         const struct lu_fid    *fid    = lu_object_fid(&dt->do_lu);
1942         struct osd_object      *obj    = osd_dt_obj(dt);
1943         struct osd_thread_info *info   = osd_oti_get(env);
1944         int result;
1945
1946         ENTRY;
1947
1948         LASSERT(osd_invariant(obj));
1949         LASSERT(!dt_object_exists(dt));
1950         LASSERT(osd_write_locked(env, obj));
1951         LASSERT(th != NULL);
1952
1953         result = __osd_object_create(info, obj, attr, hint, dof, th);
1954
1955         /* objects under osd root shld have igif fid, so dont add fid EA */
1956         if (result == 0 && fid_seq(fid) >= FID_SEQ_NORMAL)
1957                 result = osd_ea_fid_set(env, dt, fid);
1958
1959         if (result == 0)
1960                 result = __osd_oi_insert(env, obj, fid, th);
1961
1962         LASSERT(ergo(result == 0, dt_object_exists(dt)));
1963         LINVRNT(osd_invariant(obj));
1964         RETURN(result);
1965 }
1966
1967 /*
1968  * Concurrency: @dt is write locked.
1969  */
1970 static void osd_object_ref_add(const struct lu_env *env,
1971                                struct dt_object *dt,
1972                                struct thandle *th)
1973 {
1974         struct osd_object *obj = osd_dt_obj(dt);
1975         struct inode *inode = obj->oo_inode;
1976
1977         LINVRNT(osd_invariant(obj));
1978         LASSERT(dt_object_exists(dt));
1979         LASSERT(osd_write_locked(env, obj));
1980         LASSERT(th != NULL);
1981
1982         cfs_spin_lock(&obj->oo_guard);
1983         LASSERT(inode->i_nlink < LDISKFS_LINK_MAX);
1984         inode->i_nlink++;
1985         cfs_spin_unlock(&obj->oo_guard);
1986         mark_inode_dirty(inode);
1987         LINVRNT(osd_invariant(obj));
1988 }
1989
1990 /*
1991  * Concurrency: @dt is write locked.
1992  */
1993 static void osd_object_ref_del(const struct lu_env *env,
1994                                struct dt_object *dt,
1995                                struct thandle *th)
1996 {
1997         struct osd_object *obj = osd_dt_obj(dt);
1998         struct inode *inode = obj->oo_inode;
1999
2000         LINVRNT(osd_invariant(obj));
2001         LASSERT(dt_object_exists(dt));
2002         LASSERT(osd_write_locked(env, obj));
2003         LASSERT(th != NULL);
2004
2005         cfs_spin_lock(&obj->oo_guard);
2006         LASSERT(inode->i_nlink > 0);
2007         inode->i_nlink--;
2008         cfs_spin_unlock(&obj->oo_guard);
2009         mark_inode_dirty(inode);
2010         LINVRNT(osd_invariant(obj));
2011 }
2012
2013 /*
2014  * Concurrency: @dt is read locked.
2015  */
2016 static int osd_xattr_get(const struct lu_env *env,
2017                          struct dt_object *dt,
2018                          struct lu_buf *buf,
2019                          const char *name,
2020                          struct lustre_capa *capa)
2021 {
2022         struct osd_object      *obj    = osd_dt_obj(dt);
2023         struct inode           *inode  = obj->oo_inode;
2024         struct osd_thread_info *info   = osd_oti_get(env);
2025         struct dentry          *dentry = &info->oti_obj_dentry;
2026
2027         LASSERT(dt_object_exists(dt));
2028         LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
2029         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
2030
2031         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
2032                 return -EACCES;
2033
2034         dentry->d_inode = inode;
2035         return inode->i_op->getxattr(dentry, name, buf->lb_buf, buf->lb_len);
2036 }
2037
2038 /*
2039  * Concurrency: @dt is write locked.
2040  */
2041 static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
2042                          const struct lu_buf *buf, const char *name, int fl,
2043                          struct thandle *handle, struct lustre_capa *capa)
2044 {
2045         LASSERT(handle != NULL);
2046
2047         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
2048                 return -EACCES;
2049
2050         return __osd_xattr_set(env, dt, buf, name, fl);
2051 }
2052
2053 /*
2054  * Concurrency: @dt is read locked.
2055  */
2056 static int osd_xattr_list(const struct lu_env *env,
2057                           struct dt_object *dt,
2058                           struct lu_buf *buf,
2059                           struct lustre_capa *capa)
2060 {
2061         struct osd_object      *obj    = osd_dt_obj(dt);
2062         struct inode           *inode  = obj->oo_inode;
2063         struct osd_thread_info *info   = osd_oti_get(env);
2064         struct dentry          *dentry = &info->oti_obj_dentry;
2065
2066         LASSERT(dt_object_exists(dt));
2067         LASSERT(inode->i_op != NULL && inode->i_op->listxattr != NULL);
2068         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
2069
2070         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
2071                 return -EACCES;
2072
2073         dentry->d_inode = inode;
2074         return inode->i_op->listxattr(dentry, buf->lb_buf, buf->lb_len);
2075 }
2076
2077 /*
2078  * Concurrency: @dt is write locked.
2079  */
2080 static int osd_xattr_del(const struct lu_env *env,
2081                          struct dt_object *dt,
2082                          const char *name,
2083                          struct thandle *handle,
2084                          struct lustre_capa *capa)
2085 {
2086         struct osd_object      *obj    = osd_dt_obj(dt);
2087         struct inode           *inode  = obj->oo_inode;
2088         struct osd_thread_info *info   = osd_oti_get(env);
2089         struct dentry          *dentry = &info->oti_obj_dentry;
2090         struct timespec        *t      = &info->oti_time;
2091         int                     rc;
2092
2093         LASSERT(dt_object_exists(dt));
2094         LASSERT(inode->i_op != NULL && inode->i_op->removexattr != NULL);
2095         LASSERT(osd_write_locked(env, obj));
2096         LASSERT(handle != NULL);
2097
2098         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
2099                 return -EACCES;
2100
2101         dentry->d_inode = inode;
2102         *t = inode->i_ctime;
2103         rc = inode->i_op->removexattr(dentry, name);
2104         /* ctime should not be updated with server-side time. */
2105         cfs_spin_lock(&obj->oo_guard);
2106         inode->i_ctime = *t;
2107         cfs_spin_unlock(&obj->oo_guard);
2108         mark_inode_dirty(inode);
2109         return rc;
2110 }
2111
2112 static struct obd_capa *osd_capa_get(const struct lu_env *env,
2113                                      struct dt_object *dt,
2114                                      struct lustre_capa *old,
2115                                      __u64 opc)
2116 {
2117         struct osd_thread_info *info = osd_oti_get(env);
2118         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
2119         struct osd_object *obj = osd_dt_obj(dt);
2120         struct osd_device *dev = osd_obj2dev(obj);
2121         struct lustre_capa_key *key = &info->oti_capa_key;
2122         struct lustre_capa *capa = &info->oti_capa;
2123         struct obd_capa *oc;
2124         struct md_capainfo *ci;
2125         int rc;
2126         ENTRY;
2127
2128         if (!dev->od_fl_capa)
2129                 RETURN(ERR_PTR(-ENOENT));
2130
2131         LASSERT(dt_object_exists(dt));
2132         LINVRNT(osd_invariant(obj));
2133
2134         /* renewal sanity check */
2135         if (old && osd_object_auth(env, dt, old, opc))
2136                 RETURN(ERR_PTR(-EACCES));
2137
2138         ci = md_capainfo(env);
2139         if (unlikely(!ci))
2140                 RETURN(ERR_PTR(-ENOENT));
2141
2142         switch (ci->mc_auth) {
2143         case LC_ID_NONE:
2144                 RETURN(NULL);
2145         case LC_ID_PLAIN:
2146                 capa->lc_uid = obj->oo_inode->i_uid;
2147                 capa->lc_gid = obj->oo_inode->i_gid;
2148                 capa->lc_flags = LC_ID_PLAIN;
2149                 break;
2150         case LC_ID_CONVERT: {
2151                 __u32 d[4], s[4];
2152
2153                 s[0] = obj->oo_inode->i_uid;
2154                 cfs_get_random_bytes(&(s[1]), sizeof(__u32));
2155                 s[2] = obj->oo_inode->i_gid;
2156                 cfs_get_random_bytes(&(s[3]), sizeof(__u32));
2157                 rc = capa_encrypt_id(d, s, key->lk_key, CAPA_HMAC_KEY_MAX_LEN);
2158                 if (unlikely(rc))
2159                         RETURN(ERR_PTR(rc));
2160
2161                 capa->lc_uid   = ((__u64)d[1] << 32) | d[0];
2162                 capa->lc_gid   = ((__u64)d[3] << 32) | d[2];
2163                 capa->lc_flags = LC_ID_CONVERT;
2164                 break;
2165         }
2166         default:
2167                 RETURN(ERR_PTR(-EINVAL));
2168         }
2169
2170         capa->lc_fid = *fid;
2171         capa->lc_opc = opc;
2172         capa->lc_flags |= dev->od_capa_alg << 24;
2173         capa->lc_timeout = dev->od_capa_timeout;
2174         capa->lc_expiry = 0;
2175
2176         oc = capa_lookup(dev->od_capa_hash, capa, 1);
2177         if (oc) {
2178                 LASSERT(!capa_is_expired(oc));
2179                 RETURN(oc);
2180         }
2181
2182         cfs_spin_lock(&capa_lock);
2183         *key = dev->od_capa_keys[1];
2184         cfs_spin_unlock(&capa_lock);
2185
2186         capa->lc_keyid = key->lk_keyid;
2187         capa->lc_expiry = cfs_time_current_sec() + dev->od_capa_timeout;
2188
2189         rc = capa_hmac(capa->lc_hmac, capa, key->lk_key);
2190         if (rc) {
2191                 DEBUG_CAPA(D_ERROR, capa, "HMAC failed: %d for", rc);
2192                 RETURN(ERR_PTR(rc));
2193         }
2194
2195         oc = capa_add(dev->od_capa_hash, capa);
2196         RETURN(oc);
2197 }
2198
2199 static int osd_object_sync(const struct lu_env *env, struct dt_object *dt)
2200 {
2201         int rc;
2202         struct osd_object      *obj    = osd_dt_obj(dt);
2203         struct inode           *inode  = obj->oo_inode;
2204         struct osd_thread_info *info   = osd_oti_get(env);
2205         struct dentry          *dentry = &info->oti_obj_dentry;
2206         struct file            *file   = &info->oti_file;
2207         ENTRY;
2208
2209         dentry->d_inode = inode;
2210         file->f_dentry = dentry;
2211         file->f_mapping = inode->i_mapping;
2212         file->f_op = inode->i_fop;
2213         LOCK_INODE_MUTEX(inode);
2214         rc = file->f_op->fsync(file, dentry, 0);
2215         UNLOCK_INODE_MUTEX(inode);
2216         RETURN(rc);
2217 }
2218
2219 /*
2220  * Get the 64-bit version for an inode.
2221  */
2222 static dt_obj_version_t osd_object_version_get(const struct lu_env *env,
2223                                                struct dt_object *dt)
2224 {
2225         struct inode *inode = osd_dt_obj(dt)->oo_inode;
2226
2227         CDEBUG(D_INFO, "Get version "LPX64" for inode %lu\n",
2228                LDISKFS_I(inode)->i_fs_version, inode->i_ino);
2229         return LDISKFS_I(inode)->i_fs_version;
2230 }
2231
2232 /*
2233  * Set the 64-bit version and return the old version.
2234  */
2235 static void osd_object_version_set(const struct lu_env *env, struct dt_object *dt,
2236                                    dt_obj_version_t new_version)
2237 {
2238         struct inode *inode = osd_dt_obj(dt)->oo_inode;
2239
2240         CDEBUG(D_INFO, "Set version "LPX64" (old "LPX64") for inode %lu\n",
2241                new_version, LDISKFS_I(inode)->i_fs_version, inode->i_ino);
2242         LDISKFS_I(inode)->i_fs_version = new_version;
2243         /** Version is set after all inode operations are finished,
2244          *  so we should mark it dirty here */
2245         inode->i_sb->s_op->dirty_inode(inode);
2246 }
2247
2248 static int osd_data_get(const struct lu_env *env, struct dt_object *dt,
2249                         void **data)
2250 {
2251         struct osd_object *obj = osd_dt_obj(dt);
2252         ENTRY;
2253
2254         *data = (void *)obj->oo_inode;
2255         RETURN(0);
2256 }
2257
2258 /*
2259  * Index operations.
2260  */
2261
2262 static int osd_iam_index_probe(const struct lu_env *env, struct osd_object *o,
2263                            const struct dt_index_features *feat)
2264 {
2265         struct iam_descr *descr;
2266
2267         if (osd_object_is_root(o))
2268                 return feat == &dt_directory_features;
2269
2270         LASSERT(o->oo_dir != NULL);
2271
2272         descr = o->oo_dir->od_container.ic_descr;
2273         if (feat == &dt_directory_features) {
2274                 if (descr->id_rec_size == sizeof(struct osd_fid_pack))
2275                         return 1;
2276                 else
2277                         return 0;
2278         } else {
2279                 return
2280                         feat->dif_keysize_min <= descr->id_key_size &&
2281                         descr->id_key_size <= feat->dif_keysize_max &&
2282                         feat->dif_recsize_min <= descr->id_rec_size &&
2283                         descr->id_rec_size <= feat->dif_recsize_max &&
2284                         !(feat->dif_flags & (DT_IND_VARKEY |
2285                                              DT_IND_VARREC | DT_IND_NONUNQ)) &&
2286                         ergo(feat->dif_flags & DT_IND_UPDATE,
2287                              1 /* XXX check that object (and file system) is
2288                                 * writable */);
2289         }
2290 }
2291
2292 static int osd_iam_container_init(const struct lu_env *env,
2293                                   struct osd_object *obj,
2294                                   struct osd_directory *dir)
2295 {
2296         int result;
2297         struct iam_container *bag;
2298
2299         bag    = &dir->od_container;
2300         result = iam_container_init(bag, &dir->od_descr, obj->oo_inode);
2301         if (result == 0) {
2302                 result = iam_container_setup(bag);
2303                 if (result == 0)
2304                         obj->oo_dt.do_index_ops = &osd_index_iam_ops;
2305                 else
2306                         iam_container_fini(bag);
2307         }
2308         return result;
2309 }
2310
2311
2312 /*
2313  * Concurrency: no external locking is necessary.
2314  */
2315 static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
2316                          const struct dt_index_features *feat)
2317 {
2318         int result;
2319         int ea_dir = 0;
2320         struct osd_object *obj = osd_dt_obj(dt);
2321         struct osd_device *osd = osd_obj2dev(obj);
2322
2323         LINVRNT(osd_invariant(obj));
2324         LASSERT(dt_object_exists(dt));
2325
2326         if (osd_object_is_root(obj)) {
2327                 dt->do_index_ops = &osd_index_ea_ops;
2328                 result = 0;
2329         } else if (feat == &dt_directory_features && osd->od_iop_mode) {
2330                 dt->do_index_ops = &osd_index_ea_ops;
2331                 if (S_ISDIR(obj->oo_inode->i_mode))
2332                         result = 0;
2333                 else
2334                         result = -ENOTDIR;
2335                 ea_dir = 1;
2336         } else if (!osd_has_index(obj)) {
2337                 struct osd_directory *dir;
2338
2339                 OBD_ALLOC_PTR(dir);
2340                 if (dir != NULL) {
2341
2342                         cfs_spin_lock(&obj->oo_guard);
2343                         if (obj->oo_dir == NULL)
2344                                 obj->oo_dir = dir;
2345                         else
2346                                 /*
2347                                  * Concurrent thread allocated container data.
2348                                  */
2349                                 OBD_FREE_PTR(dir);
2350                         cfs_spin_unlock(&obj->oo_guard);
2351                         /*
2352                          * Now, that we have container data, serialize its
2353                          * initialization.
2354                          */
2355                         cfs_down_write(&obj->oo_ext_idx_sem);
2356                         /*
2357                          * recheck under lock.
2358                          */
2359                         if (!osd_has_index(obj))
2360                                 result = osd_iam_container_init(env, obj, dir);
2361                         else
2362                                 result = 0;
2363                         cfs_up_write(&obj->oo_ext_idx_sem);
2364                 } else
2365                         result = -ENOMEM;
2366         } else
2367                 result = 0;
2368
2369         if (result == 0 && ea_dir == 0) {
2370                 if (!osd_iam_index_probe(env, obj, feat))
2371                         result = -ENOTDIR;
2372         }
2373         LINVRNT(osd_invariant(obj));
2374
2375         return result;
2376 }
2377
2378 static const struct dt_object_operations osd_obj_ops = {
2379         .do_read_lock    = osd_object_read_lock,
2380         .do_write_lock   = osd_object_write_lock,
2381         .do_read_unlock  = osd_object_read_unlock,
2382         .do_write_unlock = osd_object_write_unlock,
2383         .do_write_locked = osd_object_write_locked,
2384         .do_attr_get     = osd_attr_get,
2385         .do_attr_set     = osd_attr_set,
2386         .do_ah_init      = osd_ah_init,
2387         .do_create       = osd_object_create,
2388         .do_index_try    = osd_index_try,
2389         .do_ref_add      = osd_object_ref_add,
2390         .do_ref_del      = osd_object_ref_del,
2391         .do_xattr_get    = osd_xattr_get,
2392         .do_xattr_set    = osd_xattr_set,
2393         .do_xattr_del    = osd_xattr_del,
2394         .do_xattr_list   = osd_xattr_list,
2395         .do_capa_get     = osd_capa_get,
2396         .do_object_sync  = osd_object_sync,
2397         .do_version_get  = osd_object_version_get,
2398         .do_version_set  = osd_object_version_set,
2399         .do_data_get     = osd_data_get,
2400 };
2401
2402 /**
2403  * dt_object_operations for interoperability mode
2404  * (i.e. to run 2.0 mds on 1.8 disk) (b11826)
2405  */
2406 static const struct dt_object_operations osd_obj_ea_ops = {
2407         .do_read_lock    = osd_object_read_lock,
2408         .do_write_lock   = osd_object_write_lock,
2409         .do_read_unlock  = osd_object_read_unlock,
2410         .do_write_unlock = osd_object_write_unlock,
2411         .do_write_locked = osd_object_write_locked,
2412         .do_attr_get     = osd_attr_get,
2413         .do_attr_set     = osd_attr_set,
2414         .do_ah_init      = osd_ah_init,
2415         .do_create       = osd_object_ea_create,
2416         .do_index_try    = osd_index_try,
2417         .do_ref_add      = osd_object_ref_add,
2418         .do_ref_del      = osd_object_ref_del,
2419         .do_xattr_get    = osd_xattr_get,
2420         .do_xattr_set    = osd_xattr_set,
2421         .do_xattr_del    = osd_xattr_del,
2422         .do_xattr_list   = osd_xattr_list,
2423         .do_capa_get     = osd_capa_get,
2424         .do_object_sync  = osd_object_sync,
2425         .do_version_get  = osd_object_version_get,
2426         .do_version_set  = osd_object_version_set,
2427         .do_data_get     = osd_data_get,
2428 };
2429
2430 /*
2431  * Body operations.
2432  */
2433
2434 /*
2435  * XXX: Another layering violation for now.
2436  *
2437  * We don't want to use ->f_op->read methods, because generic file write
2438  *
2439  *         - serializes on ->i_sem, and
2440  *
2441  *         - does a lot of extra work like balance_dirty_pages(),
2442  *
2443  * which doesn't work for globally shared files like /last-received.
2444  */
2445 static int osd_ldiskfs_readlink(struct inode *inode, char *buffer, int buflen)
2446 {
2447         struct ldiskfs_inode_info *ei = LDISKFS_I(inode);
2448
2449         memcpy(buffer, (char*)ei->i_data, buflen);
2450
2451         return  buflen;
2452 }
2453
2454 static int osd_ldiskfs_read(struct inode *inode, void *buf, int size,
2455                             loff_t *offs)
2456 {
2457         struct buffer_head *bh;
2458         unsigned long block;
2459         int osize = size;
2460         int blocksize;
2461         int csize;
2462         int boffs;
2463         int err;
2464
2465         /* prevent reading after eof */
2466         spin_lock(&inode->i_lock);
2467         if (i_size_read(inode) < *offs + size) {
2468                 size = i_size_read(inode) - *offs;
2469                 spin_unlock(&inode->i_lock);
2470                 if (size < 0) {
2471                         CDEBUG(D_EXT2, "size %llu is too short to read @%llu\n",
2472                                i_size_read(inode), *offs);
2473                         return -EBADR;
2474                 } else if (size == 0) {
2475                         return 0;
2476                 }
2477         } else {
2478                 spin_unlock(&inode->i_lock);
2479         }
2480
2481         blocksize = 1 << inode->i_blkbits;
2482
2483         while (size > 0) {
2484                 block = *offs >> inode->i_blkbits;
2485                 boffs = *offs & (blocksize - 1);
2486                 csize = min(blocksize - boffs, size);
2487                 bh = ldiskfs_bread(NULL, inode, block, 0, &err);
2488                 if (!bh) {
2489                         CERROR("can't read block: %d\n", err);
2490                         return err;
2491                 }
2492
2493                 memcpy(buf, bh->b_data + boffs, csize);
2494                 brelse(bh);
2495
2496                 *offs += csize;
2497                 buf += csize;
2498                 size -= csize;
2499         }
2500         return osize;
2501 }
2502
2503 static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt,
2504                         struct lu_buf *buf, loff_t *pos,
2505                         struct lustre_capa *capa)
2506 {
2507         struct osd_object      *obj    = osd_dt_obj(dt);
2508         struct inode           *inode  = obj->oo_inode;
2509         int rc;
2510
2511         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
2512                 RETURN(-EACCES);
2513
2514         /* Read small symlink from inode body as we need to maintain correct
2515          * on-disk symlinks for ldiskfs.
2516          */
2517         if (S_ISLNK(obj->oo_dt.do_lu.lo_header->loh_attr) &&
2518             (buf->lb_len <= sizeof (LDISKFS_I(inode)->i_data)))
2519                 rc = osd_ldiskfs_readlink(inode, buf->lb_buf, buf->lb_len);
2520         else
2521                 rc = osd_ldiskfs_read(inode, buf->lb_buf, buf->lb_len, pos);
2522
2523         return rc;
2524 }
2525
2526 static int osd_ldiskfs_writelink(struct inode *inode, char *buffer, int buflen)
2527 {
2528
2529         memcpy((char*)&LDISKFS_I(inode)->i_data, (char *)buffer,
2530                buflen);
2531         LDISKFS_I(inode)->i_disksize = buflen;
2532         i_size_write(inode, buflen);
2533         inode->i_sb->s_op->dirty_inode(inode);
2534
2535         return 0;
2536 }
2537
2538 static int osd_ldiskfs_write_record(struct inode *inode, void *buf, int bufsize,
2539                                     loff_t *offs, handle_t *handle)
2540 {
2541         struct buffer_head *bh = NULL;
2542         loff_t offset = *offs;
2543         loff_t new_size = i_size_read(inode);
2544         unsigned long block;
2545         int blocksize = 1 << inode->i_blkbits;
2546         int err = 0;
2547         int size;
2548         int boffs;
2549         int dirty_inode = 0;
2550
2551         while (bufsize > 0) {
2552                 if (bh != NULL)
2553                         brelse(bh);
2554
2555                 block = offset >> inode->i_blkbits;
2556                 boffs = offset & (blocksize - 1);
2557                 size = min(blocksize - boffs, bufsize);
2558                 bh = ldiskfs_bread(handle, inode, block, 1, &err);
2559                 if (!bh) {
2560                         CERROR("can't read/create block: %d\n", err);
2561                         break;
2562                 }
2563
2564                 err = ldiskfs_journal_get_write_access(handle, bh);
2565                 if (err) {
2566                         CERROR("journal_get_write_access() returned error %d\n",
2567                                err);
2568                         break;
2569                 }
2570                 LASSERTF(boffs + size <= bh->b_size,
2571                          "boffs %d size %d bh->b_size %lu",
2572                          boffs, size, (unsigned long)bh->b_size);
2573                 memcpy(bh->b_data + boffs, buf, size);
2574                 err = ldiskfs_journal_dirty_metadata(handle, bh);
2575                 if (err)
2576                         break;
2577
2578                 if (offset + size > new_size)
2579                         new_size = offset + size;
2580                 offset += size;
2581                 bufsize -= size;
2582                 buf += size;
2583         }
2584         if (bh)
2585                 brelse(bh);
2586
2587         /* correct in-core and on-disk sizes */
2588         if (new_size > i_size_read(inode)) {
2589                 spin_lock(&inode->i_lock);
2590                 if (new_size > i_size_read(inode))
2591                         i_size_write(inode, new_size);
2592                 if (i_size_read(inode) > LDISKFS_I(inode)->i_disksize) {
2593                         LDISKFS_I(inode)->i_disksize = i_size_read(inode);
2594                         dirty_inode = 1;
2595                 }
2596                 spin_unlock(&inode->i_lock);
2597                 if (dirty_inode)
2598                         inode->i_sb->s_op->dirty_inode(inode);
2599         }
2600
2601         if (err == 0)
2602                 *offs = offset;
2603         return err;
2604 }
2605
2606 static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
2607                          const struct lu_buf *buf, loff_t *pos,
2608                          struct thandle *handle, struct lustre_capa *capa,
2609                          int ignore_quota)
2610 {
2611         struct osd_object  *obj   = osd_dt_obj(dt);
2612         struct inode       *inode = obj->oo_inode;
2613         struct osd_thandle *oh;
2614         ssize_t            result = 0;
2615 #ifdef HAVE_QUOTA_SUPPORT
2616         cfs_cap_t           save = current->cap_effective;
2617 #endif
2618
2619         LASSERT(handle != NULL);
2620
2621         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_WRITE))
2622                 RETURN(-EACCES);
2623
2624         oh = container_of(handle, struct osd_thandle, ot_super);
2625         LASSERT(oh->ot_handle->h_transaction != NULL);
2626 #ifdef HAVE_QUOTA_SUPPORT
2627         if (ignore_quota)
2628                 current->cap_effective |= CFS_CAP_SYS_RESOURCE_MASK;
2629         else
2630                 current->cap_effective &= ~CFS_CAP_SYS_RESOURCE_MASK;
2631 #endif
2632         /* Write small symlink to inode body as we need to maintain correct
2633          * on-disk symlinks for ldiskfs.
2634          */
2635         if(S_ISLNK(obj->oo_dt.do_lu.lo_header->loh_attr) &&
2636            (buf->lb_len < sizeof (LDISKFS_I(inode)->i_data)))
2637                 result = osd_ldiskfs_writelink(inode, buf->lb_buf, buf->lb_len);
2638         else
2639                 result = osd_ldiskfs_write_record(inode, buf->lb_buf,
2640                                                   buf->lb_len, pos,
2641                                                   oh->ot_handle);
2642 #ifdef HAVE_QUOTA_SUPPORT
2643         current->cap_effective = save;
2644 #endif
2645         if (result == 0)
2646                 result = buf->lb_len;
2647         return result;
2648 }
2649
2650 static const struct dt_body_operations osd_body_ops = {
2651         .dbo_read  = osd_read,
2652         .dbo_write = osd_write
2653 };
2654
2655
2656 /**
2657  *      delete a (key, value) pair from index \a dt specified by \a key
2658  *
2659  *      \param  dt      osd index object
2660  *      \param  key     key for index
2661  *      \param  rec     record reference
2662  *      \param  handle  transaction handler
2663  *
2664  *      \retval  0  success
2665  *      \retval -ve   failure
2666  */
2667
2668 static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt,
2669                                 const struct dt_key *key, struct thandle *handle,
2670                                 struct lustre_capa *capa)
2671 {
2672         struct osd_object     *obj = osd_dt_obj(dt);
2673         struct osd_thandle    *oh;
2674         struct iam_path_descr *ipd;
2675         struct iam_container  *bag = &obj->oo_dir->od_container;
2676         int rc;
2677
2678         ENTRY;
2679
2680         LINVRNT(osd_invariant(obj));
2681         LASSERT(dt_object_exists(dt));
2682         LASSERT(bag->ic_object == obj->oo_inode);
2683         LASSERT(handle != NULL);
2684
2685         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
2686                 RETURN(-EACCES);
2687
2688         ipd = osd_idx_ipd_get(env, bag);
2689         if (unlikely(ipd == NULL))
2690                 RETURN(-ENOMEM);
2691
2692         oh = container_of0(handle, struct osd_thandle, ot_super);
2693         LASSERT(oh->ot_handle != NULL);
2694         LASSERT(oh->ot_handle->h_transaction != NULL);
2695
2696         rc = iam_delete(oh->ot_handle, bag, (const struct iam_key *)key, ipd);
2697         osd_ipd_put(env, bag, ipd);
2698         LINVRNT(osd_invariant(obj));
2699         RETURN(rc);
2700 }
2701
2702 static inline int osd_get_fid_from_dentry(struct ldiskfs_dir_entry_2 *de,
2703                                           struct dt_rec *fid)
2704 {
2705         struct osd_fid_pack *rec;
2706         int rc = -ENODATA;
2707
2708         if (de->file_type & LDISKFS_DIRENT_LUFID) {
2709                 rec = (struct osd_fid_pack *) (de->name + de->name_len + 1);
2710                 rc = osd_fid_unpack((struct lu_fid *)fid, rec);
2711         }
2712         RETURN(rc);
2713 }
2714
2715 /**
2716  * Index delete function for interoperability mode (b11826).
2717  * It will remove the directory entry added by osd_index_ea_insert().
2718  * This entry is needed to maintain name->fid mapping.
2719  *
2720  * \param key,  key i.e. file entry to be deleted
2721  *
2722  * \retval   0, on success
2723  * \retval -ve, on error
2724  */
2725 static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
2726                                const struct dt_key *key, struct thandle *handle,
2727                                struct lustre_capa *capa)
2728 {
2729         struct osd_object          *obj    = osd_dt_obj(dt);
2730         struct inode               *dir    = obj->oo_inode;
2731         struct dentry              *dentry;
2732         struct osd_thandle         *oh;
2733         struct ldiskfs_dir_entry_2 *de;
2734         struct buffer_head         *bh;
2735
2736         int rc;
2737
2738         ENTRY;
2739
2740         LINVRNT(osd_invariant(obj));
2741         LASSERT(dt_object_exists(dt));
2742         LASSERT(handle != NULL);
2743
2744         oh = container_of(handle, struct osd_thandle, ot_super);
2745         LASSERT(oh->ot_handle != NULL);
2746         LASSERT(oh->ot_handle->h_transaction != NULL);
2747
2748         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
2749                 RETURN(-EACCES);
2750
2751         dentry = osd_child_dentry_get(env, obj,
2752                                       (char *)key, strlen((char *)key));
2753
2754         cfs_down_write(&obj->oo_ext_idx_sem);
2755         bh = ll_ldiskfs_find_entry(dir, dentry, &de);
2756         if (bh) {
2757                 struct osd_thread_info *oti = osd_oti_get(env);
2758                 struct timespec *ctime = &oti->oti_time;
2759                 struct timespec *mtime = &oti->oti_time2;
2760
2761                 *ctime = dir->i_ctime;
2762                 *mtime = dir->i_mtime;
2763                 rc = ldiskfs_delete_entry(oh->ot_handle,
2764                                 dir, de, bh);
2765                 /* xtime should not be updated with server-side time. */
2766                 cfs_spin_lock(&obj->oo_guard);
2767                 dir->i_ctime = *ctime;
2768                 dir->i_mtime = *mtime;
2769                 cfs_spin_unlock(&obj->oo_guard);
2770                 mark_inode_dirty(dir);
2771                 brelse(bh);
2772         } else
2773                 rc = -ENOENT;
2774
2775         cfs_up_write(&obj->oo_ext_idx_sem);
2776         LASSERT(osd_invariant(obj));
2777         RETURN(rc);
2778 }
2779
2780 /**
2781  *      Lookup index for \a key and copy record to \a rec.
2782  *
2783  *      \param  dt      osd index object
2784  *      \param  key     key for index
2785  *      \param  rec     record reference
2786  *
2787  *      \retval  +ve  success : exact mach
2788  *      \retval  0    return record with key not greater than \a key
2789  *      \retval -ve   failure
2790  */
2791 static int osd_index_iam_lookup(const struct lu_env *env, struct dt_object *dt,
2792                                 struct dt_rec *rec, const struct dt_key *key,
2793                                 struct lustre_capa *capa)
2794 {
2795         struct osd_object     *obj = osd_dt_obj(dt);
2796         struct iam_path_descr *ipd;
2797         struct iam_container  *bag = &obj->oo_dir->od_container;
2798         struct osd_thread_info *oti = osd_oti_get(env);
2799         struct iam_iterator    *it = &oti->oti_idx_it;
2800         struct iam_rec *iam_rec;
2801         int rc;
2802         ENTRY;
2803
2804         LASSERT(osd_invariant(obj));
2805         LASSERT(dt_object_exists(dt));
2806         LASSERT(bag->ic_object == obj->oo_inode);
2807
2808         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
2809                 RETURN(-EACCES);
2810
2811         ipd = osd_idx_ipd_get(env, bag);
2812         if (IS_ERR(ipd))
2813                 RETURN(-ENOMEM);
2814
2815         /* got ipd now we can start iterator. */
2816         iam_it_init(it, bag, 0, ipd);
2817
2818         rc = iam_it_get(it, (struct iam_key *)key);
2819         if (rc >= 0) {
2820                 if (S_ISDIR(obj->oo_inode->i_mode))
2821                         iam_rec = (struct iam_rec *)oti->oti_ldp;
2822                 else
2823                         iam_rec = (struct iam_rec *) rec;
2824
2825                 iam_reccpy(&it->ii_path.ip_leaf, (struct iam_rec *)iam_rec);
2826                 if (S_ISDIR(obj->oo_inode->i_mode))
2827                         osd_fid_unpack((struct lu_fid *) rec,
2828                                        (struct osd_fid_pack *)iam_rec);
2829         }
2830         iam_it_put(it);
2831         iam_it_fini(it);
2832         osd_ipd_put(env, bag, ipd);
2833
2834         LINVRNT(osd_invariant(obj));
2835
2836         RETURN(rc);
2837 }
2838
2839 /**
2840  *      Inserts (key, value) pair in \a dt index object.
2841  *
2842  *      \param  dt      osd index object
2843  *      \param  key     key for index
2844  *      \param  rec     record reference
2845  *      \param  th      transaction handler
2846  *
2847  *      \retval  0  success
2848  *      \retval -ve failure
2849  */
2850 static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt,
2851                                 const struct dt_rec *rec, const struct dt_key *key,
2852                                 struct thandle *th, struct lustre_capa *capa,
2853                                 int ignore_quota)
2854 {
2855         struct osd_object     *obj = osd_dt_obj(dt);
2856         struct iam_path_descr *ipd;
2857         struct osd_thandle    *oh;
2858         struct iam_container  *bag = &obj->oo_dir->od_container;
2859 #ifdef HAVE_QUOTA_SUPPORT
2860         cfs_cap_t              save = current->cap_effective;
2861 #endif
2862         struct osd_thread_info *oti = osd_oti_get(env);
2863         struct iam_rec *iam_rec = (struct iam_rec *)oti->oti_ldp;
2864         int rc;
2865
2866         ENTRY;
2867
2868         LINVRNT(osd_invariant(obj));
2869         LASSERT(dt_object_exists(dt));
2870         LASSERT(bag->ic_object == obj->oo_inode);
2871         LASSERT(th != NULL);
2872
2873         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
2874                 return -EACCES;
2875
2876         ipd = osd_idx_ipd_get(env, bag);
2877         if (unlikely(ipd == NULL))
2878                 RETURN(-ENOMEM);
2879
2880         oh = container_of0(th, struct osd_thandle, ot_super);
2881         LASSERT(oh->ot_handle != NULL);
2882         LASSERT(oh->ot_handle->h_transaction != NULL);
2883 #ifdef HAVE_QUOTA_SUPPORT
2884         if (ignore_quota)
2885                 current->cap_effective |= CFS_CAP_SYS_RESOURCE_MASK;
2886         else
2887                 current->cap_effective &= ~CFS_CAP_SYS_RESOURCE_MASK;
2888 #endif
2889         if (S_ISDIR(obj->oo_inode->i_mode))
2890                 osd_fid_pack((struct osd_fid_pack *)iam_rec, rec, &oti->oti_fid);
2891         else
2892                 iam_rec = (struct iam_rec *) rec;
2893         rc = iam_insert(oh->ot_handle, bag, (const struct iam_key *)key,
2894                         iam_rec, ipd);
2895 #ifdef HAVE_QUOTA_SUPPORT
2896         current->cap_effective = save;
2897 #endif
2898         osd_ipd_put(env, bag, ipd);
2899         LINVRNT(osd_invariant(obj));
2900         RETURN(rc);
2901 }
2902
2903 /**
2904  * Calls ldiskfs_add_entry() to add directory entry
2905  * into the directory. This is required for
2906  * interoperability mode (b11826)
2907  *
2908  * \retval   0, on success
2909  * \retval -ve, on error
2910  */
2911 static int __osd_ea_add_rec(struct osd_thread_info *info,
2912                             struct osd_object *pobj,
2913                             struct inode  *cinode,
2914                             const char *name,
2915                             const struct dt_rec *fid,
2916                             struct thandle *th)
2917 {
2918         struct ldiskfs_dentry_param *ldp;
2919         struct dentry      *child;
2920         struct osd_thandle *oth;
2921         int rc;
2922
2923         oth = container_of(th, struct osd_thandle, ot_super);
2924         LASSERT(oth->ot_handle != NULL);
2925         LASSERT(oth->ot_handle->h_transaction != NULL);
2926
2927         child = osd_child_dentry_get(info->oti_env, pobj, name, strlen(name));
2928
2929         if (fid_is_igif((struct lu_fid *)fid) ||
2930             fid_seq((struct lu_fid *)fid) >= FID_SEQ_NORMAL) {
2931                 ldp = (struct ldiskfs_dentry_param *)info->oti_ldp;
2932                 osd_get_ldiskfs_dirent_param(ldp, fid);
2933                 child->d_fsdata = (void*) ldp;
2934         } else
2935                 child->d_fsdata = NULL;
2936         rc = ldiskfs_add_entry(oth->ot_handle, child, cinode);
2937
2938         RETURN(rc);
2939 }
2940
2941 /**
2942  * Calls ldiskfs_add_dot_dotdot() to add dot and dotdot entries
2943  * into the directory.Also sets flags into osd object to
2944  * indicate dot and dotdot are created. This is required for
2945  * interoperability mode (b11826)
2946  *
2947  * \param dir   directory for dot and dotdot fixup.
2948  * \param obj   child object for linking
2949  *
2950  * \retval   0, on success
2951  * \retval -ve, on error
2952  */
2953 static int osd_add_dot_dotdot(struct osd_thread_info *info,
2954                               struct osd_object *dir,
2955                               struct inode  *parent_dir, const char *name,
2956                               const struct dt_rec *dot_fid,
2957                               const struct dt_rec *dot_dot_fid,
2958                               struct thandle *th)
2959 {
2960         struct inode            *inode  = dir->oo_inode;
2961         struct ldiskfs_dentry_param *dot_ldp;
2962         struct ldiskfs_dentry_param *dot_dot_ldp;
2963         struct osd_thandle      *oth;
2964         int result = 0;
2965
2966         oth = container_of(th, struct osd_thandle, ot_super);
2967         LASSERT(oth->ot_handle->h_transaction != NULL);
2968         LASSERT(S_ISDIR(dir->oo_inode->i_mode));
2969
2970         if (strcmp(name, dot) == 0) {
2971                 if (dir->oo_compat_dot_created) {
2972                         result = -EEXIST;
2973                 } else {
2974                         LASSERT(inode == parent_dir);
2975                         dir->oo_compat_dot_created = 1;
2976                         result = 0;
2977                 }
2978         } else if(strcmp(name, dotdot) == 0) {
2979                 dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp;
2980                 dot_dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp2;
2981
2982                 if (!dir->oo_compat_dot_created)
2983                         return -EINVAL;
2984                 if (fid_seq((struct lu_fid *)dot_fid) >= FID_SEQ_NORMAL) {
2985                         osd_get_ldiskfs_dirent_param(dot_ldp, dot_fid);
2986                         osd_get_ldiskfs_dirent_param(dot_dot_ldp, dot_dot_fid);
2987                 } else {
2988                         dot_ldp = NULL;
2989                         dot_dot_ldp = NULL;
2990                 }
2991                 /* in case of rename, dotdot is already created */
2992                 if (dir->oo_compat_dotdot_created) {
2993                         return __osd_ea_add_rec(info, dir, parent_dir, name,
2994                                                 dot_dot_fid, th);
2995                 }
2996
2997                 result = ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir, inode,
2998                                                 dot_ldp, dot_dot_ldp);
2999                 if (result == 0)
3000                        dir->oo_compat_dotdot_created = 1;
3001         }
3002
3003         return result;
3004 }
3005
3006
3007 /**
3008  * It will call the appropriate osd_add* function and return the
3009  * value, return by respective functions.
3010  */
3011 static int osd_ea_add_rec(const struct lu_env *env,
3012                           struct osd_object *pobj,
3013                           struct inode *cinode,
3014                           const char *name,
3015                           const struct dt_rec *fid,
3016                           struct thandle *th)
3017 {
3018         struct osd_thread_info    *info   = osd_oti_get(env);
3019         int rc;
3020
3021         if (name[0] == '.' && (name[1] == '\0' || (name[1] == '.' &&
3022                                                    name[2] =='\0')))
3023                 rc = osd_add_dot_dotdot(info, pobj, cinode, name,
3024                      (struct dt_rec *)lu_object_fid(&pobj->oo_dt.do_lu),
3025                                         fid, th);
3026         else
3027                 rc = __osd_ea_add_rec(info, pobj, cinode, name, fid, th);
3028
3029         return rc;
3030 }
3031
3032 /**
3033  * Calls ->lookup() to find dentry. From dentry get inode and
3034  * read inode's ea to get fid. This is required for  interoperability
3035  * mode (b11826)
3036  *
3037  * \retval   0, on success
3038  * \retval -ve, on error
3039  */
3040 static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
3041                              struct dt_rec *rec, const struct dt_key *key)
3042 {
3043         struct inode               *dir    = obj->oo_inode;
3044         struct dentry              *dentry;
3045         struct ldiskfs_dir_entry_2 *de;
3046         struct buffer_head         *bh;
3047         struct lu_fid              *fid = (struct lu_fid *) rec;
3048         int ino;
3049         int rc;
3050
3051         LASSERT(dir->i_op != NULL && dir->i_op->lookup != NULL);
3052
3053         dentry = osd_child_dentry_get(env, obj,
3054                                       (char *)key, strlen((char *)key));
3055
3056         cfs_down_read(&obj->oo_ext_idx_sem);
3057         bh = ll_ldiskfs_find_entry(dir, dentry, &de);
3058         if (bh) {
3059                 ino = le32_to_cpu(de->inode);
3060                 rc = osd_get_fid_from_dentry(de, rec);
3061
3062                 /* done with de, release bh */
3063                 brelse(bh);
3064                 if (rc != 0)
3065                         rc = osd_ea_fid_get(env, obj, ino, fid);
3066         } else
3067                 rc = -ENOENT;
3068
3069         cfs_up_read(&obj->oo_ext_idx_sem);
3070         RETURN (rc);
3071 }
3072
3073 /**
3074  * Find the osd object for given fid.
3075  *
3076  * \param fid need to find the osd object having this fid
3077  *
3078  * \retval osd_object on success
3079  * \retval        -ve on error
3080  */
3081 struct osd_object *osd_object_find(const struct lu_env *env,
3082                                    struct dt_object *dt,
3083                                    const struct lu_fid *fid)
3084 {
3085         struct lu_device         *ludev = dt->do_lu.lo_dev;
3086         struct osd_object        *child = NULL;
3087         struct lu_object         *luch;
3088         struct lu_object         *lo;
3089
3090         luch = lu_object_find(env, ludev, fid, NULL);
3091         if (!IS_ERR(luch)) {
3092                 if (lu_object_exists(luch)) {
3093                         lo = lu_object_locate(luch->lo_header, ludev->ld_type);
3094                         if (lo != NULL)
3095                                 child = osd_obj(lo);
3096                         else
3097                                 LU_OBJECT_DEBUG(D_ERROR, env, luch,
3098                                                 "lu_object can't be located"
3099                                                 ""DFID"\n", PFID(fid));
3100
3101                         if (child == NULL) {
3102                                 lu_object_put(env, luch);
3103                                 CERROR("Unable to get osd_object\n");
3104                                 child = ERR_PTR(-ENOENT);
3105                         }
3106                 } else {
3107                         LU_OBJECT_DEBUG(D_ERROR, env, luch,
3108                                         "lu_object does not exists "DFID"\n",
3109                                         PFID(fid));
3110                         child = ERR_PTR(-ENOENT);
3111                 }
3112         } else
3113                 child = (void *)luch;
3114
3115         return child;
3116 }
3117
3118 /**
3119  * Put the osd object once done with it.
3120  *
3121  * \param obj osd object that needs to be put
3122  */
3123 static inline void osd_object_put(const struct lu_env *env,
3124                                   struct osd_object *obj)
3125 {
3126         lu_object_put(env, &obj->oo_dt.do_lu);
3127 }
3128
3129 /**
3130  * Index add function for interoperability mode (b11826).
3131  * It will add the directory entry.This entry is needed to
3132  * maintain name->fid mapping.
3133  *
3134  * \param key it is key i.e. file entry to be inserted
3135  * \param rec it is value of given key i.e. fid
3136  *
3137  * \retval   0, on success
3138  * \retval -ve, on error
3139  */
3140 static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
3141                                const struct dt_rec *rec,
3142                                const struct dt_key *key, struct thandle *th,
3143                                struct lustre_capa *capa, int ignore_quota)
3144 {
3145         struct osd_object        *obj   = osd_dt_obj(dt);
3146         struct lu_fid            *fid   = (struct lu_fid *) rec;
3147         const char               *name  = (const char *)key;
3148         struct osd_object        *child;
3149 #ifdef HAVE_QUOTA_SUPPORT
3150         cfs_cap_t                 save  = current->cap_effective;
3151 #endif
3152         int rc;
3153
3154         ENTRY;
3155
3156         LASSERT(osd_invariant(obj));
3157         LASSERT(dt_object_exists(dt));
3158         LASSERT(th != NULL);
3159
3160         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
3161                 RETURN(-EACCES);
3162
3163         child = osd_object_find(env, dt, fid);
3164         if (!IS_ERR(child)) {
3165                 struct inode *inode = obj->oo_inode;
3166                 struct osd_thread_info *oti = osd_oti_get(env);
3167                 struct timespec *ctime = &oti->oti_time;
3168                 struct timespec *mtime = &oti->oti_time2;
3169
3170                 *ctime = inode->i_ctime;
3171                 *mtime = inode->i_mtime;
3172 #ifdef HAVE_QUOTA_SUPPORT
3173                 if (ignore_quota)
3174                         current->cap_effective |= CFS_CAP_SYS_RESOURCE_MASK;
3175                 else
3176                         current->cap_effective &= ~CFS_CAP_SYS_RESOURCE_MASK;
3177 #endif
3178                 cfs_down_write(&obj->oo_ext_idx_sem);
3179                 rc = osd_ea_add_rec(env, obj, child->oo_inode, name, rec, th);
3180                 cfs_up_write(&obj->oo_ext_idx_sem);
3181 #ifdef HAVE_QUOTA_SUPPORT
3182                 current->cap_effective = save;
3183 #endif
3184                 osd_object_put(env, child);
3185                 /* xtime should not be updated with server-side time. */
3186                 cfs_spin_lock(&obj->oo_guard);
3187                 inode->i_ctime = *ctime;
3188                 inode->i_mtime = *mtime;
3189                 cfs_spin_unlock(&obj->oo_guard);
3190                 mark_inode_dirty(inode);
3191         } else {
3192                 rc = PTR_ERR(child);
3193         }
3194
3195         LASSERT(osd_invariant(obj));
3196         RETURN(rc);
3197 }
3198
3199 /**
3200  *  Initialize osd Iterator for given osd index object.
3201  *
3202  *  \param  dt      osd index object
3203  */
3204
3205 static struct dt_it *osd_it_iam_init(const struct lu_env *env,
3206                                  struct dt_object *dt,
3207                                  struct lustre_capa *capa)
3208 {
3209         struct osd_it_iam         *it;
3210         struct osd_thread_info *oti = osd_oti_get(env);
3211         struct osd_object     *obj = osd_dt_obj(dt);
3212         struct lu_object      *lo  = &dt->do_lu;
3213         struct iam_path_descr *ipd;
3214         struct iam_container  *bag = &obj->oo_dir->od_container;
3215
3216         LASSERT(lu_object_exists(lo));
3217
3218         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
3219                 return ERR_PTR(-EACCES);
3220
3221         it = &oti->oti_it;
3222         ipd = osd_it_ipd_get(env, bag);
3223         if (likely(ipd != NULL)) {
3224                 it->oi_obj = obj;
3225                 it->oi_ipd = ipd;
3226                 lu_object_get(lo);
3227                 iam_it_init(&it->oi_it, bag, IAM_IT_MOVE, ipd);
3228                 return (struct dt_it *)it;
3229         }
3230         return ERR_PTR(-ENOMEM);
3231 }
3232
3233 /**
3234  * free given Iterator.
3235  */
3236
3237 static void osd_it_iam_fini(const struct lu_env *env, struct dt_it *di)
3238 {
3239         struct osd_it_iam     *it = (struct osd_it_iam *)di;
3240         struct osd_object *obj = it->oi_obj;
3241
3242         iam_it_fini(&it->oi_it);
3243         osd_ipd_put(env, &obj->oo_dir->od_container, it->oi_ipd);
3244         lu_object_put(env, &obj->oo_dt.do_lu);
3245 }
3246
3247 /**
3248  *  Move Iterator to record specified by \a key
3249  *
3250  *  \param  di      osd iterator
3251  *  \param  key     key for index
3252  *
3253  *  \retval +ve  di points to record with least key not larger than key
3254  *  \retval  0   di points to exact matched key
3255  *  \retval -ve  failure
3256  */
3257
3258 static int osd_it_iam_get(const struct lu_env *env,
3259                       struct dt_it *di, const struct dt_key *key)
3260 {
3261         struct osd_it_iam *it = (struct osd_it_iam *)di;
3262
3263         return iam_it_get(&it->oi_it, (const struct iam_key *)key);
3264 }
3265
3266 /**
3267  *  Release Iterator
3268  *
3269  *  \param  di      osd iterator
3270  */
3271
3272 static void osd_it_iam_put(const struct lu_env *env, struct dt_it *di)
3273 {
3274         struct osd_it_iam *it = (struct osd_it_iam *)di;
3275
3276         iam_it_put(&it->oi_it);
3277 }
3278
3279 /**
3280  *  Move iterator by one record
3281  *
3282  *  \param  di      osd iterator
3283  *
3284  *  \retval +1   end of container reached
3285  *  \retval  0   success
3286  *  \retval -ve  failure
3287  */
3288
3289 static int osd_it_iam_next(const struct lu_env *env, struct dt_it *di)
3290 {
3291         struct osd_it_iam *it = (struct osd_it_iam *)di;
3292
3293         return iam_it_next(&it->oi_it);
3294 }
3295