Whamcloud - gitweb
LU-1866 scrub: initial OI scrub
[fs/lustre-release.git] / lustre / osd-ldiskfs / osd_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/osd/osd_handler.c
37  *
38  * Top-level entry points into osd module
39  *
40  * Author: Nikita Danilov <nikita@clusterfs.com>
41  *         Pravin Shelar <pravin.shelar@sun.com> : Added fid in dirent
42  */
43
44 #define DEBUG_SUBSYSTEM S_MDS
45
46 #include <linux/module.h>
47
48 /* LUSTRE_VERSION_CODE */
49 #include <lustre_ver.h>
50 /* prerequisite for linux/xattr.h */
51 #include <linux/types.h>
52 /* prerequisite for linux/xattr.h */
53 #include <linux/fs.h>
54 /* XATTR_{REPLACE,CREATE} */
55 #include <linux/xattr.h>
56 /* simple_mkdir() */
57 #include <lvfs.h>
58
59 /*
60  * struct OBD_{ALLOC,FREE}*()
61  * OBD_FAIL_CHECK
62  */
63 #include <obd_support.h>
64 /* struct ptlrpc_thread */
65 #include <lustre_net.h>
66 #include <lustre_fid.h>
67
68 #include "osd_internal.h"
69
70 /* llo_* api support */
71 #include <md_object.h>
72 #include <lustre_quota.h>
73
74 int ldiskfs_pdo = 1;
75 CFS_MODULE_PARM(ldiskfs_pdo, "i", int, 0644,
76                 "ldiskfs with parallel directory operations");
77
78 static const char dot[] = ".";
79 static const char dotdot[] = "..";
80 static const char remote_obj_dir[] = "REM_OBJ_DIR";
81
82 static const struct lu_object_operations      osd_lu_obj_ops;
83 static const struct dt_object_operations      osd_obj_ops;
84 static const struct dt_object_operations      osd_obj_ea_ops;
85 static const struct dt_object_operations      osd_obj_otable_it_ops;
86 static const struct dt_index_operations       osd_index_iam_ops;
87 static const struct dt_index_operations       osd_index_ea_ops;
88
89 #ifdef OSD_TRACK_DECLARES
90 int osd_trans_declare_op2rb[] = {
91         [OSD_OT_ATTR_SET]       = OSD_OT_ATTR_SET,
92         [OSD_OT_PUNCH]          = OSD_OT_MAX,
93         [OSD_OT_XATTR_SET]      = OSD_OT_XATTR_SET,
94         [OSD_OT_CREATE]         = OSD_OT_DESTROY,
95         [OSD_OT_DESTROY]        = OSD_OT_CREATE,
96         [OSD_OT_REF_ADD]        = OSD_OT_REF_DEL,
97         [OSD_OT_REF_DEL]        = OSD_OT_REF_ADD,
98         [OSD_OT_WRITE]          = OSD_OT_WRITE,
99         [OSD_OT_INSERT]         = OSD_OT_DELETE,
100         [OSD_OT_DELETE]         = OSD_OT_INSERT,
101         [OSD_OT_QUOTA]          = OSD_OT_MAX,
102 };
103 #endif
104
105 static int osd_has_index(const struct osd_object *obj)
106 {
107         return obj->oo_dt.do_index_ops != NULL;
108 }
109
110 static int osd_object_invariant(const struct lu_object *l)
111 {
112         return osd_invariant(osd_obj(l));
113 }
114
115 /*
116  * Concurrency: doesn't matter
117  */
118 static int osd_read_locked(const struct lu_env *env, struct osd_object *o)
119 {
120         return osd_oti_get(env)->oti_r_locks > 0;
121 }
122
123 /*
124  * Concurrency: doesn't matter
125  */
126 static int osd_write_locked(const struct lu_env *env, struct osd_object *o)
127 {
128         struct osd_thread_info *oti = osd_oti_get(env);
129         return oti->oti_w_locks > 0 && o->oo_owner == env;
130 }
131
132 /*
133  * Concurrency: doesn't access mutable data
134  */
135 static int osd_root_get(const struct lu_env *env,
136                         struct dt_device *dev, struct lu_fid *f)
137 {
138         lu_local_obj_fid(f, OSD_FS_ROOT_OID);
139         return 0;
140 }
141
142 /*
143  * OSD object methods.
144  */
145
146 /*
147  * Concurrency: no concurrent access is possible that early in object
148  * life-cycle.
149  */
150 static struct lu_object *osd_object_alloc(const struct lu_env *env,
151                                           const struct lu_object_header *hdr,
152                                           struct lu_device *d)
153 {
154         struct osd_object *mo;
155
156         OBD_ALLOC_PTR(mo);
157         if (mo != NULL) {
158                 struct lu_object *l;
159
160                 l = &mo->oo_dt.do_lu;
161                 dt_object_init(&mo->oo_dt, NULL, d);
162                 mo->oo_dt.do_ops = &osd_obj_ea_ops;
163                 l->lo_ops = &osd_lu_obj_ops;
164                 init_rwsem(&mo->oo_sem);
165                 init_rwsem(&mo->oo_ext_idx_sem);
166                 spin_lock_init(&mo->oo_guard);
167                 return l;
168         } else {
169                 return NULL;
170         }
171 }
172
173 static inline int __osd_xattr_get(struct inode *inode, struct dentry *dentry,
174                                   const char *name, void *buf, int len)
175 {
176         dentry->d_inode = inode;
177         return inode->i_op->getxattr(dentry, name, buf, len);
178 }
179
180 int osd_get_lma(struct osd_thread_info *info, struct inode *inode,
181                 struct dentry *dentry, struct lustre_mdt_attrs *lma)
182 {
183         int rc;
184
185         rc = __osd_xattr_get(inode, dentry, XATTR_NAME_LMA, (void *)lma,
186                              sizeof(*lma));
187         if (rc == -ERANGE) {
188                 /* try with old lma size */
189                 rc = inode->i_op->getxattr(dentry, XATTR_NAME_LMA,
190                                            info->oti_mdt_attrs_old,
191                                            LMA_OLD_SIZE);
192                 if (rc > 0)
193                         memcpy(lma, info->oti_mdt_attrs_old, sizeof(*lma));
194         }
195         if (rc > 0) {
196                 /* Check LMA compatibility */
197                 if (lma->lma_incompat & ~cpu_to_le32(LMA_INCOMPAT_SUPP)) {
198                         CWARN("%.16s: unsupported incompat LMA feature(s) "
199                               "%lx/%#x\n",
200                               LDISKFS_SB(inode->i_sb)->s_es->s_volume_name,
201                               inode->i_ino, le32_to_cpu(lma->lma_incompat) &
202                                                         ~LMA_INCOMPAT_SUPP);
203                         rc = -ENOSYS;
204                 } else {
205                         lustre_lma_swab(lma);
206                         rc = 0;
207                 }
208         } else if (rc == 0) {
209                 rc = -ENODATA;
210         }
211
212         return rc;
213 }
214
215 /*
216  * retrieve object from backend ext fs.
217  **/
218 struct inode *osd_iget(struct osd_thread_info *info, struct osd_device *dev,
219                        struct osd_inode_id *id)
220 {
221         struct inode *inode = NULL;
222
223         inode = ldiskfs_iget(osd_sb(dev), id->oii_ino);
224         if (IS_ERR(inode)) {
225                 CDEBUG(D_INODE, "no inode: ino = %u, rc = %ld\n",
226                        id->oii_ino, PTR_ERR(inode));
227         } else if (id->oii_gen != OSD_OII_NOGEN &&
228                    inode->i_generation != id->oii_gen) {
229                 CDEBUG(D_INODE, "unmatched inode: ino = %u, gen0 = %u, "
230                        "gen1 = %u\n",
231                        id->oii_ino, id->oii_gen, inode->i_generation);
232                 iput(inode);
233                 inode = ERR_PTR(-ESTALE);
234         } else if (inode->i_nlink == 0) {
235                 /* due to parallel readdir and unlink,
236                 * we can have dead inode here. */
237                 CDEBUG(D_INODE, "stale inode: ino = %u\n", id->oii_ino);
238                 make_bad_inode(inode);
239                 iput(inode);
240                 inode = ERR_PTR(-ESTALE);
241         } else if (is_bad_inode(inode)) {
242                 CWARN("%.16s: bad inode: ino = %u\n",
243                 LDISKFS_SB(osd_sb(dev))->s_es->s_volume_name, id->oii_ino);
244                 iput(inode);
245                 inode = ERR_PTR(-ENOENT);
246         } else {
247                 if (id->oii_gen == OSD_OII_NOGEN)
248                         osd_id_gen(id, inode->i_ino, inode->i_generation);
249
250                 /* Do not update file c/mtime in ldiskfs.
251                  * NB: we don't have any lock to protect this because we don't
252                  * have reference on osd_object now, but contention with
253                  * another lookup + attr_set can't happen in the tiny window
254                  * between if (...) and set S_NOCMTIME. */
255                 if (!(inode->i_flags & S_NOCMTIME))
256                         inode->i_flags |= S_NOCMTIME;
257         }
258         return inode;
259 }
260
261 static struct inode *
262 osd_iget_fid(struct osd_thread_info *info, struct osd_device *dev,
263              struct osd_inode_id *id, struct lu_fid *fid)
264 {
265         struct lustre_mdt_attrs *lma   = &info->oti_mdt_attrs;
266         struct inode            *inode;
267         int                      rc;
268
269         inode = osd_iget(info, dev, id);
270         if (IS_ERR(inode))
271                 return inode;
272
273         rc = osd_get_lma(info, inode, &info->oti_obj_dentry, lma);
274         if (rc == 0) {
275                 *fid = lma->lma_self_fid;
276         } else if (rc == -ENODATA) {
277                 if (unlikely(inode == osd_sb(dev)->s_root->d_inode))
278                         lu_local_obj_fid(fid, OSD_FS_ROOT_OID);
279                 else
280                         lu_igif_build(fid, inode->i_ino, inode->i_generation);
281         } else {
282                 iput(inode);
283                 inode = ERR_PTR(rc);
284         }
285         return inode;
286 }
287
288 static struct inode *
289 osd_iget_verify(struct osd_thread_info *info, struct osd_device *dev,
290                 struct osd_inode_id *id, const struct lu_fid *fid)
291 {
292         struct lustre_mdt_attrs *lma   = &info->oti_mdt_attrs;
293         struct inode            *inode;
294         int                      rc;
295
296         inode = osd_iget(info, dev, id);
297         if (IS_ERR(inode))
298                 return inode;
299
300         rc = osd_get_lma(info, inode, &info->oti_obj_dentry, lma);
301         if (rc == -ENODATA)
302                 return inode;
303
304         if (rc != 0) {
305                 iput(inode);
306                 return ERR_PTR(rc);
307         }
308
309         if (!lu_fid_eq(fid, &lma->lma_self_fid)) {
310                 CDEBUG(D_LFSCK, "inconsistent obj: "DFID", %lu, "DFID"\n",
311                        PFID(&lma->lma_self_fid), inode->i_ino, PFID(fid));
312                 iput(inode);
313                 return ERR_PTR(-EREMCHG);
314         }
315
316         return inode;
317 }
318
319 static int osd_fid_lookup(const struct lu_env *env, struct osd_object *obj,
320                           const struct lu_fid *fid,
321                           const struct lu_object_conf *conf)
322 {
323         struct osd_thread_info *info;
324         struct lu_device       *ldev   = obj->oo_dt.do_lu.lo_dev;
325         struct osd_device      *dev;
326         struct osd_idmap_cache *oic;
327         struct osd_inode_id    *id;
328         struct inode           *inode;
329         struct osd_scrub       *scrub;
330         struct scrub_file      *sf;
331         int                     result;
332         int                     verify = 0;
333         ENTRY;
334
335         LINVRNT(osd_invariant(obj));
336         LASSERT(obj->oo_inode == NULL);
337         LASSERTF(fid_is_sane(fid) || fid_is_idif(fid), DFID, PFID(fid));
338
339         dev = osd_dev(ldev);
340         scrub = &dev->od_scrub;
341         sf = &scrub->os_file;
342         info = osd_oti_get(env);
343         LASSERT(info);
344         oic = &info->oti_cache;
345
346         if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT))
347                 RETURN(-ENOENT);
348
349         /* Search order: 1. per-thread cache. */
350         if (lu_fid_eq(fid, &oic->oic_fid)) {
351                 id = &oic->oic_lid;
352                 goto iget;
353         }
354
355         id = &info->oti_id;
356         if (!cfs_list_empty(&scrub->os_inconsistent_items)) {
357                 /* Search order: 2. OI scrub pending list. */
358                 result = osd_oii_lookup(dev, fid, id);
359                 if (result == 0)
360                         goto iget;
361         }
362
363         if (sf->sf_flags & SF_INCONSISTENT)
364                 verify = 1;
365
366         /*
367          * Objects are created as locking anchors or place holders for objects
368          * yet to be created. No need to osd_oi_lookup() at here because FID
369          * shouldn't never be re-used, if it's really a duplicate FID from
370          * unexpected reason, we should be able to detect it later by calling
371          * do_create->osd_oi_insert()
372          */
373         if (conf != NULL && (conf->loc_flags & LOC_F_NEW) != 0)
374                 GOTO(out, result = 0);
375
376         /* Search order: 3. OI files. */
377         result = osd_oi_lookup(info, dev, fid, id, true);
378         if (result == -ENOENT) {
379                 if (!fid_is_norm(fid) ||
380                     !ldiskfs_test_bit(osd_oi_fid2idx(dev,fid),
381                                       sf->sf_oi_bitmap))
382                         GOTO(out, result = 0);
383
384                 goto trigger;
385         }
386
387         if (result != 0)
388                 GOTO(out, result);
389
390 iget:
391         if (verify == 0)
392                 inode = osd_iget(info, dev, id);
393         else
394                 inode = osd_iget_verify(info, dev, id, fid);
395         if (IS_ERR(inode)) {
396                 result = PTR_ERR(inode);
397                 if (result == -ENOENT || result == -ESTALE) {
398                         fid_zero(&oic->oic_fid);
399                         result = 0;
400                 } else if (result == -EREMCHG) {
401
402 trigger:
403                         if (thread_is_running(&scrub->os_thread)) {
404                                 result = -EINPROGRESS;
405                         } else if (!dev->od_noscrub) {
406                                 result = osd_scrub_start(dev);
407                                 LCONSOLE_ERROR("%.16s: trigger OI scrub by RPC "
408                                                "for "DFID", rc = %d [1]\n",
409                                                LDISKFS_SB(osd_sb(dev))->s_es->\
410                                                s_volume_name,PFID(fid), result);
411                                 if (result == 0 || result == -EALREADY)
412                                         result = -EINPROGRESS;
413                                 else
414                                         result = -EREMCHG;
415                         }
416                 }
417
418                 GOTO(out, result);
419         }
420
421         obj->oo_inode = inode;
422         LASSERT(obj->oo_inode->i_sb == osd_sb(dev));
423
424         obj->oo_compat_dot_created = 1;
425         obj->oo_compat_dotdot_created = 1;
426
427         if (!S_ISDIR(inode->i_mode) || !ldiskfs_pdo) /* done */
428                 GOTO(out, result = 0);
429
430         LASSERT(obj->oo_hl_head == NULL);
431         obj->oo_hl_head = ldiskfs_htree_lock_head_alloc(HTREE_HBITS_DEF);
432         if (obj->oo_hl_head == NULL) {
433                 obj->oo_inode = NULL;
434                 iput(inode);
435                 GOTO(out, result = -ENOMEM);
436         }
437         GOTO(out, result = 0);
438
439 out:
440         LINVRNT(osd_invariant(obj));
441         return result;
442 }
443
444 /*
445  * Concurrency: shouldn't matter.
446  */
447 static void osd_object_init0(struct osd_object *obj)
448 {
449         LASSERT(obj->oo_inode != NULL);
450         obj->oo_dt.do_body_ops = &osd_body_ops;
451         obj->oo_dt.do_lu.lo_header->loh_attr |=
452                 (LOHA_EXISTS | (obj->oo_inode->i_mode & S_IFMT));
453 }
454
455 /*
456  * Concurrency: no concurrent access is possible that early in object
457  * life-cycle.
458  */
459 static int osd_object_init(const struct lu_env *env, struct lu_object *l,
460                            const struct lu_object_conf *conf)
461 {
462         struct osd_object *obj = osd_obj(l);
463         int result;
464
465         LINVRNT(osd_invariant(obj));
466
467         if (fid_is_otable_it(&l->lo_header->loh_fid)) {
468                 obj->oo_dt.do_ops = &osd_obj_otable_it_ops;
469                 l->lo_header->loh_attr |= LOHA_EXISTS;
470                 return 0;
471         }
472
473         result = osd_fid_lookup(env, obj, lu_object_fid(l), conf);
474         obj->oo_dt.do_body_ops = &osd_body_ops_new;
475         if (result == 0 && obj->oo_inode != NULL)
476                 osd_object_init0(obj);
477
478         LINVRNT(osd_invariant(obj));
479         return result;
480 }
481
482 /*
483  * Concurrency: no concurrent access is possible that late in object
484  * life-cycle.
485  */
486 static void osd_object_free(const struct lu_env *env, struct lu_object *l)
487 {
488         struct osd_object *obj = osd_obj(l);
489
490         LINVRNT(osd_invariant(obj));
491
492         dt_object_fini(&obj->oo_dt);
493         if (obj->oo_hl_head != NULL)
494                 ldiskfs_htree_lock_head_free(obj->oo_hl_head);
495         OBD_FREE_PTR(obj);
496 }
497
498 /*
499  * Concurrency: no concurrent access is possible that late in object
500  * life-cycle.
501  */
502 static void osd_index_fini(struct osd_object *o)
503 {
504         struct iam_container *bag;
505
506         if (o->oo_dir != NULL) {
507                 bag = &o->oo_dir->od_container;
508                 if (o->oo_inode != NULL) {
509                         if (bag->ic_object == o->oo_inode)
510                                 iam_container_fini(bag);
511                 }
512                 OBD_FREE_PTR(o->oo_dir);
513                 o->oo_dir = NULL;
514         }
515 }
516
517 /*
518  * Concurrency: no concurrent access is possible that late in object
519  * life-cycle (for all existing callers, that is. New callers have to provide
520  * their own locking.)
521  */
522 static int osd_inode_unlinked(const struct inode *inode)
523 {
524         return inode->i_nlink == 0;
525 }
526
527 enum {
528         OSD_TXN_OI_DELETE_CREDITS    = 20,
529         OSD_TXN_INODE_DELETE_CREDITS = 20
530 };
531
532 /*
533  * Journal
534  */
535
536 #if OSD_THANDLE_STATS
537 /**
538  * Set time when the handle is allocated
539  */
540 static void osd_th_alloced(struct osd_thandle *oth)
541 {
542         oth->oth_alloced = cfs_time_current();
543 }
544
545 /**
546  * Set time when the handle started
547  */
548 static void osd_th_started(struct osd_thandle *oth)
549 {
550         oth->oth_started = cfs_time_current();
551 }
552
553 /**
554  * Helper function to convert time interval to microseconds packed in
555  * long int.
556  */
557 static long interval_to_usec(cfs_time_t start, cfs_time_t end)
558 {
559         struct timeval val;
560
561         cfs_duration_usec(cfs_time_sub(end, start), &val);
562         return val.tv_sec * 1000000 + val.tv_usec;
563 }
564
565 /**
566  * Check whether the we deal with this handle for too long.
567  */
568 static void __osd_th_check_slow(void *oth, struct osd_device *dev,
569                                 cfs_time_t alloced, cfs_time_t started,
570                                 cfs_time_t closed)
571 {
572         cfs_time_t now = cfs_time_current();
573
574         LASSERT(dev != NULL);
575
576         lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_STARTING,
577                             interval_to_usec(alloced, started));
578         lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_OPEN,
579                             interval_to_usec(started, closed));
580         lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_CLOSING,
581                             interval_to_usec(closed, now));
582
583         if (cfs_time_before(cfs_time_add(alloced, cfs_time_seconds(30)), now)) {
584                 CWARN("transaction handle %p was open for too long: "
585                       "now "CFS_TIME_T" ,"
586                       "alloced "CFS_TIME_T" ,"
587                       "started "CFS_TIME_T" ,"
588                       "closed "CFS_TIME_T"\n",
589                       oth, now, alloced, started, closed);
590                 libcfs_debug_dumpstack(NULL);
591         }
592 }
593
594 #define OSD_CHECK_SLOW_TH(oth, dev, expr)                               \
595 {                                                                       \
596         cfs_time_t __closed = cfs_time_current();                       \
597         cfs_time_t __alloced = oth->oth_alloced;                        \
598         cfs_time_t __started = oth->oth_started;                        \
599                                                                         \
600         expr;                                                           \
601         __osd_th_check_slow(oth, dev, __alloced, __started, __closed);  \
602 }
603
604 #else /* OSD_THANDLE_STATS */
605
606 #define osd_th_alloced(h)                  do {} while(0)
607 #define osd_th_started(h)                  do {} while(0)
608 #define OSD_CHECK_SLOW_TH(oth, dev, expr)  expr
609
610 #endif /* OSD_THANDLE_STATS */
611
612 /*
613  * Concurrency: doesn't access mutable data.
614  */
615 static int osd_param_is_not_sane(const struct osd_device *dev,
616                                  const struct thandle *th)
617 {
618         struct osd_thandle *oh = container_of(th, typeof(*oh), ot_super);
619
620         return oh->ot_credits > osd_journal(dev)->j_max_transaction_buffers;
621 }
622
623 /*
624  * Concurrency: shouldn't matter.
625  */
626 #ifdef HAVE_LDISKFS_JOURNAL_CALLBACK_ADD
627 static void osd_trans_commit_cb(struct super_block *sb,
628                                 struct journal_callback *jcb, int error)
629 #else
630 static void osd_trans_commit_cb(struct journal_callback *jcb, int error)
631 #endif
632 {
633         struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb);
634         struct thandle     *th  = &oh->ot_super;
635         struct lu_device   *lud = &th->th_dev->dd_lu_dev;
636         struct dt_txn_commit_cb *dcb, *tmp;
637
638         LASSERT(oh->ot_handle == NULL);
639
640         if (error)
641                 CERROR("transaction @0x%p commit error: %d\n", th, error);
642
643         dt_txn_hook_commit(th);
644
645         /* call per-transaction callbacks if any */
646         cfs_list_for_each_entry_safe(dcb, tmp, &oh->ot_dcb_list, dcb_linkage) {
647                 LASSERTF(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC,
648                          "commit callback entry: magic=%x name='%s'\n",
649                          dcb->dcb_magic, dcb->dcb_name);
650                 cfs_list_del_init(&dcb->dcb_linkage);
651                 dcb->dcb_func(NULL, th, dcb, error);
652         }
653
654         lu_ref_del_at(&lud->ld_reference, oh->ot_dev_link, "osd-tx", th);
655         lu_device_put(lud);
656         th->th_dev = NULL;
657
658         lu_context_exit(&th->th_ctx);
659         lu_context_fini(&th->th_ctx);
660         OBD_FREE_PTR(oh);
661 }
662
663 static struct thandle *osd_trans_create(const struct lu_env *env,
664                                         struct dt_device *d)
665 {
666         struct osd_thread_info *oti = osd_oti_get(env);
667         struct osd_iobuf       *iobuf = &oti->oti_iobuf;
668         struct osd_thandle     *oh;
669         struct thandle         *th;
670         ENTRY;
671
672         /* on pending IO in this thread should left from prev. request */
673         LASSERT(cfs_atomic_read(&iobuf->dr_numreqs) == 0);
674
675         th = ERR_PTR(-ENOMEM);
676         OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO);
677         if (oh != NULL) {
678                 oh->ot_quota_trans = &oti->oti_quota_trans;
679                 memset(oh->ot_quota_trans, 0, sizeof(*oh->ot_quota_trans));
680                 th = &oh->ot_super;
681                 th->th_dev = d;
682                 th->th_result = 0;
683                 th->th_tags = LCT_TX_HANDLE;
684                 oh->ot_credits = 0;
685                 oti->oti_dev = osd_dt_dev(d);
686                 CFS_INIT_LIST_HEAD(&oh->ot_dcb_list);
687                 osd_th_alloced(oh);
688
689                 memset(oti->oti_declare_ops, 0, OSD_OT_MAX);
690                 memset(oti->oti_declare_ops_rb, 0, OSD_OT_MAX);
691                 memset(oti->oti_declare_ops_cred, 0, OSD_OT_MAX);
692                 oti->oti_rollback = false;
693         }
694         RETURN(th);
695 }
696
697 /*
698  * Concurrency: shouldn't matter.
699  */
700 int osd_trans_start(const struct lu_env *env, struct dt_device *d,
701                     struct thandle *th)
702 {
703         struct osd_thread_info *oti = osd_oti_get(env);
704         struct osd_device  *dev = osd_dt_dev(d);
705         handle_t           *jh;
706         struct osd_thandle *oh;
707         int rc;
708
709         ENTRY;
710
711         LASSERT(current->journal_info == NULL);
712
713         oh = container_of0(th, struct osd_thandle, ot_super);
714         LASSERT(oh != NULL);
715         LASSERT(oh->ot_handle == NULL);
716
717         rc = dt_txn_hook_start(env, d, th);
718         if (rc != 0)
719                 GOTO(out, rc);
720
721         if (unlikely(osd_param_is_not_sane(dev, th))) {
722                 static unsigned long last_printed;
723                 static int last_credits;
724
725                 CWARN("%.16s: too many transaction credits (%d > %d)\n",
726                       LDISKFS_SB(osd_sb(dev))->s_es->s_volume_name,
727                       oh->ot_credits,
728                       osd_journal(dev)->j_max_transaction_buffers);
729 #ifdef OSD_TRACK_DECLARES
730                 CWARN("  create: %u/%u, delete: %u/%u, destroy: %u/%u\n",
731                       oti->oti_declare_ops[OSD_OT_CREATE],
732                       oti->oti_declare_ops_cred[OSD_OT_CREATE],
733                       oti->oti_declare_ops[OSD_OT_DELETE],
734                       oti->oti_declare_ops_cred[OSD_OT_DELETE],
735                       oti->oti_declare_ops[OSD_OT_DESTROY],
736                       oti->oti_declare_ops_cred[OSD_OT_DESTROY]);
737                 CWARN("  attr_set: %u/%u, xattr_set: %u/%u\n",
738                       oti->oti_declare_ops[OSD_OT_ATTR_SET],
739                       oti->oti_declare_ops_cred[OSD_OT_ATTR_SET],
740                       oti->oti_declare_ops[OSD_OT_XATTR_SET],
741                       oti->oti_declare_ops_cred[OSD_OT_XATTR_SET]);
742                 CWARN("  write: %u/%u, punch: %u/%u, quota %u/%u\n",
743                       oti->oti_declare_ops[OSD_OT_WRITE],
744                       oti->oti_declare_ops_cred[OSD_OT_WRITE],
745                       oti->oti_declare_ops[OSD_OT_PUNCH],
746                       oti->oti_declare_ops_cred[OSD_OT_PUNCH],
747                       oti->oti_declare_ops[OSD_OT_QUOTA],
748                       oti->oti_declare_ops_cred[OSD_OT_QUOTA]);
749                 CWARN("  insert: %u/%u, delete: %u/%u\n",
750                       oti->oti_declare_ops[OSD_OT_INSERT],
751                       oti->oti_declare_ops_cred[OSD_OT_INSERT],
752                       oti->oti_declare_ops[OSD_OT_DESTROY],
753                       oti->oti_declare_ops_cred[OSD_OT_DESTROY]);
754                 CWARN("  ref_add: %u/%u, ref_del: %u/%u\n",
755                       oti->oti_declare_ops[OSD_OT_REF_ADD],
756                       oti->oti_declare_ops_cred[OSD_OT_REF_ADD],
757                       oti->oti_declare_ops[OSD_OT_REF_DEL],
758                       oti->oti_declare_ops_cred[OSD_OT_REF_DEL]);
759
760                 if (last_credits != oh->ot_credits &&
761                     time_after(jiffies, last_printed + 60 * HZ)) {
762                         libcfs_debug_dumpstack(NULL);
763                         last_credits = oh->ot_credits;
764                         last_printed = jiffies;
765                 }
766 #endif
767                 /* XXX Limit the credits to 'max_transaction_buffers', and
768                  *     let the underlying filesystem to catch the error if
769                  *     we really need so many credits.
770                  *
771                  *     This should be removed when we can calculate the
772                  *     credits precisely. */
773                 oh->ot_credits = osd_journal(dev)->j_max_transaction_buffers;
774         }
775
776         /*
777          * XXX temporary stuff. Some abstraction layer should
778          * be used.
779          */
780         jh = ldiskfs_journal_start_sb(osd_sb(dev), oh->ot_credits);
781         osd_th_started(oh);
782         if (!IS_ERR(jh)) {
783                 oh->ot_handle = jh;
784                 LASSERT(oti->oti_txns == 0);
785                 lu_context_init(&th->th_ctx, th->th_tags);
786                 lu_context_enter(&th->th_ctx);
787
788                 lu_device_get(&d->dd_lu_dev);
789                 oh->ot_dev_link = lu_ref_add(&d->dd_lu_dev.ld_reference,
790                                              "osd-tx", th);
791                 oti->oti_txns++;
792                 rc = 0;
793         } else {
794                 rc = PTR_ERR(jh);
795         }
796 out:
797         RETURN(rc);
798 }
799
800 /*
801  * Concurrency: shouldn't matter.
802  */
803 static int osd_trans_stop(const struct lu_env *env, struct thandle *th)
804 {
805         int                     rc = 0;
806         struct osd_thandle     *oh;
807         struct osd_thread_info *oti = osd_oti_get(env);
808         struct osd_iobuf       *iobuf = &oti->oti_iobuf;
809         struct qsd_instance    *qsd = oti->oti_dev->od_quota_slave;
810         ENTRY;
811
812         oh = container_of0(th, struct osd_thandle, ot_super);
813
814         if (qsd != NULL)
815                 /* inform the quota slave device that the transaction is
816                  * stopping */
817                 qsd_op_end(env, qsd, oh->ot_quota_trans);
818         oh->ot_quota_trans = NULL;
819
820         if (oh->ot_handle != NULL) {
821                 handle_t *hdl = oh->ot_handle;
822
823                 /*
824                  * add commit callback
825                  * notice we don't do this in osd_trans_start()
826                  * as underlying transaction can change during truncate
827                  */
828                 osd_journal_callback_set(hdl, osd_trans_commit_cb,
829                                          &oh->ot_jcb);
830
831                 LASSERT(oti->oti_txns == 1);
832                 oti->oti_txns--;
833                 rc = dt_txn_hook_stop(env, th);
834                 if (rc != 0)
835                         CERROR("Failure in transaction hook: %d\n", rc);
836
837                 /* hook functions might modify th_sync */
838                 hdl->h_sync = th->th_sync;
839
840                 oh->ot_handle = NULL;
841                 OSD_CHECK_SLOW_TH(oh, oti->oti_dev,
842                                   rc = ldiskfs_journal_stop(hdl));
843                 if (rc != 0)
844                         CERROR("Failure to stop transaction: %d\n", rc);
845         } else {
846                 OBD_FREE_PTR(oh);
847         }
848
849         /* as we want IO to journal and data IO be concurrent, we don't block
850          * awaiting data IO completion in osd_do_bio(), instead we wait here
851          * once transaction is submitted to the journal. all reqular requests
852          * don't do direct IO (except read/write), thus this wait_event becomes
853          * no-op for them.
854          *
855          * IMPORTANT: we have to wait till any IO submited by the thread is
856          * completed otherwise iobuf may be corrupted by different request
857          */
858         cfs_wait_event(iobuf->dr_wait,
859                        cfs_atomic_read(&iobuf->dr_numreqs) == 0);
860         if (!rc)
861                 rc = iobuf->dr_error;
862
863         RETURN(rc);
864 }
865
866 static int osd_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb)
867 {
868         struct osd_thandle *oh = container_of0(th, struct osd_thandle,
869                                                ot_super);
870
871         LASSERT(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC);
872         LASSERT(&dcb->dcb_func != NULL);
873         cfs_list_add(&dcb->dcb_linkage, &oh->ot_dcb_list);
874
875         return 0;
876 }
877
878 /*
879  * Called just before object is freed. Releases all resources except for
880  * object itself (that is released by osd_object_free()).
881  *
882  * Concurrency: no concurrent access is possible that late in object
883  * life-cycle.
884  */
885 static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
886 {
887         struct osd_object *obj   = osd_obj(l);
888         struct inode      *inode = obj->oo_inode;
889
890         LINVRNT(osd_invariant(obj));
891
892         /*
893          * If object is unlinked remove fid->ino mapping from object index.
894          */
895
896         osd_index_fini(obj);
897         if (inode != NULL) {
898                 struct qsd_instance     *qsd = osd_obj2dev(obj)->od_quota_slave;
899                 qid_t                    uid = inode->i_uid;
900                 qid_t                    gid = inode->i_gid;
901
902                 iput(inode);
903                 obj->oo_inode = NULL;
904
905                 if (qsd != NULL) {
906                         struct osd_thread_info  *info = osd_oti_get(env);
907                         struct lquota_id_info   *qi = &info->oti_qi;
908
909                         /* Release granted quota to master if necessary */
910                         qi->lqi_id.qid_uid = uid;
911                         qsd_op_adjust(env, qsd, &qi->lqi_id, USRQUOTA);
912
913                         qi->lqi_id.qid_uid = gid;
914                         qsd_op_adjust(env, qsd, &qi->lqi_id, GRPQUOTA);
915                 }
916         }
917 }
918
919 /*
920  * Concurrency: ->loo_object_release() is called under site spin-lock.
921  */
922 static void osd_object_release(const struct lu_env *env,
923                                struct lu_object *l)
924 {
925 }
926
927 /*
928  * Concurrency: shouldn't matter.
929  */
930 static int osd_object_print(const struct lu_env *env, void *cookie,
931                             lu_printer_t p, const struct lu_object *l)
932 {
933         struct osd_object *o = osd_obj(l);
934         struct iam_descr  *d;
935
936         if (o->oo_dir != NULL)
937                 d = o->oo_dir->od_container.ic_descr;
938         else
939                 d = NULL;
940         return (*p)(env, cookie,
941                     LUSTRE_OSD_LDISKFS_NAME"-object@%p(i:%p:%lu/%u)[%s]",
942                     o, o->oo_inode,
943                     o->oo_inode ? o->oo_inode->i_ino : 0UL,
944                     o->oo_inode ? o->oo_inode->i_generation : 0,
945                     d ? d->id_ops->id_name : "plain");
946 }
947
948 /*
949  * Concurrency: shouldn't matter.
950  */
951 int osd_statfs(const struct lu_env *env, struct dt_device *d,
952                struct obd_statfs *sfs)
953 {
954         struct osd_device  *osd = osd_dt_dev(d);
955         struct super_block *sb = osd_sb(osd);
956         struct kstatfs     *ksfs;
957         int result = 0;
958
959         if (unlikely(osd->od_mnt == NULL))
960                 return -EINPROGRESS;
961
962         /* osd_lproc.c call this without env, allocate ksfs for that case */
963         if (unlikely(env == NULL)) {
964                 OBD_ALLOC_PTR(ksfs);
965                 if (ksfs == NULL)
966                         return -ENOMEM;
967         } else {
968                 ksfs = &osd_oti_get(env)->oti_ksfs;
969         }
970
971         spin_lock(&osd->od_osfs_lock);
972         /* cache 1 second */
973         if (cfs_time_before_64(osd->od_osfs_age, cfs_time_shift_64(-1))) {
974                 result = sb->s_op->statfs(sb->s_root, ksfs);
975                 if (likely(result == 0)) { /* N.B. statfs can't really fail */
976                         osd->od_osfs_age = cfs_time_current_64();
977                         statfs_pack(&osd->od_statfs, ksfs);
978                         if (sb->s_flags & MS_RDONLY)
979                                 sfs->os_state = OS_STATE_READONLY;
980                 }
981         }
982
983         if (likely(result == 0))
984                 *sfs = osd->od_statfs;
985         spin_unlock(&osd->od_osfs_lock);
986
987         if (unlikely(env == NULL))
988                 OBD_FREE_PTR(ksfs);
989
990         return result;
991 }
992
993 /**
994  * Estimate space needed for file creations. We assume the largest filename
995  * which is 2^64 - 1, hence a filename of 20 chars.
996  * This is 28 bytes per object which is 28MB for 1M objects ... no so bad.
997  */
998 #ifdef __LDISKFS_DIR_REC_LEN
999 #define PER_OBJ_USAGE __LDISKFS_DIR_REC_LEN(20)
1000 #else
1001 #define PER_OBJ_USAGE LDISKFS_DIR_REC_LEN(20)
1002 #endif
1003
1004 /*
1005  * Concurrency: doesn't access mutable data.
1006  */
1007 static void osd_conf_get(const struct lu_env *env,
1008                          const struct dt_device *dev,
1009                          struct dt_device_param *param)
1010 {
1011         struct super_block *sb = osd_sb(osd_dt_dev(dev));
1012
1013         /*
1014          * XXX should be taken from not-yet-existing fs abstraction layer.
1015          */
1016         param->ddp_mnt = osd_dt_dev(dev)->od_mnt;
1017         param->ddp_max_name_len = LDISKFS_NAME_LEN;
1018         param->ddp_max_nlink    = LDISKFS_LINK_MAX;
1019         param->ddp_block_shift  = sb->s_blocksize_bits;
1020         param->ddp_mount_type     = LDD_MT_LDISKFS;
1021         param->ddp_maxbytes       = sb->s_maxbytes;
1022         /* Overhead estimate should be fairly accurate, so we really take a tiny
1023          * error margin which also avoids fragmenting the filesystem too much */
1024         param->ddp_grant_reserved = 2; /* end up to be 1.9% after conversion */
1025         /* inode are statically allocated, so per-inode space consumption
1026          * is the space consumed by the directory entry */
1027         param->ddp_inodespace     = PER_OBJ_USAGE;
1028         /* per-fragment overhead to be used by the client code */
1029         param->ddp_grant_frag     = 6 * LDISKFS_BLOCK_SIZE(sb);
1030         param->ddp_mntopts      = 0;
1031         if (test_opt(sb, XATTR_USER))
1032                 param->ddp_mntopts |= MNTOPT_USERXATTR;
1033         if (test_opt(sb, POSIX_ACL))
1034                 param->ddp_mntopts |= MNTOPT_ACL;
1035
1036 #if defined(LDISKFS_FEATURE_INCOMPAT_EA_INODE)
1037         if (LDISKFS_HAS_INCOMPAT_FEATURE(sb, LDISKFS_FEATURE_INCOMPAT_EA_INODE))
1038                 param->ddp_max_ea_size = LDISKFS_XATTR_MAX_LARGE_EA_SIZE;
1039         else
1040 #endif
1041                 param->ddp_max_ea_size = sb->s_blocksize;
1042
1043 }
1044
1045 /*
1046  * Concurrency: shouldn't matter.
1047  */
1048 static int osd_sync(const struct lu_env *env, struct dt_device *d)
1049 {
1050         CDEBUG(D_HA, "syncing OSD %s\n", LUSTRE_OSD_LDISKFS_NAME);
1051         return ldiskfs_force_commit(osd_sb(osd_dt_dev(d)));
1052 }
1053
1054 /**
1055  * Start commit for OSD device.
1056  *
1057  * An implementation of dt_commit_async method for OSD device.
1058  * Asychronously starts underlayng fs sync and thereby a transaction
1059  * commit.
1060  *
1061  * \param env environment
1062  * \param d dt device
1063  *
1064  * \see dt_device_operations
1065  */
1066 static int osd_commit_async(const struct lu_env *env,
1067                             struct dt_device *d)
1068 {
1069         struct super_block *s = osd_sb(osd_dt_dev(d));
1070         ENTRY;
1071
1072         CDEBUG(D_HA, "async commit OSD %s\n", LUSTRE_OSD_LDISKFS_NAME);
1073         RETURN(s->s_op->sync_fs(s, 0));
1074 }
1075
1076 /*
1077  * Concurrency: shouldn't matter.
1078  */
1079
1080 static int osd_ro(const struct lu_env *env, struct dt_device *d)
1081 {
1082         struct super_block *sb = osd_sb(osd_dt_dev(d));
1083         int rc;
1084         ENTRY;
1085
1086         CERROR("*** setting %s read-only ***\n", osd_dt_dev(d)->od_svname);
1087
1088         rc = __lvfs_set_rdonly(sb->s_bdev, LDISKFS_SB(sb)->journal_bdev);
1089         RETURN(rc);
1090 }
1091
1092 /*
1093  * Concurrency: serialization provided by callers.
1094  */
1095 static int osd_init_capa_ctxt(const struct lu_env *env, struct dt_device *d,
1096                               int mode, unsigned long timeout, __u32 alg,
1097                               struct lustre_capa_key *keys)
1098 {
1099         struct osd_device *dev = osd_dt_dev(d);
1100         ENTRY;
1101
1102         dev->od_fl_capa = mode;
1103         dev->od_capa_timeout = timeout;
1104         dev->od_capa_alg = alg;
1105         dev->od_capa_keys = keys;
1106         RETURN(0);
1107 }
1108
1109 /**
1110  * Note: we do not count into QUOTA here.
1111  * If we mount with --data_journal we may need more.
1112  */
1113 const int osd_dto_credits_noquota[DTO_NR] = {
1114         /**
1115          * Insert/Delete.
1116          * INDEX_EXTRA_TRANS_BLOCKS(8) +
1117          * SINGLEDATA_TRANS_BLOCKS(8)
1118          * XXX Note: maybe iam need more, since iam have more level than
1119          *           EXT3 htree.
1120          */
1121         [DTO_INDEX_INSERT]  = 16,
1122         [DTO_INDEX_DELETE]  = 16,
1123         /**
1124          * Used for OI scrub
1125          */
1126         [DTO_INDEX_UPDATE]  = 16,
1127         /**
1128          * Create a object. The same as create object in EXT3.
1129          * DATA_TRANS_BLOCKS(14) +
1130          * INDEX_EXTRA_BLOCKS(8) +
1131          * 3(inode bits, groups, GDT)
1132          */
1133         [DTO_OBJECT_CREATE] = 25,
1134         /**
1135          * XXX: real credits to be fixed
1136          */
1137         [DTO_OBJECT_DELETE] = 25,
1138         /**
1139          * Attr set credits (inode)
1140          */
1141         [DTO_ATTR_SET_BASE] = 1,
1142         /**
1143          * Xattr set. The same as xattr of EXT3.
1144          * DATA_TRANS_BLOCKS(14)
1145          * XXX Note: in original MDS implmentation INDEX_EXTRA_TRANS_BLOCKS
1146          * are also counted in. Do not know why?
1147          */
1148         [DTO_XATTR_SET]     = 14,
1149         [DTO_LOG_REC]       = 14,
1150         /**
1151          * credits for inode change during write.
1152          */
1153         [DTO_WRITE_BASE]    = 3,
1154         /**
1155          * credits for single block write.
1156          */
1157         [DTO_WRITE_BLOCK]   = 14,
1158         /**
1159          * Attr set credits for chown.
1160          * This is extra credits for setattr, and it is null without quota
1161          */
1162         [DTO_ATTR_SET_CHOWN]= 0
1163 };
1164
1165 static const struct dt_device_operations osd_dt_ops = {
1166         .dt_root_get       = osd_root_get,
1167         .dt_statfs         = osd_statfs,
1168         .dt_trans_create   = osd_trans_create,
1169         .dt_trans_start    = osd_trans_start,
1170         .dt_trans_stop     = osd_trans_stop,
1171         .dt_trans_cb_add   = osd_trans_cb_add,
1172         .dt_conf_get       = osd_conf_get,
1173         .dt_sync           = osd_sync,
1174         .dt_ro             = osd_ro,
1175         .dt_commit_async   = osd_commit_async,
1176         .dt_init_capa_ctxt = osd_init_capa_ctxt,
1177 };
1178
1179 static void osd_object_read_lock(const struct lu_env *env,
1180                                  struct dt_object *dt, unsigned role)
1181 {
1182         struct osd_object *obj = osd_dt_obj(dt);
1183         struct osd_thread_info *oti = osd_oti_get(env);
1184
1185         LINVRNT(osd_invariant(obj));
1186
1187         LASSERT(obj->oo_owner != env);
1188         down_read_nested(&obj->oo_sem, role);
1189
1190         LASSERT(obj->oo_owner == NULL);
1191         oti->oti_r_locks++;
1192 }
1193
1194 static void osd_object_write_lock(const struct lu_env *env,
1195                                   struct dt_object *dt, unsigned role)
1196 {
1197         struct osd_object *obj = osd_dt_obj(dt);
1198         struct osd_thread_info *oti = osd_oti_get(env);
1199
1200         LINVRNT(osd_invariant(obj));
1201
1202         LASSERT(obj->oo_owner != env);
1203         down_write_nested(&obj->oo_sem, role);
1204
1205         LASSERT(obj->oo_owner == NULL);
1206         obj->oo_owner = env;
1207         oti->oti_w_locks++;
1208 }
1209
1210 static void osd_object_read_unlock(const struct lu_env *env,
1211                                    struct dt_object *dt)
1212 {
1213         struct osd_object *obj = osd_dt_obj(dt);
1214         struct osd_thread_info *oti = osd_oti_get(env);
1215
1216         LINVRNT(osd_invariant(obj));
1217
1218         LASSERT(oti->oti_r_locks > 0);
1219         oti->oti_r_locks--;
1220         up_read(&obj->oo_sem);
1221 }
1222
1223 static void osd_object_write_unlock(const struct lu_env *env,
1224                                     struct dt_object *dt)
1225 {
1226         struct osd_object *obj = osd_dt_obj(dt);
1227         struct osd_thread_info *oti = osd_oti_get(env);
1228
1229         LINVRNT(osd_invariant(obj));
1230
1231         LASSERT(obj->oo_owner == env);
1232         LASSERT(oti->oti_w_locks > 0);
1233         oti->oti_w_locks--;
1234         obj->oo_owner = NULL;
1235         up_write(&obj->oo_sem);
1236 }
1237
1238 static int osd_object_write_locked(const struct lu_env *env,
1239                                    struct dt_object *dt)
1240 {
1241         struct osd_object *obj = osd_dt_obj(dt);
1242
1243         LINVRNT(osd_invariant(obj));
1244
1245         return obj->oo_owner == env;
1246 }
1247
1248 static int capa_is_sane(const struct lu_env *env,
1249                         struct osd_device *dev,
1250                         struct lustre_capa *capa,
1251                         struct lustre_capa_key *keys)
1252 {
1253         struct osd_thread_info *oti = osd_oti_get(env);
1254         struct lustre_capa *tcapa = &oti->oti_capa;
1255         struct obd_capa *oc;
1256         int i, rc = 0;
1257         ENTRY;
1258
1259         oc = capa_lookup(dev->od_capa_hash, capa, 0);
1260         if (oc) {
1261                 if (capa_is_expired(oc)) {
1262                         DEBUG_CAPA(D_ERROR, capa, "expired");
1263                         rc = -ESTALE;
1264                 }
1265                 capa_put(oc);
1266                 RETURN(rc);
1267         }
1268
1269         if (capa_is_expired_sec(capa)) {
1270                 DEBUG_CAPA(D_ERROR, capa, "expired");
1271                 RETURN(-ESTALE);
1272         }
1273
1274         spin_lock(&capa_lock);
1275         for (i = 0; i < 2; i++) {
1276                 if (keys[i].lk_keyid == capa->lc_keyid) {
1277                         oti->oti_capa_key = keys[i];
1278                         break;
1279                 }
1280         }
1281         spin_unlock(&capa_lock);
1282
1283         if (i == 2) {
1284                 DEBUG_CAPA(D_ERROR, capa, "no matched capa key");
1285                 RETURN(-ESTALE);
1286         }
1287
1288         rc = capa_hmac(tcapa->lc_hmac, capa, oti->oti_capa_key.lk_key);
1289         if (rc)
1290                 RETURN(rc);
1291
1292         if (memcmp(tcapa->lc_hmac, capa->lc_hmac, sizeof(capa->lc_hmac))) {
1293                 DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch");
1294                 RETURN(-EACCES);
1295         }
1296
1297         oc = capa_add(dev->od_capa_hash, capa);
1298         capa_put(oc);
1299
1300         RETURN(0);
1301 }
1302
1303 int osd_object_auth(const struct lu_env *env, struct dt_object *dt,
1304                     struct lustre_capa *capa, __u64 opc)
1305 {
1306         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
1307         struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
1308         struct md_capainfo *ci;
1309         int rc;
1310
1311         if (!dev->od_fl_capa)
1312                 return 0;
1313
1314         if (capa == BYPASS_CAPA)
1315                 return 0;
1316
1317         ci = md_capainfo(env);
1318         if (unlikely(!ci))
1319                 return 0;
1320
1321         if (ci->mc_auth == LC_ID_NONE)
1322                 return 0;
1323
1324         if (!capa) {
1325                 CERROR("no capability is provided for fid "DFID"\n", PFID(fid));
1326                 return -EACCES;
1327         }
1328
1329         if (!lu_fid_eq(fid, &capa->lc_fid)) {
1330                 DEBUG_CAPA(D_ERROR, capa, "fid "DFID" mismatch with",
1331                            PFID(fid));
1332                 return -EACCES;
1333         }
1334
1335         if (!capa_opc_supported(capa, opc)) {
1336                 DEBUG_CAPA(D_ERROR, capa, "opc "LPX64" not supported by", opc);
1337                 return -EACCES;
1338         }
1339
1340         if ((rc = capa_is_sane(env, dev, capa, dev->od_capa_keys))) {
1341                 DEBUG_CAPA(D_ERROR, capa, "insane (rc %d)", rc);
1342                 return -EACCES;
1343         }
1344
1345         return 0;
1346 }
1347
1348 static struct timespec *osd_inode_time(const struct lu_env *env,
1349                                        struct inode *inode, __u64 seconds)
1350 {
1351         struct osd_thread_info  *oti = osd_oti_get(env);
1352         struct timespec         *t   = &oti->oti_time;
1353
1354         t->tv_sec = seconds;
1355         t->tv_nsec = 0;
1356         *t = timespec_trunc(*t, inode->i_sb->s_time_gran);
1357         return t;
1358 }
1359
1360
1361 static void osd_inode_getattr(const struct lu_env *env,
1362                               struct inode *inode, struct lu_attr *attr)
1363 {
1364         attr->la_valid      |= LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
1365                                LA_SIZE | LA_BLOCKS | LA_UID | LA_GID |
1366                                LA_FLAGS | LA_NLINK | LA_RDEV | LA_BLKSIZE;
1367
1368         attr->la_atime      = LTIME_S(inode->i_atime);
1369         attr->la_mtime      = LTIME_S(inode->i_mtime);
1370         attr->la_ctime      = LTIME_S(inode->i_ctime);
1371         attr->la_mode       = inode->i_mode;
1372         attr->la_size       = i_size_read(inode);
1373         attr->la_blocks     = inode->i_blocks;
1374         attr->la_uid        = inode->i_uid;
1375         attr->la_gid        = inode->i_gid;
1376         attr->la_flags      = LDISKFS_I(inode)->i_flags;
1377         attr->la_nlink      = inode->i_nlink;
1378         attr->la_rdev       = inode->i_rdev;
1379         attr->la_blksize    = 1 << inode->i_blkbits;
1380         attr->la_blkbits    = inode->i_blkbits;
1381 }
1382
1383 static int osd_attr_get(const struct lu_env *env,
1384                         struct dt_object *dt,
1385                         struct lu_attr *attr,
1386                         struct lustre_capa *capa)
1387 {
1388         struct osd_object *obj = osd_dt_obj(dt);
1389
1390         LASSERT(dt_object_exists(dt));
1391         LINVRNT(osd_invariant(obj));
1392
1393         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1394                 return -EACCES;
1395
1396         spin_lock(&obj->oo_guard);
1397         osd_inode_getattr(env, obj->oo_inode, attr);
1398         spin_unlock(&obj->oo_guard);
1399         return 0;
1400 }
1401
1402 static int osd_declare_attr_set(const struct lu_env *env,
1403                                 struct dt_object *dt,
1404                                 const struct lu_attr *attr,
1405                                 struct thandle *handle)
1406 {
1407         struct osd_thandle     *oh;
1408         struct osd_object      *obj;
1409         struct osd_thread_info *info = osd_oti_get(env);
1410         struct lquota_id_info  *qi = &info->oti_qi;
1411         long long               bspace;
1412         int                     rc = 0;
1413         bool                    allocated;
1414         ENTRY;
1415
1416         LASSERT(dt != NULL);
1417         LASSERT(handle != NULL);
1418
1419         obj = osd_dt_obj(dt);
1420         LASSERT(osd_invariant(obj));
1421
1422         oh = container_of0(handle, struct osd_thandle, ot_super);
1423         LASSERT(oh->ot_handle == NULL);
1424
1425         osd_trans_declare_op(env, oh, OSD_OT_ATTR_SET,
1426                              osd_dto_credits_noquota[DTO_ATTR_SET_BASE]);
1427
1428         if (attr == NULL || obj->oo_inode == NULL)
1429                 RETURN(rc);
1430
1431         bspace   = obj->oo_inode->i_blocks;
1432         bspace <<= obj->oo_inode->i_sb->s_blocksize_bits;
1433         bspace   = toqb(bspace);
1434
1435         /* Changing ownership is always preformed by super user, it should not
1436          * fail with EDQUOT.
1437          *
1438          * We still need to call the osd_declare_qid() to calculate the journal
1439          * credits for updating quota accounting files and to trigger quota
1440          * space adjustment once the operation is completed.*/
1441         if ((attr->la_valid & LA_UID) != 0 &&
1442              attr->la_uid != obj->oo_inode->i_uid) {
1443                 qi->lqi_type = USRQUOTA;
1444
1445                 /* inode accounting */
1446                 qi->lqi_is_blk = false;
1447
1448                 /* one more inode for the new owner ... */
1449                 qi->lqi_id.qid_uid = attr->la_uid;
1450                 qi->lqi_space      = 1;
1451                 allocated = (attr->la_uid == 0) ? true : false;
1452                 rc = osd_declare_qid(env, oh, qi, allocated, NULL);
1453                 if (rc == -EDQUOT || rc == -EINPROGRESS)
1454                         rc = 0;
1455                 if (rc)
1456                         RETURN(rc);
1457
1458                 /* and one less inode for the current uid */
1459                 qi->lqi_id.qid_uid = obj->oo_inode->i_uid;
1460                 qi->lqi_space      = -1;
1461                 rc = osd_declare_qid(env, oh, qi, true, NULL);
1462                 if (rc == -EDQUOT || rc == -EINPROGRESS)
1463                         rc = 0;
1464                 if (rc)
1465                         RETURN(rc);
1466
1467                 /* block accounting */
1468                 qi->lqi_is_blk = true;
1469
1470                 /* more blocks for the new owner ... */
1471                 qi->lqi_id.qid_uid = attr->la_uid;
1472                 qi->lqi_space      = bspace;
1473                 allocated = (attr->la_uid == 0) ? true : false;
1474                 rc = osd_declare_qid(env, oh, qi, allocated, NULL);
1475                 if (rc == -EDQUOT || rc == -EINPROGRESS)
1476                         rc = 0;
1477                 if (rc)
1478                         RETURN(rc);
1479
1480                 /* and finally less blocks for the current owner */
1481                 qi->lqi_id.qid_uid = obj->oo_inode->i_uid;
1482                 qi->lqi_space      = -bspace;
1483                 rc = osd_declare_qid(env, oh, qi, true, NULL);
1484                 if (rc == -EDQUOT || rc == -EINPROGRESS)
1485                         rc = 0;
1486                 if (rc)
1487                         RETURN(rc);
1488         }
1489
1490         if (attr->la_valid & LA_GID &&
1491             attr->la_gid != obj->oo_inode->i_gid) {
1492                 qi->lqi_type = GRPQUOTA;
1493
1494                 /* inode accounting */
1495                 qi->lqi_is_blk = false;
1496
1497                 /* one more inode for the new group owner ... */
1498                 qi->lqi_id.qid_gid = attr->la_gid;
1499                 qi->lqi_space      = 1;
1500                 allocated = (attr->la_gid == 0) ? true : false;
1501                 rc = osd_declare_qid(env, oh, qi, allocated, NULL);
1502                 if (rc == -EDQUOT || rc == -EINPROGRESS)
1503                         rc = 0;
1504                 if (rc)
1505                         RETURN(rc);
1506
1507                 /* and one less inode for the current gid */
1508                 qi->lqi_id.qid_gid = obj->oo_inode->i_gid;
1509                 qi->lqi_space      = -1;
1510                 rc = osd_declare_qid(env, oh, qi, true, NULL);
1511                 if (rc == -EDQUOT || rc == -EINPROGRESS)
1512                         rc = 0;
1513                 if (rc)
1514                         RETURN(rc);
1515
1516                 /* block accounting */
1517                 qi->lqi_is_blk = true;
1518
1519                 /* more blocks for the new owner ... */
1520                 qi->lqi_id.qid_gid = attr->la_gid;
1521                 qi->lqi_space      = bspace;
1522                 allocated = (attr->la_gid == 0) ? true : false;
1523                 rc = osd_declare_qid(env, oh, qi, allocated, NULL);
1524                 if (rc == -EDQUOT || rc == -EINPROGRESS)
1525                         rc = 0;
1526                 if (rc)
1527                         RETURN(rc);
1528
1529                 /* and finally less blocks for the current owner */
1530                 qi->lqi_id.qid_gid = obj->oo_inode->i_gid;
1531                 qi->lqi_space      = -bspace;
1532                 rc = osd_declare_qid(env, oh, qi, true, NULL);
1533                 if (rc == -EDQUOT || rc == -EINPROGRESS)
1534                         rc = 0;
1535                 if (rc)
1536                         RETURN(rc);
1537         }
1538
1539         RETURN(rc);
1540 }
1541
1542 static int osd_inode_setattr(const struct lu_env *env,
1543                              struct inode *inode, const struct lu_attr *attr)
1544 {
1545         __u64 bits;
1546
1547         bits = attr->la_valid;
1548
1549         LASSERT(!(bits & LA_TYPE)); /* Huh? You want too much. */
1550
1551         if (bits & LA_ATIME)
1552                 inode->i_atime  = *osd_inode_time(env, inode, attr->la_atime);
1553         if (bits & LA_CTIME)
1554                 inode->i_ctime  = *osd_inode_time(env, inode, attr->la_ctime);
1555         if (bits & LA_MTIME)
1556                 inode->i_mtime  = *osd_inode_time(env, inode, attr->la_mtime);
1557         if (bits & LA_SIZE) {
1558                 LDISKFS_I(inode)->i_disksize = attr->la_size;
1559                 i_size_write(inode, attr->la_size);
1560         }
1561
1562 #if 0
1563         /* OSD should not change "i_blocks" which is used by quota.
1564          * "i_blocks" should be changed by ldiskfs only. */
1565         if (bits & LA_BLOCKS)
1566                 inode->i_blocks = attr->la_blocks;
1567 #endif
1568         if (bits & LA_MODE)
1569                 inode->i_mode   = (inode->i_mode & S_IFMT) |
1570                         (attr->la_mode & ~S_IFMT);
1571         if (bits & LA_UID)
1572                 inode->i_uid    = attr->la_uid;
1573         if (bits & LA_GID)
1574                 inode->i_gid    = attr->la_gid;
1575         if (bits & LA_NLINK)
1576                 set_nlink(inode, attr->la_nlink);
1577         if (bits & LA_RDEV)
1578                 inode->i_rdev   = attr->la_rdev;
1579
1580         if (bits & LA_FLAGS) {
1581                 /* always keep S_NOCMTIME */
1582                 inode->i_flags = ll_ext_to_inode_flags(attr->la_flags) |
1583                                  S_NOCMTIME;
1584         }
1585         return 0;
1586 }
1587
1588 static int osd_quota_transfer(struct inode *inode, const struct lu_attr *attr)
1589 {
1590         if ((attr->la_valid & LA_UID && attr->la_uid != inode->i_uid) ||
1591             (attr->la_valid & LA_GID && attr->la_gid != inode->i_gid)) {
1592                 struct iattr    iattr;
1593                 int             rc;
1594
1595                 iattr.ia_valid = 0;
1596                 if (attr->la_valid & LA_UID)
1597                         iattr.ia_valid |= ATTR_UID;
1598                 if (attr->la_valid & LA_GID)
1599                         iattr.ia_valid |= ATTR_GID;
1600                 iattr.ia_uid = attr->la_uid;
1601                 iattr.ia_gid = attr->la_gid;
1602
1603                 rc = ll_vfs_dq_transfer(inode, &iattr);
1604                 if (rc) {
1605                         CERROR("%s: quota transfer failed: rc = %d. Is quota "
1606                                "enforcement enabled on the ldiskfs filesystem?",
1607                                inode->i_sb->s_id, rc);
1608                         return rc;
1609                 }
1610         }
1611         return 0;
1612 }
1613
1614 static int osd_attr_set(const struct lu_env *env,
1615                         struct dt_object *dt,
1616                         const struct lu_attr *attr,
1617                         struct thandle *handle,
1618                         struct lustre_capa *capa)
1619 {
1620         struct osd_object *obj = osd_dt_obj(dt);
1621         struct inode      *inode;
1622         int rc;
1623
1624         LASSERT(handle != NULL);
1625         LASSERT(dt_object_exists(dt));
1626         LASSERT(osd_invariant(obj));
1627
1628         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1629                 return -EACCES;
1630
1631         osd_trans_exec_op(env, handle, OSD_OT_ATTR_SET);
1632
1633         inode = obj->oo_inode;
1634         ll_vfs_dq_init(inode);
1635
1636         rc = osd_quota_transfer(inode, attr);
1637         if (rc)
1638                 return rc;
1639
1640         spin_lock(&obj->oo_guard);
1641         rc = osd_inode_setattr(env, inode, attr);
1642         spin_unlock(&obj->oo_guard);
1643
1644         if (!rc)
1645                 inode->i_sb->s_op->dirty_inode(inode);
1646         return rc;
1647 }
1648
1649 struct dentry *osd_child_dentry_get(const struct lu_env *env,
1650                                     struct osd_object *obj,
1651                                     const char *name, const int namelen)
1652 {
1653         return osd_child_dentry_by_inode(env, obj->oo_inode, name, namelen);
1654 }
1655
1656 static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
1657                       cfs_umode_t mode,
1658                       struct dt_allocation_hint *hint,
1659                       struct thandle *th)
1660 {
1661         int result;
1662         struct osd_device  *osd = osd_obj2dev(obj);
1663         struct osd_thandle *oth;
1664         struct dt_object   *parent = NULL;
1665         struct inode       *inode;
1666
1667         LINVRNT(osd_invariant(obj));
1668         LASSERT(obj->oo_inode == NULL);
1669         LASSERT(obj->oo_hl_head == NULL);
1670
1671         if (S_ISDIR(mode) && ldiskfs_pdo) {
1672                 obj->oo_hl_head =ldiskfs_htree_lock_head_alloc(HTREE_HBITS_DEF);
1673                 if (obj->oo_hl_head == NULL)
1674                         return -ENOMEM;
1675         }
1676
1677         oth = container_of(th, struct osd_thandle, ot_super);
1678         LASSERT(oth->ot_handle->h_transaction != NULL);
1679
1680         if (hint && hint->dah_parent)
1681                 parent = hint->dah_parent;
1682
1683         inode = ldiskfs_create_inode(oth->ot_handle,
1684                                      parent ? osd_dt_obj(parent)->oo_inode :
1685                                               osd_sb(osd)->s_root->d_inode,
1686                                      mode);
1687         if (!IS_ERR(inode)) {
1688                 /* Do not update file c/mtime in ldiskfs.
1689                  * NB: don't need any lock because no contention at this
1690                  * early stage */
1691                 inode->i_flags |= S_NOCMTIME;
1692
1693                 /* For new created object, it must be consistent,
1694                  * and it is unnecessary to scrub against it. */
1695                 ldiskfs_set_inode_state(inode, LDISKFS_STATE_LUSTRE_NOSCRUB);
1696                 obj->oo_inode = inode;
1697                 result = 0;
1698         } else {
1699                 if (obj->oo_hl_head != NULL) {
1700                         ldiskfs_htree_lock_head_free(obj->oo_hl_head);
1701                         obj->oo_hl_head = NULL;
1702                 }
1703                 result = PTR_ERR(inode);
1704         }
1705         LINVRNT(osd_invariant(obj));
1706         return result;
1707 }
1708
1709 enum {
1710         OSD_NAME_LEN = 255
1711 };
1712
1713 static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj,
1714                      struct lu_attr *attr,
1715                      struct dt_allocation_hint *hint,
1716                      struct dt_object_format *dof,
1717                      struct thandle *th)
1718 {
1719         int result;
1720         struct osd_thandle *oth;
1721         __u32 mode = (attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX));
1722
1723         LASSERT(S_ISDIR(attr->la_mode));
1724
1725         oth = container_of(th, struct osd_thandle, ot_super);
1726         LASSERT(oth->ot_handle->h_transaction != NULL);
1727         result = osd_mkfile(info, obj, mode, hint, th);
1728
1729         return result;
1730 }
1731
1732 static int osd_mk_index(struct osd_thread_info *info, struct osd_object *obj,
1733                         struct lu_attr *attr,
1734                         struct dt_allocation_hint *hint,
1735                         struct dt_object_format *dof,
1736                         struct thandle *th)
1737 {
1738         int result;
1739         struct osd_thandle *oth;
1740         const struct dt_index_features *feat = dof->u.dof_idx.di_feat;
1741
1742         __u32 mode = (attr->la_mode & (S_IFMT | S_IALLUGO | S_ISVTX));
1743
1744         LASSERT(S_ISREG(attr->la_mode));
1745
1746         oth = container_of(th, struct osd_thandle, ot_super);
1747         LASSERT(oth->ot_handle->h_transaction != NULL);
1748
1749         result = osd_mkfile(info, obj, mode, hint, th);
1750         if (result == 0) {
1751                 LASSERT(obj->oo_inode != NULL);
1752                 if (feat->dif_flags & DT_IND_VARKEY)
1753                         result = iam_lvar_create(obj->oo_inode,
1754                                                  feat->dif_keysize_max,
1755                                                  feat->dif_ptrsize,
1756                                                  feat->dif_recsize_max,
1757                                                  oth->ot_handle);
1758                 else
1759                         result = iam_lfix_create(obj->oo_inode,
1760                                                  feat->dif_keysize_max,
1761                                                  feat->dif_ptrsize,
1762                                                  feat->dif_recsize_max,
1763                                                  oth->ot_handle);
1764
1765         }
1766         return result;
1767 }
1768
1769 static int osd_mkreg(struct osd_thread_info *info, struct osd_object *obj,
1770                      struct lu_attr *attr,
1771                      struct dt_allocation_hint *hint,
1772                      struct dt_object_format *dof,
1773                      struct thandle *th)
1774 {
1775         LASSERT(S_ISREG(attr->la_mode));
1776         return osd_mkfile(info, obj, (attr->la_mode &
1777                                (S_IFMT | S_IALLUGO | S_ISVTX)), hint, th);
1778 }
1779
1780 static int osd_mksym(struct osd_thread_info *info, struct osd_object *obj,
1781                      struct lu_attr *attr,
1782                      struct dt_allocation_hint *hint,
1783                      struct dt_object_format *dof,
1784                      struct thandle *th)
1785 {
1786         LASSERT(S_ISLNK(attr->la_mode));
1787         return osd_mkfile(info, obj, (attr->la_mode &
1788                               (S_IFMT | S_IALLUGO | S_ISVTX)), hint, th);
1789 }
1790
1791 static int osd_mknod(struct osd_thread_info *info, struct osd_object *obj,
1792                      struct lu_attr *attr,
1793                      struct dt_allocation_hint *hint,
1794                      struct dt_object_format *dof,
1795                      struct thandle *th)
1796 {
1797         cfs_umode_t mode = attr->la_mode & (S_IFMT | S_IALLUGO | S_ISVTX);
1798         int result;
1799
1800         LINVRNT(osd_invariant(obj));
1801         LASSERT(obj->oo_inode == NULL);
1802         LASSERT(S_ISCHR(mode) || S_ISBLK(mode) ||
1803                 S_ISFIFO(mode) || S_ISSOCK(mode));
1804
1805         result = osd_mkfile(info, obj, mode, hint, th);
1806         if (result == 0) {
1807                 LASSERT(obj->oo_inode != NULL);
1808                 /*
1809                  * This inode should be marked dirty for i_rdev.  Currently
1810                  * that is done in the osd_attr_init().
1811                  */
1812                 init_special_inode(obj->oo_inode, obj->oo_inode->i_mode,
1813                                    attr->la_rdev);
1814         }
1815         LINVRNT(osd_invariant(obj));
1816         return result;
1817 }
1818
1819 typedef int (*osd_obj_type_f)(struct osd_thread_info *, struct osd_object *,
1820                               struct lu_attr *,
1821                               struct dt_allocation_hint *hint,
1822                               struct dt_object_format *dof,
1823                               struct thandle *);
1824
1825 static osd_obj_type_f osd_create_type_f(enum dt_format_type type)
1826 {
1827         osd_obj_type_f result;
1828
1829         switch (type) {
1830         case DFT_DIR:
1831                 result = osd_mkdir;
1832                 break;
1833         case DFT_REGULAR:
1834                 result = osd_mkreg;
1835                 break;
1836         case DFT_SYM:
1837                 result = osd_mksym;
1838                 break;
1839         case DFT_NODE:
1840                 result = osd_mknod;
1841                 break;
1842         case DFT_INDEX:
1843                 result = osd_mk_index;
1844                 break;
1845
1846         default:
1847                 LBUG();
1848                 break;
1849         }
1850         return result;
1851 }
1852
1853
1854 static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah,
1855                         struct dt_object *parent, struct dt_object *child,
1856                         cfs_umode_t child_mode)
1857 {
1858         LASSERT(ah);
1859
1860         memset(ah, 0, sizeof(*ah));
1861         ah->dah_parent = parent;
1862         ah->dah_mode = child_mode;
1863 }
1864
1865 static void osd_attr_init(struct osd_thread_info *info, struct osd_object *obj,
1866                           struct lu_attr *attr, struct dt_object_format *dof)
1867 {
1868         struct inode   *inode = obj->oo_inode;
1869         __u64           valid = attr->la_valid;
1870         int             result;
1871
1872         attr->la_valid &= ~(LA_TYPE | LA_MODE);
1873
1874         if (dof->dof_type != DFT_NODE)
1875                 attr->la_valid &= ~LA_RDEV;
1876         if ((valid & LA_ATIME) && (attr->la_atime == LTIME_S(inode->i_atime)))
1877                 attr->la_valid &= ~LA_ATIME;
1878         if ((valid & LA_CTIME) && (attr->la_ctime == LTIME_S(inode->i_ctime)))
1879                 attr->la_valid &= ~LA_CTIME;
1880         if ((valid & LA_MTIME) && (attr->la_mtime == LTIME_S(inode->i_mtime)))
1881                 attr->la_valid &= ~LA_MTIME;
1882
1883         result = osd_quota_transfer(inode, attr);
1884         if (result)
1885                 return;
1886
1887         if (attr->la_valid != 0) {
1888                 result = osd_inode_setattr(info->oti_env, inode, attr);
1889                 /*
1890                  * The osd_inode_setattr() should always succeed here.  The
1891                  * only error that could be returned is EDQUOT when we are
1892                  * trying to change the UID or GID of the inode. However, this
1893                  * should not happen since quota enforcement is no longer
1894                  * enabled on ldiskfs (lquota takes care of it).
1895                  */
1896                 LASSERTF(result == 0, "%d", result);
1897                 inode->i_sb->s_op->dirty_inode(inode);
1898         }
1899
1900         attr->la_valid = valid;
1901 }
1902
1903 /**
1904  * Helper function for osd_object_create()
1905  *
1906  * \retval 0, on success
1907  */
1908 static int __osd_object_create(struct osd_thread_info *info,
1909                                struct osd_object *obj, struct lu_attr *attr,
1910                                struct dt_allocation_hint *hint,
1911                                struct dt_object_format *dof,
1912                                struct thandle *th)
1913 {
1914         int     result;
1915
1916         result = osd_create_type_f(dof->dof_type)(info, obj, attr, hint, dof,
1917                                                   th);
1918         if (result == 0) {
1919                 osd_attr_init(info, obj, attr, dof);
1920                 osd_object_init0(obj);
1921                 /* bz 24037 */
1922                 if (obj->oo_inode && (obj->oo_inode->i_state & I_NEW))
1923                         unlock_new_inode(obj->oo_inode);
1924         }
1925
1926         return result;
1927 }
1928
1929 /**
1930  * Helper function for osd_object_create()
1931  *
1932  * \retval 0, on success
1933  */
1934 static int __osd_oi_insert(const struct lu_env *env, struct osd_object *obj,
1935                            const struct lu_fid *fid, struct thandle *th)
1936 {
1937         struct osd_thread_info *info = osd_oti_get(env);
1938         struct osd_inode_id    *id   = &info->oti_id;
1939         struct osd_device      *osd  = osd_obj2dev(obj);
1940
1941         LASSERT(obj->oo_inode != NULL);
1942
1943         osd_id_gen(id, obj->oo_inode->i_ino, obj->oo_inode->i_generation);
1944         return osd_oi_insert(info, osd, fid, id, th);
1945 }
1946
1947 int osd_fld_lookup(const struct lu_env *env, struct osd_device *osd,
1948                    const struct lu_fid *fid, struct lu_seq_range *range)
1949 {
1950         struct seq_server_site  *ss = osd_seq_site(osd);
1951         int                     rc;
1952
1953         if (fid_is_igif(fid)) {
1954                 range->lsr_flags = LU_SEQ_RANGE_MDT;
1955                 range->lsr_index = 0;
1956                 return 0;
1957         }
1958
1959         if (fid_is_idif(fid)) {
1960                 range->lsr_flags = LU_SEQ_RANGE_OST;
1961                 range->lsr_index = fid_idif_ost_idx(fid);
1962                 return 0;
1963         }
1964
1965         if (!fid_is_norm(fid)) {
1966                 range->lsr_flags = LU_SEQ_RANGE_MDT;
1967                 if (ss != NULL)
1968                         /* FIXME: If ss is NULL, it suppose not get lsr_index
1969                          * at all */
1970                         range->lsr_index = ss->ss_node_id;
1971                 return 0;
1972         }
1973
1974         LASSERT(ss != NULL);
1975         range->lsr_flags = -1;
1976         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
1977         if (rc != 0) {
1978                 CERROR("%s can not find "DFID": rc = %d\n",
1979                        osd2lu_dev(osd)->ld_obd->obd_name, PFID(fid), rc);
1980         }
1981         return rc;
1982 }
1983
1984
1985 static int osd_declare_object_create(const struct lu_env *env,
1986                                      struct dt_object *dt,
1987                                      struct lu_attr *attr,
1988                                      struct dt_allocation_hint *hint,
1989                                      struct dt_object_format *dof,
1990                                      struct thandle *handle)
1991 {
1992         struct lu_seq_range     *range = &osd_oti_get(env)->oti_seq_range;
1993         struct osd_thandle      *oh;
1994         int                      rc;
1995         ENTRY;
1996
1997         LASSERT(handle != NULL);
1998
1999         oh = container_of0(handle, struct osd_thandle, ot_super);
2000         LASSERT(oh->ot_handle == NULL);
2001
2002         osd_trans_declare_op(env, oh, OSD_OT_CREATE,
2003                              osd_dto_credits_noquota[DTO_OBJECT_CREATE]);
2004         /* XXX: So far, only normal fid needs be inserted into the oi,
2005          *      things could be changed later. Revise following code then. */
2006         if (fid_is_norm(lu_object_fid(&dt->do_lu)) &&
2007             !fid_is_on_ost(osd_oti_get(env), osd_dt_dev(handle->th_dev),
2008                            lu_object_fid(&dt->do_lu))) {
2009                 /* Reuse idle OI block may cause additional one OI block
2010                  * to be changed. */
2011                 osd_trans_declare_op(env, oh, OSD_OT_INSERT,
2012                                 osd_dto_credits_noquota[DTO_INDEX_INSERT] + 1);
2013         }
2014         /* If this is directory, then we expect . and .. to be inserted as
2015          * well. The one directory block always needs to be created for the
2016          * directory, so we could use DTO_WRITE_BASE here (GDT, block bitmap,
2017          * block), there is no danger of needing a tree for the first block.
2018          */
2019         if (attr && S_ISDIR(attr->la_mode)) {
2020                 osd_trans_declare_op(env, oh, OSD_OT_INSERT,
2021                                      osd_dto_credits_noquota[DTO_WRITE_BASE]);
2022                 osd_trans_declare_op(env, oh, OSD_OT_INSERT, 0);
2023         }
2024
2025         if (!attr)
2026                 RETURN(0);
2027
2028         rc = osd_declare_inode_qid(env, attr->la_uid, attr->la_gid, 1, oh,
2029                                    false, false, NULL, false);
2030         if (rc != 0)
2031                 RETURN(rc);
2032
2033         /* It does fld look up inside declare, and the result will be
2034          * added to fld cache, so the following fld lookup inside insert
2035          * does not need send RPC anymore, so avoid send rpc with holding
2036          * transaction */
2037         if (fid_is_norm(lu_object_fid(&dt->do_lu)) &&
2038                 !fid_is_last_id(lu_object_fid(&dt->do_lu)))
2039                 osd_fld_lookup(env, osd_dt_dev(handle->th_dev),
2040                                lu_object_fid(&dt->do_lu), range);
2041
2042
2043         RETURN(rc);
2044 }
2045
2046 static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
2047                              struct lu_attr *attr,
2048                              struct dt_allocation_hint *hint,
2049                              struct dt_object_format *dof,
2050                              struct thandle *th)
2051 {
2052         const struct lu_fid    *fid    = lu_object_fid(&dt->do_lu);
2053         struct osd_object      *obj    = osd_dt_obj(dt);
2054         struct osd_thread_info *info   = osd_oti_get(env);
2055         int result;
2056
2057         ENTRY;
2058
2059         LINVRNT(osd_invariant(obj));
2060         LASSERT(!dt_object_exists(dt));
2061         LASSERT(osd_write_locked(env, obj));
2062         LASSERT(th != NULL);
2063
2064         if (unlikely(fid_is_acct(fid)))
2065                 /* Quota files can't be created from the kernel any more,
2066                  * 'tune2fs -O quota' will take care of creating them */
2067                 RETURN(-EPERM);
2068
2069         osd_trans_exec_op(env, th, OSD_OT_CREATE);
2070         osd_trans_declare_rb(env, th, OSD_OT_REF_ADD);
2071
2072         result = __osd_object_create(info, obj, attr, hint, dof, th);
2073         if (result == 0)
2074                 result = __osd_oi_insert(env, obj, fid, th);
2075
2076         LASSERT(ergo(result == 0, dt_object_exists(dt)));
2077         LASSERT(osd_invariant(obj));
2078         RETURN(result);
2079 }
2080
2081 /**
2082  * Called to destroy on-disk representation of the object
2083  *
2084  * Concurrency: must be locked
2085  */
2086 static int osd_declare_object_destroy(const struct lu_env *env,
2087                                       struct dt_object *dt,
2088                                       struct thandle *th)
2089 {
2090         struct osd_object  *obj = osd_dt_obj(dt);
2091         struct inode       *inode = obj->oo_inode;
2092         struct osd_thandle *oh;
2093         int                 rc;
2094         ENTRY;
2095
2096         oh = container_of0(th, struct osd_thandle, ot_super);
2097         LASSERT(oh->ot_handle == NULL);
2098         LASSERT(inode);
2099
2100         osd_trans_declare_op(env, oh, OSD_OT_DELETE,
2101                              osd_dto_credits_noquota[DTO_OBJECT_DELETE]);
2102         /* XXX: So far, only normal fid needs to be inserted into the OI,
2103          *      so only normal fid needs to be removed from the OI also.
2104          * Recycle idle OI leaf may cause additional three OI blocks
2105          * to be changed. */
2106         osd_trans_declare_op(env, oh, OSD_OT_DESTROY,
2107                              fid_is_norm(lu_object_fid(&dt->do_lu)) ?
2108                              osd_dto_credits_noquota[DTO_INDEX_DELETE] + 3 : 0);
2109
2110         /* one less inode */
2111         rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, -1, oh,
2112                                    false, true, NULL, false);
2113         if (rc)
2114                 RETURN(rc);
2115         /* data to be truncated */
2116         rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, 0, oh,
2117                                    true, true, NULL, false);
2118         RETURN(rc);
2119 }
2120
2121 static int osd_object_destroy(const struct lu_env *env,
2122                               struct dt_object *dt,
2123                               struct thandle *th)
2124 {
2125         const struct lu_fid    *fid = lu_object_fid(&dt->do_lu);
2126         struct osd_object      *obj = osd_dt_obj(dt);
2127         struct inode           *inode = obj->oo_inode;
2128         struct osd_device      *osd = osd_obj2dev(obj);
2129         struct osd_thandle     *oh;
2130         int                     result;
2131         ENTRY;
2132
2133         oh = container_of0(th, struct osd_thandle, ot_super);
2134         LASSERT(oh->ot_handle);
2135         LASSERT(inode);
2136         LASSERT(!lu_object_is_dying(dt->do_lu.lo_header));
2137
2138         if (unlikely(fid_is_acct(fid)))
2139                 RETURN(-EPERM);
2140
2141         /* Parallel control for OI scrub. For most of cases, there is no
2142          * lock contention. So it will not affect unlink performance. */
2143         mutex_lock(&inode->i_mutex);
2144         if (S_ISDIR(inode->i_mode)) {
2145                 LASSERT(osd_inode_unlinked(inode) ||
2146                         inode->i_nlink == 1);
2147                 spin_lock(&obj->oo_guard);
2148                 clear_nlink(inode);
2149                 spin_unlock(&obj->oo_guard);
2150                 inode->i_sb->s_op->dirty_inode(inode);
2151         }
2152
2153         osd_trans_exec_op(env, th, OSD_OT_DESTROY);
2154
2155         result = osd_oi_delete(osd_oti_get(env), osd, fid, th);
2156         mutex_unlock(&inode->i_mutex);
2157
2158         /* XXX: add to ext3 orphan list */
2159         /* rc = ext3_orphan_add(handle_t *handle, struct inode *inode) */
2160
2161         /* not needed in the cache anymore */
2162         set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
2163
2164         RETURN(0);
2165 }
2166
2167 static inline int __osd_xattr_set(struct osd_thread_info *info,
2168                                   struct inode *inode, const char *name,
2169                                   const void *buf, int buflen, int fl)
2170 {
2171         struct dentry *dentry = &info->oti_child_dentry;
2172
2173         ll_vfs_dq_init(inode);
2174         dentry->d_inode = inode;
2175         return inode->i_op->setxattr(dentry, name, buf, buflen, fl);
2176 }
2177
2178 /**
2179  * Put the fid into lustre_mdt_attrs, and then place the structure
2180  * inode's ea. This fid should not be altered during the life time
2181  * of the inode.
2182  *
2183  * \retval +ve, on success
2184  * \retval -ve, on error
2185  *
2186  * FIXME: It is good to have/use ldiskfs_xattr_set_handle() here
2187  */
2188 int osd_ea_fid_set(struct osd_thread_info *info, struct inode *inode,
2189                    const struct lu_fid *fid)
2190 {
2191         struct lustre_mdt_attrs *lma = &info->oti_mdt_attrs;
2192         int                      rc;
2193
2194         lustre_lma_init(lma, fid);
2195         lustre_lma_swab(lma);
2196
2197         rc = __osd_xattr_set(info, inode, XATTR_NAME_LMA, lma, sizeof(*lma),
2198                              XATTR_CREATE);
2199         /* Someone may created the EA by race. */
2200         if (unlikely(rc == -EEXIST))
2201                 rc = 0;
2202         return rc;
2203 }
2204
2205 /**
2206  * ldiskfs supports fid in dirent, it is passed in dentry->d_fsdata.
2207  * lustre 1.8 also uses d_fsdata for passing other info to ldiskfs.
2208  * To have compatilibility with 1.8 ldiskfs driver we need to have
2209  * magic number at start of fid data.
2210  * \ldiskfs_dentry_param is used only to pass fid from osd to ldiskfs.
2211  * its inmemory API.
2212  */
2213 void osd_get_ldiskfs_dirent_param(struct ldiskfs_dentry_param *param,
2214                                   const struct dt_rec *fid)
2215 {
2216         /* XXX: replace the check with "!fid_is_client_mdt_visible()"
2217          *      when FID in OI file introduced for local object. */
2218         if (!fid_is_norm((const struct lu_fid *)fid) &&
2219             !fid_is_igif((const struct lu_fid *)fid)) {
2220                 param->edp_magic = 0;
2221                 return;
2222         }
2223
2224         param->edp_magic = LDISKFS_LUFID_MAGIC;
2225         param->edp_len =  sizeof(struct lu_fid) + 1;
2226         fid_cpu_to_be((struct lu_fid *)param->edp_data, (struct lu_fid *)fid);
2227 }
2228
2229 /**
2230  * Try to read the fid from inode ea into dt_rec, if return value
2231  * i.e. rc is +ve, then we got fid, otherwise we will have to form igif
2232  *
2233  * \param fid object fid.
2234  *
2235  * \retval 0 on success
2236  */
2237 static int osd_ea_fid_get(const struct lu_env *env, struct osd_object *obj,
2238                           __u32 ino, struct lu_fid *fid,
2239                           struct osd_inode_id *id)
2240 {
2241         struct osd_thread_info *info  = osd_oti_get(env);
2242         struct inode           *inode;
2243         ENTRY;
2244
2245         osd_id_gen(id, ino, OSD_OII_NOGEN);
2246         inode = osd_iget_fid(info, osd_obj2dev(obj), id, fid);
2247         if (IS_ERR(inode))
2248                 RETURN(PTR_ERR(inode));
2249
2250         iput(inode);
2251         RETURN(0);
2252 }
2253
2254 /**
2255  * OSD layer object create function for interoperability mode (b11826).
2256  * This is mostly similar to osd_object_create(). Only difference being, fid is
2257  * inserted into inode ea here.
2258  *
2259  * \retval   0, on success
2260  * \retval -ve, on error
2261  */
2262 static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt,
2263                                 struct lu_attr *attr,
2264                                 struct dt_allocation_hint *hint,
2265                                 struct dt_object_format *dof,
2266                                 struct thandle *th)
2267 {
2268         const struct lu_fid    *fid    = lu_object_fid(&dt->do_lu);
2269         struct osd_object      *obj    = osd_dt_obj(dt);
2270         struct osd_thread_info *info   = osd_oti_get(env);
2271         int                     result;
2272
2273         ENTRY;
2274
2275         LASSERT(osd_invariant(obj));
2276         LASSERT(!dt_object_exists(dt));
2277         LASSERT(osd_write_locked(env, obj));
2278         LASSERT(th != NULL);
2279
2280         if (unlikely(fid_is_acct(fid)))
2281                 /* Quota files can't be created from the kernel any more,
2282                  * 'tune2fs -O quota' will take care of creating them */
2283                 RETURN(-EPERM);
2284
2285         osd_trans_exec_op(env, th, OSD_OT_CREATE);
2286         osd_trans_declare_rb(env, th, OSD_OT_REF_ADD);
2287
2288         result = __osd_object_create(info, obj, attr, hint, dof, th);
2289         /* objects under osd root shld have igif fid, so dont add fid EA */
2290         /* For ost object, the fid will be stored during first write */
2291         if (result == 0 && fid_seq(fid) >= FID_SEQ_NORMAL &&
2292             !fid_is_on_ost(info, osd_dt_dev(th->th_dev), fid))
2293                 result = osd_ea_fid_set(info, obj->oo_inode, fid);
2294
2295         if (result == 0)
2296                 result = __osd_oi_insert(env, obj, fid, th);
2297
2298         LASSERT(ergo(result == 0, dt_object_exists(dt)));
2299         LINVRNT(osd_invariant(obj));
2300         RETURN(result);
2301 }
2302
2303 static int osd_declare_object_ref_add(const struct lu_env *env,
2304                                       struct dt_object *dt,
2305                                       struct thandle *handle)
2306 {
2307         struct osd_thandle       *oh;
2308
2309         /* it's possible that object doesn't exist yet */
2310         LASSERT(handle != NULL);
2311
2312         oh = container_of0(handle, struct osd_thandle, ot_super);
2313         LASSERT(oh->ot_handle == NULL);
2314
2315         osd_trans_declare_op(env, oh, OSD_OT_REF_ADD,
2316                              osd_dto_credits_noquota[DTO_ATTR_SET_BASE]);
2317
2318         return 0;
2319 }
2320
2321 /*
2322  * Concurrency: @dt is write locked.
2323  */
2324 static int osd_object_ref_add(const struct lu_env *env,
2325                               struct dt_object *dt, struct thandle *th)
2326 {
2327         struct osd_object *obj = osd_dt_obj(dt);
2328         struct inode      *inode = obj->oo_inode;
2329
2330         LINVRNT(osd_invariant(obj));
2331         LASSERT(dt_object_exists(dt));
2332         LASSERT(osd_write_locked(env, obj));
2333         LASSERT(th != NULL);
2334
2335         osd_trans_exec_op(env, th, OSD_OT_REF_ADD);
2336
2337         /*
2338          * DIR_NLINK feature is set for compatibility reasons if:
2339          * 1) nlinks > LDISKFS_LINK_MAX, or
2340          * 2) nlinks == 2, since this indicates i_nlink was previously 1.
2341          *
2342          * It is easier to always set this flag (rather than check and set),
2343          * since it has less overhead, and the superblock will be dirtied
2344          * at some point. Both e2fsprogs and any Lustre-supported ldiskfs
2345          * do not actually care whether this flag is set or not.
2346          */
2347         spin_lock(&obj->oo_guard);
2348         /* inc_nlink from 0 may cause WARN_ON */
2349         if(inode->i_nlink == 0)
2350                 set_nlink(inode, 1);
2351         else
2352                 inc_nlink(inode);
2353         if (S_ISDIR(inode->i_mode) && inode->i_nlink > 1) {
2354                 if (inode->i_nlink >= LDISKFS_LINK_MAX ||
2355                     inode->i_nlink == 2)
2356                         set_nlink(inode, 1);
2357         }
2358         LASSERT(inode->i_nlink <= LDISKFS_LINK_MAX);
2359         spin_unlock(&obj->oo_guard);
2360         inode->i_sb->s_op->dirty_inode(inode);
2361         LINVRNT(osd_invariant(obj));
2362
2363         return 0;
2364 }
2365
2366 static int osd_declare_object_ref_del(const struct lu_env *env,
2367                                       struct dt_object *dt,
2368                                       struct thandle *handle)
2369 {
2370         struct osd_thandle *oh;
2371
2372         LASSERT(dt_object_exists(dt));
2373         LASSERT(handle != NULL);
2374
2375         oh = container_of0(handle, struct osd_thandle, ot_super);
2376         LASSERT(oh->ot_handle == NULL);
2377
2378         osd_trans_declare_op(env, oh, OSD_OT_REF_DEL,
2379                              osd_dto_credits_noquota[DTO_ATTR_SET_BASE]);
2380
2381         return 0;
2382 }
2383
2384 /*
2385  * Concurrency: @dt is write locked.
2386  */
2387 static int osd_object_ref_del(const struct lu_env *env, struct dt_object *dt,
2388                               struct thandle *th)
2389 {
2390         struct osd_object *obj = osd_dt_obj(dt);
2391         struct inode      *inode = obj->oo_inode;
2392
2393         LINVRNT(osd_invariant(obj));
2394         LASSERT(dt_object_exists(dt));
2395         LASSERT(osd_write_locked(env, obj));
2396         LASSERT(th != NULL);
2397
2398         osd_trans_exec_op(env, th, OSD_OT_REF_DEL);
2399
2400         spin_lock(&obj->oo_guard);
2401         LASSERT(inode->i_nlink > 0);
2402         drop_nlink(inode);
2403         /* If this is/was a many-subdir directory (nlink > LDISKFS_LINK_MAX)
2404          * then the nlink count is 1. Don't let it be set to 0 or the directory
2405          * inode will be deleted incorrectly. */
2406         if (S_ISDIR(inode->i_mode) && inode->i_nlink == 0)
2407                 set_nlink(inode, 1);
2408         spin_unlock(&obj->oo_guard);
2409         inode->i_sb->s_op->dirty_inode(inode);
2410         LINVRNT(osd_invariant(obj));
2411
2412         return 0;
2413 }
2414
2415 /*
2416  * Get the 64-bit version for an inode.
2417  */
2418 static int osd_object_version_get(const struct lu_env *env,
2419                                   struct dt_object *dt, dt_obj_version_t *ver)
2420 {
2421         struct inode *inode = osd_dt_obj(dt)->oo_inode;
2422
2423         CDEBUG(D_INODE, "Get version "LPX64" for inode %lu\n",
2424                LDISKFS_I(inode)->i_fs_version, inode->i_ino);
2425         *ver = LDISKFS_I(inode)->i_fs_version;
2426         return 0;
2427 }
2428
2429 /*
2430  * Concurrency: @dt is read locked.
2431  */
2432 static int osd_xattr_get(const struct lu_env *env, struct dt_object *dt,
2433                          struct lu_buf *buf, const char *name,
2434                          struct lustre_capa *capa)
2435 {
2436         struct osd_object      *obj    = osd_dt_obj(dt);
2437         struct inode           *inode  = obj->oo_inode;
2438         struct osd_thread_info *info   = osd_oti_get(env);
2439         struct dentry          *dentry = &info->oti_obj_dentry;
2440
2441         /* version get is not real XATTR but uses xattr API */
2442         if (strcmp(name, XATTR_NAME_VERSION) == 0) {
2443                 /* for version we are just using xattr API but change inode
2444                  * field instead */
2445                 LASSERT(buf->lb_len == sizeof(dt_obj_version_t));
2446                 osd_object_version_get(env, dt, buf->lb_buf);
2447                 return sizeof(dt_obj_version_t);
2448         }
2449
2450         LASSERT(dt_object_exists(dt));
2451         LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
2452
2453         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
2454                 return -EACCES;
2455
2456         return __osd_xattr_get(inode, dentry, name, buf->lb_buf, buf->lb_len);
2457 }
2458
2459
2460 static int osd_declare_xattr_set(const struct lu_env *env,
2461                                  struct dt_object *dt,
2462                                  const struct lu_buf *buf, const char *name,
2463                                  int fl, struct thandle *handle)
2464 {
2465         struct osd_thandle *oh;
2466
2467         LASSERT(handle != NULL);
2468
2469         oh = container_of0(handle, struct osd_thandle, ot_super);
2470         LASSERT(oh->ot_handle == NULL);
2471
2472         osd_trans_declare_op(env, oh, OSD_OT_XATTR_SET,
2473                              strcmp(name, XATTR_NAME_VERSION) == 0 ?
2474                              osd_dto_credits_noquota[DTO_ATTR_SET_BASE] :
2475                              osd_dto_credits_noquota[DTO_XATTR_SET]);
2476
2477         return 0;
2478 }
2479
2480 /*
2481  * Set the 64-bit version for object
2482  */
2483 static void osd_object_version_set(const struct lu_env *env,
2484                                    struct dt_object *dt,
2485                                    dt_obj_version_t *new_version)
2486 {
2487         struct inode *inode = osd_dt_obj(dt)->oo_inode;
2488
2489         CDEBUG(D_INODE, "Set version "LPX64" (old "LPX64") for inode %lu\n",
2490                *new_version, LDISKFS_I(inode)->i_fs_version, inode->i_ino);
2491
2492         LDISKFS_I(inode)->i_fs_version = *new_version;
2493         /** Version is set after all inode operations are finished,
2494          *  so we should mark it dirty here */
2495         inode->i_sb->s_op->dirty_inode(inode);
2496 }
2497
2498 /*
2499  * Concurrency: @dt is write locked.
2500  */
2501 static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
2502                          const struct lu_buf *buf, const char *name, int fl,
2503                          struct thandle *handle, struct lustre_capa *capa)
2504 {
2505         struct osd_object      *obj      = osd_dt_obj(dt);
2506         struct inode           *inode    = obj->oo_inode;
2507         struct osd_thread_info *info     = osd_oti_get(env);
2508         int                     fs_flags = 0;
2509
2510         LASSERT(handle != NULL);
2511
2512         /* version set is not real XATTR */
2513         if (strcmp(name, XATTR_NAME_VERSION) == 0) {
2514                 /* for version we are just using xattr API but change inode
2515                  * field instead */
2516                 LASSERT(buf->lb_len == sizeof(dt_obj_version_t));
2517                 osd_object_version_set(env, dt, buf->lb_buf);
2518                 return sizeof(dt_obj_version_t);
2519         }
2520
2521         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
2522                 return -EACCES;
2523
2524         osd_trans_exec_op(env, handle, OSD_OT_XATTR_SET);
2525         if (fl & LU_XATTR_REPLACE)
2526                 fs_flags |= XATTR_REPLACE;
2527
2528         if (fl & LU_XATTR_CREATE)
2529                 fs_flags |= XATTR_CREATE;
2530
2531         return __osd_xattr_set(info, inode, name, buf->lb_buf, buf->lb_len,
2532                                fs_flags);
2533 }
2534
2535 /*
2536  * Concurrency: @dt is read locked.
2537  */
2538 static int osd_xattr_list(const struct lu_env *env, struct dt_object *dt,
2539                           struct lu_buf *buf, struct lustre_capa *capa)
2540 {
2541         struct osd_object      *obj    = osd_dt_obj(dt);
2542         struct inode           *inode  = obj->oo_inode;
2543         struct osd_thread_info *info   = osd_oti_get(env);
2544         struct dentry          *dentry = &info->oti_obj_dentry;
2545
2546         LASSERT(dt_object_exists(dt));
2547         LASSERT(inode->i_op != NULL && inode->i_op->listxattr != NULL);
2548         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
2549
2550         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
2551                 return -EACCES;
2552
2553         dentry->d_inode = inode;
2554         return inode->i_op->listxattr(dentry, buf->lb_buf, buf->lb_len);
2555 }
2556
2557 static int osd_declare_xattr_del(const struct lu_env *env,
2558                                  struct dt_object *dt, const char *name,
2559                                  struct thandle *handle)
2560 {
2561         struct osd_thandle *oh;
2562
2563         LASSERT(dt_object_exists(dt));
2564         LASSERT(handle != NULL);
2565
2566         oh = container_of0(handle, struct osd_thandle, ot_super);
2567         LASSERT(oh->ot_handle == NULL);
2568
2569         osd_trans_declare_op(env, oh, OSD_OT_XATTR_SET,
2570                              osd_dto_credits_noquota[DTO_XATTR_SET]);
2571
2572         return 0;
2573 }
2574
2575 /*
2576  * Concurrency: @dt is write locked.
2577  */
2578 static int osd_xattr_del(const struct lu_env *env, struct dt_object *dt,
2579                          const char *name, struct thandle *handle,
2580                          struct lustre_capa *capa)
2581 {
2582         struct osd_object      *obj    = osd_dt_obj(dt);
2583         struct inode           *inode  = obj->oo_inode;
2584         struct osd_thread_info *info   = osd_oti_get(env);
2585         struct dentry          *dentry = &info->oti_obj_dentry;
2586         int                     rc;
2587
2588         LASSERT(dt_object_exists(dt));
2589         LASSERT(inode->i_op != NULL && inode->i_op->removexattr != NULL);
2590         LASSERT(handle != NULL);
2591
2592         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
2593                 return -EACCES;
2594
2595         osd_trans_exec_op(env, handle, OSD_OT_XATTR_SET);
2596
2597         ll_vfs_dq_init(inode);
2598         dentry->d_inode = inode;
2599         rc = inode->i_op->removexattr(dentry, name);
2600         return rc;
2601 }
2602
2603 static struct obd_capa *osd_capa_get(const struct lu_env *env,
2604                                      struct dt_object *dt,
2605                                      struct lustre_capa *old,
2606                                      __u64 opc)
2607 {
2608         struct osd_thread_info *info = osd_oti_get(env);
2609         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
2610         struct osd_object *obj = osd_dt_obj(dt);
2611         struct osd_device *dev = osd_obj2dev(obj);
2612         struct lustre_capa_key *key = &info->oti_capa_key;
2613         struct lustre_capa *capa = &info->oti_capa;
2614         struct obd_capa *oc;
2615         struct md_capainfo *ci;
2616         int rc;
2617         ENTRY;
2618
2619         if (!dev->od_fl_capa)
2620                 RETURN(ERR_PTR(-ENOENT));
2621
2622         LASSERT(dt_object_exists(dt));
2623         LINVRNT(osd_invariant(obj));
2624
2625         /* renewal sanity check */
2626         if (old && osd_object_auth(env, dt, old, opc))
2627                 RETURN(ERR_PTR(-EACCES));
2628
2629         ci = md_capainfo(env);
2630         if (unlikely(!ci))
2631                 RETURN(ERR_PTR(-ENOENT));
2632
2633         switch (ci->mc_auth) {
2634         case LC_ID_NONE:
2635                 RETURN(NULL);
2636         case LC_ID_PLAIN:
2637                 capa->lc_uid = obj->oo_inode->i_uid;
2638                 capa->lc_gid = obj->oo_inode->i_gid;
2639                 capa->lc_flags = LC_ID_PLAIN;
2640                 break;
2641         case LC_ID_CONVERT: {
2642                 __u32 d[4], s[4];
2643
2644                 s[0] = obj->oo_inode->i_uid;
2645                 cfs_get_random_bytes(&(s[1]), sizeof(__u32));
2646                 s[2] = obj->oo_inode->i_gid;
2647                 cfs_get_random_bytes(&(s[3]), sizeof(__u32));
2648                 rc = capa_encrypt_id(d, s, key->lk_key, CAPA_HMAC_KEY_MAX_LEN);
2649                 if (unlikely(rc))
2650                         RETURN(ERR_PTR(rc));
2651
2652                 capa->lc_uid   = ((__u64)d[1] << 32) | d[0];
2653                 capa->lc_gid   = ((__u64)d[3] << 32) | d[2];
2654                 capa->lc_flags = LC_ID_CONVERT;
2655                 break;
2656         }
2657         default:
2658                 RETURN(ERR_PTR(-EINVAL));
2659         }
2660
2661         capa->lc_fid = *fid;
2662         capa->lc_opc = opc;
2663         capa->lc_flags |= dev->od_capa_alg << 24;
2664         capa->lc_timeout = dev->od_capa_timeout;
2665         capa->lc_expiry = 0;
2666
2667         oc = capa_lookup(dev->od_capa_hash, capa, 1);
2668         if (oc) {
2669                 LASSERT(!capa_is_expired(oc));
2670                 RETURN(oc);
2671         }
2672
2673         spin_lock(&capa_lock);
2674         *key = dev->od_capa_keys[1];
2675         spin_unlock(&capa_lock);
2676
2677         capa->lc_keyid = key->lk_keyid;
2678         capa->lc_expiry = cfs_time_current_sec() + dev->od_capa_timeout;
2679
2680         rc = capa_hmac(capa->lc_hmac, capa, key->lk_key);
2681         if (rc) {
2682                 DEBUG_CAPA(D_ERROR, capa, "HMAC failed: %d for", rc);
2683                 RETURN(ERR_PTR(rc));
2684         }
2685
2686         oc = capa_add(dev->od_capa_hash, capa);
2687         RETURN(oc);
2688 }
2689
2690 static int osd_object_sync(const struct lu_env *env, struct dt_object *dt)
2691 {
2692         struct osd_object       *obj    = osd_dt_obj(dt);
2693         struct inode            *inode  = obj->oo_inode;
2694         struct osd_thread_info  *info   = osd_oti_get(env);
2695         struct dentry           *dentry = &info->oti_obj_dentry;
2696         struct file             *file   = &info->oti_file;
2697         int                     rc;
2698
2699         ENTRY;
2700
2701         dentry->d_inode = inode;
2702         file->f_dentry = dentry;
2703         file->f_mapping = inode->i_mapping;
2704         file->f_op = inode->i_fop;
2705         mutex_lock(&inode->i_mutex);
2706         rc = file->f_op->fsync(file, dentry, 0);
2707         mutex_unlock(&inode->i_mutex);
2708         RETURN(rc);
2709 }
2710
2711 static int osd_data_get(const struct lu_env *env, struct dt_object *dt,
2712                         void **data)
2713 {
2714         struct osd_object *obj = osd_dt_obj(dt);
2715         ENTRY;
2716
2717         *data = (void *)obj->oo_inode;
2718         RETURN(0);
2719 }
2720
2721 /*
2722  * Index operations.
2723  */
2724
2725 static int osd_iam_index_probe(const struct lu_env *env, struct osd_object *o,
2726                            const struct dt_index_features *feat)
2727 {
2728         struct iam_descr *descr;
2729
2730         if (osd_object_is_root(o))
2731                 return feat == &dt_directory_features;
2732
2733         LASSERT(o->oo_dir != NULL);
2734
2735         descr = o->oo_dir->od_container.ic_descr;
2736         if (feat == &dt_directory_features) {
2737                 if (descr->id_rec_size == sizeof(struct osd_fid_pack))
2738                         return 1;
2739                 else
2740                         return 0;
2741         } else {
2742                 return
2743                         feat->dif_keysize_min <= descr->id_key_size &&
2744                         descr->id_key_size <= feat->dif_keysize_max &&
2745                         feat->dif_recsize_min <= descr->id_rec_size &&
2746                         descr->id_rec_size <= feat->dif_recsize_max &&
2747                         !(feat->dif_flags & (DT_IND_VARKEY |
2748                                              DT_IND_VARREC | DT_IND_NONUNQ)) &&
2749                         ergo(feat->dif_flags & DT_IND_UPDATE,
2750                              1 /* XXX check that object (and file system) is
2751                                 * writable */);
2752         }
2753 }
2754
2755 static int osd_iam_container_init(const struct lu_env *env,
2756                                   struct osd_object *obj,
2757                                   struct osd_directory *dir)
2758 {
2759         struct iam_container *bag = &dir->od_container;
2760         int result;
2761
2762         result = iam_container_init(bag, &dir->od_descr, obj->oo_inode);
2763         if (result != 0)
2764                 return result;
2765
2766         result = iam_container_setup(bag);
2767         if (result == 0)
2768                 obj->oo_dt.do_index_ops = &osd_index_iam_ops;
2769         else
2770                 iam_container_fini(bag);
2771
2772         return result;
2773 }
2774
2775
2776 /*
2777  * Concurrency: no external locking is necessary.
2778  */
2779 static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
2780                          const struct dt_index_features *feat)
2781 {
2782         int                      result;
2783         int                      skip_iam = 0;
2784         struct osd_object       *obj = osd_dt_obj(dt);
2785
2786         LINVRNT(osd_invariant(obj));
2787         LASSERT(dt_object_exists(dt));
2788
2789         if (osd_object_is_root(obj)) {
2790                 dt->do_index_ops = &osd_index_ea_ops;
2791                 result = 0;
2792         } else if (feat == &dt_directory_features) {
2793                 dt->do_index_ops = &osd_index_ea_ops;
2794                 if (S_ISDIR(obj->oo_inode->i_mode))
2795                         result = 0;
2796                 else
2797                         result = -ENOTDIR;
2798                 skip_iam = 1;
2799         } else if (unlikely(feat == &dt_otable_features)) {
2800                 dt->do_index_ops = &osd_otable_ops;
2801                 return 0;
2802         } else if (unlikely(feat == &dt_acct_features)) {
2803                 dt->do_index_ops = &osd_acct_index_ops;
2804                 result = 0;
2805                 skip_iam = 1;
2806         } else if (!osd_has_index(obj)) {
2807                 struct osd_directory *dir;
2808
2809                 OBD_ALLOC_PTR(dir);
2810                 if (dir != NULL) {
2811
2812                         spin_lock(&obj->oo_guard);
2813                         if (obj->oo_dir == NULL)
2814                                 obj->oo_dir = dir;
2815                         else
2816                                 /*
2817                                  * Concurrent thread allocated container data.
2818                                  */
2819                                 OBD_FREE_PTR(dir);
2820                         spin_unlock(&obj->oo_guard);
2821                         /*
2822                          * Now, that we have container data, serialize its
2823                          * initialization.
2824                          */
2825                         down_write(&obj->oo_ext_idx_sem);
2826                         /*
2827                          * recheck under lock.
2828                          */
2829                         if (!osd_has_index(obj))
2830                                 result = osd_iam_container_init(env, obj, dir);
2831                         else
2832                                 result = 0;
2833                         up_write(&obj->oo_ext_idx_sem);
2834                 } else {
2835                         result = -ENOMEM;
2836                 }
2837         } else {
2838                 result = 0;
2839         }
2840
2841         if (result == 0 && skip_iam == 0) {
2842                 if (!osd_iam_index_probe(env, obj, feat))
2843                         result = -ENOTDIR;
2844         }
2845         LINVRNT(osd_invariant(obj));
2846
2847         if (is_quota_glb_feat(feat))
2848                 result = osd_quota_migration(env, dt, feat);
2849
2850         return result;
2851 }
2852
2853 static int osd_otable_it_attr_get(const struct lu_env *env,
2854                                  struct dt_object *dt,
2855                                  struct lu_attr *attr,
2856                                  struct lustre_capa *capa)
2857 {
2858         attr->la_valid = 0;
2859         return 0;
2860 }
2861
2862 static const struct dt_object_operations osd_obj_ops = {
2863         .do_read_lock         = osd_object_read_lock,
2864         .do_write_lock        = osd_object_write_lock,
2865         .do_read_unlock       = osd_object_read_unlock,
2866         .do_write_unlock      = osd_object_write_unlock,
2867         .do_write_locked      = osd_object_write_locked,
2868         .do_attr_get          = osd_attr_get,
2869         .do_declare_attr_set  = osd_declare_attr_set,
2870         .do_attr_set          = osd_attr_set,
2871         .do_ah_init           = osd_ah_init,
2872         .do_declare_create    = osd_declare_object_create,
2873         .do_create            = osd_object_create,
2874         .do_declare_destroy   = osd_declare_object_destroy,
2875         .do_destroy           = osd_object_destroy,
2876         .do_index_try         = osd_index_try,
2877         .do_declare_ref_add   = osd_declare_object_ref_add,
2878         .do_ref_add           = osd_object_ref_add,
2879         .do_declare_ref_del   = osd_declare_object_ref_del,
2880         .do_ref_del           = osd_object_ref_del,
2881         .do_xattr_get         = osd_xattr_get,
2882         .do_declare_xattr_set = osd_declare_xattr_set,
2883         .do_xattr_set         = osd_xattr_set,
2884         .do_declare_xattr_del = osd_declare_xattr_del,
2885         .do_xattr_del         = osd_xattr_del,
2886         .do_xattr_list        = osd_xattr_list,
2887         .do_capa_get          = osd_capa_get,
2888         .do_object_sync       = osd_object_sync,
2889         .do_data_get          = osd_data_get,
2890 };
2891
2892 /**
2893  * dt_object_operations for interoperability mode
2894  * (i.e. to run 2.0 mds on 1.8 disk) (b11826)
2895  */
2896 static const struct dt_object_operations osd_obj_ea_ops = {
2897         .do_read_lock         = osd_object_read_lock,
2898         .do_write_lock        = osd_object_write_lock,
2899         .do_read_unlock       = osd_object_read_unlock,
2900         .do_write_unlock      = osd_object_write_unlock,
2901         .do_write_locked      = osd_object_write_locked,
2902         .do_attr_get          = osd_attr_get,
2903         .do_declare_attr_set  = osd_declare_attr_set,
2904         .do_attr_set          = osd_attr_set,
2905         .do_ah_init           = osd_ah_init,
2906         .do_declare_create    = osd_declare_object_create,
2907         .do_create            = osd_object_ea_create,
2908         .do_declare_destroy   = osd_declare_object_destroy,
2909         .do_destroy           = osd_object_destroy,
2910         .do_index_try         = osd_index_try,
2911         .do_declare_ref_add   = osd_declare_object_ref_add,
2912         .do_ref_add           = osd_object_ref_add,
2913         .do_declare_ref_del   = osd_declare_object_ref_del,
2914         .do_ref_del           = osd_object_ref_del,
2915         .do_xattr_get         = osd_xattr_get,
2916         .do_declare_xattr_set = osd_declare_xattr_set,
2917         .do_xattr_set         = osd_xattr_set,
2918         .do_declare_xattr_del = osd_declare_xattr_del,
2919         .do_xattr_del         = osd_xattr_del,
2920         .do_xattr_list        = osd_xattr_list,
2921         .do_capa_get          = osd_capa_get,
2922         .do_object_sync       = osd_object_sync,
2923         .do_data_get          = osd_data_get,
2924 };
2925
2926 static const struct dt_object_operations osd_obj_otable_it_ops = {
2927         .do_attr_get    = osd_otable_it_attr_get,
2928         .do_index_try   = osd_index_try,
2929 };
2930
2931 static int osd_index_declare_iam_delete(const struct lu_env *env,
2932                                         struct dt_object *dt,
2933                                         const struct dt_key *key,
2934                                         struct thandle *handle)
2935 {
2936         struct osd_thandle    *oh;
2937
2938         oh = container_of0(handle, struct osd_thandle, ot_super);
2939         LASSERT(oh->ot_handle == NULL);
2940
2941         osd_trans_declare_op(env, oh, OSD_OT_DELETE,
2942                              osd_dto_credits_noquota[DTO_INDEX_DELETE]);
2943
2944         return 0;
2945 }
2946
2947 /**
2948  *      delete a (key, value) pair from index \a dt specified by \a key
2949  *
2950  *      \param  dt      osd index object
2951  *      \param  key     key for index
2952  *      \param  rec     record reference
2953  *      \param  handle  transaction handler
2954  *
2955  *      \retval  0  success
2956  *      \retval -ve   failure
2957  */
2958
2959 static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt,
2960                                 const struct dt_key *key,
2961                                 struct thandle *handle,
2962                                 struct lustre_capa *capa)
2963 {
2964         struct osd_thread_info *oti = osd_oti_get(env);
2965         struct osd_object      *obj = osd_dt_obj(dt);
2966         struct osd_thandle     *oh;
2967         struct iam_path_descr  *ipd;
2968         struct iam_container   *bag = &obj->oo_dir->od_container;
2969         int                     rc;
2970
2971         ENTRY;
2972
2973         LINVRNT(osd_invariant(obj));
2974         LASSERT(dt_object_exists(dt));
2975         LASSERT(bag->ic_object == obj->oo_inode);
2976         LASSERT(handle != NULL);
2977
2978         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
2979                 RETURN(-EACCES);
2980
2981         osd_trans_exec_op(env, handle, OSD_OT_DELETE);
2982
2983         ipd = osd_idx_ipd_get(env, bag);
2984         if (unlikely(ipd == NULL))
2985                 RETURN(-ENOMEM);
2986
2987         oh = container_of0(handle, struct osd_thandle, ot_super);
2988         LASSERT(oh->ot_handle != NULL);
2989         LASSERT(oh->ot_handle->h_transaction != NULL);
2990
2991         if (fid_is_quota(lu_object_fid(&dt->do_lu))) {
2992                 /* swab quota uid/gid provided by caller */
2993                 oti->oti_quota_id = cpu_to_le64(*((__u64 *)key));
2994                 key = (const struct dt_key *)&oti->oti_quota_id;
2995         }
2996
2997         rc = iam_delete(oh->ot_handle, bag, (const struct iam_key *)key, ipd);
2998         osd_ipd_put(env, bag, ipd);
2999         LINVRNT(osd_invariant(obj));
3000         RETURN(rc);
3001 }
3002
3003 static int osd_index_declare_ea_delete(const struct lu_env *env,
3004                                        struct dt_object *dt,
3005                                        const struct dt_key *key,
3006                                        struct thandle *handle)
3007 {
3008         struct osd_thandle *oh;
3009         struct inode       *inode;
3010         int                 rc;
3011         ENTRY;
3012
3013         LASSERT(dt_object_exists(dt));
3014         LASSERT(handle != NULL);
3015
3016         oh = container_of0(handle, struct osd_thandle, ot_super);
3017         LASSERT(oh->ot_handle == NULL);
3018
3019         osd_trans_declare_op(env, oh, OSD_OT_DELETE,
3020                              osd_dto_credits_noquota[DTO_INDEX_DELETE]);
3021
3022         inode = osd_dt_obj(dt)->oo_inode;
3023         LASSERT(inode);
3024
3025         rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, 0, oh,
3026                                    true, true, NULL, false);
3027         RETURN(rc);
3028 }
3029
3030 static inline int osd_get_fid_from_dentry(struct ldiskfs_dir_entry_2 *de,
3031                                           struct dt_rec *fid)
3032 {
3033         struct osd_fid_pack *rec;
3034         int                  rc = -ENODATA;
3035
3036         if (de->file_type & LDISKFS_DIRENT_LUFID) {
3037                 rec = (struct osd_fid_pack *) (de->name + de->name_len + 1);
3038                 rc = osd_fid_unpack((struct lu_fid *)fid, rec);
3039         }
3040         RETURN(rc);
3041 }
3042
3043 /**
3044  * Index delete function for interoperability mode (b11826).
3045  * It will remove the directory entry added by osd_index_ea_insert().
3046  * This entry is needed to maintain name->fid mapping.
3047  *
3048  * \param key,  key i.e. file entry to be deleted
3049  *
3050  * \retval   0, on success
3051  * \retval -ve, on error
3052  */
3053 static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
3054                                const struct dt_key *key,
3055                                struct thandle *handle,
3056                                struct lustre_capa *capa)
3057 {
3058         struct osd_object          *obj    = osd_dt_obj(dt);
3059         struct inode               *dir    = obj->oo_inode;
3060         struct dentry              *dentry;
3061         struct osd_thandle         *oh;
3062         struct ldiskfs_dir_entry_2 *de;
3063         struct buffer_head         *bh;
3064         struct htree_lock          *hlock = NULL;
3065         int                         rc;
3066
3067         ENTRY;
3068
3069         LINVRNT(osd_invariant(obj));
3070         LASSERT(dt_object_exists(dt));
3071         LASSERT(handle != NULL);
3072
3073         osd_trans_exec_op(env, handle, OSD_OT_DELETE);
3074
3075         oh = container_of(handle, struct osd_thandle, ot_super);
3076         LASSERT(oh->ot_handle != NULL);
3077         LASSERT(oh->ot_handle->h_transaction != NULL);
3078
3079         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
3080                 RETURN(-EACCES);
3081
3082         ll_vfs_dq_init(dir);
3083         dentry = osd_child_dentry_get(env, obj,
3084                                       (char *)key, strlen((char *)key));
3085
3086         if (obj->oo_hl_head != NULL) {
3087                 hlock = osd_oti_get(env)->oti_hlock;
3088                 ldiskfs_htree_lock(hlock, obj->oo_hl_head,
3089                                    dir, LDISKFS_HLOCK_DEL);
3090         } else {
3091                 down_write(&obj->oo_ext_idx_sem);
3092         }
3093
3094         bh = osd_ldiskfs_find_entry(dir, dentry, &de, hlock);
3095         if (bh) {
3096                 rc = ldiskfs_delete_entry(oh->ot_handle,
3097                                           dir, de, bh);
3098                 brelse(bh);
3099         } else {
3100                 rc = -ENOENT;
3101         }
3102         if (hlock != NULL)
3103                 ldiskfs_htree_unlock(hlock);
3104         else
3105                 up_write(&obj->oo_ext_idx_sem);
3106
3107         LASSERT(osd_invariant(obj));
3108         RETURN(rc);
3109 }
3110
3111 /**
3112  *      Lookup index for \a key and copy record to \a rec.
3113  *
3114  *      \param  dt      osd index object
3115  *      \param  key     key for index
3116  *      \param  rec     record reference
3117  *
3118  *      \retval  +ve  success : exact mach
3119  *      \retval  0    return record with key not greater than \a key
3120  *      \retval -ve   failure
3121  */
3122 static int osd_index_iam_lookup(const struct lu_env *env, struct dt_object *dt,
3123                                 struct dt_rec *rec, const struct dt_key *key,
3124                                 struct lustre_capa *capa)
3125 {
3126         struct osd_object      *obj = osd_dt_obj(dt);
3127         struct iam_path_descr  *ipd;
3128         struct iam_container   *bag = &obj->oo_dir->od_container;
3129         struct osd_thread_info *oti = osd_oti_get(env);
3130         struct iam_iterator    *it = &oti->oti_idx_it;
3131         struct iam_rec         *iam_rec;
3132         int                     rc;
3133
3134         ENTRY;
3135
3136         LASSERT(osd_invariant(obj));
3137         LASSERT(dt_object_exists(dt));
3138         LASSERT(bag->ic_object == obj->oo_inode);
3139
3140         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
3141                 RETURN(-EACCES);
3142
3143         ipd = osd_idx_ipd_get(env, bag);
3144         if (IS_ERR(ipd))
3145                 RETURN(-ENOMEM);
3146
3147         /* got ipd now we can start iterator. */
3148         iam_it_init(it, bag, 0, ipd);
3149
3150         if (fid_is_quota(lu_object_fid(&dt->do_lu))) {
3151                 /* swab quota uid/gid provided by caller */
3152                 oti->oti_quota_id = cpu_to_le64(*((__u64 *)key));
3153                 key = (const struct dt_key *)&oti->oti_quota_id;
3154         }
3155
3156         rc = iam_it_get(it, (struct iam_key *)key);
3157         if (rc >= 0) {
3158                 if (S_ISDIR(obj->oo_inode->i_mode))
3159                         iam_rec = (struct iam_rec *)oti->oti_ldp;
3160                 else
3161                         iam_rec = (struct iam_rec *) rec;
3162
3163                 iam_reccpy(&it->ii_path.ip_leaf, (struct iam_rec *)iam_rec);
3164
3165                 if (S_ISDIR(obj->oo_inode->i_mode))
3166                         osd_fid_unpack((struct lu_fid *) rec,
3167                                        (struct osd_fid_pack *)iam_rec);
3168                 else if (fid_is_quota(lu_object_fid(&dt->do_lu)))
3169                         osd_quota_unpack(obj, rec);
3170         }
3171
3172         iam_it_put(it);
3173         iam_it_fini(it);
3174         osd_ipd_put(env, bag, ipd);
3175
3176         LINVRNT(osd_invariant(obj));
3177
3178         RETURN(rc);
3179 }
3180
3181 static int osd_index_declare_iam_insert(const struct lu_env *env,
3182                                         struct dt_object *dt,
3183                                         const struct dt_rec *rec,
3184                                         const struct dt_key *key,
3185                                         struct thandle *handle)
3186 {
3187         struct osd_thandle *oh;
3188
3189         LASSERT(dt_object_exists(dt));
3190         LASSERT(handle != NULL);
3191
3192         oh = container_of0(handle, struct osd_thandle, ot_super);
3193         LASSERT(oh->ot_handle == NULL);
3194
3195         osd_trans_declare_op(env, oh, OSD_OT_INSERT,
3196                              osd_dto_credits_noquota[DTO_INDEX_INSERT]);
3197
3198         return 0;
3199 }
3200
3201 /**
3202  *      Inserts (key, value) pair in \a dt index object.
3203  *
3204  *      \param  dt      osd index object
3205  *      \param  key     key for index
3206  *      \param  rec     record reference
3207  *      \param  th      transaction handler
3208  *
3209  *      \retval  0  success
3210  *      \retval -ve failure
3211  */
3212 static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt,
3213                                 const struct dt_rec *rec,
3214                                 const struct dt_key *key, struct thandle *th,
3215                                 struct lustre_capa *capa, int ignore_quota)
3216 {
3217         struct osd_object     *obj = osd_dt_obj(dt);
3218         struct iam_path_descr *ipd;
3219         struct osd_thandle    *oh;
3220         struct iam_container  *bag = &obj->oo_dir->od_container;
3221         struct osd_thread_info *oti = osd_oti_get(env);
3222         struct iam_rec         *iam_rec;
3223         int                     rc;
3224
3225         ENTRY;
3226
3227         LINVRNT(osd_invariant(obj));
3228         LASSERT(dt_object_exists(dt));
3229         LASSERT(bag->ic_object == obj->oo_inode);
3230         LASSERT(th != NULL);
3231
3232         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
3233                 RETURN(-EACCES);
3234
3235         osd_trans_exec_op(env, th, OSD_OT_INSERT);
3236
3237         ipd = osd_idx_ipd_get(env, bag);
3238         if (unlikely(ipd == NULL))
3239                 RETURN(-ENOMEM);
3240
3241         oh = container_of0(th, struct osd_thandle, ot_super);
3242         LASSERT(oh->ot_handle != NULL);
3243         LASSERT(oh->ot_handle->h_transaction != NULL);
3244         if (S_ISDIR(obj->oo_inode->i_mode)) {
3245                 iam_rec = (struct iam_rec *)oti->oti_ldp;
3246                 osd_fid_pack((struct osd_fid_pack *)iam_rec, rec, &oti->oti_fid);
3247         } else if (fid_is_quota(lu_object_fid(&dt->do_lu))) {
3248                 /* pack quota uid/gid */
3249                 oti->oti_quota_id = cpu_to_le64(*((__u64 *)key));
3250                 key = (const struct dt_key *)&oti->oti_quota_id;
3251                 /* pack quota record */
3252                 rec = osd_quota_pack(obj, rec, &oti->oti_quota_rec);
3253                 iam_rec = (struct iam_rec *)rec;
3254         } else {
3255                 iam_rec = (struct iam_rec *)rec;
3256         }
3257
3258         rc = iam_insert(oh->ot_handle, bag, (const struct iam_key *)key,
3259                         iam_rec, ipd);
3260         osd_ipd_put(env, bag, ipd);
3261         LINVRNT(osd_invariant(obj));
3262         RETURN(rc);
3263 }
3264
3265 /**
3266  * Calls ldiskfs_add_entry() to add directory entry
3267  * into the directory. This is required for
3268  * interoperability mode (b11826)
3269  *
3270  * \retval   0, on success
3271  * \retval -ve, on error
3272  */
3273 static int __osd_ea_add_rec(struct osd_thread_info *info,
3274                             struct osd_object *pobj, struct inode  *cinode,
3275                             const char *name, const struct dt_rec *fid,
3276                             struct htree_lock *hlock, struct thandle *th)
3277 {
3278         struct ldiskfs_dentry_param *ldp;
3279         struct dentry               *child;
3280         struct osd_thandle          *oth;
3281         int                          rc;
3282
3283         oth = container_of(th, struct osd_thandle, ot_super);
3284         LASSERT(oth->ot_handle != NULL);
3285         LASSERT(oth->ot_handle->h_transaction != NULL);
3286         LASSERT(pobj->oo_inode);
3287
3288         ldp = (struct ldiskfs_dentry_param *)info->oti_ldp;
3289         if (unlikely(pobj->oo_inode ==
3290                      osd_sb(osd_obj2dev(pobj))->s_root->d_inode))
3291                 ldp->edp_magic = 0;
3292         else
3293                 osd_get_ldiskfs_dirent_param(ldp, fid);
3294         child = osd_child_dentry_get(info->oti_env, pobj, name, strlen(name));
3295         child->d_fsdata = (void *)ldp;
3296         ll_vfs_dq_init(pobj->oo_inode);
3297         rc = osd_ldiskfs_add_entry(oth->ot_handle, child, cinode, hlock);
3298
3299         RETURN(rc);
3300 }
3301
3302 /**
3303  * Calls ldiskfs_add_dot_dotdot() to add dot and dotdot entries
3304  * into the directory.Also sets flags into osd object to
3305  * indicate dot and dotdot are created. This is required for
3306  * interoperability mode (b11826)
3307  *
3308  * \param dir   directory for dot and dotdot fixup.
3309  * \param obj   child object for linking
3310  *
3311  * \retval   0, on success
3312  * \retval -ve, on error
3313  */
3314 static int osd_add_dot_dotdot(struct osd_thread_info *info,
3315                               struct osd_object *dir,
3316                               struct inode  *parent_dir, const char *name,
3317                               const struct dt_rec *dot_fid,
3318                               const struct dt_rec *dot_dot_fid,
3319                               struct thandle *th)
3320 {
3321         struct inode                *inode = dir->oo_inode;
3322         struct ldiskfs_dentry_param *dot_ldp;
3323         struct ldiskfs_dentry_param *dot_dot_ldp;
3324         struct osd_thandle          *oth;
3325         int result = 0;
3326
3327         oth = container_of(th, struct osd_thandle, ot_super);
3328         LASSERT(oth->ot_handle->h_transaction != NULL);
3329         LASSERT(S_ISDIR(dir->oo_inode->i_mode));
3330
3331         if (strcmp(name, dot) == 0) {
3332                 if (dir->oo_compat_dot_created) {
3333                         result = -EEXIST;
3334                 } else {
3335                         LASSERT(inode == parent_dir);
3336                         dir->oo_compat_dot_created = 1;
3337                         result = 0;
3338                 }
3339         } else if(strcmp(name, dotdot) == 0) {
3340                 if (!dir->oo_compat_dot_created)
3341                         return -EINVAL;
3342
3343                 dot_dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp2;
3344                 osd_get_ldiskfs_dirent_param(dot_dot_ldp, dot_dot_fid);
3345                 /* in case of rename, dotdot is already created */
3346                 if (dir->oo_compat_dotdot_created)
3347                         return __osd_ea_add_rec(info, dir, parent_dir, name,
3348                                                 dot_dot_fid, NULL, th);
3349
3350                 dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp;
3351                 dot_ldp->edp_magic = 0;
3352                 result = ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir,
3353                                                 inode, dot_ldp, dot_dot_ldp);
3354                 if (result == 0)
3355                         dir->oo_compat_dotdot_created = 1;
3356         }
3357
3358         return result;
3359 }
3360
3361
3362 /**
3363  * It will call the appropriate osd_add* function and return the
3364  * value, return by respective functions.
3365  */
3366 static int osd_ea_add_rec(const struct lu_env *env, struct osd_object *pobj,
3367                           struct inode *cinode, const char *name,
3368                           const struct dt_rec *fid, struct thandle *th)
3369 {
3370         struct osd_thread_info *info   = osd_oti_get(env);
3371         struct htree_lock      *hlock;
3372         int                     rc;
3373
3374         hlock = pobj->oo_hl_head != NULL ? info->oti_hlock : NULL;
3375
3376         if (name[0] == '.' && (name[1] == '\0' || (name[1] == '.' &&
3377                                                    name[2] =='\0'))) {
3378                 if (hlock != NULL) {
3379                         ldiskfs_htree_lock(hlock, pobj->oo_hl_head,
3380                                            pobj->oo_inode, 0);
3381                 } else {
3382                         down_write(&pobj->oo_ext_idx_sem);
3383                 }
3384                 rc = osd_add_dot_dotdot(info, pobj, cinode, name,
3385                      (struct dt_rec *)lu_object_fid(&pobj->oo_dt.do_lu),
3386                                         fid, th);
3387         } else {
3388                 if (hlock != NULL) {
3389                         ldiskfs_htree_lock(hlock, pobj->oo_hl_head,
3390                                            pobj->oo_inode, LDISKFS_HLOCK_ADD);
3391                 } else {
3392                         down_write(&pobj->oo_ext_idx_sem);
3393                 }
3394
3395                 rc = __osd_ea_add_rec(info, pobj, cinode, name, fid,
3396                                       hlock, th);
3397         }
3398         if (hlock != NULL)
3399                 ldiskfs_htree_unlock(hlock);
3400         else
3401                 up_write(&pobj->oo_ext_idx_sem);
3402
3403         return rc;
3404 }
3405
3406 static void
3407 osd_consistency_check(struct osd_thread_info *oti, struct osd_device *dev,
3408                       struct osd_idmap_cache *oic)
3409 {
3410         struct osd_scrub    *scrub = &dev->od_scrub;
3411         struct lu_fid       *fid   = &oic->oic_fid;
3412         struct osd_inode_id *id    = &oti->oti_id;
3413         int                  once  = 0;
3414         int                  rc;
3415         ENTRY;
3416
3417         if (!fid_is_norm(fid) && !fid_is_igif(fid))
3418                 RETURN_EXIT;
3419
3420 again:
3421         rc = osd_oi_lookup(oti, dev, fid, id, true);
3422         if (rc != 0 && rc != -ENOENT)
3423                 RETURN_EXIT;
3424
3425         if (rc == 0 && osd_id_eq(id, &oic->oic_lid))
3426                 RETURN_EXIT;
3427
3428         if (thread_is_running(&scrub->os_thread)) {
3429                 rc = osd_oii_insert(dev, oic, rc == -ENOENT);
3430                 /* There is race condition between osd_oi_lookup and OI scrub.
3431                  * The OI scrub finished just after osd_oi_lookup() failure.
3432                  * Under such case, it is unnecessary to trigger OI scrub again,
3433                  * but try to call osd_oi_lookup() again. */
3434                 if (unlikely(rc == -EAGAIN))
3435                         goto again;
3436
3437                 RETURN_EXIT;
3438         }
3439
3440         if (!dev->od_noscrub && ++once == 1) {
3441                 CDEBUG(D_LFSCK, "Trigger OI scrub by RPC for "DFID"\n",
3442                        PFID(fid));
3443                 rc = osd_scrub_start(dev);
3444                 LCONSOLE_ERROR("%.16s: trigger OI scrub by RPC for "DFID
3445                                ", rc = %d [2]\n",
3446                                LDISKFS_SB(osd_sb(dev))->s_es->s_volume_name,
3447                                PFID(fid), rc);
3448                 if (rc == 0)
3449                         goto again;
3450         }
3451
3452         EXIT;
3453 }
3454
3455 /**
3456  * Calls ->lookup() to find dentry. From dentry get inode and
3457  * read inode's ea to get fid. This is required for  interoperability
3458  * mode (b11826)
3459  *
3460  * \retval   0, on success
3461  * \retval -ve, on error
3462  */
3463 static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
3464                              struct dt_rec *rec, const struct dt_key *key)
3465 {
3466         struct inode               *dir    = obj->oo_inode;
3467         struct dentry              *dentry;
3468         struct ldiskfs_dir_entry_2 *de;
3469         struct buffer_head         *bh;
3470         struct lu_fid              *fid = (struct lu_fid *) rec;
3471         struct htree_lock          *hlock = NULL;
3472         int                         ino;
3473         int                         rc;
3474         ENTRY;
3475
3476         LASSERT(dir->i_op != NULL && dir->i_op->lookup != NULL);
3477
3478         dentry = osd_child_dentry_get(env, obj,
3479                                       (char *)key, strlen((char *)key));
3480
3481         if (obj->oo_hl_head != NULL) {
3482                 hlock = osd_oti_get(env)->oti_hlock;
3483                 ldiskfs_htree_lock(hlock, obj->oo_hl_head,
3484                                    dir, LDISKFS_HLOCK_LOOKUP);
3485         } else {
3486                 down_read(&obj->oo_ext_idx_sem);
3487         }
3488
3489         bh = osd_ldiskfs_find_entry(dir, dentry, &de, hlock);
3490         if (bh) {
3491                 struct osd_thread_info *oti = osd_oti_get(env);
3492                 struct osd_idmap_cache *oic = &oti->oti_cache;
3493                 struct osd_device *dev = osd_obj2dev(obj);
3494                 struct osd_scrub *scrub = &dev->od_scrub;
3495                 struct scrub_file *sf = &scrub->os_file;
3496
3497                 ino = le32_to_cpu(de->inode);
3498                 rc = osd_get_fid_from_dentry(de, rec);
3499
3500                 /* done with de, release bh */
3501                 brelse(bh);
3502                 if (rc != 0)
3503                         rc = osd_ea_fid_get(env, obj, ino, fid, &oic->oic_lid);
3504                 else
3505                         osd_id_gen(&oic->oic_lid, ino, OSD_OII_NOGEN);
3506                 if (rc != 0) {
3507                         fid_zero(&oic->oic_fid);
3508                         GOTO(out, rc);
3509                 }
3510
3511                 oic->oic_fid = *fid;
3512                 if ((scrub->os_pos_current <= ino) &&
3513                     (sf->sf_flags & SF_INCONSISTENT ||
3514                      ldiskfs_test_bit(osd_oi_fid2idx(dev, fid),
3515                                       sf->sf_oi_bitmap)))
3516                         osd_consistency_check(oti, dev, oic);
3517         } else {
3518                 rc = -ENOENT;
3519         }
3520
3521         GOTO(out, rc);
3522
3523 out:
3524         if (hlock != NULL)
3525                 ldiskfs_htree_unlock(hlock);
3526         else
3527                 up_read(&obj->oo_ext_idx_sem);
3528         return rc;
3529 }
3530
3531 /**
3532  * Find the osd object for given fid.
3533  *
3534  * \param fid need to find the osd object having this fid
3535  *
3536  * \retval osd_object on success
3537  * \retval        -ve on error
3538  */
3539 struct osd_object *osd_object_find(const struct lu_env *env,
3540                                    struct dt_object *dt,
3541                                    const struct lu_fid *fid)
3542 {
3543         struct lu_device  *ludev = dt->do_lu.lo_dev;
3544         struct osd_object *child = NULL;
3545         struct lu_object  *luch;
3546         struct lu_object  *lo;
3547
3548         /*
3549          * at this point topdev might not exist yet
3550          * (i.e. MGS is preparing profiles). so we can
3551          * not rely on topdev and instead lookup with
3552          * our device passed as topdev. this can't work
3553          * if the object isn't cached yet (as osd doesn't
3554          * allocate lu_header). IOW, the object must be
3555          * in the cache, otherwise lu_object_alloc() crashes
3556          * -bzzz
3557          */
3558         luch = lu_object_find_at(env, ludev, fid, NULL);
3559         if (!IS_ERR(luch)) {
3560                 if (lu_object_exists(luch)) {
3561                         lo = lu_object_locate(luch->lo_header, ludev->ld_type);
3562                         if (lo != NULL)
3563                                 child = osd_obj(lo);
3564                         else
3565                                 LU_OBJECT_DEBUG(D_ERROR, env, luch,
3566                                                 "lu_object can't be located"
3567                                                 DFID"\n", PFID(fid));
3568
3569                         if (child == NULL) {
3570                                 lu_object_put(env, luch);
3571                                 CERROR("Unable to get osd_object\n");
3572                                 child = ERR_PTR(-ENOENT);
3573                         }
3574                 } else {
3575                         LU_OBJECT_DEBUG(D_ERROR, env, luch,
3576                                         "lu_object does not exists "DFID"\n",
3577                                         PFID(fid));
3578                         lu_object_put(env, luch);
3579                         child = ERR_PTR(-ENOENT);
3580                 }
3581         } else
3582                 child = (void *)luch;
3583
3584         return child;
3585 }
3586
3587 /**
3588  * Put the osd object once done with it.
3589  *
3590  * \param obj osd object that needs to be put
3591  */
3592 static inline void osd_object_put(const struct lu_env *env,
3593                                   struct osd_object *obj)
3594 {
3595         lu_object_put(env, &obj->oo_dt.do_lu);
3596 }
3597
3598 static int osd_index_declare_ea_insert(const struct lu_env *env,
3599                                        struct dt_object *dt,
3600                                        const struct dt_rec *rec,
3601                                        const struct dt_key *key,
3602                                        struct thandle *handle)
3603 {
3604         struct osd_thandle      *oh;
3605         struct inode            *inode;
3606         struct lu_fid           *fid = (struct lu_fid *)rec;
3607         int                     rc;
3608         ENTRY;
3609
3610         LASSERT(dt_object_exists(dt));
3611         LASSERT(handle != NULL);
3612
3613         oh = container_of0(handle, struct osd_thandle, ot_super);
3614         LASSERT(oh->ot_handle == NULL);
3615
3616         osd_trans_declare_op(env, oh, OSD_OT_INSERT,
3617                              osd_dto_credits_noquota[DTO_INDEX_INSERT]);
3618
3619         inode = osd_dt_obj(dt)->oo_inode;
3620         LASSERT(inode);
3621
3622         /* We ignore block quota on meta pool (MDTs), so needn't
3623          * calculate how many blocks will be consumed by this index
3624          * insert */
3625         rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, 0, oh,
3626                                    true, true, NULL, false);
3627         if (fid == NULL)
3628                 RETURN(0);
3629
3630         /* It does fld look up inside declare, and the result will be
3631         * added to fld cache, so the following fld lookup inside insert
3632         * does not need send RPC anymore, so avoid send rpc with holding
3633         * transaction */
3634         LASSERTF(fid_is_sane(fid), "fid is insane"DFID"\n", PFID(fid));
3635         osd_fld_lookup(env, osd_dt_dev(handle->th_dev), fid,
3636                         &osd_oti_get(env)->oti_seq_range);
3637
3638         RETURN(rc);
3639 }
3640
3641 /**
3642  * Index add function for interoperability mode (b11826).
3643  * It will add the directory entry.This entry is needed to
3644  * maintain name->fid mapping.
3645  *
3646  * \param key it is key i.e. file entry to be inserted
3647  * \param rec it is value of given key i.e. fid
3648  *
3649  * \retval   0, on success
3650  * \retval -ve, on error
3651  */
3652 static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
3653                                const struct dt_rec *rec,
3654                                const struct dt_key *key, struct thandle *th,
3655                                struct lustre_capa *capa, int ignore_quota)
3656 {
3657         struct osd_object *obj   = osd_dt_obj(dt);
3658         struct lu_fid     *fid   = (struct lu_fid *) rec;
3659         const char        *name  = (const char *)key;
3660         struct osd_object *child;
3661         int                rc;
3662
3663         ENTRY;
3664
3665         LASSERT(osd_invariant(obj));
3666         LASSERT(dt_object_exists(dt));
3667         LASSERT(th != NULL);
3668
3669         osd_trans_exec_op(env, th, OSD_OT_INSERT);
3670
3671         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
3672                 RETURN(-EACCES);
3673
3674         child = osd_object_find(env, dt, fid);
3675         if (!IS_ERR(child)) {
3676                 rc = osd_ea_add_rec(env, obj, child->oo_inode, name, rec, th);
3677                 osd_object_put(env, child);
3678         } else {
3679                 rc = PTR_ERR(child);
3680         }
3681
3682         LASSERT(osd_invariant(obj));
3683         RETURN(rc);
3684 }
3685
3686 /**
3687  *  Initialize osd Iterator for given osd index object.
3688  *
3689  *  \param  dt      osd index object
3690  */
3691
3692 static struct dt_it *osd_it_iam_init(const struct lu_env *env,
3693                                      struct dt_object *dt,
3694                                      __u32 unused,
3695                                      struct lustre_capa *capa)
3696 {
3697         struct osd_it_iam      *it;
3698         struct osd_thread_info *oti = osd_oti_get(env);
3699         struct osd_object      *obj = osd_dt_obj(dt);
3700         struct lu_object       *lo  = &dt->do_lu;
3701         struct iam_path_descr  *ipd;
3702         struct iam_container   *bag = &obj->oo_dir->od_container;
3703
3704         LASSERT(lu_object_exists(lo));
3705
3706         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
3707                 return ERR_PTR(-EACCES);
3708
3709         it = &oti->oti_it;
3710         ipd = osd_it_ipd_get(env, bag);
3711         if (likely(ipd != NULL)) {
3712                 it->oi_obj = obj;
3713                 it->oi_ipd = ipd;
3714                 lu_object_get(lo);
3715                 iam_it_init(&it->oi_it, bag, IAM_IT_MOVE, ipd);
3716                 return (struct dt_it *)it;
3717         }
3718         return ERR_PTR(-ENOMEM);
3719 }
3720
3721 /**
3722  * free given Iterator.
3723  */
3724
3725 static void osd_it_iam_fini(const struct lu_env *env, struct dt_it *di)
3726 {
3727         struct osd_it_iam *it = (struct osd_it_iam *)di;
3728         struct osd_object *obj = it->oi_obj;
3729
3730         iam_it_fini(&it->oi_it);
3731         osd_ipd_put(env, &obj->oo_dir->od_container, it->oi_ipd);
3732         lu_object_put(env, &obj->oo_dt.do_lu);
3733 }
3734
3735 /**
3736  *  Move Iterator to record specified by \a key
3737  *
3738  *  \param  di      osd iterator
3739  *  \param  key     key for index
3740  *
3741  *  \retval +ve  di points to record with least key not larger than key
3742  *  \retval  0   di points to exact matched key
3743  *  \retval -ve  failure
3744  */
3745
3746 static int osd_it_iam_get(const struct lu_env *env,
3747                           struct dt_it *di, const struct dt_key *key)
3748 {
3749         struct osd_thread_info  *oti = osd_oti_get(env);
3750         struct osd_it_iam       *it = (struct osd_it_iam *)di;
3751
3752         if (fid_is_quota(lu_object_fid(&it->oi_obj->oo_dt.do_lu))) {
3753                 /* swab quota uid/gid */
3754                 oti->oti_quota_id = cpu_to_le64(*((__u64 *)key));
3755                 key = (struct dt_key *)&oti->oti_quota_id;
3756         }
3757
3758         return iam_it_get(&it->oi_it, (const struct iam_key *)key);
3759 }
3760
3761 /**
3762  *  Release Iterator
3763  *
3764  *  \param  di      osd iterator
3765  */
3766
3767 static void osd_it_iam_put(const struct lu_env *env, struct dt_it *di)
3768 {
3769         struct osd_it_iam *it = (struct osd_it_iam *)di;
3770
3771         iam_it_put(&it->oi_it);
3772 }
3773
3774 /**
3775  *  Move iterator by one record
3776  *
3777  *  \param  di      osd iterator
3778  *
3779  *  \retval +1   end of container reached
3780  *  \retval  0   success
3781  *  \retval -ve  failure
3782  */
3783
3784 static int osd_it_iam_next(const struct lu_env *env, struct dt_it *di)
3785 {
3786         struct osd_it_iam *it = (struct osd_it_iam *)di;
3787
3788         return iam_it_next(&it->oi_it);
3789 }
3790
3791 /**
3792  * Return pointer to the key under iterator.
3793  */
3794
3795 static struct dt_key *osd_it_iam_key(const struct lu_env *env,
3796                                  const struct dt_it *di)
3797 {
3798         struct osd_thread_info *oti = osd_oti_get(env);
3799         struct osd_it_iam      *it = (struct osd_it_iam *)di;
3800         struct osd_object      *obj = it->oi_obj;
3801         struct dt_key          *key;
3802
3803         key = (struct dt_key *)iam_it_key_get(&it->oi_it);
3804
3805         if (!IS_ERR(key) && fid_is_quota(lu_object_fid(&obj->oo_dt.do_lu))) {
3806                 /* swab quota uid/gid */
3807                 oti->oti_quota_id = le64_to_cpu(*((__u64 *)key));
3808                 key = (struct dt_key *)&oti->oti_quota_id;
3809         }
3810
3811         return key;
3812 }
3813
3814 /**
3815  * Return size of key under iterator (in bytes)
3816  */
3817
3818 static int osd_it_iam_key_size(const struct lu_env *env, const struct dt_it *di)
3819 {
3820         struct osd_it_iam *it = (struct osd_it_iam *)di;
3821
3822         return iam_it_key_size(&it->oi_it);
3823 }
3824
3825 static inline void osd_it_append_attrs(struct lu_dirent *ent, __u32 attr,
3826                                        int len, __u16 type)
3827 {
3828         struct luda_type *lt;
3829         const unsigned    align = sizeof(struct luda_type) - 1;
3830
3831         /* check if file type is required */
3832         if (attr & LUDA_TYPE) {
3833                         len = (len + align) & ~align;
3834
3835                         lt = (void *) ent->lde_name + len;
3836                         lt->lt_type = cpu_to_le16(CFS_DTTOIF(type));
3837                         ent->lde_attrs |= LUDA_TYPE;
3838         }
3839
3840         ent->lde_attrs = cpu_to_le32(ent->lde_attrs);
3841 }
3842
3843 /**
3844  * build lu direct from backend fs dirent.
3845  */
3846
3847 static inline void osd_it_pack_dirent(struct lu_dirent *ent,
3848                                       struct lu_fid *fid, __u64 offset,
3849                                       char *name, __u16 namelen,
3850                                       __u16 type, __u32 attr)
3851 {
3852         fid_cpu_to_le(&ent->lde_fid, fid);
3853         ent->lde_attrs = LUDA_FID;
3854
3855         ent->lde_hash = cpu_to_le64(offset);
3856         ent->lde_reclen = cpu_to_le16(lu_dirent_calc_size(namelen, attr));
3857
3858         strncpy(ent->lde_name, name, namelen);
3859         ent->lde_namelen = cpu_to_le16(namelen);
3860
3861         /* append lustre attributes */
3862         osd_it_append_attrs(ent, attr, namelen, type);
3863 }
3864
3865 /**
3866  * Return pointer to the record under iterator.
3867  */
3868 static int osd_it_iam_rec(const struct lu_env *env,
3869                           const struct dt_it *di,
3870                           struct dt_rec *dtrec, __u32 attr)
3871 {
3872         struct osd_it_iam      *it   = (struct osd_it_iam *)di;
3873         struct osd_thread_info *info = osd_oti_get(env);
3874         ENTRY;
3875
3876         if (S_ISDIR(it->oi_obj->oo_inode->i_mode)) {
3877                 const struct osd_fid_pack *rec;
3878                 struct lu_fid             *fid = &info->oti_fid;
3879                 struct lu_dirent          *lde = (struct lu_dirent *)dtrec;
3880                 char                      *name;
3881                 int                        namelen;
3882                 __u64                      hash;
3883                 int                        rc;
3884
3885                 name = (char *)iam_it_key_get(&it->oi_it);
3886                 if (IS_ERR(name))
3887                         RETURN(PTR_ERR(name));
3888
3889                 namelen = iam_it_key_size(&it->oi_it);
3890
3891                 rec = (const struct osd_fid_pack *)iam_it_rec_get(&it->oi_it);
3892                 if (IS_ERR(rec))
3893                         RETURN(PTR_ERR(rec));
3894
3895                 rc = osd_fid_unpack(fid, rec);
3896                 if (rc)
3897                         RETURN(rc);
3898
3899                 hash = iam_it_store(&it->oi_it);
3900
3901                 /* IAM does not store object type in IAM index (dir) */
3902                 osd_it_pack_dirent(lde, fid, hash, name, namelen,
3903                                    0, LUDA_FID);
3904         } else if (fid_is_quota(lu_object_fid(&it->oi_obj->oo_dt.do_lu))) {
3905                 iam_reccpy(&it->oi_it.ii_path.ip_leaf,
3906                            (struct iam_rec *)dtrec);
3907                 osd_quota_unpack(it->oi_obj, dtrec);
3908         } else {
3909                 iam_reccpy(&it->oi_it.ii_path.ip_leaf,
3910                            (struct iam_rec *)dtrec);
3911         }
3912
3913         RETURN(0);
3914 }
3915
3916 /**
3917  * Returns cookie for current Iterator position.
3918  */
3919 static __u64 osd_it_iam_store(const struct lu_env *env, const struct dt_it *di)
3920 {
3921         struct osd_it_iam *it = (struct osd_it_iam *)di;
3922
3923         return iam_it_store(&it->oi_it);
3924 }
3925
3926 /**
3927  * Restore iterator from cookie.
3928  *
3929  * \param  di      osd iterator
3930  * \param  hash    Iterator location cookie
3931  *
3932  * \retval +ve  di points to record with least key not larger than key.
3933  * \retval  0   di points to exact matched key
3934  * \retval -ve  failure
3935  */
3936
3937 static int osd_it_iam_load(const struct lu_env *env,
3938                            const struct dt_it *di, __u64 hash)
3939 {
3940         struct osd_it_iam *it = (struct osd_it_iam *)di;
3941
3942         return iam_it_load(&it->oi_it, hash);
3943 }
3944
3945 static const struct dt_index_operations osd_index_iam_ops = {
3946         .dio_lookup         = osd_index_iam_lookup,
3947         .dio_declare_insert = osd_index_declare_iam_insert,
3948         .dio_insert         = osd_index_iam_insert,
3949         .dio_declare_delete = osd_index_declare_iam_delete,
3950         .dio_delete         = osd_index_iam_delete,
3951         .dio_it     = {
3952                 .init     = osd_it_iam_init,
3953                 .fini     = osd_it_iam_fini,
3954                 .get      = osd_it_iam_get,
3955                 .put      = osd_it_iam_put,
3956                 .next     = osd_it_iam_next,
3957                 .key      = osd_it_iam_key,
3958                 .key_size = osd_it_iam_key_size,
3959                 .rec      = osd_it_iam_rec,
3960                 .store    = osd_it_iam_store,
3961                 .load     = osd_it_iam_load
3962         }
3963 };
3964
3965
3966 /**
3967  * Creates or initializes iterator context.
3968  *
3969  * \retval struct osd_it_ea, iterator structure on success
3970  *
3971  */
3972 static struct dt_it *osd_it_ea_init(const struct lu_env *env,
3973                                     struct dt_object *dt,
3974                                     __u32 attr,
3975                                     struct lustre_capa *capa)
3976 {
3977         struct osd_object       *obj  = osd_dt_obj(dt);
3978         struct osd_thread_info  *info = osd_oti_get(env);
3979         struct osd_it_ea        *it   = &info->oti_it_ea;
3980         struct lu_object        *lo   = &dt->do_lu;
3981         struct dentry           *obj_dentry = &info->oti_it_dentry;
3982         ENTRY;
3983         LASSERT(lu_object_exists(lo));
3984
3985         obj_dentry->d_inode = obj->oo_inode;
3986         obj_dentry->d_sb = osd_sb(osd_obj2dev(obj));
3987         obj_dentry->d_name.hash = 0;
3988
3989         it->oie_rd_dirent       = 0;
3990         it->oie_it_dirent       = 0;
3991         it->oie_dirent          = NULL;
3992         it->oie_buf             = info->oti_it_ea_buf;
3993         it->oie_obj             = obj;
3994         it->oie_file.f_pos      = 0;
3995         it->oie_file.f_dentry   = obj_dentry;
3996         if (attr & LUDA_64BITHASH)
3997                 it->oie_file.f_mode |= FMODE_64BITHASH;
3998         else
3999                 it->oie_file.f_mode |= FMODE_32BITHASH;
4000         it->oie_file.f_mapping    = obj->oo_inode->i_mapping;
4001         it->oie_file.f_op         = obj->oo_inode->i_fop;
4002         it->oie_file.private_data = NULL;
4003         lu_object_get(lo);
4004         RETURN((struct dt_it *) it);
4005 }
4006
4007 /**
4008  * Destroy or finishes iterator context.
4009  *
4010  * \param di iterator structure to be destroyed
4011  */
4012 static void osd_it_ea_fini(const struct lu_env *env, struct dt_it *di)
4013 {
4014         struct osd_it_ea     *it   = (struct osd_it_ea *)di;
4015         struct osd_object    *obj  = it->oie_obj;
4016         struct inode       *inode  = obj->oo_inode;
4017
4018         ENTRY;
4019         it->oie_file.f_op->release(inode, &it->oie_file);
4020         lu_object_put(env, &obj->oo_dt.do_lu);
4021         EXIT;
4022 }
4023
4024 /**
4025  * It position the iterator at given key, so that next lookup continues from
4026  * that key Or it is similar to dio_it->load() but based on a key,
4027  * rather than file position.
4028  *
4029  * As a special convention, osd_it_ea_get(env, di, "") has to rewind iterator
4030  * to the beginning.
4031  *
4032  * TODO: Presently return +1 considering it is only used by mdd_dir_is_empty().
4033  */
4034 static int osd_it_ea_get(const struct lu_env *env,
4035                          struct dt_it *di, const struct dt_key *key)
4036 {
4037         struct osd_it_ea     *it   = (struct osd_it_ea *)di;
4038
4039         ENTRY;
4040         LASSERT(((const char *)key)[0] == '\0');
4041         it->oie_file.f_pos      = 0;
4042         it->oie_rd_dirent       = 0;
4043         it->oie_it_dirent       = 0;
4044         it->oie_dirent          = NULL;
4045
4046         RETURN(+1);
4047 }
4048
4049 /**
4050  * Does nothing
4051  */
4052 static void osd_it_ea_put(const struct lu_env *env, struct dt_it *di)
4053 {
4054 }
4055
4056 /**
4057  * It is called internally by ->readdir(). It fills the
4058  * iterator's in-memory data structure with required
4059  * information i.e. name, namelen, rec_size etc.
4060  *
4061  * \param buf in which information to be filled in.
4062  * \param name name of the file in given dir
4063  *
4064  * \retval 0 on success
4065  * \retval 1 on buffer full
4066  */
4067 static int osd_ldiskfs_filldir(char *buf, const char *name, int namelen,
4068                                loff_t offset, __u64 ino,
4069                                unsigned d_type)
4070 {
4071         struct osd_it_ea        *it   = (struct osd_it_ea *)buf;
4072         struct osd_object       *obj  = it->oie_obj;
4073         struct osd_it_ea_dirent *ent  = it->oie_dirent;
4074         struct lu_fid           *fid  = &ent->oied_fid;
4075         struct osd_fid_pack     *rec;
4076         ENTRY;
4077
4078         /* this should never happen */
4079         if (unlikely(namelen == 0 || namelen > LDISKFS_NAME_LEN)) {
4080                 CERROR("ldiskfs return invalid namelen %d\n", namelen);
4081                 RETURN(-EIO);
4082         }
4083
4084         if ((void *) ent - it->oie_buf + sizeof(*ent) + namelen >
4085             OSD_IT_EA_BUFSIZE)
4086                 RETURN(1);
4087
4088         /* "." is just the object itself. */
4089         if (namelen == 1 && name[0] == '.') {
4090                 *fid = obj->oo_dt.do_lu.lo_header->loh_fid;
4091         } else if (d_type & LDISKFS_DIRENT_LUFID) {
4092                 rec = (struct osd_fid_pack*) (name + namelen + 1);
4093                 if (osd_fid_unpack(fid, rec) != 0)
4094                         fid_zero(fid);
4095         } else {
4096                 fid_zero(fid);
4097         }
4098         d_type &= ~LDISKFS_DIRENT_LUFID;
4099
4100         /* NOT export local root. */
4101         if (unlikely(osd_sb(osd_obj2dev(obj))->s_root->d_inode->i_ino == ino)) {
4102                 ino = obj->oo_inode->i_ino;
4103                 *fid = obj->oo_dt.do_lu.lo_header->loh_fid;
4104         }
4105
4106         ent->oied_ino     = ino;
4107         ent->oied_off     = offset;
4108         ent->oied_namelen = namelen;
4109         ent->oied_type    = d_type;
4110
4111         memcpy(ent->oied_name, name, namelen);
4112
4113         it->oie_rd_dirent++;
4114         it->oie_dirent = (void *) ent + cfs_size_round(sizeof(*ent) + namelen);
4115         RETURN(0);
4116 }
4117
4118 /**
4119  * Calls ->readdir() to load a directory entry at a time
4120  * and stored it in iterator's in-memory data structure.
4121  *
4122  * \param di iterator's in memory structure
4123  *
4124  * \retval   0 on success
4125  * \retval -ve on error
4126  */
4127 static int osd_ldiskfs_it_fill(const struct lu_env *env,
4128                                const struct dt_it *di)
4129 {
4130         struct osd_it_ea   *it    = (struct osd_it_ea *)di;
4131         struct osd_object  *obj   = it->oie_obj;
4132         struct inode       *inode = obj->oo_inode;
4133         struct htree_lock  *hlock = NULL;
4134         int                 result = 0;
4135
4136         ENTRY;
4137         it->oie_dirent = it->oie_buf;
4138         it->oie_rd_dirent = 0;
4139
4140         if (obj->oo_hl_head != NULL) {
4141                 hlock = osd_oti_get(env)->oti_hlock;
4142                 ldiskfs_htree_lock(hlock, obj->oo_hl_head,
4143                                    inode, LDISKFS_HLOCK_READDIR);
4144         } else {
4145                 down_read(&obj->oo_ext_idx_sem);
4146         }
4147
4148         result = inode->i_fop->readdir(&it->oie_file, it,
4149                                        (filldir_t) osd_ldiskfs_filldir);
4150
4151         if (hlock != NULL)
4152                 ldiskfs_htree_unlock(hlock);
4153         else
4154                 up_read(&obj->oo_ext_idx_sem);
4155
4156         if (it->oie_rd_dirent == 0) {
4157                 result = -EIO;
4158         } else {
4159                 it->oie_dirent = it->oie_buf;
4160                 it->oie_it_dirent = 1;
4161         }
4162
4163         RETURN(result);
4164 }
4165
4166 /**
4167  * It calls osd_ldiskfs_it_fill() which will use ->readdir()
4168  * to load a directory entry at a time and stored it in
4169  * iterator's in-memory data structure.
4170  *
4171  * \param di iterator's in memory structure
4172  *
4173  * \retval +ve iterator reached to end
4174  * \retval   0 iterator not reached to end
4175  * \retval -ve on error
4176  */
4177 static int osd_it_ea_next(const struct lu_env *env, struct dt_it *di)
4178 {
4179         struct osd_it_ea *it = (struct osd_it_ea *)di;
4180         int rc;
4181
4182         ENTRY;
4183
4184         if (it->oie_it_dirent < it->oie_rd_dirent) {
4185                 it->oie_dirent =
4186                         (void *) it->oie_dirent +
4187                         cfs_size_round(sizeof(struct osd_it_ea_dirent) +
4188                                        it->oie_dirent->oied_namelen);
4189                 it->oie_it_dirent++;
4190                 RETURN(0);
4191         } else {
4192                 if (it->oie_file.f_pos == ldiskfs_get_htree_eof(&it->oie_file))
4193                         rc = +1;
4194                 else
4195                         rc = osd_ldiskfs_it_fill(env, di);
4196         }
4197
4198         RETURN(rc);
4199 }
4200
4201 /**
4202  * Returns the key at current position from iterator's in memory structure.
4203  *
4204  * \param di iterator's in memory structure
4205  *
4206  * \retval key i.e. struct dt_key on success
4207  */
4208 static struct dt_key *osd_it_ea_key(const struct lu_env *env,
4209                                     const struct dt_it *di)
4210 {
4211         struct osd_it_ea *it = (struct osd_it_ea *)di;
4212
4213         return (struct dt_key *)it->oie_dirent->oied_name;
4214 }
4215
4216 /**
4217  * Returns the key's size at current position from iterator's in memory structure.
4218  *
4219  * \param di iterator's in memory structure
4220  *
4221  * \retval key_size i.e. struct dt_key on success
4222  */
4223 static int osd_it_ea_key_size(const struct lu_env *env, const struct dt_it *di)
4224 {
4225         struct osd_it_ea *it = (struct osd_it_ea *)di;
4226
4227         return it->oie_dirent->oied_namelen;
4228 }
4229
4230
4231 /**
4232  * Returns the value (i.e. fid/igif) at current position from iterator's
4233  * in memory structure.
4234  *
4235  * \param di struct osd_it_ea, iterator's in memory structure
4236  * \param attr attr requested for dirent.
4237  * \param lde lustre dirent
4238  *
4239  * \retval   0 no error and \param lde has correct lustre dirent.
4240  * \retval -ve on error
4241  */
4242 static inline int osd_it_ea_rec(const struct lu_env *env,
4243                                 const struct dt_it *di,
4244                                 struct dt_rec *dtrec, __u32 attr)
4245 {
4246         struct osd_it_ea       *it    = (struct osd_it_ea *)di;
4247         struct osd_object      *obj   = it->oie_obj;
4248         struct osd_device      *dev   = osd_obj2dev(obj);
4249         struct osd_scrub       *scrub = &dev->od_scrub;
4250         struct scrub_file      *sf    = &scrub->os_file;
4251         struct osd_thread_info *oti   = osd_oti_get(env);
4252         struct osd_idmap_cache *oic   = &oti->oti_cache;
4253         struct lu_fid          *fid   = &it->oie_dirent->oied_fid;
4254         struct lu_dirent       *lde   = (struct lu_dirent *)dtrec;
4255         __u32                   ino   = it->oie_dirent->oied_ino;
4256         int                     rc    = 0;
4257         ENTRY;
4258
4259         if (!fid_is_sane(fid)) {
4260                 rc = osd_ea_fid_get(env, obj, ino, fid, &oic->oic_lid);
4261                 if (rc != 0) {
4262                         fid_zero(&oic->oic_fid);
4263                         RETURN(rc);
4264                 }
4265         } else {
4266                 osd_id_gen(&oic->oic_lid, ino, OSD_OII_NOGEN);
4267         }
4268
4269         osd_it_pack_dirent(lde, fid, it->oie_dirent->oied_off,
4270                            it->oie_dirent->oied_name,
4271                            it->oie_dirent->oied_namelen,
4272                            it->oie_dirent->oied_type, attr);
4273         oic->oic_fid = *fid;
4274         if ((scrub->os_pos_current <= ino) &&
4275             (sf->sf_flags & SF_INCONSISTENT ||
4276              ldiskfs_test_bit(osd_oi_fid2idx(dev, fid), sf->sf_oi_bitmap)))
4277                 osd_consistency_check(oti, dev, oic);
4278
4279         RETURN(rc);
4280 }
4281
4282 /**
4283  * Returns a cookie for current position of the iterator head, so that
4284  * user can use this cookie to load/start the iterator next time.
4285  *
4286  * \param di iterator's in memory structure
4287  *
4288  * \retval cookie for current position, on success
4289  */
4290 static __u64 osd_it_ea_store(const struct lu_env *env, const struct dt_it *di)
4291 {
4292         struct osd_it_ea *it = (struct osd_it_ea *)di;
4293
4294         return it->oie_dirent->oied_off;
4295 }
4296
4297 /**
4298  * It calls osd_ldiskfs_it_fill() which will use ->readdir()
4299  * to load a directory entry at a time and stored it i inn,
4300  * in iterator's in-memory data structure.
4301  *
4302  * \param di struct osd_it_ea, iterator's in memory structure
4303  *
4304  * \retval +ve on success
4305  * \retval -ve on error
4306  */
4307 static int osd_it_ea_load(const struct lu_env *env,
4308                           const struct dt_it *di, __u64 hash)
4309 {
4310         struct osd_it_ea *it = (struct osd_it_ea *)di;
4311         int rc;
4312
4313         ENTRY;
4314         it->oie_file.f_pos = hash;
4315
4316         rc =  osd_ldiskfs_it_fill(env, di);
4317         if (rc == 0)
4318                 rc = +1;
4319
4320         RETURN(rc);
4321 }
4322
4323 /**
4324  * Index lookup function for interoperability mode (b11826).
4325  *
4326  * \param key,  key i.e. file name to be searched
4327  *
4328  * \retval +ve, on success
4329  * \retval -ve, on error
4330  */
4331 static int osd_index_ea_lookup(const struct lu_env *env, struct dt_object *dt,
4332                                struct dt_rec *rec, const struct dt_key *key,
4333                                struct lustre_capa *capa)
4334 {
4335         struct osd_object *obj = osd_dt_obj(dt);
4336         int rc = 0;
4337
4338         ENTRY;
4339
4340         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
4341         LINVRNT(osd_invariant(obj));
4342
4343         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
4344                 return -EACCES;
4345
4346         rc = osd_ea_lookup_rec(env, obj, rec, key);
4347         if (rc == 0)
4348                 rc = +1;
4349         RETURN(rc);
4350 }
4351
4352 /**
4353  * Index and Iterator operations for interoperability
4354  * mode (i.e. to run 2.0 mds on 1.8 disk) (b11826)
4355  */
4356 static const struct dt_index_operations osd_index_ea_ops = {
4357         .dio_lookup         = osd_index_ea_lookup,
4358         .dio_declare_insert = osd_index_declare_ea_insert,
4359         .dio_insert         = osd_index_ea_insert,
4360         .dio_declare_delete = osd_index_declare_ea_delete,
4361         .dio_delete         = osd_index_ea_delete,
4362         .dio_it     = {
4363                 .init     = osd_it_ea_init,
4364                 .fini     = osd_it_ea_fini,
4365                 .get      = osd_it_ea_get,
4366                 .put      = osd_it_ea_put,
4367                 .next     = osd_it_ea_next,
4368                 .key      = osd_it_ea_key,
4369                 .key_size = osd_it_ea_key_size,
4370                 .rec      = osd_it_ea_rec,
4371                 .store    = osd_it_ea_store,
4372                 .load     = osd_it_ea_load
4373         }
4374 };
4375
4376 static void *osd_key_init(const struct lu_context *ctx,
4377                           struct lu_context_key *key)
4378 {
4379         struct osd_thread_info *info;
4380
4381         OBD_ALLOC_PTR(info);
4382         if (info == NULL)
4383                 return ERR_PTR(-ENOMEM);
4384
4385         OBD_ALLOC(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
4386         if (info->oti_it_ea_buf == NULL)
4387                 goto out_free_info;
4388
4389         info->oti_env = container_of(ctx, struct lu_env, le_ctx);
4390
4391         info->oti_hlock = ldiskfs_htree_lock_alloc();
4392         if (info->oti_hlock == NULL)
4393                 goto out_free_ea;
4394
4395         return info;
4396
4397  out_free_ea:
4398         OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
4399  out_free_info:
4400         OBD_FREE_PTR(info);
4401         return ERR_PTR(-ENOMEM);
4402 }
4403
4404 static void osd_key_fini(const struct lu_context *ctx,
4405                          struct lu_context_key *key, void* data)
4406 {
4407         struct osd_thread_info *info = data;
4408
4409         if (info->oti_hlock != NULL)
4410                 ldiskfs_htree_lock_free(info->oti_hlock);
4411         OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
4412         OBD_FREE_PTR(info);
4413 }
4414
4415 static void osd_key_exit(const struct lu_context *ctx,
4416                          struct lu_context_key *key, void *data)
4417 {
4418         struct osd_thread_info *info = data;
4419
4420         LASSERT(info->oti_r_locks == 0);
4421         LASSERT(info->oti_w_locks == 0);
4422         LASSERT(info->oti_txns    == 0);
4423 }
4424
4425 /* type constructor/destructor: osd_type_init, osd_type_fini */
4426 LU_TYPE_INIT_FINI(osd, &osd_key);
4427
4428 struct lu_context_key osd_key = {
4429         .lct_tags = LCT_DT_THREAD | LCT_MD_THREAD | LCT_MG_THREAD | LCT_LOCAL,
4430         .lct_init = osd_key_init,
4431         .lct_fini = osd_key_fini,
4432         .lct_exit = osd_key_exit
4433 };
4434
4435
4436 static int osd_device_init(const struct lu_env *env, struct lu_device *d,
4437                            const char *name, struct lu_device *next)
4438 {
4439         struct osd_device *osd = osd_dev(d);
4440
4441         strncpy(osd->od_svname, name, MAX_OBD_NAME);
4442         return osd_procfs_init(osd, name);
4443 }
4444
4445 static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
4446 {
4447         ENTRY;
4448
4449         osd_scrub_cleanup(env, o);
4450
4451         if (o->od_fsops) {
4452                 fsfilt_put_ops(o->od_fsops);
4453                 o->od_fsops = NULL;
4454         }
4455
4456         /* shutdown quota slave instance associated with the device */
4457         if (o->od_quota_slave != NULL) {
4458                 qsd_fini(env, o->od_quota_slave);
4459                 o->od_quota_slave = NULL;
4460         }
4461
4462         RETURN(0);
4463 }
4464
4465 static int osd_mount(const struct lu_env *env,
4466                      struct osd_device *o, struct lustre_cfg *cfg)
4467 {
4468         const char              *name  = lustre_cfg_string(cfg, 0);
4469         const char              *dev  = lustre_cfg_string(cfg, 1);
4470         const char              *opts;
4471         unsigned long            page, s_flags, lmd_flags = 0;
4472         struct page             *__page;
4473         struct file_system_type *type;
4474         char                    *options = NULL;
4475         char                    *str;
4476         int                       rc = 0;
4477         ENTRY;
4478
4479         if (o->od_mnt != NULL)
4480                 RETURN(0);
4481
4482         if (strlen(dev) >= sizeof(o->od_mntdev))
4483                 RETURN(-E2BIG);
4484         strcpy(o->od_mntdev, dev);
4485
4486         o->od_fsops = fsfilt_get_ops(mt_str(LDD_MT_LDISKFS));
4487         if (o->od_fsops == NULL) {
4488                 CERROR("Can't find fsfilt_ldiskfs\n");
4489                 RETURN(-ENOTSUPP);
4490         }
4491
4492         OBD_PAGE_ALLOC(__page, CFS_ALLOC_STD);
4493         if (__page == NULL)
4494                 GOTO(out, rc = -ENOMEM);
4495
4496         str = lustre_cfg_string(cfg, 2);
4497         s_flags = simple_strtoul(str, NULL, 0);
4498         str = strstr(str, ":");
4499         if (str)
4500                 lmd_flags = simple_strtoul(str + 1, NULL, 0);
4501         opts = lustre_cfg_string(cfg, 3);
4502         page = (unsigned long)cfs_page_address(__page);
4503         options = (char *)page;
4504         *options = '\0';
4505         if (opts == NULL)
4506                 strcat(options, "user_xattr,acl");
4507         else
4508                 strcat(options, opts);
4509
4510         /* Glom up mount options */
4511         if (*options != '\0')
4512                 strcat(options, ",");
4513         strlcat(options, "no_mbcache", CFS_PAGE_SIZE);
4514
4515         type = get_fs_type("ldiskfs");
4516         if (!type) {
4517                 CERROR("%s: cannot find ldiskfs module\n", name);
4518                 GOTO(out, rc = -ENODEV);
4519         }
4520
4521         o->od_mnt = vfs_kern_mount(type, s_flags, dev, options);
4522         cfs_module_put(type->owner);
4523
4524         if (IS_ERR(o->od_mnt)) {
4525                 rc = PTR_ERR(o->od_mnt);
4526                 CERROR("%s: can't mount %s: %d\n", name, dev, rc);
4527                 o->od_mnt = NULL;
4528                 GOTO(out, rc);
4529         }
4530
4531         if (lvfs_check_rdonly(o->od_mnt->mnt_sb->s_bdev)) {
4532                 CERROR("%s: underlying device %s is marked as read-only. "
4533                        "Setup failed\n", name, dev);
4534                 mntput(o->od_mnt);
4535                 o->od_mnt = NULL;
4536                 GOTO(out, rc = -EROFS);
4537         }
4538
4539         if (!LDISKFS_HAS_COMPAT_FEATURE(o->od_mnt->mnt_sb,
4540             LDISKFS_FEATURE_COMPAT_HAS_JOURNAL)) {
4541                 CERROR("%s: device %s is mounted w/o journal\n", name, dev);
4542                 mntput(o->od_mnt);
4543                 o->od_mnt = NULL;
4544                 GOTO(out, rc = -EINVAL);
4545         }
4546
4547         ldiskfs_set_inode_state(osd_sb(o)->s_root->d_inode,
4548                                 LDISKFS_STATE_LUSTRE_NO_OI);
4549         if (lmd_flags & LMD_FLG_NOSCRUB)
4550                 o->od_noscrub = 1;
4551
4552 out:
4553         if (__page)
4554                 OBD_PAGE_FREE(__page);
4555         if (rc)
4556                 fsfilt_put_ops(o->od_fsops);
4557
4558         RETURN(rc);
4559 }
4560
4561 static struct lu_device *osd_device_fini(const struct lu_env *env,
4562                                          struct lu_device *d)
4563 {
4564         int rc;
4565         ENTRY;
4566
4567         rc = osd_shutdown(env, osd_dev(d));
4568
4569         osd_obj_map_fini(osd_dev(d));
4570
4571         shrink_dcache_sb(osd_sb(osd_dev(d)));
4572         osd_sync(env, lu2dt_dev(d));
4573
4574         rc = osd_procfs_fini(osd_dev(d));
4575         if (rc) {
4576                 CERROR("proc fini error %d \n", rc);
4577                 RETURN (ERR_PTR(rc));
4578         }
4579
4580         if (osd_dev(d)->od_mnt) {
4581                 mntput(osd_dev(d)->od_mnt);
4582                 osd_dev(d)->od_mnt = NULL;
4583         }
4584
4585         RETURN(NULL);
4586 }
4587
4588 static int osd_device_init0(const struct lu_env *env,
4589                             struct osd_device *o,
4590                             struct lustre_cfg *cfg)
4591 {
4592         struct lu_device        *l = osd2lu_dev(o);
4593         struct osd_thread_info *info;
4594         int                     rc;
4595
4596         /* if the module was re-loaded, env can loose its keys */
4597         rc = lu_env_refill((struct lu_env *) env);
4598         if (rc)
4599                 GOTO(out, rc);
4600         info = osd_oti_get(env);
4601         LASSERT(info);
4602
4603         l->ld_ops = &osd_lu_ops;
4604         o->od_dt_dev.dd_ops = &osd_dt_ops;
4605
4606         spin_lock_init(&o->od_osfs_lock);
4607         mutex_init(&o->od_otable_mutex);
4608         o->od_osfs_age = cfs_time_shift_64(-1000);
4609
4610         o->od_capa_hash = init_capa_hash();
4611         if (o->od_capa_hash == NULL)
4612                 GOTO(out, rc = -ENOMEM);
4613
4614         o->od_read_cache = 1;
4615         o->od_writethrough_cache = 1;
4616         o->od_readcache_max_filesize = OSD_MAX_CACHE_SIZE;
4617
4618         rc = osd_mount(env, o, cfg);
4619         if (rc)
4620                 GOTO(out_capa, rc);
4621
4622         CFS_INIT_LIST_HEAD(&o->od_ios_list);
4623         /* setup scrub, including OI files initialization */
4624         rc = osd_scrub_setup(env, o);
4625         if (rc < 0)
4626                 GOTO(out_mnt, rc);
4627
4628         strncpy(o->od_svname, lustre_cfg_string(cfg, 4),
4629                         sizeof(o->od_svname) - 1);
4630
4631         rc = osd_obj_map_init(o);
4632         if (rc != 0)
4633                 GOTO(out_scrub, rc);
4634
4635         rc = lu_site_init(&o->od_site, l);
4636         if (rc)
4637                 GOTO(out_compat, rc);
4638         o->od_site.ls_bottom_dev = l;
4639
4640         rc = lu_site_init_finish(&o->od_site);
4641         if (rc)
4642                 GOTO(out_site, rc);
4643
4644         rc = osd_procfs_init(o, o->od_svname);
4645         if (rc != 0) {
4646                 CERROR("%s: can't initialize procfs: rc = %d\n",
4647                        o->od_svname, rc);
4648                 GOTO(out_site, rc);
4649         }
4650
4651         LASSERT(l->ld_site->ls_linkage.next && l->ld_site->ls_linkage.prev);
4652
4653         /* initialize quota slave instance */
4654         o->od_quota_slave = qsd_init(env, o->od_svname, &o->od_dt_dev,
4655                                      o->od_proc_entry);
4656         if (IS_ERR(o->od_quota_slave)) {
4657                 rc = PTR_ERR(o->od_quota_slave);
4658                 o->od_quota_slave = NULL;
4659                 GOTO(out_procfs, rc);
4660         }
4661
4662         RETURN(0);
4663 out_procfs:
4664         osd_procfs_fini(o);
4665 out_site:
4666         lu_site_fini(&o->od_site);
4667 out_compat:
4668         osd_obj_map_fini(o);
4669 out_scrub:
4670         osd_scrub_cleanup(env, o);
4671 out_mnt:
4672         osd_oi_fini(info, o);
4673         osd_shutdown(env, o);
4674         mntput(o->od_mnt);
4675         o->od_mnt = NULL;
4676 out_capa:
4677         cleanup_capa_hash(o->od_capa_hash);
4678 out:
4679         RETURN(rc);
4680 }
4681
4682 static struct lu_device *osd_device_alloc(const struct lu_env *env,
4683                                           struct lu_device_type *t,
4684                                           struct lustre_cfg *cfg)
4685 {
4686         struct osd_device *o;
4687         int                rc;
4688
4689         OBD_ALLOC_PTR(o);
4690         if (o == NULL)
4691                 return ERR_PTR(-ENOMEM);
4692
4693         rc = dt_device_init(&o->od_dt_dev, t);
4694         if (rc == 0) {
4695                 /* Because the ctx might be revived in dt_device_init,
4696                  * refill the env here */
4697                 lu_env_refill((struct lu_env *)env);
4698                 rc = osd_device_init0(env, o, cfg);
4699                 if (rc)
4700                         dt_device_fini(&o->od_dt_dev);
4701         }
4702
4703         if (unlikely(rc != 0))
4704                 OBD_FREE_PTR(o);
4705
4706         return rc == 0 ? osd2lu_dev(o) : ERR_PTR(rc);
4707 }
4708
4709 static struct lu_device *osd_device_free(const struct lu_env *env,
4710                                          struct lu_device *d)
4711 {
4712         struct osd_device *o = osd_dev(d);
4713         ENTRY;
4714
4715         cleanup_capa_hash(o->od_capa_hash);
4716         /* XXX: make osd top device in order to release reference */
4717         d->ld_site->ls_top_dev = d;
4718         lu_site_purge(env, d->ld_site, -1);
4719         if (!cfs_hash_is_empty(d->ld_site->ls_obj_hash)) {
4720                 LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_ERROR, NULL);
4721                 lu_site_print(env, d->ld_site, &msgdata, lu_cdebug_printer);
4722         }
4723         lu_site_fini(&o->od_site);
4724         dt_device_fini(&o->od_dt_dev);
4725         OBD_FREE_PTR(o);
4726         RETURN(NULL);
4727 }
4728
4729 static int osd_process_config(const struct lu_env *env,
4730                               struct lu_device *d, struct lustre_cfg *cfg)
4731 {
4732         struct osd_device *o = osd_dev(d);
4733         int err;
4734         ENTRY;
4735
4736         switch(cfg->lcfg_command) {
4737         case LCFG_SETUP:
4738                 err = osd_mount(env, o, cfg);
4739                 break;
4740         case LCFG_CLEANUP:
4741                 lu_dev_del_linkage(d->ld_site, d);
4742                 err = osd_shutdown(env, o);
4743                 break;
4744         default:
4745                 err = -ENOSYS;
4746         }
4747
4748         RETURN(err);
4749 }
4750
4751 static int osd_recovery_complete(const struct lu_env *env,
4752                                  struct lu_device *d)
4753 {
4754         struct osd_device       *osd = osd_dev(d);
4755         int                      rc = 0;
4756         ENTRY;
4757
4758         if (osd->od_quota_slave == NULL)
4759                 RETURN(0);
4760
4761         /* start qsd instance on recovery completion, this notifies the quota
4762          * slave code that we are about to process new requests now */
4763         rc = qsd_start(env, osd->od_quota_slave);
4764         RETURN(rc);
4765 }
4766
4767 /*
4768  * we use exports to track all osd users
4769  */
4770 static int osd_obd_connect(const struct lu_env *env, struct obd_export **exp,
4771                            struct obd_device *obd, struct obd_uuid *cluuid,
4772                            struct obd_connect_data *data, void *localdata)
4773 {
4774         struct osd_device    *osd = osd_dev(obd->obd_lu_dev);
4775         struct lustre_handle  conn;
4776         int                   rc;
4777         ENTRY;
4778
4779         CDEBUG(D_CONFIG, "connect #%d\n", osd->od_connects);
4780
4781         rc = class_connect(&conn, obd, cluuid);
4782         if (rc)
4783                 RETURN(rc);
4784
4785         *exp = class_conn2export(&conn);
4786
4787         spin_lock(&osd->od_osfs_lock);
4788         osd->od_connects++;
4789         spin_unlock(&osd->od_osfs_lock);
4790
4791         RETURN(0);
4792 }
4793
4794 /*
4795  * once last export (we don't count self-export) disappeared
4796  * osd can be released
4797  */
4798 static int osd_obd_disconnect(struct obd_export *exp)
4799 {
4800         struct obd_device *obd = exp->exp_obd;
4801         struct osd_device *osd = osd_dev(obd->obd_lu_dev);
4802         int                rc, release = 0;
4803         ENTRY;
4804
4805         /* Only disconnect the underlying layers on the final disconnect. */
4806         spin_lock(&osd->od_osfs_lock);
4807         osd->od_connects--;
4808         if (osd->od_connects == 0)
4809                 release = 1;
4810         spin_unlock(&osd->od_osfs_lock);
4811
4812         rc = class_disconnect(exp); /* bz 9811 */
4813
4814         if (rc == 0 && release)
4815                 class_manual_cleanup(obd);
4816         RETURN(rc);
4817 }
4818
4819 static int osd_prepare(const struct lu_env *env, struct lu_device *pdev,
4820                        struct lu_device *dev)
4821 {
4822         struct osd_device *osd = osd_dev(dev);
4823         int                result = 0;
4824         ENTRY;
4825
4826         if (dev->ld_site && lu_device_is_md(dev->ld_site->ls_top_dev)) {
4827                 /* MDT/MDD still use old infrastructure to create
4828                  * special files */
4829                 result = llo_local_objects_setup(env, lu2md_dev(pdev),
4830                                                  lu2dt_dev(dev));
4831                 if (result)
4832                         RETURN(result);
4833         }
4834
4835         if (osd->od_quota_slave != NULL)
4836                 /* set up quota slave objects */
4837                 result = qsd_prepare(env, osd->od_quota_slave);
4838
4839         RETURN(result);
4840 }
4841
4842 static const struct lu_object_operations osd_lu_obj_ops = {
4843         .loo_object_init      = osd_object_init,
4844         .loo_object_delete    = osd_object_delete,
4845         .loo_object_release   = osd_object_release,
4846         .loo_object_free      = osd_object_free,
4847         .loo_object_print     = osd_object_print,
4848         .loo_object_invariant = osd_object_invariant
4849 };
4850
4851 const struct lu_device_operations osd_lu_ops = {
4852         .ldo_object_alloc      = osd_object_alloc,
4853         .ldo_process_config    = osd_process_config,
4854         .ldo_recovery_complete = osd_recovery_complete,
4855         .ldo_prepare           = osd_prepare,
4856 };
4857
4858 static const struct lu_device_type_operations osd_device_type_ops = {
4859         .ldto_init = osd_type_init,
4860         .ldto_fini = osd_type_fini,
4861
4862         .ldto_start = osd_type_start,
4863         .ldto_stop  = osd_type_stop,
4864
4865         .ldto_device_alloc = osd_device_alloc,
4866         .ldto_device_free  = osd_device_free,
4867
4868         .ldto_device_init    = osd_device_init,
4869         .ldto_device_fini    = osd_device_fini
4870 };
4871
4872 struct lu_device_type osd_device_type = {
4873         .ldt_tags     = LU_DEVICE_DT,
4874         .ldt_name     = LUSTRE_OSD_LDISKFS_NAME,
4875         .ldt_ops      = &osd_device_type_ops,
4876         .ldt_ctx_tags = LCT_LOCAL,
4877 };
4878
4879 /*
4880  * lprocfs legacy support.
4881  */
4882 static struct obd_ops osd_obd_device_ops = {
4883         .o_owner = THIS_MODULE,
4884         .o_connect      = osd_obd_connect,
4885         .o_disconnect   = osd_obd_disconnect
4886 };
4887
4888 static int __init osd_mod_init(void)
4889 {
4890         struct lprocfs_static_vars lvars;
4891
4892         osd_oi_mod_init();
4893         lprocfs_osd_init_vars(&lvars);
4894         return class_register_type(&osd_obd_device_ops, NULL, lvars.module_vars,
4895                                    LUSTRE_OSD_LDISKFS_NAME, &osd_device_type);
4896 }
4897
4898 static void __exit osd_mod_exit(void)
4899 {
4900         class_unregister_type(LUSTRE_OSD_LDISKFS_NAME);
4901 }
4902
4903 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4904 MODULE_DESCRIPTION("Lustre Object Storage Device ("LUSTRE_OSD_LDISKFS_NAME")");
4905 MODULE_LICENSE("GPL");
4906
4907 cfs_module(osd, "0.1.0", osd_mod_init, osd_mod_exit);