Whamcloud - gitweb
LU-957 scrub: Ancillary work for LFSCK/OI scrub
[fs/lustre-release.git] / lustre / osd-ldiskfs / osd_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/osd/osd_handler.c
37  *
38  * Top-level entry points into osd module
39  *
40  * Author: Nikita Danilov <nikita@clusterfs.com>
41  *         Pravin Shelar <pravin.shelar@sun.com> : Added fid in dirent
42  */
43
44 #ifndef EXPORT_SYMTAB
45 # define EXPORT_SYMTAB
46 #endif
47 #define DEBUG_SUBSYSTEM S_MDS
48
49 #include <linux/module.h>
50
51 /* LUSTRE_VERSION_CODE */
52 #include <lustre_ver.h>
53 /* prerequisite for linux/xattr.h */
54 #include <linux/types.h>
55 /* prerequisite for linux/xattr.h */
56 #include <linux/fs.h>
57 /* XATTR_{REPLACE,CREATE} */
58 #include <linux/xattr.h>
59 /* simple_mkdir() */
60 #include <lvfs.h>
61
62 /*
63  * struct OBD_{ALLOC,FREE}*()
64  * OBD_FAIL_CHECK
65  */
66 #include <obd_support.h>
67 /* struct ptlrpc_thread */
68 #include <lustre_net.h>
69
70 /* fid_is_local() */
71 #include <lustre_fid.h>
72
73 #include "osd_internal.h"
74 #include "osd_igif.h"
75
76 /* llo_* api support */
77 #include <md_object.h>
78
79 #ifdef HAVE_LDISKFS_PDO
80 int ldiskfs_pdo = 1;
81 CFS_MODULE_PARM(ldiskfs_pdo, "i", int, 0644,
82                 "ldiskfs with parallel directory operations");
83 #else
84 int ldiskfs_pdo = 0;
85 #endif
86
87 static const char dot[] = ".";
88 static const char dotdot[] = "..";
89 static const char remote_obj_dir[] = "REM_OBJ_DIR";
90
91 static const struct lu_object_operations      osd_lu_obj_ops;
92 static const struct dt_object_operations      osd_obj_ops;
93 static const struct dt_object_operations      osd_obj_ea_ops;
94 static const struct dt_index_operations       osd_index_iam_ops;
95 static const struct dt_index_operations       osd_index_ea_ops;
96
97 static int osd_has_index(const struct osd_object *obj)
98 {
99         return obj->oo_dt.do_index_ops != NULL;
100 }
101
102 static int osd_object_invariant(const struct lu_object *l)
103 {
104         return osd_invariant(osd_obj(l));
105 }
106
107 #ifdef HAVE_QUOTA_SUPPORT
108 static inline void
109 osd_push_ctxt(const struct lu_env *env, struct osd_ctxt *save)
110 {
111         struct md_ucred *uc = md_ucred(env);
112         struct cred     *tc;
113
114         LASSERT(uc != NULL);
115
116         save->oc_uid = current_fsuid();
117         save->oc_gid = current_fsgid();
118         save->oc_cap = current_cap();
119         if ((tc = prepare_creds())) {
120                 tc->fsuid         = uc->mu_fsuid;
121                 tc->fsgid         = uc->mu_fsgid;
122                 commit_creds(tc);
123         }
124         /* XXX not suboptimal */
125         cfs_curproc_cap_unpack(uc->mu_cap);
126 }
127
128 static inline void
129 osd_pop_ctxt(struct osd_ctxt *save)
130 {
131         struct cred *tc;
132
133         if ((tc = prepare_creds())) {
134                 tc->fsuid         = save->oc_uid;
135                 tc->fsgid         = save->oc_gid;
136                 tc->cap_effective = save->oc_cap;
137                 commit_creds(tc);
138         }
139 }
140 #endif
141
142 /*
143  * Concurrency: doesn't matter
144  */
145 static int osd_read_locked(const struct lu_env *env, struct osd_object *o)
146 {
147         return osd_oti_get(env)->oti_r_locks > 0;
148 }
149
150 /*
151  * Concurrency: doesn't matter
152  */
153 static int osd_write_locked(const struct lu_env *env, struct osd_object *o)
154 {
155         struct osd_thread_info *oti = osd_oti_get(env);
156         return oti->oti_w_locks > 0 && o->oo_owner == env;
157 }
158
159 /*
160  * Concurrency: doesn't access mutable data
161  */
162 static int osd_root_get(const struct lu_env *env,
163                         struct dt_device *dev, struct lu_fid *f)
164 {
165         lu_local_obj_fid(f, OSD_FS_ROOT_OID);
166         return 0;
167 }
168
169 static inline int osd_qid_type(struct osd_thandle *oh, int i)
170 {
171         return (oh->ot_id_type & (1 << i)) ? GRPQUOTA : USRQUOTA;
172 }
173
174 static inline void osd_qid_set_type(struct osd_thandle *oh, int i, int type)
175 {
176         oh->ot_id_type |= ((type == GRPQUOTA) ? (1 << i) : 0);
177 }
178
179 void osd_declare_qid(struct dt_object *dt, struct osd_thandle *oh,
180                      int type, uid_t id, struct inode *inode)
181 {
182 #ifdef CONFIG_QUOTA
183         int i, allocated = 0;
184         struct osd_object *obj;
185
186         LASSERT(dt != NULL);
187         LASSERT(oh != NULL);
188         LASSERTF(oh->ot_id_cnt <= OSD_MAX_UGID_CNT, "count=%u",
189                  oh->ot_id_cnt);
190
191         /* id entry is allocated in the quota file */
192         if (inode && inode->i_dquot[type] && inode->i_dquot[type]->dq_off)
193                 allocated = 1;
194
195         for (i = 0; i < oh->ot_id_cnt; i++) {
196                 if (oh->ot_id_array[i] == id && osd_qid_type(oh, i) == type)
197                         return;
198         }
199
200         if (unlikely(i >= OSD_MAX_UGID_CNT)) {
201                 CERROR("more than %d uid/gids for a transaction?\n", i);
202                 return;
203         }
204
205         oh->ot_id_array[i] = id;
206         osd_qid_set_type(oh, i, type);
207         oh->ot_id_cnt++;
208         obj = osd_dt_obj(dt);
209         oh->ot_credits += (allocated || id == 0) ?
210                 1 : LDISKFS_QUOTA_INIT_BLOCKS(osd_sb(osd_obj2dev(obj)));
211 #endif
212 }
213
214 /*
215  * OSD object methods.
216  */
217
218 /*
219  * Concurrency: no concurrent access is possible that early in object
220  * life-cycle.
221  */
222 static struct lu_object *osd_object_alloc(const struct lu_env *env,
223                                           const struct lu_object_header *hdr,
224                                           struct lu_device *d)
225 {
226         struct osd_object *mo;
227
228         OBD_ALLOC_PTR(mo);
229         if (mo != NULL) {
230                 struct lu_object *l;
231
232                 l = &mo->oo_dt.do_lu;
233                 dt_object_init(&mo->oo_dt, NULL, d);
234                 if (osd_dev(d)->od_iop_mode)
235                         mo->oo_dt.do_ops = &osd_obj_ea_ops;
236                 else
237                         mo->oo_dt.do_ops = &osd_obj_ops;
238
239                 l->lo_ops = &osd_lu_obj_ops;
240                 cfs_init_rwsem(&mo->oo_sem);
241                 cfs_init_rwsem(&mo->oo_ext_idx_sem);
242                 cfs_spin_lock_init(&mo->oo_guard);
243                 return l;
244         } else {
245                 return NULL;
246         }
247 }
248
249 /*
250  * retrieve object from backend ext fs.
251  **/
252 struct inode *osd_iget(struct osd_thread_info *info,
253                        struct osd_device *dev,
254                        const struct osd_inode_id *id)
255 {
256         struct inode *inode = NULL;
257
258         inode = ldiskfs_iget(osd_sb(dev), id->oii_ino);
259         if (IS_ERR(inode)) {
260                 CERROR("Cannot get inode, rc = %li\n", PTR_ERR(inode));
261         } else if (id->oii_gen != OSD_OII_NOGEN &&
262                    inode->i_generation != id->oii_gen) {
263                 iput(inode);
264                 inode = ERR_PTR(-ESTALE);
265         } else if (inode->i_nlink == 0) {
266                 /* due to parallel readdir and unlink,
267                 * we can have dead inode here. */
268                 CWARN("stale inode\n");
269                 make_bad_inode(inode);
270                 iput(inode);
271                 inode = ERR_PTR(-ESTALE);
272         } else if (is_bad_inode(inode)) {
273                 CERROR("bad inode %lx\n",inode->i_ino);
274                 iput(inode);
275                 inode = ERR_PTR(-ENOENT);
276         } else {
277                 /* Do not update file c/mtime in ldiskfs.
278                  * NB: we don't have any lock to protect this because we don't
279                  * have reference on osd_object now, but contention with
280                  * another lookup + attr_set can't happen in the tiny window
281                  * between if (...) and set S_NOCMTIME. */
282                 if (!(inode->i_flags & S_NOCMTIME))
283                         inode->i_flags |= S_NOCMTIME;
284         }
285         return inode;
286 }
287
288 static int osd_fid_lookup(const struct lu_env *env,
289                           struct osd_object *obj, const struct lu_fid *fid)
290 {
291         struct osd_thread_info *info;
292         struct lu_device       *ldev = obj->oo_dt.do_lu.lo_dev;
293         struct osd_device      *dev;
294         struct osd_inode_id    *id;
295         struct inode           *inode;
296         int                     result;
297
298         LINVRNT(osd_invariant(obj));
299         LASSERT(obj->oo_inode == NULL);
300         LASSERTF(fid_is_sane(fid) || fid_is_idif(fid), DFID, PFID(fid));
301         /*
302          * This assertion checks that osd layer sees only local
303          * fids. Unfortunately it is somewhat expensive (does a
304          * cache-lookup). Disabling it for production/acceptance-testing.
305          */
306         LASSERT(1 || fid_is_local(env, ldev->ld_site, fid));
307
308         ENTRY;
309
310         info = osd_oti_get(env);
311         LASSERT(info);
312         dev  = osd_dev(ldev);
313         id   = &info->oti_id;
314
315         if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT))
316                 RETURN(-ENOENT);
317
318         result = osd_oi_lookup(info, dev, fid, id);
319         if (result != 0) {
320                 if (result == -ENOENT)
321                         result = 0;
322                 GOTO(out, result);
323         }
324
325         inode = osd_iget(info, dev, id);
326         if (IS_ERR(inode)) {
327                 /*
328                  * If fid wasn't found in oi, inode-less object is
329                  * created, for which lu_object_exists() returns
330                  * false. This is used in a (frequent) case when
331                  * objects are created as locking anchors or
332                  * place holders for objects yet to be created.
333                  */
334                 result = PTR_ERR(inode);
335                 GOTO(out, result);
336         }
337
338         obj->oo_inode = inode;
339         LASSERT(obj->oo_inode->i_sb == osd_sb(dev));
340         if (dev->od_iop_mode) {
341                 obj->oo_compat_dot_created = 1;
342                 obj->oo_compat_dotdot_created = 1;
343         }
344
345         if (!S_ISDIR(inode->i_mode) || !ldiskfs_pdo) /* done */
346                 goto out;
347
348         LASSERT(obj->oo_hl_head == NULL);
349         obj->oo_hl_head = ldiskfs_htree_lock_head_alloc(HTREE_HBITS_DEF);
350         if (obj->oo_hl_head == NULL) {
351                 obj->oo_inode = NULL;
352                 iput(inode);
353                 result = -ENOMEM;
354         }
355 out:
356         LINVRNT(osd_invariant(obj));
357         RETURN(result);
358 }
359
360 /*
361  * Concurrency: shouldn't matter.
362  */
363 static void osd_object_init0(struct osd_object *obj)
364 {
365         LASSERT(obj->oo_inode != NULL);
366         obj->oo_dt.do_body_ops = &osd_body_ops;
367         obj->oo_dt.do_lu.lo_header->loh_attr |=
368                 (LOHA_EXISTS | (obj->oo_inode->i_mode & S_IFMT));
369 }
370
371 /*
372  * Concurrency: no concurrent access is possible that early in object
373  * life-cycle.
374  */
375 static int osd_object_init(const struct lu_env *env, struct lu_object *l,
376                            const struct lu_object_conf *unused)
377 {
378         struct osd_object *obj = osd_obj(l);
379         int result;
380
381         LINVRNT(osd_invariant(obj));
382
383         result = osd_fid_lookup(env, obj, lu_object_fid(l));
384         obj->oo_dt.do_body_ops = &osd_body_ops_new;
385         if (result == 0) {
386                 if (obj->oo_inode != NULL)
387                         osd_object_init0(obj);
388         }
389         LINVRNT(osd_invariant(obj));
390         return result;
391 }
392
393 /*
394  * Concurrency: no concurrent access is possible that late in object
395  * life-cycle.
396  */
397 static void osd_object_free(const struct lu_env *env, struct lu_object *l)
398 {
399         struct osd_object *obj = osd_obj(l);
400
401         LINVRNT(osd_invariant(obj));
402
403         dt_object_fini(&obj->oo_dt);
404         if (obj->oo_hl_head != NULL)
405                 ldiskfs_htree_lock_head_free(obj->oo_hl_head);
406         OBD_FREE_PTR(obj);
407 }
408
409 /*
410  * Concurrency: no concurrent access is possible that late in object
411  * life-cycle.
412  */
413 static void osd_index_fini(struct osd_object *o)
414 {
415         struct iam_container *bag;
416
417         if (o->oo_dir != NULL) {
418                 bag = &o->oo_dir->od_container;
419                 if (o->oo_inode != NULL) {
420                         if (bag->ic_object == o->oo_inode)
421                                 iam_container_fini(bag);
422                 }
423                 OBD_FREE_PTR(o->oo_dir);
424                 o->oo_dir = NULL;
425         }
426 }
427
428 /*
429  * Concurrency: no concurrent access is possible that late in object
430  * life-cycle (for all existing callers, that is. New callers have to provide
431  * their own locking.)
432  */
433 static int osd_inode_unlinked(const struct inode *inode)
434 {
435         return inode->i_nlink == 0;
436 }
437
438 enum {
439         OSD_TXN_OI_DELETE_CREDITS    = 20,
440         OSD_TXN_INODE_DELETE_CREDITS = 20
441 };
442
443 /*
444  * Journal
445  */
446
447 #if OSD_THANDLE_STATS
448 /**
449  * Set time when the handle is allocated
450  */
451 static void osd_th_alloced(struct osd_thandle *oth)
452 {
453         oth->oth_alloced = cfs_time_current();
454 }
455
456 /**
457  * Set time when the handle started
458  */
459 static void osd_th_started(struct osd_thandle *oth)
460 {
461         oth->oth_started = cfs_time_current();
462 }
463
464 /**
465  * Helper function to convert time interval to microseconds packed in
466  * long int (default time units for the counter in "stats" initialized
467  * by lu_time_init() )
468  */
469 static long interval_to_usec(cfs_time_t start, cfs_time_t end)
470 {
471         struct timeval val;
472
473         cfs_duration_usec(cfs_time_sub(end, start), &val);
474         return val.tv_sec * 1000000 + val.tv_usec;
475 }
476
477 /**
478  * Check whether the we deal with this handle for too long.
479  */
480 static void __osd_th_check_slow(void *oth, struct osd_device *dev,
481                                 cfs_time_t alloced, cfs_time_t started,
482                                 cfs_time_t closed)
483 {
484         cfs_time_t now = cfs_time_current();
485
486         LASSERT(dev != NULL);
487
488         lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_STARTING,
489                             interval_to_usec(alloced, started));
490         lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_OPEN,
491                             interval_to_usec(started, closed));
492         lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_CLOSING,
493                             interval_to_usec(closed, now));
494
495         if (cfs_time_before(cfs_time_add(alloced, cfs_time_seconds(30)), now)) {
496                 CWARN("transaction handle %p was open for too long: "
497                       "now "CFS_TIME_T" ,"
498                       "alloced "CFS_TIME_T" ,"
499                       "started "CFS_TIME_T" ,"
500                       "closed "CFS_TIME_T"\n",
501                       oth, now, alloced, started, closed);
502                 libcfs_debug_dumpstack(NULL);
503         }
504 }
505
506 #define OSD_CHECK_SLOW_TH(oth, dev, expr)                               \
507 {                                                                       \
508         cfs_time_t __closed = cfs_time_current();                       \
509         cfs_time_t __alloced = oth->oth_alloced;                        \
510         cfs_time_t __started = oth->oth_started;                        \
511                                                                         \
512         expr;                                                           \
513         __osd_th_check_slow(oth, dev, __alloced, __started, __closed);  \
514 }
515
516 #else /* OSD_THANDLE_STATS */
517
518 #define osd_th_alloced(h)                  do {} while(0)
519 #define osd_th_started(h)                  do {} while(0)
520 #define OSD_CHECK_SLOW_TH(oth, dev, expr)  expr
521
522 #endif /* OSD_THANDLE_STATS */
523
524 /*
525  * Concurrency: doesn't access mutable data.
526  */
527 static int osd_param_is_sane(const struct osd_device *dev,
528                              const struct thandle *th)
529 {
530         struct osd_thandle *oh;
531         oh = container_of0(th, struct osd_thandle, ot_super);
532         return oh->ot_credits <= osd_journal(dev)->j_max_transaction_buffers;
533 }
534
535 /*
536  * Concurrency: shouldn't matter.
537  */
538 #ifdef HAVE_LDISKFS_JOURNAL_CALLBACK_ADD
539 static void osd_trans_commit_cb(struct super_block *sb,
540                                 struct journal_callback *jcb, int error)
541 #else
542 static void osd_trans_commit_cb(struct journal_callback *jcb, int error)
543 #endif
544 {
545         struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb);
546         struct thandle     *th  = &oh->ot_super;
547         struct lu_device   *lud = &th->th_dev->dd_lu_dev;
548         struct dt_txn_commit_cb *dcb, *tmp;
549
550         LASSERT(oh->ot_handle == NULL);
551
552         if (error)
553                 CERROR("transaction @0x%p commit error: %d\n", th, error);
554
555         dt_txn_hook_commit(th);
556
557         /* call per-transaction callbacks if any */
558         cfs_list_for_each_entry_safe(dcb, tmp, &oh->ot_dcb_list, dcb_linkage) {
559                 cfs_list_del_init(&dcb->dcb_linkage);
560                 dcb->dcb_func(NULL, th, dcb, error);
561         }
562
563         lu_ref_del_at(&lud->ld_reference, oh->ot_dev_link, "osd-tx", th);
564         lu_device_put(lud);
565         th->th_dev = NULL;
566
567         lu_context_exit(&th->th_ctx);
568         lu_context_fini(&th->th_ctx);
569         OBD_FREE_PTR(oh);
570 }
571
572 static struct thandle *osd_trans_create(const struct lu_env *env,
573                                         struct dt_device *d)
574 {
575         struct osd_thread_info *oti = osd_oti_get(env);
576         struct osd_iobuf       *iobuf = &oti->oti_iobuf;
577         struct osd_thandle     *oh;
578         struct thandle         *th;
579         ENTRY;
580
581         /* on pending IO in this thread should left from prev. request */
582         LASSERT(cfs_atomic_read(&iobuf->dr_numreqs) == 0);
583
584         th = ERR_PTR(-ENOMEM);
585         OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO);
586         if (oh != NULL) {
587                 th = &oh->ot_super;
588                 th->th_dev = d;
589                 th->th_result = 0;
590                 th->th_tags = LCT_TX_HANDLE;
591                 oh->ot_credits = 0;
592                 oti->oti_dev = osd_dt_dev(d);
593                 CFS_INIT_LIST_HEAD(&oh->ot_dcb_list);
594                 osd_th_alloced(oh);
595         }
596         RETURN(th);
597 }
598
599 /*
600  * Concurrency: shouldn't matter.
601  */
602 int osd_trans_start(const struct lu_env *env, struct dt_device *d,
603                     struct thandle *th)
604 {
605         struct osd_thread_info *oti = osd_oti_get(env);
606         struct osd_device  *dev = osd_dt_dev(d);
607         handle_t           *jh;
608         struct osd_thandle *oh;
609         int rc;
610
611         ENTRY;
612
613         LASSERT(current->journal_info == NULL);
614
615         oh = container_of0(th, struct osd_thandle, ot_super);
616         LASSERT(oh != NULL);
617         LASSERT(oh->ot_handle == NULL);
618
619         rc = dt_txn_hook_start(env, d, th);
620         if (rc != 0)
621                 GOTO(out, rc);
622
623         if (!osd_param_is_sane(dev, th)) {
624                 CWARN("%s: too many transaction credits (%d > %d)\n",
625                       d->dd_lu_dev.ld_obd->obd_name, oh->ot_credits,
626                       osd_journal(dev)->j_max_transaction_buffers);
627                 /* XXX Limit the credits to 'max_transaction_buffers', and
628                  *     let the underlying filesystem to catch the error if
629                  *     we really need so many credits.
630                  *
631                  *     This should be removed when we can calculate the
632                  *     credits precisely. */
633                 oh->ot_credits = osd_journal(dev)->j_max_transaction_buffers;
634 #ifdef OSD_TRACK_DECLARES
635                 CERROR("  attr_set: %d, punch: %d, xattr_set: %d,\n",
636                        oh->ot_declare_attr_set, oh->ot_declare_punch,
637                        oh->ot_declare_xattr_set);
638                 CERROR("  create: %d, ref_add: %d, ref_del: %d, write: %d\n",
639                        oh->ot_declare_create, oh->ot_declare_ref_add,
640                        oh->ot_declare_ref_del, oh->ot_declare_write);
641                 CERROR("  insert: %d, delete: %d, destroy: %d\n",
642                        oh->ot_declare_insert, oh->ot_declare_delete,
643                        oh->ot_declare_destroy);
644 #endif
645         }
646
647         /*
648          * XXX temporary stuff. Some abstraction layer should
649          * be used.
650          */
651         jh = ldiskfs_journal_start_sb(osd_sb(dev), oh->ot_credits);
652         osd_th_started(oh);
653         if (!IS_ERR(jh)) {
654                 oh->ot_handle = jh;
655                 LASSERT(oti->oti_txns == 0);
656                 lu_context_init(&th->th_ctx, th->th_tags);
657                 lu_context_enter(&th->th_ctx);
658
659                 lu_device_get(&d->dd_lu_dev);
660                 oh->ot_dev_link = lu_ref_add(&d->dd_lu_dev.ld_reference,
661                                              "osd-tx", th);
662
663                 /*
664                  * XXX: current rule is that we first start tx,
665                  *      then lock object(s), but we can't use
666                  *      this rule for data (due to locking specifics
667                  *      in ldiskfs). also in long-term we'd like to
668                  *      use usually-used (locks;tx) ordering. so,
669                  *      UGLY thing is that we'll use one ordering for
670                  *      data (ofd) and reverse ordering for metadata
671                  *      (mdd). then at some point we'll fix the latter
672                  */
673                 if (lu_device_is_md(&d->dd_lu_dev)) {
674                         LASSERT(oti->oti_r_locks == 0);
675                         LASSERT(oti->oti_w_locks == 0);
676                 }
677
678                 oti->oti_txns++;
679                 rc = 0;
680         } else {
681                 rc = PTR_ERR(jh);
682         }
683 out:
684         RETURN(rc);
685 }
686
687 /*
688  * Concurrency: shouldn't matter.
689  */
690 static int osd_trans_stop(const struct lu_env *env, struct thandle *th)
691 {
692         int                     rc = 0;
693         struct osd_thandle     *oh;
694         struct osd_thread_info *oti = osd_oti_get(env);
695         struct osd_iobuf       *iobuf = &oti->oti_iobuf;
696
697         ENTRY;
698
699         oh = container_of0(th, struct osd_thandle, ot_super);
700
701         if (oh->ot_handle != NULL) {
702                 handle_t *hdl = oh->ot_handle;
703
704                 hdl->h_sync = th->th_sync;
705
706                 /*
707                  * add commit callback
708                  * notice we don't do this in osd_trans_start()
709                  * as underlying transaction can change during truncate
710                  */
711                 osd_journal_callback_set(hdl, osd_trans_commit_cb,
712                                          &oh->ot_jcb);
713
714                 LASSERT(oti->oti_txns == 1);
715                 oti->oti_txns--;
716                 /*
717                  * XXX: current rule is that we first start tx,
718                  *      then lock object(s), but we can't use
719                  *      this rule for data (due to locking specifics
720                  *      in ldiskfs). also in long-term we'd like to
721                  *      use usually-used (locks;tx) ordering. so,
722                  *      UGLY thing is that we'll use one ordering for
723                  *      data (ofd) and reverse ordering for metadata
724                  *      (mdd). then at some point we'll fix the latter
725                  */
726                 if (lu_device_is_md(&th->th_dev->dd_lu_dev)) {
727                         LASSERT(oti->oti_r_locks == 0);
728                         LASSERT(oti->oti_w_locks == 0);
729                 }
730                 rc = dt_txn_hook_stop(env, th);
731                 if (rc != 0)
732                         CERROR("Failure in transaction hook: %d\n", rc);
733                 oh->ot_handle = NULL;
734                 OSD_CHECK_SLOW_TH(oh, oti->oti_dev,
735                                   rc = ldiskfs_journal_stop(hdl));
736                 if (rc != 0)
737                         CERROR("Failure to stop transaction: %d\n", rc);
738         } else {
739                 OBD_FREE_PTR(oh);
740         }
741
742         /* as we want IO to journal and data IO be concurrent, we don't block
743          * awaiting data IO completion in osd_do_bio(), instead we wait here
744          * once transaction is submitted to the journal. all reqular requests
745          * don't do direct IO (except read/write), thus this wait_event becomes
746          * no-op for them.
747          *
748          * IMPORTANT: we have to wait till any IO submited by the thread is
749          * completed otherwise iobuf may be corrupted by different request
750          */
751         cfs_wait_event(iobuf->dr_wait,
752                        cfs_atomic_read(&iobuf->dr_numreqs) == 0);
753         if (!rc)
754                 rc = iobuf->dr_error;
755
756         RETURN(rc);
757 }
758
759 static int osd_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb)
760 {
761         struct osd_thandle *oh = container_of0(th, struct osd_thandle,
762                                                ot_super);
763
764         cfs_list_add(&dcb->dcb_linkage, &oh->ot_dcb_list);
765
766         return 0;
767 }
768
769 /*
770  * Called just before object is freed. Releases all resources except for
771  * object itself (that is released by osd_object_free()).
772  *
773  * Concurrency: no concurrent access is possible that late in object
774  * life-cycle.
775  */
776 static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
777 {
778         struct osd_object *obj   = osd_obj(l);
779         struct inode      *inode = obj->oo_inode;
780
781         LINVRNT(osd_invariant(obj));
782
783         /*
784          * If object is unlinked remove fid->ino mapping from object index.
785          */
786
787         osd_index_fini(obj);
788         if (inode != NULL) {
789                 iput(inode);
790                 obj->oo_inode = NULL;
791         }
792 }
793
794 /*
795  * Concurrency: ->loo_object_release() is called under site spin-lock.
796  */
797 static void osd_object_release(const struct lu_env *env,
798                                struct lu_object *l)
799 {
800 }
801
802 /*
803  * Concurrency: shouldn't matter.
804  */
805 static int osd_object_print(const struct lu_env *env, void *cookie,
806                             lu_printer_t p, const struct lu_object *l)
807 {
808         struct osd_object *o = osd_obj(l);
809         struct iam_descr  *d;
810
811         if (o->oo_dir != NULL)
812                 d = o->oo_dir->od_container.ic_descr;
813         else
814                 d = NULL;
815         return (*p)(env, cookie, LUSTRE_OSD_NAME"-object@%p(i:%p:%lu/%u)[%s]",
816                     o, o->oo_inode,
817                     o->oo_inode ? o->oo_inode->i_ino : 0UL,
818                     o->oo_inode ? o->oo_inode->i_generation : 0,
819                     d ? d->id_ops->id_name : "plain");
820 }
821
822 /*
823  * Concurrency: shouldn't matter.
824  */
825 int osd_statfs(const struct lu_env *env, struct dt_device *d,
826                struct obd_statfs *sfs)
827 {
828         struct osd_device  *osd = osd_dt_dev(d);
829         struct super_block *sb = osd_sb(osd);
830         struct kstatfs     *ksfs;
831         int result = 0;
832
833         /* osd_lproc.c call this without env, allocate ksfs for that case */
834         if (unlikely(env == NULL)) {
835                 OBD_ALLOC_PTR(ksfs);
836                 if (ksfs == NULL)
837                         return -ENOMEM;
838         } else {
839                 ksfs = &osd_oti_get(env)->oti_ksfs;
840         }
841
842         cfs_spin_lock(&osd->od_osfs_lock);
843         /* cache 1 second */
844         if (cfs_time_before_64(osd->od_osfs_age, cfs_time_shift_64(-1))) {
845                 result = ll_do_statfs(sb, ksfs);
846                 if (likely(result == 0)) { /* N.B. statfs can't really fail */
847                         osd->od_osfs_age = cfs_time_current_64();
848                         statfs_pack(&osd->od_statfs, ksfs);
849                 }
850         }
851
852         if (likely(result == 0))
853                 *sfs = osd->od_statfs;
854         cfs_spin_unlock(&osd->od_osfs_lock);
855
856         if (unlikely(env == NULL))
857                 OBD_FREE_PTR(ksfs);
858
859         return result;
860 }
861
862 /*
863  * Concurrency: doesn't access mutable data.
864  */
865 static void osd_conf_get(const struct lu_env *env,
866                          const struct dt_device *dev,
867                          struct dt_device_param *param)
868 {
869         struct super_block *sb = osd_sb(osd_dt_dev(dev));
870
871         /*
872          * XXX should be taken from not-yet-existing fs abstraction layer.
873          */
874         param->ddp_max_name_len = LDISKFS_NAME_LEN;
875         param->ddp_max_nlink    = LDISKFS_LINK_MAX;
876         param->ddp_block_shift  = osd_sb(osd_dt_dev(dev))->s_blocksize_bits;
877         param->ddp_mntopts      = 0;
878         if (test_opt(sb, XATTR_USER))
879                 param->ddp_mntopts |= MNTOPT_USERXATTR;
880         if (test_opt(sb, POSIX_ACL))
881                 param->ddp_mntopts |= MNTOPT_ACL;
882
883 #if defined(LDISKFS_FEATURE_INCOMPAT_EA_INODE)
884         if (LDISKFS_HAS_INCOMPAT_FEATURE(sb, LDISKFS_FEATURE_INCOMPAT_EA_INODE))
885                 param->ddp_max_ea_size = LDISKFS_XATTR_MAX_LARGE_EA_SIZE;
886         else
887 #endif
888                 param->ddp_max_ea_size = sb->s_blocksize;
889
890 }
891
892 /**
893  * Helper function to get and fill the buffer with input values.
894  */
895 static struct lu_buf *osd_buf_get(const struct lu_env *env, void *area, ssize_t len)
896 {
897         struct lu_buf *buf;
898
899         buf = &osd_oti_get(env)->oti_buf;
900         buf->lb_buf = area;
901         buf->lb_len = len;
902         return buf;
903 }
904
905 /*
906  * Concurrency: shouldn't matter.
907  */
908 static int osd_sync(const struct lu_env *env, struct dt_device *d)
909 {
910         CDEBUG(D_HA, "syncing OSD %s\n", LUSTRE_OSD_NAME);
911         return ldiskfs_force_commit(osd_sb(osd_dt_dev(d)));
912 }
913
914 /**
915  * Start commit for OSD device.
916  *
917  * An implementation of dt_commit_async method for OSD device.
918  * Asychronously starts underlayng fs sync and thereby a transaction
919  * commit.
920  *
921  * \param env environment
922  * \param d dt device
923  *
924  * \see dt_device_operations
925  */
926 static int osd_commit_async(const struct lu_env *env,
927                             struct dt_device *d)
928 {
929         struct super_block *s = osd_sb(osd_dt_dev(d));
930         ENTRY;
931
932         CDEBUG(D_HA, "async commit OSD %s\n", LUSTRE_OSD_NAME);
933         RETURN(s->s_op->sync_fs(s, 0));
934 }
935
936 /*
937  * Concurrency: shouldn't matter.
938  */
939
940 static int osd_ro(const struct lu_env *env, struct dt_device *d)
941 {
942         struct super_block *sb = osd_sb(osd_dt_dev(d));
943         int rc;
944         ENTRY;
945
946         CERROR("*** setting device %s read-only ***\n", LUSTRE_OSD_NAME);
947
948         rc = __lvfs_set_rdonly(sb->s_bdev, LDISKFS_SB(sb)->journal_bdev);
949         RETURN(rc);
950 }
951
952 /*
953  * Concurrency: serialization provided by callers.
954  */
955 static int osd_init_capa_ctxt(const struct lu_env *env, struct dt_device *d,
956                               int mode, unsigned long timeout, __u32 alg,
957                               struct lustre_capa_key *keys)
958 {
959         struct osd_device *dev = osd_dt_dev(d);
960         ENTRY;
961
962         dev->od_fl_capa = mode;
963         dev->od_capa_timeout = timeout;
964         dev->od_capa_alg = alg;
965         dev->od_capa_keys = keys;
966         RETURN(0);
967 }
968
969 /**
970  * Concurrency: serialization provided by callers.
971  */
972 static void osd_init_quota_ctxt(const struct lu_env *env, struct dt_device *d,
973                                struct dt_quota_ctxt *ctxt, void *data)
974 {
975         struct obd_device *obd = (void *)ctxt;
976         struct vfsmount *mnt = (struct vfsmount *)data;
977         ENTRY;
978
979         obd->u.obt.obt_sb = mnt->mnt_root->d_inode->i_sb;
980         OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
981         obd->obd_lvfs_ctxt.pwdmnt = mnt;
982         obd->obd_lvfs_ctxt.pwd = mnt->mnt_root;
983         obd->obd_lvfs_ctxt.fs = get_ds();
984
985         EXIT;
986 }
987
988 /**
989  * Note: we do not count into QUOTA here.
990  * If we mount with --data_journal we may need more.
991  */
992 const int osd_dto_credits_noquota[DTO_NR] = {
993         /**
994          * Insert/Delete.
995          * INDEX_EXTRA_TRANS_BLOCKS(8) +
996          * SINGLEDATA_TRANS_BLOCKS(8)
997          * XXX Note: maybe iam need more, since iam have more level than
998          *           EXT3 htree.
999          */
1000         [DTO_INDEX_INSERT]  = 16,
1001         [DTO_INDEX_DELETE]  = 16,
1002         /**
1003          * Unused now
1004          */
1005         [DTO_INDEX_UPDATE]  = 16,
1006         /**
1007          * Create a object. The same as create object in EXT3.
1008          * DATA_TRANS_BLOCKS(14) +
1009          * INDEX_EXTRA_BLOCKS(8) +
1010          * 3(inode bits, groups, GDT)
1011          */
1012         [DTO_OBJECT_CREATE] = 25,
1013         /**
1014          * XXX: real credits to be fixed
1015          */
1016         [DTO_OBJECT_DELETE] = 25,
1017         /**
1018          * Attr set credits (inode)
1019          */
1020         [DTO_ATTR_SET_BASE] = 1,
1021         /**
1022          * Xattr set. The same as xattr of EXT3.
1023          * DATA_TRANS_BLOCKS(14)
1024          * XXX Note: in original MDS implmentation INDEX_EXTRA_TRANS_BLOCKS
1025          * are also counted in. Do not know why?
1026          */
1027         [DTO_XATTR_SET]     = 14,
1028         [DTO_LOG_REC]       = 14,
1029         /**
1030          * credits for inode change during write.
1031          */
1032         [DTO_WRITE_BASE]    = 3,
1033         /**
1034          * credits for single block write.
1035          */
1036         [DTO_WRITE_BLOCK]   = 14,
1037         /**
1038          * Attr set credits for chown.
1039          * This is extra credits for setattr, and it is null without quota
1040          */
1041         [DTO_ATTR_SET_CHOWN]= 0
1042 };
1043
1044 static const struct dt_device_operations osd_dt_ops = {
1045         .dt_root_get       = osd_root_get,
1046         .dt_statfs         = osd_statfs,
1047         .dt_trans_create   = osd_trans_create,
1048         .dt_trans_start    = osd_trans_start,
1049         .dt_trans_stop     = osd_trans_stop,
1050         .dt_trans_cb_add   = osd_trans_cb_add,
1051         .dt_conf_get       = osd_conf_get,
1052         .dt_sync           = osd_sync,
1053         .dt_ro             = osd_ro,
1054         .dt_commit_async   = osd_commit_async,
1055         .dt_init_capa_ctxt = osd_init_capa_ctxt,
1056         .dt_init_quota_ctxt= osd_init_quota_ctxt,
1057 };
1058
1059 static void osd_object_read_lock(const struct lu_env *env,
1060                                  struct dt_object *dt, unsigned role)
1061 {
1062         struct osd_object *obj = osd_dt_obj(dt);
1063         struct osd_thread_info *oti = osd_oti_get(env);
1064
1065         LINVRNT(osd_invariant(obj));
1066
1067         LASSERT(obj->oo_owner != env);
1068         cfs_down_read_nested(&obj->oo_sem, role);
1069
1070         LASSERT(obj->oo_owner == NULL);
1071         oti->oti_r_locks++;
1072 }
1073
1074 static void osd_object_write_lock(const struct lu_env *env,
1075                                   struct dt_object *dt, unsigned role)
1076 {
1077         struct osd_object *obj = osd_dt_obj(dt);
1078         struct osd_thread_info *oti = osd_oti_get(env);
1079
1080         LINVRNT(osd_invariant(obj));
1081
1082         LASSERT(obj->oo_owner != env);
1083         cfs_down_write_nested(&obj->oo_sem, role);
1084
1085         LASSERT(obj->oo_owner == NULL);
1086         obj->oo_owner = env;
1087         oti->oti_w_locks++;
1088 }
1089
1090 static void osd_object_read_unlock(const struct lu_env *env,
1091                                    struct dt_object *dt)
1092 {
1093         struct osd_object *obj = osd_dt_obj(dt);
1094         struct osd_thread_info *oti = osd_oti_get(env);
1095
1096         LINVRNT(osd_invariant(obj));
1097
1098         LASSERT(oti->oti_r_locks > 0);
1099         oti->oti_r_locks--;
1100         cfs_up_read(&obj->oo_sem);
1101 }
1102
1103 static void osd_object_write_unlock(const struct lu_env *env,
1104                                     struct dt_object *dt)
1105 {
1106         struct osd_object *obj = osd_dt_obj(dt);
1107         struct osd_thread_info *oti = osd_oti_get(env);
1108
1109         LINVRNT(osd_invariant(obj));
1110
1111         LASSERT(obj->oo_owner == env);
1112         LASSERT(oti->oti_w_locks > 0);
1113         oti->oti_w_locks--;
1114         obj->oo_owner = NULL;
1115         cfs_up_write(&obj->oo_sem);
1116 }
1117
1118 static int osd_object_write_locked(const struct lu_env *env,
1119                                    struct dt_object *dt)
1120 {
1121         struct osd_object *obj = osd_dt_obj(dt);
1122
1123         LINVRNT(osd_invariant(obj));
1124
1125         return obj->oo_owner == env;
1126 }
1127
1128 static int capa_is_sane(const struct lu_env *env,
1129                         struct osd_device *dev,
1130                         struct lustre_capa *capa,
1131                         struct lustre_capa_key *keys)
1132 {
1133         struct osd_thread_info *oti = osd_oti_get(env);
1134         struct lustre_capa *tcapa = &oti->oti_capa;
1135         struct obd_capa *oc;
1136         int i, rc = 0;
1137         ENTRY;
1138
1139         oc = capa_lookup(dev->od_capa_hash, capa, 0);
1140         if (oc) {
1141                 if (capa_is_expired(oc)) {
1142                         DEBUG_CAPA(D_ERROR, capa, "expired");
1143                         rc = -ESTALE;
1144                 }
1145                 capa_put(oc);
1146                 RETURN(rc);
1147         }
1148
1149         if (capa_is_expired_sec(capa)) {
1150                 DEBUG_CAPA(D_ERROR, capa, "expired");
1151                 RETURN(-ESTALE);
1152         }
1153
1154         cfs_spin_lock(&capa_lock);
1155         for (i = 0; i < 2; i++) {
1156                 if (keys[i].lk_keyid == capa->lc_keyid) {
1157                         oti->oti_capa_key = keys[i];
1158                         break;
1159                 }
1160         }
1161         cfs_spin_unlock(&capa_lock);
1162
1163         if (i == 2) {
1164                 DEBUG_CAPA(D_ERROR, capa, "no matched capa key");
1165                 RETURN(-ESTALE);
1166         }
1167
1168         rc = capa_hmac(tcapa->lc_hmac, capa, oti->oti_capa_key.lk_key);
1169         if (rc)
1170                 RETURN(rc);
1171
1172         if (memcmp(tcapa->lc_hmac, capa->lc_hmac, sizeof(capa->lc_hmac))) {
1173                 DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch");
1174                 RETURN(-EACCES);
1175         }
1176
1177         oc = capa_add(dev->od_capa_hash, capa);
1178         capa_put(oc);
1179
1180         RETURN(0);
1181 }
1182
1183 int osd_object_auth(const struct lu_env *env, struct dt_object *dt,
1184                     struct lustre_capa *capa, __u64 opc)
1185 {
1186         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
1187         struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
1188         struct md_capainfo *ci;
1189         int rc;
1190
1191         if (!dev->od_fl_capa)
1192                 return 0;
1193
1194         if (capa == BYPASS_CAPA)
1195                 return 0;
1196
1197         ci = md_capainfo(env);
1198         if (unlikely(!ci))
1199                 return 0;
1200
1201         if (ci->mc_auth == LC_ID_NONE)
1202                 return 0;
1203
1204         if (!capa) {
1205                 CERROR("no capability is provided for fid "DFID"\n", PFID(fid));
1206                 return -EACCES;
1207         }
1208
1209         if (!lu_fid_eq(fid, &capa->lc_fid)) {
1210                 DEBUG_CAPA(D_ERROR, capa, "fid "DFID" mismatch with",
1211                            PFID(fid));
1212                 return -EACCES;
1213         }
1214
1215         if (!capa_opc_supported(capa, opc)) {
1216                 DEBUG_CAPA(D_ERROR, capa, "opc "LPX64" not supported by", opc);
1217                 return -EACCES;
1218         }
1219
1220         if ((rc = capa_is_sane(env, dev, capa, dev->od_capa_keys))) {
1221                 DEBUG_CAPA(D_ERROR, capa, "insane (rc %d)", rc);
1222                 return -EACCES;
1223         }
1224
1225         return 0;
1226 }
1227
1228 static struct timespec *osd_inode_time(const struct lu_env *env,
1229                                        struct inode *inode, __u64 seconds)
1230 {
1231         struct osd_thread_info *oti = osd_oti_get(env);
1232         struct timespec        *t   = &oti->oti_time;
1233
1234         t->tv_sec  = seconds;
1235         t->tv_nsec = 0;
1236         *t = timespec_trunc(*t, get_sb_time_gran(inode->i_sb));
1237         return t;
1238 }
1239
1240
1241 static void osd_inode_getattr(const struct lu_env *env,
1242                               struct inode *inode, struct lu_attr *attr)
1243 {
1244         attr->la_valid      |= LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
1245                                LA_SIZE | LA_BLOCKS | LA_UID | LA_GID |
1246                                LA_FLAGS | LA_NLINK | LA_RDEV | LA_BLKSIZE;
1247
1248         attr->la_atime      = LTIME_S(inode->i_atime);
1249         attr->la_mtime      = LTIME_S(inode->i_mtime);
1250         attr->la_ctime      = LTIME_S(inode->i_ctime);
1251         attr->la_mode       = inode->i_mode;
1252         attr->la_size       = i_size_read(inode);
1253         attr->la_blocks     = inode->i_blocks;
1254         attr->la_uid        = inode->i_uid;
1255         attr->la_gid        = inode->i_gid;
1256         attr->la_flags      = LDISKFS_I(inode)->i_flags;
1257         attr->la_nlink      = inode->i_nlink;
1258         attr->la_rdev       = inode->i_rdev;
1259         attr->la_blksize    = ll_inode_blksize(inode);
1260         attr->la_blkbits    = inode->i_blkbits;
1261 }
1262
1263 static int osd_attr_get(const struct lu_env *env,
1264                         struct dt_object *dt,
1265                         struct lu_attr *attr,
1266                         struct lustre_capa *capa)
1267 {
1268         struct osd_object *obj = osd_dt_obj(dt);
1269
1270         LASSERT(dt_object_exists(dt));
1271         LINVRNT(osd_invariant(obj));
1272
1273         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1274                 return -EACCES;
1275
1276         cfs_spin_lock(&obj->oo_guard);
1277         osd_inode_getattr(env, obj->oo_inode, attr);
1278         cfs_spin_unlock(&obj->oo_guard);
1279         return 0;
1280 }
1281
1282 static int osd_declare_attr_set(const struct lu_env *env,
1283                                 struct dt_object *dt,
1284                                 const struct lu_attr *attr,
1285                                 struct thandle *handle)
1286 {
1287         struct osd_thandle *oh;
1288         struct osd_object *obj;
1289
1290         LASSERT(dt != NULL);
1291         LASSERT(handle != NULL);
1292
1293         obj = osd_dt_obj(dt);
1294         LASSERT(osd_invariant(obj));
1295
1296         oh = container_of0(handle, struct osd_thandle, ot_super);
1297         LASSERT(oh->ot_handle == NULL);
1298
1299         OSD_DECLARE_OP(oh, attr_set);
1300         oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE];
1301
1302         if (attr && attr->la_valid & LA_UID) {
1303                 if (obj->oo_inode)
1304                         osd_declare_qid(dt, oh, USRQUOTA, obj->oo_inode->i_uid,
1305                                         obj->oo_inode);
1306                 osd_declare_qid(dt, oh, USRQUOTA, attr->la_uid, NULL);
1307         }
1308         if (attr && attr->la_valid & LA_GID) {
1309                 if (obj->oo_inode)
1310                         osd_declare_qid(dt, oh, GRPQUOTA, obj->oo_inode->i_gid,
1311                                         obj->oo_inode);
1312                 osd_declare_qid(dt, oh, GRPQUOTA, attr->la_gid, NULL);
1313         }
1314
1315         return 0;
1316 }
1317
1318 static int osd_inode_setattr(const struct lu_env *env,
1319                              struct inode *inode, const struct lu_attr *attr)
1320 {
1321         __u64 bits;
1322
1323         bits = attr->la_valid;
1324
1325         LASSERT(!(bits & LA_TYPE)); /* Huh? You want too much. */
1326
1327         if (bits & LA_ATIME)
1328                 inode->i_atime  = *osd_inode_time(env, inode, attr->la_atime);
1329         if (bits & LA_CTIME)
1330                 inode->i_ctime  = *osd_inode_time(env, inode, attr->la_ctime);
1331         if (bits & LA_MTIME)
1332                 inode->i_mtime  = *osd_inode_time(env, inode, attr->la_mtime);
1333         if (bits & LA_SIZE) {
1334                 LDISKFS_I(inode)->i_disksize = attr->la_size;
1335                 i_size_write(inode, attr->la_size);
1336         }
1337
1338 #if 0
1339         /* OSD should not change "i_blocks" which is used by quota.
1340          * "i_blocks" should be changed by ldiskfs only. */
1341         if (bits & LA_BLOCKS)
1342                 inode->i_blocks = attr->la_blocks;
1343 #endif
1344         if (bits & LA_MODE)
1345                 inode->i_mode   = (inode->i_mode & S_IFMT) |
1346                         (attr->la_mode & ~S_IFMT);
1347         if (bits & LA_UID)
1348                 inode->i_uid    = attr->la_uid;
1349         if (bits & LA_GID)
1350                 inode->i_gid    = attr->la_gid;
1351         if (bits & LA_NLINK)
1352                 inode->i_nlink  = attr->la_nlink;
1353         if (bits & LA_RDEV)
1354                 inode->i_rdev   = attr->la_rdev;
1355
1356         if (bits & LA_FLAGS) {
1357                 /* always keep S_NOCMTIME */
1358                 inode->i_flags = ll_ext_to_inode_flags(attr->la_flags) |
1359                                  S_NOCMTIME;
1360         }
1361         return 0;
1362 }
1363
1364 static int osd_attr_set(const struct lu_env *env,
1365                         struct dt_object *dt,
1366                         const struct lu_attr *attr,
1367                         struct thandle *handle,
1368                         struct lustre_capa *capa)
1369 {
1370         struct osd_object *obj = osd_dt_obj(dt);
1371         struct inode      *inode;
1372         int rc;
1373
1374         LASSERT(handle != NULL);
1375         LASSERT(dt_object_exists(dt));
1376         LASSERT(osd_invariant(obj));
1377
1378         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1379                 return -EACCES;
1380
1381         OSD_EXEC_OP(handle, attr_set);
1382
1383         inode = obj->oo_inode;
1384 #ifdef HAVE_QUOTA_SUPPORT
1385         if ((attr->la_valid & LA_UID && attr->la_uid != inode->i_uid) ||
1386             (attr->la_valid & LA_GID && attr->la_gid != inode->i_gid)) {
1387                 struct osd_ctxt *save = &osd_oti_get(env)->oti_ctxt;
1388                 struct iattr iattr;
1389                 int rc;
1390
1391                 iattr.ia_valid = 0;
1392                 if (attr->la_valid & LA_UID)
1393                         iattr.ia_valid |= ATTR_UID;
1394                 if (attr->la_valid & LA_GID)
1395                         iattr.ia_valid |= ATTR_GID;
1396                 iattr.ia_uid = attr->la_uid;
1397                 iattr.ia_gid = attr->la_gid;
1398                 osd_push_ctxt(env, save);
1399                 rc = ll_vfs_dq_transfer(inode, &iattr) ? -EDQUOT : 0;
1400                 osd_pop_ctxt(save);
1401                 if (rc != 0)
1402                         return rc;
1403         }
1404 #endif
1405
1406         cfs_spin_lock(&obj->oo_guard);
1407         rc = osd_inode_setattr(env, inode, attr);
1408         cfs_spin_unlock(&obj->oo_guard);
1409
1410         if (!rc)
1411                 inode->i_sb->s_op->dirty_inode(inode);
1412         return rc;
1413 }
1414
1415 /*
1416  * Object creation.
1417  *
1418  * XXX temporary solution.
1419  */
1420 static int osd_create_pre(struct osd_thread_info *info, struct osd_object *obj,
1421                           struct lu_attr *attr, struct thandle *th)
1422 {
1423         return 0;
1424 }
1425
1426 static int osd_create_post(struct osd_thread_info *info, struct osd_object *obj,
1427                            struct lu_attr *attr, struct thandle *th)
1428 {
1429         osd_object_init0(obj);
1430         if (obj->oo_inode && (obj->oo_inode->i_state & I_NEW))
1431                 unlock_new_inode(obj->oo_inode);
1432         return 0;
1433 }
1434
1435 struct dentry *osd_child_dentry_get(const struct lu_env *env,
1436                                     struct osd_object *obj,
1437                                     const char *name, const int namelen)
1438 {
1439         return osd_child_dentry_by_inode(env, obj->oo_inode, name, namelen);
1440 }
1441
1442 static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
1443                       cfs_umode_t mode,
1444                       struct dt_allocation_hint *hint,
1445                       struct thandle *th)
1446 {
1447         int result;
1448         struct osd_device  *osd = osd_obj2dev(obj);
1449         struct osd_thandle *oth;
1450         struct dt_object   *parent = NULL;
1451         struct inode       *inode;
1452 #ifdef HAVE_QUOTA_SUPPORT
1453         struct osd_ctxt    *save = &info->oti_ctxt;
1454 #endif
1455
1456         LINVRNT(osd_invariant(obj));
1457         LASSERT(obj->oo_inode == NULL);
1458         LASSERT(obj->oo_hl_head == NULL);
1459
1460         if (S_ISDIR(mode) && ldiskfs_pdo) {
1461                 obj->oo_hl_head =ldiskfs_htree_lock_head_alloc(HTREE_HBITS_DEF);
1462                 if (obj->oo_hl_head == NULL)
1463                         return -ENOMEM;
1464         }
1465
1466         oth = container_of(th, struct osd_thandle, ot_super);
1467         LASSERT(oth->ot_handle->h_transaction != NULL);
1468
1469         if (hint && hint->dah_parent)
1470                 parent = hint->dah_parent;
1471
1472 #ifdef HAVE_QUOTA_SUPPORT
1473         osd_push_ctxt(info->oti_env, save);
1474 #endif
1475         inode = ldiskfs_create_inode(oth->ot_handle,
1476                                      parent ? osd_dt_obj(parent)->oo_inode :
1477                                               osd_sb(osd)->s_root->d_inode,
1478                                      mode);
1479 #ifdef HAVE_QUOTA_SUPPORT
1480         osd_pop_ctxt(save);
1481 #endif
1482         if (!IS_ERR(inode)) {
1483                 /* Do not update file c/mtime in ldiskfs.
1484                  * NB: don't need any lock because no contention at this
1485                  * early stage */
1486                 inode->i_flags |= S_NOCMTIME;
1487                 obj->oo_inode = inode;
1488                 result = 0;
1489         } else {
1490                 if (obj->oo_hl_head != NULL) {
1491                         ldiskfs_htree_lock_head_free(obj->oo_hl_head);
1492                         obj->oo_hl_head = NULL;
1493                 }
1494                 result = PTR_ERR(inode);
1495         }
1496         LINVRNT(osd_invariant(obj));
1497         return result;
1498 }
1499
1500 enum {
1501         OSD_NAME_LEN = 255
1502 };
1503
1504 static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj,
1505                      struct lu_attr *attr,
1506                      struct dt_allocation_hint *hint,
1507                      struct dt_object_format *dof,
1508                      struct thandle *th)
1509 {
1510         int result;
1511         struct osd_thandle *oth;
1512         struct osd_device *osd = osd_obj2dev(obj);
1513         __u32 mode = (attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX));
1514
1515         LASSERT(S_ISDIR(attr->la_mode));
1516
1517         oth = container_of(th, struct osd_thandle, ot_super);
1518         LASSERT(oth->ot_handle->h_transaction != NULL);
1519         result = osd_mkfile(info, obj, mode, hint, th);
1520         if (result == 0 && osd->od_iop_mode == 0) {
1521                 LASSERT(obj->oo_inode != NULL);
1522                 /*
1523                  * XXX uh-oh... call low-level iam function directly.
1524                  */
1525
1526                 result = iam_lvar_create(obj->oo_inode, OSD_NAME_LEN, 4,
1527                                          sizeof (struct osd_fid_pack),
1528                                          oth->ot_handle);
1529         }
1530         return result;
1531 }
1532
1533 static int osd_mk_index(struct osd_thread_info *info, struct osd_object *obj,
1534                         struct lu_attr *attr,
1535                         struct dt_allocation_hint *hint,
1536                         struct dt_object_format *dof,
1537                         struct thandle *th)
1538 {
1539         int result;
1540         struct osd_thandle *oth;
1541         const struct dt_index_features *feat = dof->u.dof_idx.di_feat;
1542
1543         __u32 mode = (attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX));
1544
1545         LASSERT(S_ISREG(attr->la_mode));
1546
1547         oth = container_of(th, struct osd_thandle, ot_super);
1548         LASSERT(oth->ot_handle->h_transaction != NULL);
1549
1550         result = osd_mkfile(info, obj, mode, hint, th);
1551         if (result == 0) {
1552                 LASSERT(obj->oo_inode != NULL);
1553                 if (feat->dif_flags & DT_IND_VARKEY)
1554                         result = iam_lvar_create(obj->oo_inode,
1555                                                  feat->dif_keysize_max,
1556                                                  feat->dif_ptrsize,
1557                                                  feat->dif_recsize_max,
1558                                                  oth->ot_handle);
1559                 else
1560                         result = iam_lfix_create(obj->oo_inode,
1561                                                  feat->dif_keysize_max,
1562                                                  feat->dif_ptrsize,
1563                                                  feat->dif_recsize_max,
1564                                                  oth->ot_handle);
1565
1566         }
1567         return result;
1568 }
1569
1570 static int osd_mkreg(struct osd_thread_info *info, struct osd_object *obj,
1571                      struct lu_attr *attr,
1572                      struct dt_allocation_hint *hint,
1573                      struct dt_object_format *dof,
1574                      struct thandle *th)
1575 {
1576         LASSERT(S_ISREG(attr->la_mode));
1577         return osd_mkfile(info, obj, (attr->la_mode &
1578                                (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1579 }
1580
1581 static int osd_mksym(struct osd_thread_info *info, struct osd_object *obj,
1582                      struct lu_attr *attr,
1583                      struct dt_allocation_hint *hint,
1584                      struct dt_object_format *dof,
1585                      struct thandle *th)
1586 {
1587         LASSERT(S_ISLNK(attr->la_mode));
1588         return osd_mkfile(info, obj, (attr->la_mode &
1589                               (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1590 }
1591
1592 static int osd_mknod(struct osd_thread_info *info, struct osd_object *obj,
1593                      struct lu_attr *attr,
1594                      struct dt_allocation_hint *hint,
1595                      struct dt_object_format *dof,
1596                      struct thandle *th)
1597 {
1598         cfs_umode_t mode = attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX);
1599         int result;
1600
1601         LINVRNT(osd_invariant(obj));
1602         LASSERT(obj->oo_inode == NULL);
1603         LASSERT(S_ISCHR(mode) || S_ISBLK(mode) ||
1604                 S_ISFIFO(mode) || S_ISSOCK(mode));
1605
1606         result = osd_mkfile(info, obj, mode, hint, th);
1607         if (result == 0) {
1608                 LASSERT(obj->oo_inode != NULL);
1609                 init_special_inode(obj->oo_inode, mode, attr->la_rdev);
1610         }
1611         LINVRNT(osd_invariant(obj));
1612         return result;
1613 }
1614
1615 typedef int (*osd_obj_type_f)(struct osd_thread_info *, struct osd_object *,
1616                               struct lu_attr *,
1617                               struct dt_allocation_hint *hint,
1618                               struct dt_object_format *dof,
1619                               struct thandle *);
1620
1621 static osd_obj_type_f osd_create_type_f(enum dt_format_type type)
1622 {
1623         osd_obj_type_f result;
1624
1625         switch (type) {
1626         case DFT_DIR:
1627                 result = osd_mkdir;
1628                 break;
1629         case DFT_REGULAR:
1630                 result = osd_mkreg;
1631                 break;
1632         case DFT_SYM:
1633                 result = osd_mksym;
1634                 break;
1635         case DFT_NODE:
1636                 result = osd_mknod;
1637                 break;
1638         case DFT_INDEX:
1639                 result = osd_mk_index;
1640                 break;
1641
1642         default:
1643                 LBUG();
1644                 break;
1645         }
1646         return result;
1647 }
1648
1649
1650 static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah,
1651                         struct dt_object *parent, cfs_umode_t child_mode)
1652 {
1653         LASSERT(ah);
1654
1655         memset(ah, 0, sizeof(*ah));
1656         ah->dah_parent = parent;
1657         ah->dah_mode = child_mode;
1658 }
1659
1660 /**
1661  * Helper function for osd_object_create()
1662  *
1663  * \retval 0, on success
1664  */
1665 static int __osd_object_create(struct osd_thread_info *info,
1666                                struct osd_object *obj, struct lu_attr *attr,
1667                                struct dt_allocation_hint *hint,
1668                                struct dt_object_format *dof,
1669                                struct thandle *th)
1670 {
1671
1672         int result;
1673
1674         result = osd_create_pre(info, obj, attr, th);
1675         if (result == 0) {
1676                 result = osd_create_type_f(dof->dof_type)(info, obj,
1677                                            attr, hint, dof, th);
1678                 if (result == 0)
1679                         result = osd_create_post(info, obj, attr, th);
1680         }
1681         return result;
1682 }
1683
1684 /**
1685  * Helper function for osd_object_create()
1686  *
1687  * \retval 0, on success
1688  */
1689 static int __osd_oi_insert(const struct lu_env *env, struct osd_object *obj,
1690                            const struct lu_fid *fid, struct thandle *th)
1691 {
1692         struct osd_thread_info *info = osd_oti_get(env);
1693         struct osd_inode_id    *id   = &info->oti_id;
1694         struct osd_device      *osd  = osd_obj2dev(obj);
1695         struct md_ucred        *uc   = md_ucred(env);
1696
1697         LASSERT(obj->oo_inode != NULL);
1698         LASSERT(uc != NULL);
1699
1700         id->oii_ino = obj->oo_inode->i_ino;
1701         id->oii_gen = obj->oo_inode->i_generation;
1702
1703         return osd_oi_insert(info, osd, fid, id, th,
1704                              uc->mu_cap & CFS_CAP_SYS_RESOURCE_MASK);
1705 }
1706
1707 static int osd_declare_object_create(const struct lu_env *env,
1708                                      struct dt_object *dt,
1709                                      struct lu_attr *attr,
1710                                      struct dt_allocation_hint *hint,
1711                                      struct dt_object_format *dof,
1712                                      struct thandle *handle)
1713 {
1714         struct osd_thandle *oh;
1715
1716         LASSERT(handle != NULL);
1717
1718         oh = container_of0(handle, struct osd_thandle, ot_super);
1719         LASSERT(oh->ot_handle == NULL);
1720
1721         OSD_DECLARE_OP(oh, create);
1722         oh->ot_credits += osd_dto_credits_noquota[DTO_OBJECT_CREATE];
1723         /* XXX: So far, only normal fid needs be inserted into the oi,
1724          *      things could be changed later. Revise following code then. */
1725         if (fid_is_norm(lu_object_fid(&dt->do_lu))) {
1726                 OSD_DECLARE_OP(oh, insert);
1727                 oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT];
1728         }
1729         /* If this is directory, then we expect . and .. to be inserted as
1730          * well. The one directory block always needs to be created for the
1731          * directory, so we could use DTO_WRITE_BASE here (GDT, block bitmap,
1732          * block), there is no danger of needing a tree for the first block.
1733          */
1734         if (attr && S_ISDIR(attr->la_mode)) {
1735                 OSD_DECLARE_OP(oh, insert);
1736                 OSD_DECLARE_OP(oh, insert);
1737                 oh->ot_credits += osd_dto_credits_noquota[DTO_WRITE_BASE];
1738         }
1739
1740         if (attr) {
1741                 osd_declare_qid(dt, oh, USRQUOTA, attr->la_uid, NULL);
1742                 osd_declare_qid(dt, oh, GRPQUOTA, attr->la_gid, NULL);
1743         }
1744         return 0;
1745 }
1746
1747 static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
1748                              struct lu_attr *attr,
1749                              struct dt_allocation_hint *hint,
1750                              struct dt_object_format *dof,
1751                              struct thandle *th)
1752 {
1753         const struct lu_fid    *fid    = lu_object_fid(&dt->do_lu);
1754         struct osd_object      *obj    = osd_dt_obj(dt);
1755         struct osd_thread_info *info   = osd_oti_get(env);
1756         int result;
1757
1758         ENTRY;
1759
1760         LINVRNT(osd_invariant(obj));
1761         LASSERT(!dt_object_exists(dt));
1762         LASSERT(osd_write_locked(env, obj));
1763         LASSERT(th != NULL);
1764
1765         OSD_EXEC_OP(th, create);
1766
1767         result = __osd_object_create(info, obj, attr, hint, dof, th);
1768         if (result == 0)
1769                 result = __osd_oi_insert(env, obj, fid, th);
1770
1771         LASSERT(ergo(result == 0, dt_object_exists(dt)));
1772         LASSERT(osd_invariant(obj));
1773         RETURN(result);
1774 }
1775
1776 /**
1777  * Called to destroy on-disk representation of the object
1778  *
1779  * Concurrency: must be locked
1780  */
1781 static int osd_declare_object_destroy(const struct lu_env *env,
1782                                       struct dt_object *dt,
1783                                       struct thandle *th)
1784 {
1785         struct osd_object  *obj = osd_dt_obj(dt);
1786         struct inode       *inode = obj->oo_inode;
1787         struct osd_thandle *oh;
1788
1789         ENTRY;
1790
1791         oh = container_of0(th, struct osd_thandle, ot_super);
1792         LASSERT(oh->ot_handle == NULL);
1793         LASSERT(inode);
1794
1795         OSD_DECLARE_OP(oh, destroy);
1796         OSD_DECLARE_OP(oh, delete);
1797         oh->ot_credits += osd_dto_credits_noquota[DTO_OBJECT_DELETE];
1798         oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_DELETE];
1799
1800         osd_declare_qid(dt, oh, USRQUOTA, inode->i_uid, inode);
1801         osd_declare_qid(dt, oh, GRPQUOTA, inode->i_gid, inode);
1802
1803         RETURN(0);
1804 }
1805
1806 static int osd_object_destroy(const struct lu_env *env,
1807                               struct dt_object *dt,
1808                               struct thandle *th)
1809 {
1810         const struct lu_fid    *fid = lu_object_fid(&dt->do_lu);
1811         struct osd_object      *obj = osd_dt_obj(dt);
1812         struct inode           *inode = obj->oo_inode;
1813         struct osd_device      *osd = osd_obj2dev(obj);
1814         struct osd_thandle     *oh;
1815         int                     result;
1816         ENTRY;
1817
1818         oh = container_of0(th, struct osd_thandle, ot_super);
1819         LASSERT(oh->ot_handle);
1820         LASSERT(inode);
1821         LASSERT(!lu_object_is_dying(dt->do_lu.lo_header));
1822
1823         if (S_ISDIR(inode->i_mode)) {
1824                 LASSERT(osd_inode_unlinked(inode) ||
1825                         inode->i_nlink == 1);
1826                 cfs_spin_lock(&obj->oo_guard);
1827                 inode->i_nlink = 0;
1828                 cfs_spin_unlock(&obj->oo_guard);
1829                 inode->i_sb->s_op->dirty_inode(inode);
1830         } else {
1831                 LASSERT(osd_inode_unlinked(inode));
1832         }
1833
1834         OSD_EXEC_OP(th, destroy);
1835
1836         result = osd_oi_delete(osd_oti_get(env), osd, fid, th);
1837
1838         /* XXX: add to ext3 orphan list */
1839         /* rc = ext3_orphan_add(handle_t *handle, struct inode *inode) */
1840
1841         /* not needed in the cache anymore */
1842         set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
1843
1844         RETURN(0);
1845 }
1846
1847 /**
1848  * Helper function for osd_xattr_set()
1849  */
1850 static int __osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
1851                            const struct lu_buf *buf, const char *name, int fl)
1852 {
1853         struct osd_object      *obj      = osd_dt_obj(dt);
1854         struct inode           *inode    = obj->oo_inode;
1855         struct osd_thread_info *info     = osd_oti_get(env);
1856         struct dentry          *dentry   = &info->oti_child_dentry;
1857         int                     fs_flags = 0;
1858         int                     rc;
1859
1860         LASSERT(dt_object_exists(dt));
1861         LASSERT(inode->i_op != NULL && inode->i_op->setxattr != NULL);
1862         LASSERT(osd_write_locked(env, obj));
1863
1864         if (fl & LU_XATTR_REPLACE)
1865                 fs_flags |= XATTR_REPLACE;
1866
1867         if (fl & LU_XATTR_CREATE)
1868                 fs_flags |= XATTR_CREATE;
1869
1870         dentry->d_inode = inode;
1871         rc = inode->i_op->setxattr(dentry, name, buf->lb_buf,
1872                                    buf->lb_len, fs_flags);
1873         return rc;
1874 }
1875
1876 /**
1877  * Put the fid into lustre_mdt_attrs, and then place the structure
1878  * inode's ea. This fid should not be altered during the life time
1879  * of the inode.
1880  *
1881  * \retval +ve, on success
1882  * \retval -ve, on error
1883  *
1884  * FIXME: It is good to have/use ldiskfs_xattr_set_handle() here
1885  */
1886 static int osd_ea_fid_set(const struct lu_env *env, struct dt_object *dt,
1887                           const struct lu_fid *fid)
1888 {
1889         struct osd_thread_info  *info      = osd_oti_get(env);
1890         struct lustre_mdt_attrs *mdt_attrs = &info->oti_mdt_attrs;
1891
1892         lustre_lma_init(mdt_attrs, fid);
1893         lustre_lma_swab(mdt_attrs);
1894         return __osd_xattr_set(env, dt,
1895                                osd_buf_get(env, mdt_attrs, sizeof *mdt_attrs),
1896                                XATTR_NAME_LMA, LU_XATTR_CREATE);
1897
1898 }
1899
1900 /**
1901  * Helper function to form igif
1902  */
1903 static inline void osd_igif_get(const struct lu_env *env, struct inode  *inode,
1904                                 struct lu_fid *fid)
1905 {
1906         LU_IGIF_BUILD(fid, inode->i_ino, inode->i_generation);
1907 }
1908
1909 /**
1910  * ldiskfs supports fid in dirent, it is passed in dentry->d_fsdata.
1911  * lustre 1.8 also uses d_fsdata for passing other info to ldiskfs.
1912  * To have compatilibility with 1.8 ldiskfs driver we need to have
1913  * magic number at start of fid data.
1914  * \ldiskfs_dentry_param is used only to pass fid from osd to ldiskfs.
1915  * its inmemory API.
1916  */
1917 void osd_get_ldiskfs_dirent_param(struct ldiskfs_dentry_param *param,
1918                                   const struct dt_rec *fid)
1919 {
1920         param->edp_magic = LDISKFS_LUFID_MAGIC;
1921         param->edp_len =  sizeof(struct lu_fid) + 1;
1922
1923         fid_cpu_to_be((struct lu_fid *)param->edp_data,
1924                       (struct lu_fid *)fid);
1925 }
1926
1927 /**
1928  * Try to read the fid from inode ea into dt_rec, if return value
1929  * i.e. rc is +ve, then we got fid, otherwise we will have to form igif
1930  *
1931  * \param fid object fid.
1932  *
1933  * \retval 0 on success
1934  */
1935 static int osd_ea_fid_get(const struct lu_env *env, struct osd_object *obj,
1936                           __u32 ino, struct lu_fid *fid)
1937 {
1938         struct osd_thread_info  *info      = osd_oti_get(env);
1939         struct lustre_mdt_attrs *mdt_attrs = &info->oti_mdt_attrs;
1940         struct lu_device        *ldev   = obj->oo_dt.do_lu.lo_dev;
1941         struct dentry           *dentry = &info->oti_child_dentry;
1942         struct osd_inode_id     *id     = &info->oti_id;
1943         struct osd_device       *dev;
1944         struct inode            *inode;
1945         int                      rc;
1946
1947         ENTRY;
1948         dev  = osd_dev(ldev);
1949
1950         id->oii_ino = ino;
1951         id->oii_gen = OSD_OII_NOGEN;
1952
1953         inode = osd_iget(info, dev, id);
1954         if (IS_ERR(inode)) {
1955                 rc = PTR_ERR(inode);
1956                 GOTO(out,rc);
1957         }
1958         dentry->d_inode = inode;
1959
1960         LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
1961         rc = inode->i_op->getxattr(dentry, XATTR_NAME_LMA, (void *)mdt_attrs,
1962                                    sizeof *mdt_attrs);
1963
1964         /* Check LMA compatibility */
1965         if (rc > 0 &&
1966             (mdt_attrs->lma_incompat & ~cpu_to_le32(LMA_INCOMPAT_SUPP))) {
1967                 CWARN("Inode %lx: Unsupported incompat LMA feature(s) %#x\n",
1968                       inode->i_ino, le32_to_cpu(mdt_attrs->lma_incompat) &
1969                       ~LMA_INCOMPAT_SUPP);
1970                 return -ENOSYS;
1971         }
1972
1973         if (rc > 0) {
1974                 lustre_lma_swab(mdt_attrs);
1975                 memcpy(fid, &mdt_attrs->lma_self_fid, sizeof(*fid));
1976                 rc = 0;
1977         } else if (rc == -ENODATA) {
1978                 osd_igif_get(env, inode, fid);
1979                 rc = 0;
1980         }
1981         iput(inode);
1982 out:
1983         RETURN(rc);
1984 }
1985
1986 /**
1987  * OSD layer object create function for interoperability mode (b11826).
1988  * This is mostly similar to osd_object_create(). Only difference being, fid is
1989  * inserted into inode ea here.
1990  *
1991  * \retval   0, on success
1992  * \retval -ve, on error
1993  */
1994 static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt,
1995                                 struct lu_attr *attr,
1996                                 struct dt_allocation_hint *hint,
1997                                 struct dt_object_format *dof,
1998                                 struct thandle *th)
1999 {
2000         const struct lu_fid    *fid    = lu_object_fid(&dt->do_lu);
2001         struct osd_object      *obj    = osd_dt_obj(dt);
2002         struct osd_thread_info *info   = osd_oti_get(env);
2003         int                     result;
2004
2005         ENTRY;
2006
2007         LASSERT(osd_invariant(obj));
2008         LASSERT(!dt_object_exists(dt));
2009         LASSERT(osd_write_locked(env, obj));
2010         LASSERT(th != NULL);
2011
2012         OSD_EXEC_OP(th, create);
2013
2014         result = __osd_object_create(info, obj, attr, hint, dof, th);
2015         /* objects under osd root shld have igif fid, so dont add fid EA */
2016         if (result == 0 && fid_seq(fid) >= FID_SEQ_NORMAL)
2017                 result = osd_ea_fid_set(env, dt, fid);
2018
2019         if (result == 0)
2020                 result = __osd_oi_insert(env, obj, fid, th);
2021
2022         LASSERT(ergo(result == 0, dt_object_exists(dt)));
2023         LINVRNT(osd_invariant(obj));
2024         RETURN(result);
2025 }
2026
2027 static int osd_declare_object_ref_add(const struct lu_env *env,
2028                                       struct dt_object *dt,
2029                                       struct thandle *handle)
2030 {
2031         struct osd_thandle *oh;
2032
2033         /* it's possible that object doesn't exist yet */
2034         LASSERT(handle != NULL);
2035
2036         oh = container_of0(handle, struct osd_thandle, ot_super);
2037         LASSERT(oh->ot_handle == NULL);
2038
2039         OSD_DECLARE_OP(oh, ref_add);
2040         oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE];
2041
2042         return 0;
2043 }
2044
2045 /*
2046  * Concurrency: @dt is write locked.
2047  */
2048 static int osd_object_ref_add(const struct lu_env *env,
2049                               struct dt_object *dt, struct thandle *th)
2050 {
2051         struct osd_object *obj = osd_dt_obj(dt);
2052         struct inode      *inode = obj->oo_inode;
2053
2054         LINVRNT(osd_invariant(obj));
2055         LASSERT(dt_object_exists(dt));
2056         LASSERT(osd_write_locked(env, obj));
2057         LASSERT(th != NULL);
2058
2059         OSD_EXEC_OP(th, ref_add);
2060
2061         /*
2062          * DIR_NLINK feature is set for compatibility reasons if:
2063          * 1) nlinks > LDISKFS_LINK_MAX, or
2064          * 2) nlinks == 2, since this indicates i_nlink was previously 1.
2065          *
2066          * It is easier to always set this flag (rather than check and set),
2067          * since it has less overhead, and the superblock will be dirtied
2068          * at some point. Both e2fsprogs and any Lustre-supported ldiskfs
2069          * do not actually care whether this flag is set or not.
2070          */
2071         cfs_spin_lock(&obj->oo_guard);
2072         inode->i_nlink++;
2073         if (S_ISDIR(inode->i_mode) && inode->i_nlink > 1) {
2074                 if (inode->i_nlink >= LDISKFS_LINK_MAX ||
2075                     inode->i_nlink == 2)
2076                         inode->i_nlink = 1;
2077         }
2078         LASSERT(inode->i_nlink < LDISKFS_LINK_MAX);
2079         cfs_spin_unlock(&obj->oo_guard);
2080         inode->i_sb->s_op->dirty_inode(inode);
2081         LINVRNT(osd_invariant(obj));
2082
2083         return 0;
2084 }
2085
2086 static int osd_declare_object_ref_del(const struct lu_env *env,
2087                                       struct dt_object *dt,
2088                                       struct thandle *handle)
2089 {
2090         struct osd_thandle *oh;
2091
2092         LASSERT(dt_object_exists(dt));
2093         LASSERT(handle != NULL);
2094
2095         oh = container_of0(handle, struct osd_thandle, ot_super);
2096         LASSERT(oh->ot_handle == NULL);
2097
2098         OSD_DECLARE_OP(oh, ref_del);
2099         oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE];
2100
2101         return 0;
2102 }
2103
2104 /*
2105  * Concurrency: @dt is write locked.
2106  */
2107 static int osd_object_ref_del(const struct lu_env *env, struct dt_object *dt,
2108                               struct thandle *th)
2109 {
2110         struct osd_object *obj = osd_dt_obj(dt);
2111         struct inode      *inode = obj->oo_inode;
2112
2113         LINVRNT(osd_invariant(obj));
2114         LASSERT(dt_object_exists(dt));
2115         LASSERT(osd_write_locked(env, obj));
2116         LASSERT(th != NULL);
2117
2118         OSD_EXEC_OP(th, ref_del);
2119
2120         cfs_spin_lock(&obj->oo_guard);
2121         LASSERT(inode->i_nlink > 0);
2122         inode->i_nlink--;
2123         /* If this is/was a many-subdir directory (nlink > LDISKFS_LINK_MAX)
2124          * then the nlink count is 1. Don't let it be set to 0 or the directory
2125          * inode will be deleted incorrectly. */
2126         if (S_ISDIR(inode->i_mode) && inode->i_nlink == 0)
2127                 inode->i_nlink++;
2128         cfs_spin_unlock(&obj->oo_guard);
2129         inode->i_sb->s_op->dirty_inode(inode);
2130         LINVRNT(osd_invariant(obj));
2131
2132         return 0;
2133 }
2134
2135 /*
2136  * Get the 64-bit version for an inode.
2137  */
2138 static int osd_object_version_get(const struct lu_env *env,
2139                                   struct dt_object *dt, dt_obj_version_t *ver)
2140 {
2141         struct inode *inode = osd_dt_obj(dt)->oo_inode;
2142
2143         CDEBUG(D_INODE, "Get version "LPX64" for inode %lu\n",
2144                LDISKFS_I(inode)->i_fs_version, inode->i_ino);
2145         *ver = LDISKFS_I(inode)->i_fs_version;
2146         return 0;
2147 }
2148
2149 /*
2150  * Concurrency: @dt is read locked.
2151  */
2152 static int osd_xattr_get(const struct lu_env *env, struct dt_object *dt,
2153                          struct lu_buf *buf, const char *name,
2154                          struct lustre_capa *capa)
2155 {
2156         struct osd_object      *obj    = osd_dt_obj(dt);
2157         struct inode           *inode  = obj->oo_inode;
2158         struct osd_thread_info *info   = osd_oti_get(env);
2159         struct dentry          *dentry = &info->oti_obj_dentry;
2160
2161         /* version get is not real XATTR but uses xattr API */
2162         if (strcmp(name, XATTR_NAME_VERSION) == 0) {
2163                 /* for version we are just using xattr API but change inode
2164                  * field instead */
2165                 LASSERT(buf->lb_len == sizeof(dt_obj_version_t));
2166                 osd_object_version_get(env, dt, buf->lb_buf);
2167                 return sizeof(dt_obj_version_t);
2168         }
2169
2170         LASSERT(dt_object_exists(dt));
2171         LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
2172         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
2173
2174         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
2175                 return -EACCES;
2176
2177         dentry->d_inode = inode;
2178         return inode->i_op->getxattr(dentry, name, buf->lb_buf, buf->lb_len);
2179 }
2180
2181
2182 static int osd_declare_xattr_set(const struct lu_env *env,
2183                                  struct dt_object *dt,
2184                                  const struct lu_buf *buf, const char *name,
2185                                  int fl, struct thandle *handle)
2186 {
2187         struct osd_thandle *oh;
2188
2189         LASSERT(handle != NULL);
2190
2191         if (strcmp(name, XATTR_NAME_VERSION) == 0) {
2192                 /* no credits for version */
2193                 return 0;
2194         }
2195
2196         oh = container_of0(handle, struct osd_thandle, ot_super);
2197         LASSERT(oh->ot_handle == NULL);
2198
2199         OSD_DECLARE_OP(oh, xattr_set);
2200         oh->ot_credits += osd_dto_credits_noquota[DTO_XATTR_SET];
2201
2202         return 0;
2203 }
2204
2205 /*
2206  * Set the 64-bit version for object
2207  */
2208 static void osd_object_version_set(const struct lu_env *env,
2209                                    struct dt_object *dt,
2210                                    dt_obj_version_t *new_version)
2211 {
2212         struct inode *inode = osd_dt_obj(dt)->oo_inode;
2213
2214         CDEBUG(D_INODE, "Set version "LPX64" (old "LPX64") for inode %lu\n",
2215                *new_version, LDISKFS_I(inode)->i_fs_version, inode->i_ino);
2216
2217         LDISKFS_I(inode)->i_fs_version = *new_version;
2218         /** Version is set after all inode operations are finished,
2219          *  so we should mark it dirty here */
2220         inode->i_sb->s_op->dirty_inode(inode);
2221 }
2222
2223 /*
2224  * Concurrency: @dt is write locked.
2225  */
2226 static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
2227                          const struct lu_buf *buf, const char *name, int fl,
2228                          struct thandle *handle, struct lustre_capa *capa)
2229 {
2230         LASSERT(handle != NULL);
2231
2232         /* version set is not real XATTR */
2233         if (strcmp(name, XATTR_NAME_VERSION) == 0) {
2234                 /* for version we are just using xattr API but change inode
2235                  * field instead */
2236                 LASSERT(buf->lb_len == sizeof(dt_obj_version_t));
2237                 osd_object_version_set(env, dt, buf->lb_buf);
2238                 return sizeof(dt_obj_version_t);
2239         }
2240
2241         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
2242                 return -EACCES;
2243
2244         OSD_EXEC_OP(handle, xattr_set);
2245         return __osd_xattr_set(env, dt, buf, name, fl);
2246 }
2247
2248 /*
2249  * Concurrency: @dt is read locked.
2250  */
2251 static int osd_xattr_list(const struct lu_env *env, struct dt_object *dt,
2252                           struct lu_buf *buf, struct lustre_capa *capa)
2253 {
2254         struct osd_object      *obj    = osd_dt_obj(dt);
2255         struct inode           *inode  = obj->oo_inode;
2256         struct osd_thread_info *info   = osd_oti_get(env);
2257         struct dentry          *dentry = &info->oti_obj_dentry;
2258
2259         LASSERT(dt_object_exists(dt));
2260         LASSERT(inode->i_op != NULL && inode->i_op->listxattr != NULL);
2261         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
2262
2263         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
2264                 return -EACCES;
2265
2266         dentry->d_inode = inode;
2267         return inode->i_op->listxattr(dentry, buf->lb_buf, buf->lb_len);
2268 }
2269
2270 static int osd_declare_xattr_del(const struct lu_env *env,
2271                                  struct dt_object *dt, const char *name,
2272                                  struct thandle *handle)
2273 {
2274         struct osd_thandle *oh;
2275
2276         LASSERT(dt_object_exists(dt));
2277         LASSERT(handle != NULL);
2278
2279         oh = container_of0(handle, struct osd_thandle, ot_super);
2280         LASSERT(oh->ot_handle == NULL);
2281
2282         OSD_DECLARE_OP(oh, xattr_set);
2283         oh->ot_credits += osd_dto_credits_noquota[DTO_XATTR_SET];
2284
2285         return 0;
2286 }
2287
2288 /*
2289  * Concurrency: @dt is write locked.
2290  */
2291 static int osd_xattr_del(const struct lu_env *env, struct dt_object *dt,
2292                          const char *name, struct thandle *handle,
2293                          struct lustre_capa *capa)
2294 {
2295         struct osd_object      *obj    = osd_dt_obj(dt);
2296         struct inode           *inode  = obj->oo_inode;
2297         struct osd_thread_info *info   = osd_oti_get(env);
2298         struct dentry          *dentry = &info->oti_obj_dentry;
2299         int                     rc;
2300
2301         LASSERT(dt_object_exists(dt));
2302         LASSERT(inode->i_op != NULL && inode->i_op->removexattr != NULL);
2303         LASSERT(osd_write_locked(env, obj));
2304         LASSERT(handle != NULL);
2305
2306         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
2307                 return -EACCES;
2308
2309         OSD_EXEC_OP(handle, xattr_set);
2310
2311         dentry->d_inode = inode;
2312         rc = inode->i_op->removexattr(dentry, name);
2313         return rc;
2314 }
2315
2316 static struct obd_capa *osd_capa_get(const struct lu_env *env,
2317                                      struct dt_object *dt,
2318                                      struct lustre_capa *old,
2319                                      __u64 opc)
2320 {
2321         struct osd_thread_info *info = osd_oti_get(env);
2322         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
2323         struct osd_object *obj = osd_dt_obj(dt);
2324         struct osd_device *dev = osd_obj2dev(obj);
2325         struct lustre_capa_key *key = &info->oti_capa_key;
2326         struct lustre_capa *capa = &info->oti_capa;
2327         struct obd_capa *oc;
2328         struct md_capainfo *ci;
2329         int rc;
2330         ENTRY;
2331
2332         if (!dev->od_fl_capa)
2333                 RETURN(ERR_PTR(-ENOENT));
2334
2335         LASSERT(dt_object_exists(dt));
2336         LINVRNT(osd_invariant(obj));
2337
2338         /* renewal sanity check */
2339         if (old && osd_object_auth(env, dt, old, opc))
2340                 RETURN(ERR_PTR(-EACCES));
2341
2342         ci = md_capainfo(env);
2343         if (unlikely(!ci))
2344                 RETURN(ERR_PTR(-ENOENT));
2345
2346         switch (ci->mc_auth) {
2347         case LC_ID_NONE:
2348                 RETURN(NULL);
2349         case LC_ID_PLAIN:
2350                 capa->lc_uid = obj->oo_inode->i_uid;
2351                 capa->lc_gid = obj->oo_inode->i_gid;
2352                 capa->lc_flags = LC_ID_PLAIN;
2353                 break;
2354         case LC_ID_CONVERT: {
2355                 __u32 d[4], s[4];
2356
2357                 s[0] = obj->oo_inode->i_uid;
2358                 cfs_get_random_bytes(&(s[1]), sizeof(__u32));
2359                 s[2] = obj->oo_inode->i_gid;
2360                 cfs_get_random_bytes(&(s[3]), sizeof(__u32));
2361                 rc = capa_encrypt_id(d, s, key->lk_key, CAPA_HMAC_KEY_MAX_LEN);
2362                 if (unlikely(rc))
2363                         RETURN(ERR_PTR(rc));
2364
2365                 capa->lc_uid   = ((__u64)d[1] << 32) | d[0];
2366                 capa->lc_gid   = ((__u64)d[3] << 32) | d[2];
2367                 capa->lc_flags = LC_ID_CONVERT;
2368                 break;
2369         }
2370         default:
2371                 RETURN(ERR_PTR(-EINVAL));
2372         }
2373
2374         capa->lc_fid = *fid;
2375         capa->lc_opc = opc;
2376         capa->lc_flags |= dev->od_capa_alg << 24;
2377         capa->lc_timeout = dev->od_capa_timeout;
2378         capa->lc_expiry = 0;
2379
2380         oc = capa_lookup(dev->od_capa_hash, capa, 1);
2381         if (oc) {
2382                 LASSERT(!capa_is_expired(oc));
2383                 RETURN(oc);
2384         }
2385
2386         cfs_spin_lock(&capa_lock);
2387         *key = dev->od_capa_keys[1];
2388         cfs_spin_unlock(&capa_lock);
2389
2390         capa->lc_keyid = key->lk_keyid;
2391         capa->lc_expiry = cfs_time_current_sec() + dev->od_capa_timeout;
2392
2393         rc = capa_hmac(capa->lc_hmac, capa, key->lk_key);
2394         if (rc) {
2395                 DEBUG_CAPA(D_ERROR, capa, "HMAC failed: %d for", rc);
2396                 RETURN(ERR_PTR(rc));
2397         }
2398
2399         oc = capa_add(dev->od_capa_hash, capa);
2400         RETURN(oc);
2401 }
2402
2403 static int osd_object_sync(const struct lu_env *env, struct dt_object *dt)
2404 {
2405         struct osd_object      *obj    = osd_dt_obj(dt);
2406         struct inode           *inode  = obj->oo_inode;
2407         struct osd_thread_info *info   = osd_oti_get(env);
2408         struct dentry          *dentry = &info->oti_obj_dentry;
2409         struct file            *file   = &info->oti_file;
2410         int                     rc;
2411
2412         ENTRY;
2413
2414         dentry->d_inode = inode;
2415         file->f_dentry = dentry;
2416         file->f_mapping = inode->i_mapping;
2417         file->f_op = inode->i_fop;
2418         LOCK_INODE_MUTEX(inode);
2419         rc = file->f_op->fsync(file, dentry, 0);
2420         UNLOCK_INODE_MUTEX(inode);
2421         RETURN(rc);
2422 }
2423
2424 static int osd_data_get(const struct lu_env *env, struct dt_object *dt,
2425                         void **data)
2426 {
2427         struct osd_object *obj = osd_dt_obj(dt);
2428         ENTRY;
2429
2430         *data = (void *)obj->oo_inode;
2431         RETURN(0);
2432 }
2433
2434 /*
2435  * Index operations.
2436  */
2437
2438 static int osd_iam_index_probe(const struct lu_env *env, struct osd_object *o,
2439                            const struct dt_index_features *feat)
2440 {
2441         struct iam_descr *descr;
2442
2443         if (osd_object_is_root(o))
2444                 return feat == &dt_directory_features;
2445
2446         LASSERT(o->oo_dir != NULL);
2447
2448         descr = o->oo_dir->od_container.ic_descr;
2449         if (feat == &dt_directory_features) {
2450                 if (descr->id_rec_size == sizeof(struct osd_fid_pack))
2451                         return 1;
2452                 else
2453                         return 0;
2454         } else {
2455                 return
2456                         feat->dif_keysize_min <= descr->id_key_size &&
2457                         descr->id_key_size <= feat->dif_keysize_max &&
2458                         feat->dif_recsize_min <= descr->id_rec_size &&
2459                         descr->id_rec_size <= feat->dif_recsize_max &&
2460                         !(feat->dif_flags & (DT_IND_VARKEY |
2461                                              DT_IND_VARREC | DT_IND_NONUNQ)) &&
2462                         ergo(feat->dif_flags & DT_IND_UPDATE,
2463                              1 /* XXX check that object (and file system) is
2464                                 * writable */);
2465         }
2466 }
2467
2468 static int osd_iam_container_init(const struct lu_env *env,
2469                                   struct osd_object *obj,
2470                                   struct osd_directory *dir)
2471 {
2472         struct iam_container *bag = &dir->od_container;
2473         int result;
2474
2475         result = iam_container_init(bag, &dir->od_descr, obj->oo_inode);
2476         if (result != 0)
2477                 return result;
2478
2479         result = iam_container_setup(bag);
2480         if (result != 0)
2481                 goto out;
2482
2483         if (osd_obj2dev(obj)->od_iop_mode) {
2484                 u32 ptr = bag->ic_descr->id_ops->id_root_ptr(bag);
2485
2486                 bag->ic_root_bh = ldiskfs_bread(NULL, obj->oo_inode,
2487                                                 ptr, 0, &result);
2488         }
2489
2490  out:
2491         if (result == 0)
2492                 obj->oo_dt.do_index_ops = &osd_index_iam_ops;
2493         else
2494                 iam_container_fini(bag);
2495
2496         return result;
2497 }
2498
2499
2500 /*
2501  * Concurrency: no external locking is necessary.
2502  */
2503 static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
2504                          const struct dt_index_features *feat)
2505 {
2506         int result;
2507         int ea_dir = 0;
2508         struct osd_object *obj = osd_dt_obj(dt);
2509         struct osd_device *osd = osd_obj2dev(obj);
2510
2511         LINVRNT(osd_invariant(obj));
2512         LASSERT(dt_object_exists(dt));
2513
2514         if (osd_object_is_root(obj)) {
2515                 dt->do_index_ops = &osd_index_ea_ops;
2516                 result = 0;
2517         } else if (feat == &dt_directory_features && osd->od_iop_mode) {
2518                 dt->do_index_ops = &osd_index_ea_ops;
2519                 if (S_ISDIR(obj->oo_inode->i_mode))
2520                         result = 0;
2521                 else
2522                         result = -ENOTDIR;
2523                 ea_dir = 1;
2524         } else if (!osd_has_index(obj)) {
2525                 struct osd_directory *dir;
2526
2527                 OBD_ALLOC_PTR(dir);
2528                 if (dir != NULL) {
2529
2530                         cfs_spin_lock(&obj->oo_guard);
2531                         if (obj->oo_dir == NULL)
2532                                 obj->oo_dir = dir;
2533                         else
2534                                 /*
2535                                  * Concurrent thread allocated container data.
2536                                  */
2537                                 OBD_FREE_PTR(dir);
2538                         cfs_spin_unlock(&obj->oo_guard);
2539                         /*
2540                          * Now, that we have container data, serialize its
2541                          * initialization.
2542                          */
2543                         cfs_down_write(&obj->oo_ext_idx_sem);
2544                         /*
2545                          * recheck under lock.
2546                          */
2547                         if (!osd_has_index(obj))
2548                                 result = osd_iam_container_init(env, obj, dir);
2549                         else
2550                                 result = 0;
2551                         cfs_up_write(&obj->oo_ext_idx_sem);
2552                 } else {
2553                         result = -ENOMEM;
2554                 }
2555         } else {
2556                 result = 0;
2557         }
2558
2559         if (result == 0 && ea_dir == 0) {
2560                 if (!osd_iam_index_probe(env, obj, feat))
2561                         result = -ENOTDIR;
2562         }
2563         LINVRNT(osd_invariant(obj));
2564
2565         return result;
2566 }
2567
2568 static const struct dt_object_operations osd_obj_ops = {
2569         .do_read_lock         = osd_object_read_lock,
2570         .do_write_lock        = osd_object_write_lock,
2571         .do_read_unlock       = osd_object_read_unlock,
2572         .do_write_unlock      = osd_object_write_unlock,
2573         .do_write_locked      = osd_object_write_locked,
2574         .do_attr_get          = osd_attr_get,
2575         .do_declare_attr_set  = osd_declare_attr_set,
2576         .do_attr_set          = osd_attr_set,
2577         .do_ah_init           = osd_ah_init,
2578         .do_declare_create    = osd_declare_object_create,
2579         .do_create            = osd_object_create,
2580         .do_declare_destroy   = osd_declare_object_destroy,
2581         .do_destroy           = osd_object_destroy,
2582         .do_index_try         = osd_index_try,
2583         .do_declare_ref_add   = osd_declare_object_ref_add,
2584         .do_ref_add           = osd_object_ref_add,
2585         .do_declare_ref_del   = osd_declare_object_ref_del,
2586         .do_ref_del           = osd_object_ref_del,
2587         .do_xattr_get         = osd_xattr_get,
2588         .do_declare_xattr_set = osd_declare_xattr_set,
2589         .do_xattr_set         = osd_xattr_set,
2590         .do_declare_xattr_del = osd_declare_xattr_del,
2591         .do_xattr_del         = osd_xattr_del,
2592         .do_xattr_list        = osd_xattr_list,
2593         .do_capa_get          = osd_capa_get,
2594         .do_object_sync       = osd_object_sync,
2595         .do_data_get          = osd_data_get,
2596 };
2597
2598 /**
2599  * dt_object_operations for interoperability mode
2600  * (i.e. to run 2.0 mds on 1.8 disk) (b11826)
2601  */
2602 static const struct dt_object_operations osd_obj_ea_ops = {
2603         .do_read_lock         = osd_object_read_lock,
2604         .do_write_lock        = osd_object_write_lock,
2605         .do_read_unlock       = osd_object_read_unlock,
2606         .do_write_unlock      = osd_object_write_unlock,
2607         .do_write_locked      = osd_object_write_locked,
2608         .do_attr_get          = osd_attr_get,
2609         .do_declare_attr_set  = osd_declare_attr_set,
2610         .do_attr_set          = osd_attr_set,
2611         .do_ah_init           = osd_ah_init,
2612         .do_declare_create    = osd_declare_object_create,
2613         .do_create            = osd_object_ea_create,
2614         .do_declare_destroy   = osd_declare_object_destroy,
2615         .do_destroy           = osd_object_destroy,
2616         .do_index_try         = osd_index_try,
2617         .do_declare_ref_add   = osd_declare_object_ref_add,
2618         .do_ref_add           = osd_object_ref_add,
2619         .do_declare_ref_del   = osd_declare_object_ref_del,
2620         .do_ref_del           = osd_object_ref_del,
2621         .do_xattr_get         = osd_xattr_get,
2622         .do_declare_xattr_set = osd_declare_xattr_set,
2623         .do_xattr_set         = osd_xattr_set,
2624         .do_declare_xattr_del = osd_declare_xattr_del,
2625         .do_xattr_del         = osd_xattr_del,
2626         .do_xattr_list        = osd_xattr_list,
2627         .do_capa_get          = osd_capa_get,
2628         .do_object_sync       = osd_object_sync,
2629         .do_data_get          = osd_data_get,
2630 };
2631
2632 static int osd_index_declare_iam_delete(const struct lu_env *env,
2633                                         struct dt_object *dt,
2634                                         const struct dt_key *key,
2635                                         struct thandle *handle)
2636 {
2637         struct osd_thandle    *oh;
2638
2639         oh = container_of0(handle, struct osd_thandle, ot_super);
2640         LASSERT(oh->ot_handle == NULL);
2641
2642         OSD_DECLARE_OP(oh, delete);
2643         oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_DELETE];
2644
2645         return 0;
2646 }
2647
2648 /**
2649  *      delete a (key, value) pair from index \a dt specified by \a key
2650  *
2651  *      \param  dt      osd index object
2652  *      \param  key     key for index
2653  *      \param  rec     record reference
2654  *      \param  handle  transaction handler
2655  *
2656  *      \retval  0  success
2657  *      \retval -ve   failure
2658  */
2659
2660 static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt,
2661                                 const struct dt_key *key,
2662                                 struct thandle *handle,
2663                                 struct lustre_capa *capa)
2664 {
2665         struct osd_object     *obj = osd_dt_obj(dt);
2666         struct osd_thandle    *oh;
2667         struct iam_path_descr *ipd;
2668         struct iam_container  *bag = &obj->oo_dir->od_container;
2669         int                    rc;
2670
2671         ENTRY;
2672
2673         LINVRNT(osd_invariant(obj));
2674         LASSERT(dt_object_exists(dt));
2675         LASSERT(bag->ic_object == obj->oo_inode);
2676         LASSERT(handle != NULL);
2677
2678         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
2679                 RETURN(-EACCES);
2680
2681         OSD_EXEC_OP(handle, delete);
2682
2683         ipd = osd_idx_ipd_get(env, bag);
2684         if (unlikely(ipd == NULL))
2685                 RETURN(-ENOMEM);
2686
2687         oh = container_of0(handle, struct osd_thandle, ot_super);
2688         LASSERT(oh->ot_handle != NULL);
2689         LASSERT(oh->ot_handle->h_transaction != NULL);
2690
2691         rc = iam_delete(oh->ot_handle, bag, (const struct iam_key *)key, ipd);
2692         osd_ipd_put(env, bag, ipd);
2693         LINVRNT(osd_invariant(obj));
2694         RETURN(rc);
2695 }
2696
2697 static int osd_index_declare_ea_delete(const struct lu_env *env,
2698                                        struct dt_object *dt,
2699                                        const struct dt_key *key,
2700                                        struct thandle *handle)
2701 {
2702         struct osd_thandle *oh;
2703
2704         LASSERT(dt_object_exists(dt));
2705         LASSERT(handle != NULL);
2706
2707         oh = container_of0(handle, struct osd_thandle, ot_super);
2708         LASSERT(oh->ot_handle == NULL);
2709
2710         OSD_DECLARE_OP(oh, delete);
2711         oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_DELETE];
2712
2713         LASSERT(osd_dt_obj(dt)->oo_inode);
2714         osd_declare_qid(dt, oh, USRQUOTA, osd_dt_obj(dt)->oo_inode->i_uid,
2715                         osd_dt_obj(dt)->oo_inode);
2716         osd_declare_qid(dt, oh, GRPQUOTA, osd_dt_obj(dt)->oo_inode->i_gid,
2717                         osd_dt_obj(dt)->oo_inode);
2718
2719         return 0;
2720 }
2721
2722 static inline int osd_get_fid_from_dentry(struct ldiskfs_dir_entry_2 *de,
2723                                           struct dt_rec *fid)
2724 {
2725         struct osd_fid_pack *rec;
2726         int                  rc = -ENODATA;
2727
2728         if (de->file_type & LDISKFS_DIRENT_LUFID) {
2729                 rec = (struct osd_fid_pack *) (de->name + de->name_len + 1);
2730                 rc = osd_fid_unpack((struct lu_fid *)fid, rec);
2731         }
2732         RETURN(rc);
2733 }
2734
2735 /**
2736  * Index delete function for interoperability mode (b11826).
2737  * It will remove the directory entry added by osd_index_ea_insert().
2738  * This entry is needed to maintain name->fid mapping.
2739  *
2740  * \param key,  key i.e. file entry to be deleted
2741  *
2742  * \retval   0, on success
2743  * \retval -ve, on error
2744  */
2745 static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
2746                                const struct dt_key *key,
2747                                struct thandle *handle,
2748                                struct lustre_capa *capa)
2749 {
2750         struct osd_object          *obj    = osd_dt_obj(dt);
2751         struct inode               *dir    = obj->oo_inode;
2752         struct dentry              *dentry;
2753         struct osd_thandle         *oh;
2754         struct ldiskfs_dir_entry_2 *de;
2755         struct buffer_head         *bh;
2756         struct htree_lock          *hlock = NULL;
2757         int                         rc;
2758
2759         ENTRY;
2760
2761         LINVRNT(osd_invariant(obj));
2762         LASSERT(dt_object_exists(dt));
2763         LASSERT(handle != NULL);
2764
2765         OSD_EXEC_OP(handle, delete);
2766
2767         oh = container_of(handle, struct osd_thandle, ot_super);
2768         LASSERT(oh->ot_handle != NULL);
2769         LASSERT(oh->ot_handle->h_transaction != NULL);
2770
2771         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
2772                 RETURN(-EACCES);
2773
2774         dentry = osd_child_dentry_get(env, obj,
2775                                       (char *)key, strlen((char *)key));
2776
2777         if (obj->oo_hl_head != NULL) {
2778                 hlock = osd_oti_get(env)->oti_hlock;
2779                 ldiskfs_htree_lock(hlock, obj->oo_hl_head,
2780                                    dir, LDISKFS_HLOCK_DEL);
2781         } else {
2782                 cfs_down_write(&obj->oo_ext_idx_sem);
2783         }
2784
2785         bh = osd_ldiskfs_find_entry(dir, dentry, &de, hlock);
2786         if (bh) {
2787                 rc = ldiskfs_delete_entry(oh->ot_handle,
2788                                           dir, de, bh);
2789                 brelse(bh);
2790         } else {
2791                 rc = -ENOENT;
2792         }
2793         if (hlock != NULL)
2794                 ldiskfs_htree_unlock(hlock);
2795         else
2796                 cfs_up_write(&obj->oo_ext_idx_sem);
2797
2798         LASSERT(osd_invariant(obj));
2799         RETURN(rc);
2800 }
2801
2802 /**
2803  *      Lookup index for \a key and copy record to \a rec.
2804  *
2805  *      \param  dt      osd index object
2806  *      \param  key     key for index
2807  *      \param  rec     record reference
2808  *
2809  *      \retval  +ve  success : exact mach
2810  *      \retval  0    return record with key not greater than \a key
2811  *      \retval -ve   failure
2812  */
2813 static int osd_index_iam_lookup(const struct lu_env *env, struct dt_object *dt,
2814                                 struct dt_rec *rec, const struct dt_key *key,
2815                                 struct lustre_capa *capa)
2816 {
2817         struct osd_object      *obj = osd_dt_obj(dt);
2818         struct iam_path_descr  *ipd;
2819         struct iam_container   *bag = &obj->oo_dir->od_container;
2820         struct osd_thread_info *oti = osd_oti_get(env);
2821         struct iam_iterator    *it = &oti->oti_idx_it;
2822         struct iam_rec         *iam_rec;
2823         int                     rc;
2824
2825         ENTRY;
2826
2827         LASSERT(osd_invariant(obj));
2828         LASSERT(dt_object_exists(dt));
2829         LASSERT(bag->ic_object == obj->oo_inode);
2830
2831         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
2832                 RETURN(-EACCES);
2833
2834         ipd = osd_idx_ipd_get(env, bag);
2835         if (IS_ERR(ipd))
2836                 RETURN(-ENOMEM);
2837
2838         /* got ipd now we can start iterator. */
2839         iam_it_init(it, bag, 0, ipd);
2840
2841         rc = iam_it_get(it, (struct iam_key *)key);
2842         if (rc >= 0) {
2843                 if (S_ISDIR(obj->oo_inode->i_mode))
2844                         iam_rec = (struct iam_rec *)oti->oti_ldp;
2845                 else
2846                         iam_rec = (struct iam_rec *) rec;
2847
2848                 iam_reccpy(&it->ii_path.ip_leaf, (struct iam_rec *)iam_rec);
2849                 if (S_ISDIR(obj->oo_inode->i_mode))
2850                         osd_fid_unpack((struct lu_fid *) rec,
2851                                        (struct osd_fid_pack *)iam_rec);
2852         }
2853         iam_it_put(it);
2854         iam_it_fini(it);
2855         osd_ipd_put(env, bag, ipd);
2856
2857         LINVRNT(osd_invariant(obj));
2858
2859         RETURN(rc);
2860 }
2861
2862 static int osd_index_declare_iam_insert(const struct lu_env *env,
2863                                         struct dt_object *dt,
2864                                         const struct dt_rec *rec,
2865                                         const struct dt_key *key,
2866                                         struct thandle *handle)
2867 {
2868         struct osd_thandle *oh;
2869
2870         LASSERT(dt_object_exists(dt));
2871         LASSERT(handle != NULL);
2872
2873         oh = container_of0(handle, struct osd_thandle, ot_super);
2874         LASSERT(oh->ot_handle == NULL);
2875
2876         OSD_DECLARE_OP(oh, insert);
2877         oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT];
2878
2879         return 0;
2880 }
2881
2882 /**
2883  *      Inserts (key, value) pair in \a dt index object.
2884  *
2885  *      \param  dt      osd index object
2886  *      \param  key     key for index
2887  *      \param  rec     record reference
2888  *      \param  th      transaction handler
2889  *
2890  *      \retval  0  success
2891  *      \retval -ve failure
2892  */
2893 static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt,
2894                                 const struct dt_rec *rec,
2895                                 const struct dt_key *key, struct thandle *th,
2896                                 struct lustre_capa *capa, int ignore_quota)
2897 {
2898         struct osd_object     *obj = osd_dt_obj(dt);
2899         struct iam_path_descr *ipd;
2900         struct osd_thandle    *oh;
2901         struct iam_container  *bag = &obj->oo_dir->od_container;
2902 #ifdef HAVE_QUOTA_SUPPORT
2903         cfs_cap_t              save = cfs_curproc_cap_pack();
2904 #endif
2905         struct osd_thread_info *oti = osd_oti_get(env);
2906         struct iam_rec         *iam_rec = (struct iam_rec *)oti->oti_ldp;
2907         int                     rc;
2908
2909         ENTRY;
2910
2911         LINVRNT(osd_invariant(obj));
2912         LASSERT(dt_object_exists(dt));
2913         LASSERT(bag->ic_object == obj->oo_inode);
2914         LASSERT(th != NULL);
2915
2916         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
2917                 return -EACCES;
2918
2919         OSD_EXEC_OP(th, insert);
2920
2921         ipd = osd_idx_ipd_get(env, bag);
2922         if (unlikely(ipd == NULL))
2923                 RETURN(-ENOMEM);
2924
2925         oh = container_of0(th, struct osd_thandle, ot_super);
2926         LASSERT(oh->ot_handle != NULL);
2927         LASSERT(oh->ot_handle->h_transaction != NULL);
2928 #ifdef HAVE_QUOTA_SUPPORT
2929         if (ignore_quota)
2930                 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
2931         else
2932                 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
2933 #endif
2934         if (S_ISDIR(obj->oo_inode->i_mode))
2935                 osd_fid_pack((struct osd_fid_pack *)iam_rec, rec, &oti->oti_fid);
2936         else
2937                 iam_rec = (struct iam_rec *) rec;
2938         rc = iam_insert(oh->ot_handle, bag, (const struct iam_key *)key,
2939                         iam_rec, ipd);
2940 #ifdef HAVE_QUOTA_SUPPORT
2941         cfs_curproc_cap_unpack(save);
2942 #endif
2943         osd_ipd_put(env, bag, ipd);
2944         LINVRNT(osd_invariant(obj));
2945         RETURN(rc);
2946 }
2947
2948 /**
2949  * Calls ldiskfs_add_entry() to add directory entry
2950  * into the directory. This is required for
2951  * interoperability mode (b11826)
2952  *
2953  * \retval   0, on success
2954  * \retval -ve, on error
2955  */
2956 static int __osd_ea_add_rec(struct osd_thread_info *info,
2957                             struct osd_object *pobj, struct inode  *cinode,
2958                             const char *name, const struct dt_rec *fid,
2959                             struct htree_lock *hlock, struct thandle *th)
2960 {
2961         struct ldiskfs_dentry_param *ldp;
2962         struct dentry               *child;
2963         struct osd_thandle          *oth;
2964         int                          rc;
2965
2966         oth = container_of(th, struct osd_thandle, ot_super);
2967         LASSERT(oth->ot_handle != NULL);
2968         LASSERT(oth->ot_handle->h_transaction != NULL);
2969
2970         child = osd_child_dentry_get(info->oti_env, pobj, name, strlen(name));
2971
2972         /* XXX: remove fid_is_igif() check here.
2973          * IGIF check is just to handle insertion of .. when it is 'ROOT',
2974          * it is IGIF now but needs FID in dir entry as well for readdir
2975          * to work.
2976          * LU-838 should fix that and remove fid_is_igif() check */
2977         if (fid_is_igif((struct lu_fid *)fid) ||
2978             fid_is_norm((struct lu_fid *)fid)) {
2979                 ldp = (struct ldiskfs_dentry_param *)info->oti_ldp;
2980                 osd_get_ldiskfs_dirent_param(ldp, fid);
2981                 child->d_fsdata = (void *)ldp;
2982         } else {
2983                 child->d_fsdata = NULL;
2984         }
2985         rc = osd_ldiskfs_add_entry(oth->ot_handle, child, cinode, hlock);
2986
2987         RETURN(rc);
2988 }
2989
2990 /**
2991  * Calls ldiskfs_add_dot_dotdot() to add dot and dotdot entries
2992  * into the directory.Also sets flags into osd object to
2993  * indicate dot and dotdot are created. This is required for
2994  * interoperability mode (b11826)
2995  *
2996  * \param dir   directory for dot and dotdot fixup.
2997  * \param obj   child object for linking
2998  *
2999  * \retval   0, on success
3000  * \retval -ve, on error
3001  */
3002 static int osd_add_dot_dotdot(struct osd_thread_info *info,
3003                               struct osd_object *dir,
3004                               struct inode  *parent_dir, const char *name,
3005                               const struct dt_rec *dot_fid,
3006                               const struct dt_rec *dot_dot_fid,
3007                               struct thandle *th)
3008 {
3009         struct inode                *inode = dir->oo_inode;
3010         struct ldiskfs_dentry_param *dot_ldp;
3011         struct ldiskfs_dentry_param *dot_dot_ldp;
3012         struct osd_thandle          *oth;
3013         int result = 0;
3014
3015         oth = container_of(th, struct osd_thandle, ot_super);
3016         LASSERT(oth->ot_handle->h_transaction != NULL);
3017         LASSERT(S_ISDIR(dir->oo_inode->i_mode));
3018
3019         if (strcmp(name, dot) == 0) {
3020                 if (dir->oo_compat_dot_created) {
3021                         result = -EEXIST;
3022                 } else {
3023                         LASSERT(inode == parent_dir);
3024                         dir->oo_compat_dot_created = 1;
3025                         result = 0;
3026                 }
3027         } else if(strcmp(name, dotdot) == 0) {
3028                 dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp;
3029                 dot_dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp2;
3030
3031                 if (!dir->oo_compat_dot_created)
3032                         return -EINVAL;
3033                 if (!fid_is_igif((struct lu_fid *)dot_fid)) {
3034                         osd_get_ldiskfs_dirent_param(dot_ldp, dot_fid);
3035                         osd_get_ldiskfs_dirent_param(dot_dot_ldp, dot_dot_fid);
3036                 } else {
3037                         dot_ldp = NULL;
3038                         dot_dot_ldp = NULL;
3039                 }
3040                 /* in case of rename, dotdot is already created */
3041                 if (dir->oo_compat_dotdot_created) {
3042                         return __osd_ea_add_rec(info, dir, parent_dir, name,
3043                                                 dot_dot_fid, NULL, th);
3044                 }
3045
3046                 result = ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir,
3047                                                 inode, dot_ldp, dot_dot_ldp);
3048                 if (result == 0)
3049                        dir->oo_compat_dotdot_created = 1;
3050         }
3051
3052         return result;
3053 }
3054
3055
3056 /**
3057  * It will call the appropriate osd_add* function and return the
3058  * value, return by respective functions.
3059  */
3060 static int osd_ea_add_rec(const struct lu_env *env, struct osd_object *pobj,
3061                           struct inode *cinode, const char *name,
3062                           const struct dt_rec *fid, struct thandle *th)
3063 {
3064         struct osd_thread_info *info   = osd_oti_get(env);
3065         struct htree_lock      *hlock;
3066         int                     rc;
3067
3068         hlock = pobj->oo_hl_head != NULL ? info->oti_hlock : NULL;
3069
3070         if (name[0] == '.' && (name[1] == '\0' || (name[1] == '.' &&
3071                                                    name[2] =='\0'))) {
3072                 if (hlock != NULL) {
3073                         ldiskfs_htree_lock(hlock, pobj->oo_hl_head,
3074                                            pobj->oo_inode, 0);
3075                 } else {
3076                         cfs_down_write(&pobj->oo_ext_idx_sem);
3077                 }
3078                 rc = osd_add_dot_dotdot(info, pobj, cinode, name,
3079                      (struct dt_rec *)lu_object_fid(&pobj->oo_dt.do_lu),
3080                                         fid, th);
3081         } else {
3082                 if (hlock != NULL) {
3083                         ldiskfs_htree_lock(hlock, pobj->oo_hl_head,
3084                                            pobj->oo_inode, LDISKFS_HLOCK_ADD);
3085                 } else {
3086                         cfs_down_write(&pobj->oo_ext_idx_sem);
3087                 }
3088
3089                 rc = __osd_ea_add_rec(info, pobj, cinode, name, fid,
3090                                       hlock, th);
3091         }
3092         if (hlock != NULL)
3093                 ldiskfs_htree_unlock(hlock);
3094         else
3095                 cfs_up_write(&pobj->oo_ext_idx_sem);
3096
3097         return rc;
3098 }
3099
3100 /**
3101  * Calls ->lookup() to find dentry. From dentry get inode and
3102  * read inode's ea to get fid. This is required for  interoperability
3103  * mode (b11826)
3104  *
3105  * \retval   0, on success
3106  * \retval -ve, on error
3107  */
3108 static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
3109                              struct dt_rec *rec, const struct dt_key *key)
3110 {
3111         struct inode               *dir    = obj->oo_inode;
3112         struct dentry              *dentry;
3113         struct ldiskfs_dir_entry_2 *de;
3114         struct buffer_head         *bh;
3115         struct lu_fid              *fid = (struct lu_fid *) rec;
3116         struct htree_lock          *hlock = NULL;
3117         int                         ino;
3118         int                         rc;
3119
3120         LASSERT(dir->i_op != NULL && dir->i_op->lookup != NULL);
3121
3122         dentry = osd_child_dentry_get(env, obj,
3123                                       (char *)key, strlen((char *)key));
3124
3125         if (obj->oo_hl_head != NULL) {
3126                 hlock = osd_oti_get(env)->oti_hlock;
3127                 ldiskfs_htree_lock(hlock, obj->oo_hl_head,
3128                                    dir, LDISKFS_HLOCK_LOOKUP);
3129         } else {
3130                 cfs_down_read(&obj->oo_ext_idx_sem);
3131         }
3132
3133         bh = osd_ldiskfs_find_entry(dir, dentry, &de, hlock);
3134         if (bh) {
3135                 ino = le32_to_cpu(de->inode);
3136                 rc = osd_get_fid_from_dentry(de, rec);
3137
3138                 /* done with de, release bh */
3139                 brelse(bh);
3140                 if (rc != 0)
3141                         rc = osd_ea_fid_get(env, obj, ino, fid);
3142         } else {
3143                 rc = -ENOENT;
3144         }
3145
3146         if (hlock != NULL)
3147                 ldiskfs_htree_unlock(hlock);
3148         else
3149                 cfs_up_read(&obj->oo_ext_idx_sem);
3150         RETURN (rc);
3151 }
3152
3153 /**
3154  * Find the osd object for given fid.
3155  *
3156  * \param fid need to find the osd object having this fid
3157  *
3158  * \retval osd_object on success
3159  * \retval        -ve on error
3160  */
3161 struct osd_object *osd_object_find(const struct lu_env *env,
3162                                    struct dt_object *dt,
3163                                    const struct lu_fid *fid)
3164 {
3165         struct lu_device  *ludev = dt->do_lu.lo_dev;
3166         struct osd_object *child = NULL;
3167         struct lu_object  *luch;
3168         struct lu_object  *lo;
3169
3170         luch = lu_object_find(env, ludev, fid, NULL);
3171         if (!IS_ERR(luch)) {
3172                 if (lu_object_exists(luch)) {
3173                         lo = lu_object_locate(luch->lo_header, ludev->ld_type);
3174                         if (lo != NULL)
3175                                 child = osd_obj(lo);
3176                         else
3177                                 LU_OBJECT_DEBUG(D_ERROR, env, luch,
3178                                                 "lu_object can't be located"
3179                                                 ""DFID"\n", PFID(fid));
3180
3181                         if (child == NULL) {
3182                                 lu_object_put(env, luch);
3183                                 CERROR("Unable to get osd_object\n");
3184                                 child = ERR_PTR(-ENOENT);
3185                         }
3186                 } else {
3187                         LU_OBJECT_DEBUG(D_ERROR, env, luch,
3188                                         "lu_object does not exists "DFID"\n",
3189                                         PFID(fid));
3190                         child = ERR_PTR(-ENOENT);
3191                 }
3192         } else
3193                 child = (void *)luch;
3194
3195         return child;
3196 }
3197
3198 /**
3199  * Put the osd object once done with it.
3200  *
3201  * \param obj osd object that needs to be put
3202  */
3203 static inline void osd_object_put(const struct lu_env *env,
3204                                   struct osd_object *obj)
3205 {
3206         lu_object_put(env, &obj->oo_dt.do_lu);
3207 }
3208
3209 static int osd_index_declare_ea_insert(const struct lu_env *env,
3210                                        struct dt_object *dt,
3211                                        const struct dt_rec *rec,
3212                                        const struct dt_key *key,
3213                                        struct thandle *handle)
3214 {
3215         struct osd_thandle *oh;
3216
3217         LASSERT(dt_object_exists(dt));
3218         LASSERT(handle != NULL);
3219
3220         oh = container_of0(handle, struct osd_thandle, ot_super);
3221         LASSERT(oh->ot_handle == NULL);
3222
3223         OSD_DECLARE_OP(oh, insert);
3224         oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT];
3225
3226         LASSERT(osd_dt_obj(dt)->oo_inode);
3227         osd_declare_qid(dt, oh, USRQUOTA, osd_dt_obj(dt)->oo_inode->i_uid,
3228                         osd_dt_obj(dt)->oo_inode);
3229         osd_declare_qid(dt, oh, GRPQUOTA, osd_dt_obj(dt)->oo_inode->i_gid,
3230                         osd_dt_obj(dt)->oo_inode);
3231
3232         return 0;
3233 }
3234
3235 /**
3236  * Index add function for interoperability mode (b11826).
3237  * It will add the directory entry.This entry is needed to
3238  * maintain name->fid mapping.
3239  *
3240  * \param key it is key i.e. file entry to be inserted
3241  * \param rec it is value of given key i.e. fid
3242  *
3243  * \retval   0, on success
3244  * \retval -ve, on error
3245  */
3246 static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
3247                                const struct dt_rec *rec,
3248                                const struct dt_key *key, struct thandle *th,
3249                                struct lustre_capa *capa, int ignore_quota)
3250 {
3251         struct osd_object *obj   = osd_dt_obj(dt);
3252         struct lu_fid     *fid   = (struct lu_fid *) rec;
3253         const char        *name  = (const char *)key;
3254         struct osd_object *child;
3255 #ifdef HAVE_QUOTA_SUPPORT
3256         cfs_cap_t          save  = cfs_curproc_cap_pack();
3257 #endif
3258         int                rc;
3259
3260         ENTRY;
3261
3262         LASSERT(osd_invariant(obj));
3263         LASSERT(dt_object_exists(dt));
3264         LASSERT(th != NULL);
3265
3266         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
3267                 RETURN(-EACCES);
3268
3269         child = osd_object_find(env, dt, fid);
3270         if (!IS_ERR(child)) {
3271 #ifdef HAVE_QUOTA_SUPPORT
3272                 if (ignore_quota)
3273                         cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
3274                 else
3275                         cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
3276 #endif
3277                 rc = osd_ea_add_rec(env, obj, child->oo_inode, name, rec, th);
3278 #ifdef HAVE_QUOTA_SUPPORT
3279                 cfs_curproc_cap_unpack(save);
3280 #endif
3281                 osd_object_put(env, child);
3282         } else {
3283                 rc = PTR_ERR(child);
3284         }
3285
3286         LASSERT(osd_invariant(obj));
3287         RETURN(rc);
3288 }
3289
3290 /**
3291  *  Initialize osd Iterator for given osd index object.
3292  *
3293  *  \param  dt      osd index object
3294  */
3295
3296 static struct dt_it *osd_it_iam_init(const struct lu_env *env,
3297                                      struct dt_object *dt,
3298                                      __u32 unused,
3299                                      struct lustre_capa *capa)
3300 {
3301         struct osd_it_iam      *it;
3302         struct osd_thread_info *oti = osd_oti_get(env);
3303         struct osd_object      *obj = osd_dt_obj(dt);
3304         struct lu_object       *lo  = &dt->do_lu;
3305         struct iam_path_descr  *ipd;
3306         struct iam_container   *bag = &obj->oo_dir->od_container;
3307
3308         LASSERT(lu_object_exists(lo));
3309
3310         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
3311                 return ERR_PTR(-EACCES);
3312
3313         it = &oti->oti_it;
3314         ipd = osd_it_ipd_get(env, bag);
3315         if (likely(ipd != NULL)) {
3316                 it->oi_obj = obj;
3317                 it->oi_ipd = ipd;
3318                 lu_object_get(lo);
3319                 iam_it_init(&it->oi_it, bag, IAM_IT_MOVE, ipd);
3320                 return (struct dt_it *)it;
3321         }
3322         return ERR_PTR(-ENOMEM);
3323 }
3324
3325 /**
3326  * free given Iterator.
3327  */
3328
3329 static void osd_it_iam_fini(const struct lu_env *env, struct dt_it *di)
3330 {
3331         struct osd_it_iam *it = (struct osd_it_iam *)di;
3332         struct osd_object *obj = it->oi_obj;
3333
3334         iam_it_fini(&it->oi_it);
3335         osd_ipd_put(env, &obj->oo_dir->od_container, it->oi_ipd);
3336         lu_object_put(env, &obj->oo_dt.do_lu);
3337 }
3338
3339 /**
3340  *  Move Iterator to record specified by \a key
3341  *
3342  *  \param  di      osd iterator
3343  *  \param  key     key for index
3344  *
3345  *  \retval +ve  di points to record with least key not larger than key
3346  *  \retval  0   di points to exact matched key
3347  *  \retval -ve  failure
3348  */
3349
3350 static int osd_it_iam_get(const struct lu_env *env,
3351                           struct dt_it *di, const struct dt_key *key)
3352 {
3353         struct osd_it_iam *it = (struct osd_it_iam *)di;
3354
3355         return iam_it_get(&it->oi_it, (const struct iam_key *)key);
3356 }
3357
3358 /**
3359  *  Release Iterator
3360  *
3361  *  \param  di      osd iterator
3362  */
3363
3364 static void osd_it_iam_put(const struct lu_env *env, struct dt_it *di)
3365 {
3366         struct osd_it_iam *it = (struct osd_it_iam *)di;
3367
3368         iam_it_put(&it->oi_it);
3369 }
3370
3371 /**
3372  *  Move iterator by one record
3373  *
3374  *  \param  di      osd iterator
3375  *
3376  *  \retval +1   end of container reached
3377  *  \retval  0   success
3378  *  \retval -ve  failure
3379  */
3380
3381 static int osd_it_iam_next(const struct lu_env *env, struct dt_it *di)
3382 {
3383         struct osd_it_iam *it = (struct osd_it_iam *)di;
3384
3385         return iam_it_next(&it->oi_it);
3386 }
3387
3388 /**
3389  * Return pointer to the key under iterator.
3390  */
3391
3392 static struct dt_key *osd_it_iam_key(const struct lu_env *env,
3393                                  const struct dt_it *di)
3394 {
3395         struct osd_it_iam *it = (struct osd_it_iam *)di;
3396
3397         return (struct dt_key *)iam_it_key_get(&it->oi_it);
3398 }
3399
3400 /**
3401  * Return size of key under iterator (in bytes)
3402  */
3403
3404 static int osd_it_iam_key_size(const struct lu_env *env, const struct dt_it *di)
3405 {
3406         struct osd_it_iam *it = (struct osd_it_iam *)di;
3407
3408         return iam_it_key_size(&it->oi_it);
3409 }
3410
3411 static inline void osd_it_append_attrs(struct lu_dirent *ent, __u32 attr,
3412                                        int len, __u16 type)
3413 {
3414         struct luda_type *lt;
3415         const unsigned    align = sizeof(struct luda_type) - 1;
3416
3417         /* check if file type is required */
3418         if (attr & LUDA_TYPE) {
3419                         len = (len + align) & ~align;
3420
3421                         lt = (void *) ent->lde_name + len;
3422                         lt->lt_type = cpu_to_le16(CFS_DTTOIF(type));
3423                         ent->lde_attrs |= LUDA_TYPE;
3424         }
3425
3426         ent->lde_attrs = cpu_to_le32(ent->lde_attrs);
3427 }
3428
3429 /**
3430  * build lu direct from backend fs dirent.
3431  */
3432
3433 static inline void osd_it_pack_dirent(struct lu_dirent *ent,
3434                                       struct lu_fid *fid, __u64 offset,
3435                                       char *name, __u16 namelen,
3436                                       __u16 type, __u32 attr)
3437 {
3438         fid_cpu_to_le(&ent->lde_fid, fid);
3439         ent->lde_attrs = LUDA_FID;
3440
3441         ent->lde_hash = cpu_to_le64(offset);
3442         ent->lde_reclen = cpu_to_le16(lu_dirent_calc_size(namelen, attr));
3443
3444         strncpy(ent->lde_name, name, namelen);
3445         ent->lde_namelen = cpu_to_le16(namelen);
3446
3447         /* append lustre attributes */
3448         osd_it_append_attrs(ent, attr, namelen, type);
3449 }
3450
3451 /**
3452  * Return pointer to the record under iterator.
3453  */
3454 static int osd_it_iam_rec(const struct lu_env *env,
3455                           const struct dt_it *di,
3456                           struct dt_rec *dtrec, __u32 attr)
3457 {
3458         struct osd_it_iam *it        = (struct osd_it_iam *)di;
3459         struct osd_thread_info *info = osd_oti_get(env);
3460         struct lu_fid     *fid       = &info->oti_fid;
3461         const struct osd_fid_pack *rec;
3462         struct lu_dirent *lde = (struct lu_dirent *)dtrec;
3463         char *name;
3464         int namelen;
3465         __u64 hash;
3466         int rc;
3467
3468         name = (char *)iam_it_key_get(&it->oi_it);
3469         if (IS_ERR(name))
3470                 RETURN(PTR_ERR(name));
3471
3472         namelen = iam_it_key_size(&it->oi_it);
3473
3474         rec = (const struct osd_fid_pack *) iam_it_rec_get(&it->oi_it);
3475         if (IS_ERR(rec))
3476                 RETURN(PTR_ERR(rec));
3477
3478         rc = osd_fid_unpack(fid, rec);
3479         if (rc)
3480                 RETURN(rc);
3481
3482         hash = iam_it_store(&it->oi_it);
3483
3484         /* IAM does not store object type in IAM index (dir) */
3485         osd_it_pack_dirent(lde, fid, hash, name, namelen,
3486                            0, LUDA_FID);
3487
3488         return 0;
3489 }
3490
3491 /**
3492  * Returns cookie for current Iterator position.
3493  */
3494 static __u64 osd_it_iam_store(const struct lu_env *env, const struct dt_it *di)
3495 {
3496         struct osd_it_iam *it = (struct osd_it_iam *)di;
3497
3498         return iam_it_store(&it->oi_it);
3499 }
3500
3501 /**
3502  * Restore iterator from cookie.
3503  *
3504  * \param  di      osd iterator
3505  * \param  hash    Iterator location cookie
3506  *
3507  * \retval +ve  di points to record with least key not larger than key.
3508  * \retval  0   di points to exact matched key
3509  * \retval -ve  failure
3510  */
3511
3512 static int osd_it_iam_load(const struct lu_env *env,
3513                            const struct dt_it *di, __u64 hash)
3514 {
3515         struct osd_it_iam *it = (struct osd_it_iam *)di;
3516
3517         return iam_it_load(&it->oi_it, hash);
3518 }
3519
3520 static const struct dt_index_operations osd_index_iam_ops = {
3521         .dio_lookup         = osd_index_iam_lookup,
3522         .dio_declare_insert = osd_index_declare_iam_insert,
3523         .dio_insert         = osd_index_iam_insert,
3524         .dio_declare_delete = osd_index_declare_iam_delete,
3525         .dio_delete         = osd_index_iam_delete,
3526         .dio_it     = {
3527                 .init     = osd_it_iam_init,
3528                 .fini     = osd_it_iam_fini,
3529                 .get      = osd_it_iam_get,
3530                 .put      = osd_it_iam_put,
3531                 .next     = osd_it_iam_next,
3532                 .key      = osd_it_iam_key,
3533                 .key_size = osd_it_iam_key_size,
3534                 .rec      = osd_it_iam_rec,
3535                 .store    = osd_it_iam_store,
3536                 .load     = osd_it_iam_load
3537         }
3538 };
3539
3540 /**
3541  * Creates or initializes iterator context.
3542  *
3543  * \retval struct osd_it_ea, iterator structure on success
3544  *
3545  */
3546 static struct dt_it *osd_it_ea_init(const struct lu_env *env,
3547                                     struct dt_object *dt,
3548                                     __u32 attr,
3549                                     struct lustre_capa *capa)
3550 {
3551         struct osd_object       *obj  = osd_dt_obj(dt);
3552         struct osd_thread_info  *info = osd_oti_get(env);
3553         struct osd_it_ea        *it   = &info->oti_it_ea;
3554         struct lu_object        *lo   = &dt->do_lu;
3555         struct dentry           *obj_dentry = &info->oti_it_dentry;
3556         ENTRY;
3557         LASSERT(lu_object_exists(lo));
3558
3559         obj_dentry->d_inode = obj->oo_inode;
3560         obj_dentry->d_sb = osd_sb(osd_obj2dev(obj));
3561         obj_dentry->d_name.hash = 0;
3562
3563         it->oie_rd_dirent       = 0;
3564         it->oie_it_dirent       = 0;
3565         it->oie_dirent          = NULL;
3566         it->oie_buf             = info->oti_it_ea_buf;
3567         it->oie_obj             = obj;
3568         it->oie_file.f_pos      = 0;
3569         it->oie_file.f_dentry   = obj_dentry;
3570         if (attr & LUDA_64BITHASH)
3571                 it->oie_file.f_flags = O_64BITHASH;
3572         else
3573                 it->oie_file.f_flags = O_32BITHASH;
3574         it->oie_file.f_mapping    = obj->oo_inode->i_mapping;
3575         it->oie_file.f_op         = obj->oo_inode->i_fop;
3576         it->oie_file.private_data = NULL;
3577         lu_object_get(lo);
3578         RETURN((struct dt_it *) it);
3579 }
3580
3581 /**
3582  * Destroy or finishes iterator context.
3583  *
3584  * \param di iterator structure to be destroyed
3585  */
3586 static void osd_it_ea_fini(const struct lu_env *env, struct dt_it *di)
3587 {
3588         struct osd_it_ea     *it   = (struct osd_it_ea *)di;
3589         struct osd_object    *obj  = it->oie_obj;
3590         struct inode       *inode  = obj->oo_inode;
3591
3592         ENTRY;
3593         it->oie_file.f_op->release(inode, &it->oie_file);
3594         lu_object_put(env, &obj->oo_dt.do_lu);
3595         EXIT;
3596 }
3597
3598 /**
3599  * It position the iterator at given key, so that next lookup continues from
3600  * that key Or it is similar to dio_it->load() but based on a key,
3601  * rather than file position.
3602  *
3603  * As a special convention, osd_it_ea_get(env, di, "") has to rewind iterator
3604  * to the beginning.
3605  *
3606  * TODO: Presently return +1 considering it is only used by mdd_dir_is_empty().
3607  */
3608 static int osd_it_ea_get(const struct lu_env *env,
3609                          struct dt_it *di, const struct dt_key *key)
3610 {
3611         struct osd_it_ea     *it   = (struct osd_it_ea *)di;
3612
3613         ENTRY;
3614         LASSERT(((const char *)key)[0] == '\0');
3615         it->oie_file.f_pos      = 0;
3616         it->oie_rd_dirent       = 0;
3617         it->oie_it_dirent       = 0;
3618         it->oie_dirent          = NULL;
3619
3620         RETURN(+1);
3621 }
3622
3623 /**
3624  * Does nothing
3625  */
3626 static void osd_it_ea_put(const struct lu_env *env, struct dt_it *di)
3627 {
3628 }
3629
3630 /**
3631  * It is called internally by ->readdir(). It fills the
3632  * iterator's in-memory data structure with required
3633  * information i.e. name, namelen, rec_size etc.
3634  *
3635  * \param buf in which information to be filled in.
3636  * \param name name of the file in given dir
3637  *
3638  * \retval 0 on success
3639  * \retval 1 on buffer full
3640  */
3641 static int osd_ldiskfs_filldir(char *buf, const char *name, int namelen,
3642                                loff_t offset, __u64 ino,
3643                                unsigned d_type)
3644 {
3645         struct osd_it_ea        *it   = (struct osd_it_ea *)buf;
3646         struct osd_it_ea_dirent *ent  = it->oie_dirent;
3647         struct lu_fid           *fid  = &ent->oied_fid;
3648         struct osd_fid_pack     *rec;
3649         ENTRY;
3650
3651         /* this should never happen */
3652         if (unlikely(namelen == 0 || namelen > LDISKFS_NAME_LEN)) {
3653                 CERROR("ldiskfs return invalid namelen %d\n", namelen);
3654                 RETURN(-EIO);
3655         }
3656
3657         if ((void *) ent - it->oie_buf + sizeof(*ent) + namelen >
3658             OSD_IT_EA_BUFSIZE)
3659                 RETURN(1);
3660
3661         if (d_type & LDISKFS_DIRENT_LUFID) {
3662                 rec = (struct osd_fid_pack*) (name + namelen + 1);
3663
3664                 if (osd_fid_unpack(fid, rec) != 0)
3665                         fid_zero(fid);
3666
3667                 d_type &= ~LDISKFS_DIRENT_LUFID;
3668         } else {
3669                 fid_zero(fid);
3670         }
3671
3672         ent->oied_ino     = ino;
3673         ent->oied_off     = offset;
3674         ent->oied_namelen = namelen;
3675         ent->oied_type    = d_type;
3676
3677         memcpy(ent->oied_name, name, namelen);
3678
3679         it->oie_rd_dirent++;
3680         it->oie_dirent = (void *) ent + cfs_size_round(sizeof(*ent) + namelen);
3681         RETURN(0);
3682 }
3683
3684 /**
3685  * Calls ->readdir() to load a directory entry at a time
3686  * and stored it in iterator's in-memory data structure.
3687  *
3688  * \param di iterator's in memory structure
3689  *
3690  * \retval   0 on success
3691  * \retval -ve on error
3692  */
3693 static int osd_ldiskfs_it_fill(const struct lu_env *env,
3694                                const struct dt_it *di)
3695 {
3696         struct osd_it_ea   *it    = (struct osd_it_ea *)di;
3697         struct osd_object  *obj   = it->oie_obj;
3698         struct inode       *inode = obj->oo_inode;
3699         struct htree_lock  *hlock = NULL;
3700         int                 result = 0;
3701
3702         ENTRY;
3703         it->oie_dirent = it->oie_buf;
3704         it->oie_rd_dirent = 0;
3705
3706         if (obj->oo_hl_head != NULL) {
3707                 hlock = osd_oti_get(env)->oti_hlock;
3708                 ldiskfs_htree_lock(hlock, obj->oo_hl_head,
3709                                    inode, LDISKFS_HLOCK_READDIR);
3710         } else {
3711                 cfs_down_read(&obj->oo_ext_idx_sem);
3712         }
3713
3714         result = inode->i_fop->readdir(&it->oie_file, it,
3715                                        (filldir_t) osd_ldiskfs_filldir);
3716
3717         if (hlock != NULL)
3718                 ldiskfs_htree_unlock(hlock);
3719         else
3720                 cfs_up_read(&obj->oo_ext_idx_sem);
3721
3722         if (it->oie_rd_dirent == 0) {
3723                 result = -EIO;
3724         } else {
3725                 it->oie_dirent = it->oie_buf;
3726                 it->oie_it_dirent = 1;
3727         }
3728
3729         RETURN(result);
3730 }
3731
3732 /**
3733  * It calls osd_ldiskfs_it_fill() which will use ->readdir()
3734  * to load a directory entry at a time and stored it in
3735  * iterator's in-memory data structure.
3736  *
3737  * \param di iterator's in memory structure
3738  *
3739  * \retval +ve iterator reached to end
3740  * \retval   0 iterator not reached to end
3741  * \retval -ve on error
3742  */
3743 static int osd_it_ea_next(const struct lu_env *env, struct dt_it *di)
3744 {
3745         struct osd_it_ea *it = (struct osd_it_ea *)di;
3746         int rc;
3747
3748         ENTRY;
3749
3750         if (it->oie_it_dirent < it->oie_rd_dirent) {
3751                 it->oie_dirent =
3752                         (void *) it->oie_dirent +
3753                         cfs_size_round(sizeof(struct osd_it_ea_dirent) +
3754                                        it->oie_dirent->oied_namelen);
3755                 it->oie_it_dirent++;
3756                 RETURN(0);
3757         } else {
3758                 if (it->oie_file.f_pos == LDISKFS_HTREE_EOF)
3759                         rc = +1;
3760                 else
3761                         rc = osd_ldiskfs_it_fill(env, di);
3762         }
3763
3764         RETURN(rc);
3765 }
3766
3767 /**
3768  * Returns the key at current position from iterator's in memory structure.
3769  *
3770  * \param di iterator's in memory structure
3771  *
3772  * \retval key i.e. struct dt_key on success
3773  */
3774 static struct dt_key *osd_it_ea_key(const struct lu_env *env,
3775                                     const struct dt_it *di)
3776 {
3777         struct osd_it_ea *it = (struct osd_it_ea *)di;
3778
3779         return (struct dt_key *)it->oie_dirent->oied_name;
3780 }
3781
3782 /**
3783  * Returns the key's size at current position from iterator's in memory structure.
3784  *
3785  * \param di iterator's in memory structure
3786  *
3787  * \retval key_size i.e. struct dt_key on success
3788  */
3789 static int osd_it_ea_key_size(const struct lu_env *env, const struct dt_it *di)
3790 {
3791         struct osd_it_ea *it = (struct osd_it_ea *)di;
3792
3793         return it->oie_dirent->oied_namelen;
3794 }
3795
3796
3797 /**
3798  * Returns the value (i.e. fid/igif) at current position from iterator's
3799  * in memory structure.
3800  *
3801  * \param di struct osd_it_ea, iterator's in memory structure
3802  * \param attr attr requested for dirent.
3803  * \param lde lustre dirent
3804  *
3805  * \retval   0 no error and \param lde has correct lustre dirent.
3806  * \retval -ve on error
3807  */
3808 static inline int osd_it_ea_rec(const struct lu_env *env,
3809                                 const struct dt_it *di,
3810                                 struct dt_rec *dtrec, __u32 attr)
3811 {
3812         struct osd_it_ea        *it     = (struct osd_it_ea *)di;
3813         struct osd_object       *obj    = it->oie_obj;
3814         struct lu_fid           *fid    = &it->oie_dirent->oied_fid;
3815         struct lu_dirent        *lde    = (struct lu_dirent *)dtrec;
3816         int    rc = 0;
3817
3818         ENTRY;
3819
3820         if (!fid_is_sane(fid))
3821                 rc = osd_ea_fid_get(env, obj, it->oie_dirent->oied_ino, fid);
3822
3823         if (rc == 0)
3824                 osd_it_pack_dirent(lde, fid, it->oie_dirent->oied_off,
3825                                    it->oie_dirent->oied_name,
3826                                    it->oie_dirent->oied_namelen,
3827                                    it->oie_dirent->oied_type,
3828                                    attr);
3829         RETURN(rc);
3830 }
3831
3832 /**
3833  * Returns a cookie for current position of the iterator head, so that
3834  * user can use this cookie to load/start the iterator next time.
3835  *
3836  * \param di iterator's in memory structure
3837  *
3838  * \retval cookie for current position, on success
3839  */
3840 static __u64 osd_it_ea_store(const struct lu_env *env, const struct dt_it *di)
3841 {
3842         struct osd_it_ea *it = (struct osd_it_ea *)di;
3843
3844         return it->oie_dirent->oied_off;
3845 }
3846
3847 /**
3848  * It calls osd_ldiskfs_it_fill() which will use ->readdir()
3849  * to load a directory entry at a time and stored it i inn,
3850  * in iterator's in-memory data structure.
3851  *
3852  * \param di struct osd_it_ea, iterator's in memory structure
3853  *
3854  * \retval +ve on success
3855  * \retval -ve on error
3856  */
3857 static int osd_it_ea_load(const struct lu_env *env,
3858                           const struct dt_it *di, __u64 hash)
3859 {
3860         struct osd_it_ea *it = (struct osd_it_ea *)di;
3861         int rc;
3862
3863         ENTRY;
3864         it->oie_file.f_pos = hash;
3865
3866         rc =  osd_ldiskfs_it_fill(env, di);
3867         if (rc == 0)
3868                 rc = +1;
3869
3870         RETURN(rc);
3871 }
3872
3873 /**
3874  * Index lookup function for interoperability mode (b11826).
3875  *
3876  * \param key,  key i.e. file name to be searched
3877  *
3878  * \retval +ve, on success
3879  * \retval -ve, on error
3880  */
3881 static int osd_index_ea_lookup(const struct lu_env *env, struct dt_object *dt,
3882                                struct dt_rec *rec, const struct dt_key *key,
3883                                struct lustre_capa *capa)
3884 {
3885         struct osd_object *obj = osd_dt_obj(dt);
3886         int rc = 0;
3887
3888         ENTRY;
3889
3890         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
3891         LINVRNT(osd_invariant(obj));
3892
3893         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
3894                 return -EACCES;
3895
3896         rc = osd_ea_lookup_rec(env, obj, rec, key);
3897
3898         if (rc == 0)
3899                 rc = +1;
3900         RETURN(rc);
3901 }
3902
3903 /**
3904  * Index and Iterator operations for interoperability
3905  * mode (i.e. to run 2.0 mds on 1.8 disk) (b11826)
3906  */
3907 static const struct dt_index_operations osd_index_ea_ops = {
3908         .dio_lookup         = osd_index_ea_lookup,
3909         .dio_declare_insert = osd_index_declare_ea_insert,
3910         .dio_insert         = osd_index_ea_insert,
3911         .dio_declare_delete = osd_index_declare_ea_delete,
3912         .dio_delete         = osd_index_ea_delete,
3913         .dio_it     = {
3914                 .init     = osd_it_ea_init,
3915                 .fini     = osd_it_ea_fini,
3916                 .get      = osd_it_ea_get,
3917                 .put      = osd_it_ea_put,
3918                 .next     = osd_it_ea_next,
3919                 .key      = osd_it_ea_key,
3920                 .key_size = osd_it_ea_key_size,
3921                 .rec      = osd_it_ea_rec,
3922                 .store    = osd_it_ea_store,
3923                 .load     = osd_it_ea_load
3924         }
3925 };
3926
3927 static void *osd_key_init(const struct lu_context *ctx,
3928                           struct lu_context_key *key)
3929 {
3930         struct osd_thread_info *info;
3931
3932         OBD_ALLOC_PTR(info);
3933         if (info == NULL)
3934                 return ERR_PTR(-ENOMEM);
3935
3936         OBD_ALLOC(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
3937         if (info->oti_it_ea_buf == NULL)
3938                 goto out_free_info;
3939
3940         info->oti_env = container_of(ctx, struct lu_env, le_ctx);
3941
3942         info->oti_hlock = ldiskfs_htree_lock_alloc();
3943         if (info->oti_hlock == NULL)
3944                 goto out_free_ea;
3945
3946         return info;
3947
3948  out_free_ea:
3949         OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
3950  out_free_info:
3951         OBD_FREE_PTR(info);
3952         return ERR_PTR(-ENOMEM);
3953 }
3954
3955 static void osd_key_fini(const struct lu_context *ctx,
3956                          struct lu_context_key *key, void* data)
3957 {
3958         struct osd_thread_info *info = data;
3959
3960         if (info->oti_hlock != NULL)
3961                 ldiskfs_htree_lock_free(info->oti_hlock);
3962         OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
3963         OBD_FREE_PTR(info);
3964 }
3965
3966 static void osd_key_exit(const struct lu_context *ctx,
3967                          struct lu_context_key *key, void *data)
3968 {
3969         struct osd_thread_info *info = data;
3970
3971         LASSERT(info->oti_r_locks == 0);
3972         LASSERT(info->oti_w_locks == 0);
3973         LASSERT(info->oti_txns    == 0);
3974 }
3975
3976 /* type constructor/destructor: osd_type_init, osd_type_fini */
3977 LU_TYPE_INIT_FINI(osd, &osd_key);
3978
3979 struct lu_context_key osd_key = {
3980         .lct_tags = LCT_DT_THREAD | LCT_MD_THREAD | LCT_MG_THREAD | LCT_LOCAL,
3981         .lct_init = osd_key_init,
3982         .lct_fini = osd_key_fini,
3983         .lct_exit = osd_key_exit
3984 };
3985
3986
3987 static int osd_device_init(const struct lu_env *env, struct lu_device *d,
3988                            const char *name, struct lu_device *next)
3989 {
3990         return osd_procfs_init(osd_dev(d), name);
3991 }
3992
3993 static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
3994 {
3995         struct osd_thread_info *info = osd_oti_get(env);
3996
3997         ENTRY;
3998
3999         if (o->od_oi_table != NULL)
4000                 osd_oi_fini(info, o);
4001
4002         if (o->od_fsops) {
4003                 fsfilt_put_ops(o->od_fsops);
4004                 o->od_fsops = NULL;
4005         }
4006
4007         RETURN(0);
4008 }
4009
4010 static int osd_mount(const struct lu_env *env,
4011                      struct osd_device *o, struct lustre_cfg *cfg)
4012 {
4013         struct lustre_mount_info *lmi;
4014         const char               *dev  = lustre_cfg_string(cfg, 0);
4015         struct lustre_disk_data  *ldd;
4016         struct lustre_sb_info    *lsi;
4017         int                       rc = 0;
4018
4019         ENTRY;
4020
4021         o->od_fsops = fsfilt_get_ops(mt_str(LDD_MT_LDISKFS));
4022         if (o->od_fsops == NULL) {
4023                 CERROR("Can't find fsfilt_ldiskfs\n");
4024                 RETURN(-ENOTSUPP);
4025         }
4026
4027         if (o->od_mount != NULL) {
4028                 CERROR("Already mounted (%s)\n", dev);
4029                 RETURN(-EEXIST);
4030         }
4031
4032         /* get mount */
4033         lmi = server_get_mount(dev);
4034         if (lmi == NULL) {
4035                 CERROR("Cannot get mount info for %s!\n", dev);
4036                 RETURN(-EFAULT);
4037         }
4038
4039         LASSERT(lmi != NULL);
4040         /* save lustre_mount_info in dt_device */
4041         o->od_mount = lmi;
4042         o->od_mnt = lmi->lmi_mnt;
4043
4044         lsi = s2lsi(lmi->lmi_sb);
4045         ldd = lsi->lsi_ldd;
4046
4047         if (ldd->ldd_flags & LDD_F_IAM_DIR) {
4048                 o->od_iop_mode = 0;
4049                 LCONSOLE_WARN("%s: OSD: IAM mode enabled\n", dev);
4050         } else
4051                 o->od_iop_mode = 1;
4052
4053         if (ldd->ldd_flags & LDD_F_SV_TYPE_OST) {
4054                 rc = osd_compat_init(o);
4055                 if (rc)
4056                         CERROR("%s: can't initialize compats: %d\n", dev, rc);
4057         }
4058
4059         RETURN(rc);
4060 }
4061
4062 static struct lu_device *osd_device_fini(const struct lu_env *env,
4063                                          struct lu_device *d)
4064 {
4065         int rc;
4066         ENTRY;
4067
4068         osd_compat_fini(osd_dev(d));
4069
4070         shrink_dcache_sb(osd_sb(osd_dev(d)));
4071         osd_sync(env, lu2dt_dev(d));
4072
4073         rc = osd_procfs_fini(osd_dev(d));
4074         if (rc) {
4075                 CERROR("proc fini error %d \n", rc);
4076                 RETURN (ERR_PTR(rc));
4077         }
4078
4079         if (osd_dev(d)->od_mount)
4080                 server_put_mount(osd_dev(d)->od_mount->lmi_name,
4081                                  osd_dev(d)->od_mount->lmi_mnt);
4082         osd_dev(d)->od_mount = NULL;
4083
4084         RETURN(NULL);
4085 }
4086
4087 static struct lu_device *osd_device_alloc(const struct lu_env *env,
4088                                           struct lu_device_type *t,
4089                                           struct lustre_cfg *cfg)
4090 {
4091         struct lu_device  *l;
4092         struct osd_device *o;
4093
4094         OBD_ALLOC_PTR(o);
4095         if (o != NULL) {
4096                 int result;
4097
4098                 result = dt_device_init(&o->od_dt_dev, t);
4099                 if (result == 0) {
4100                         l = osd2lu_dev(o);
4101                         l->ld_ops = &osd_lu_ops;
4102                         o->od_dt_dev.dd_ops = &osd_dt_ops;
4103                         cfs_spin_lock_init(&o->od_osfs_lock);
4104                         o->od_osfs_age = cfs_time_shift_64(-1000);
4105                         o->od_capa_hash = init_capa_hash();
4106                         if (o->od_capa_hash == NULL) {
4107                                 dt_device_fini(&o->od_dt_dev);
4108                                 l = ERR_PTR(-ENOMEM);
4109                         }
4110                 } else
4111                         l = ERR_PTR(result);
4112
4113                 if (IS_ERR(l))
4114                         OBD_FREE_PTR(o);
4115         } else
4116                 l = ERR_PTR(-ENOMEM);
4117         return l;
4118 }
4119
4120 static struct lu_device *osd_device_free(const struct lu_env *env,
4121                                          struct lu_device *d)
4122 {
4123         struct osd_device *o = osd_dev(d);
4124         ENTRY;
4125
4126         cleanup_capa_hash(o->od_capa_hash);
4127         dt_device_fini(&o->od_dt_dev);
4128         OBD_FREE_PTR(o);
4129         RETURN(NULL);
4130 }
4131
4132 static int osd_process_config(const struct lu_env *env,
4133                               struct lu_device *d, struct lustre_cfg *cfg)
4134 {
4135         struct osd_device *o = osd_dev(d);
4136         int err;
4137         ENTRY;
4138
4139         switch(cfg->lcfg_command) {
4140         case LCFG_SETUP:
4141                 err = osd_mount(env, o, cfg);
4142                 break;
4143         case LCFG_CLEANUP:
4144                 err = osd_shutdown(env, o);
4145                 break;
4146         default:
4147                 err = -ENOSYS;
4148         }
4149
4150         RETURN(err);
4151 }
4152
4153 static int osd_recovery_complete(const struct lu_env *env,
4154                                  struct lu_device *d)
4155 {
4156         RETURN(0);
4157 }
4158
4159 static int osd_prepare(const struct lu_env *env, struct lu_device *pdev,
4160                        struct lu_device *dev)
4161 {
4162         struct osd_device      *osd = osd_dev(dev);
4163         struct osd_thread_info *oti = osd_oti_get(env);
4164         int                     result;
4165
4166         ENTRY;
4167
4168         /* 1. initialize oi before any file create or file open */
4169         result = osd_oi_init(oti, osd);
4170         if (result < 0)
4171                 RETURN(result);
4172
4173         if (!lu_device_is_md(pdev))
4174                 RETURN(0);
4175
4176         /* 2. setup local objects */
4177         result = llo_local_objects_setup(env, lu2md_dev(pdev), lu2dt_dev(dev));
4178         RETURN(result);
4179 }
4180
4181 static const struct lu_object_operations osd_lu_obj_ops = {
4182         .loo_object_init      = osd_object_init,
4183         .loo_object_delete    = osd_object_delete,
4184         .loo_object_release   = osd_object_release,
4185         .loo_object_free      = osd_object_free,
4186         .loo_object_print     = osd_object_print,
4187         .loo_object_invariant = osd_object_invariant
4188 };
4189
4190 const struct lu_device_operations osd_lu_ops = {
4191         .ldo_object_alloc      = osd_object_alloc,
4192         .ldo_process_config    = osd_process_config,
4193         .ldo_recovery_complete = osd_recovery_complete,
4194         .ldo_prepare           = osd_prepare,
4195 };
4196
4197 static const struct lu_device_type_operations osd_device_type_ops = {
4198         .ldto_init = osd_type_init,
4199         .ldto_fini = osd_type_fini,
4200
4201         .ldto_start = osd_type_start,
4202         .ldto_stop  = osd_type_stop,
4203
4204         .ldto_device_alloc = osd_device_alloc,
4205         .ldto_device_free  = osd_device_free,
4206
4207         .ldto_device_init    = osd_device_init,
4208         .ldto_device_fini    = osd_device_fini
4209 };
4210
4211 static struct lu_device_type osd_device_type = {
4212         .ldt_tags     = LU_DEVICE_DT,
4213         .ldt_name     = LUSTRE_OSD_NAME,
4214         .ldt_ops      = &osd_device_type_ops,
4215         .ldt_ctx_tags = LCT_LOCAL,
4216 };
4217
4218 /*
4219  * lprocfs legacy support.
4220  */
4221 static struct obd_ops osd_obd_device_ops = {
4222         .o_owner = THIS_MODULE
4223 };
4224
4225 static int __init osd_mod_init(void)
4226 {
4227         struct lprocfs_static_vars lvars;
4228
4229         osd_oi_mod_init();
4230         lprocfs_osd_init_vars(&lvars);
4231         return class_register_type(&osd_obd_device_ops, NULL, lvars.module_vars,
4232                                    LUSTRE_OSD_NAME, &osd_device_type);
4233 }
4234
4235 static void __exit osd_mod_exit(void)
4236 {
4237         class_unregister_type(LUSTRE_OSD_NAME);
4238 }
4239
4240 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4241 MODULE_DESCRIPTION("Lustre Object Storage Device ("LUSTRE_OSD_NAME")");
4242 MODULE_LICENSE("GPL");
4243
4244 cfs_module(osd, "0.1.0", osd_mod_init, osd_mod_exit);