Whamcloud - gitweb
LU-3105 osd: remove capa related stuff from servers
[fs/lustre-release.git] / lustre / osd-zfs / osd_index.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/osd-zfs/osd_index.c
37  *
38  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
39  * Author: Mike Pershin <tappro@whamcloud.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_OSD
43
44 #include <lustre_ver.h>
45 #include <libcfs/libcfs.h>
46 #include <obd_support.h>
47 #include <lustre_net.h>
48 #include <obd.h>
49 #include <obd_class.h>
50 #include <lustre_disk.h>
51 #include <lustre_fid.h>
52
53 #include "osd_internal.h"
54
55 #include <sys/dnode.h>
56 #include <sys/dbuf.h>
57 #include <sys/spa.h>
58 #include <sys/stat.h>
59 #include <sys/zap.h>
60 #include <sys/spa_impl.h>
61 #include <sys/zfs_znode.h>
62 #include <sys/dmu_tx.h>
63 #include <sys/dmu_objset.h>
64 #include <sys/dsl_prop.h>
65 #include <sys/sa_impl.h>
66 #include <sys/txg.h>
67
68 static inline int osd_object_is_zap(dmu_buf_t *db)
69 {
70         dmu_buf_impl_t *dbi = (dmu_buf_impl_t *) db;
71         dnode_t *dn;
72         int rc;
73
74         DB_DNODE_ENTER(dbi);
75         dn = DB_DNODE(dbi);
76         rc = (dn->dn_type == DMU_OT_DIRECTORY_CONTENTS ||
77                         dn->dn_type == DMU_OT_USERGROUP_USED);
78         DB_DNODE_EXIT(dbi);
79
80         return rc;
81 }
82
83 /* We don't actually have direct access to the zap_hashbits() function
84  * so just pretend like we do for now.  If this ever breaks we can look at
85  * it at that time. */
86 #define zap_hashbits(zc) 48
87 /*
88  * ZFS hash format:
89  * | cd (16 bits) | hash (48 bits) |
90  * we need it in other form:
91  * |0| hash (48 bit) | cd (15 bit) |
92  * to be a full 64-bit ordered hash so that Lustre readdir can use it to merge
93  * the readdir hashes from multiple directory stripes uniformly on the client.
94  * Another point is sign bit, the hash range should be in [0, 2^63-1] because
95  * loff_t (for llseek) needs to be a positive value.  This means the "cd" field
96  * should only be the low 15 bits.
97  */
98 uint64_t osd_zap_cursor_serialize(zap_cursor_t *zc)
99 {
100         uint64_t zfs_hash = zap_cursor_serialize(zc) & (~0ULL >> 1);
101
102         return (zfs_hash >> zap_hashbits(zc)) |
103                 (zfs_hash << (63 - zap_hashbits(zc)));
104 }
105
106 void osd_zap_cursor_init_serialized(zap_cursor_t *zc, struct objset *os,
107                                     uint64_t id, uint64_t dirhash)
108 {
109         uint64_t zfs_hash = ((dirhash << zap_hashbits(zc)) & (~0ULL >> 1)) |
110                 (dirhash >> (63 - zap_hashbits(zc)));
111
112         zap_cursor_init_serialized(zc, os, id, zfs_hash);
113 }
114
115 int osd_zap_cursor_init(zap_cursor_t **zc, struct objset *os,
116                         uint64_t id, uint64_t dirhash)
117 {
118         zap_cursor_t *t;
119
120         OBD_ALLOC_PTR(t);
121         if (unlikely(t == NULL))
122                 return -ENOMEM;
123
124         osd_zap_cursor_init_serialized(t, os, id, dirhash);
125         *zc = t;
126
127         return 0;
128 }
129
130 void osd_zap_cursor_fini(zap_cursor_t *zc)
131 {
132         zap_cursor_fini(zc);
133         OBD_FREE_PTR(zc);
134 }
135
136 static inline void osd_obj_cursor_init_serialized(zap_cursor_t *zc,
137                                                  struct osd_object *o,
138                                                  uint64_t dirhash)
139 {
140         struct osd_device *d = osd_obj2dev(o);
141         osd_zap_cursor_init_serialized(zc, d->od_os,
142                                        o->oo_db->db_object, dirhash);
143 }
144
145 static inline int osd_obj_cursor_init(zap_cursor_t **zc, struct osd_object *o,
146                         uint64_t dirhash)
147 {
148         struct osd_device *d = osd_obj2dev(o);
149         return osd_zap_cursor_init(zc, d->od_os, o->oo_db->db_object, dirhash);
150 }
151
152 static struct dt_it *osd_index_it_init(const struct lu_env *env,
153                                        struct dt_object *dt,
154                                        __u32 unused)
155 {
156         struct osd_thread_info  *info = osd_oti_get(env);
157         struct osd_zap_it       *it;
158         struct osd_object       *obj = osd_dt_obj(dt);
159         struct lu_object        *lo  = &dt->do_lu;
160         int                      rc;
161         ENTRY;
162
163         LASSERT(lu_object_exists(lo));
164         LASSERT(obj->oo_db);
165         LASSERT(osd_object_is_zap(obj->oo_db));
166         LASSERT(info);
167
168         OBD_SLAB_ALLOC_PTR_GFP(it, osd_zapit_cachep, GFP_NOFS);
169         if (it == NULL)
170                 RETURN(ERR_PTR(-ENOMEM));
171
172         rc = osd_obj_cursor_init(&it->ozi_zc, obj, 0);
173         if (rc != 0) {
174                 OBD_SLAB_FREE_PTR(it, osd_zapit_cachep);
175                 RETURN(ERR_PTR(rc));
176         }
177
178         it->ozi_obj   = obj;
179         it->ozi_reset = 1;
180         lu_object_get(lo);
181
182         RETURN((struct dt_it *)it);
183 }
184
185 static void osd_index_it_fini(const struct lu_env *env, struct dt_it *di)
186 {
187         struct osd_zap_it       *it     = (struct osd_zap_it *)di;
188         struct osd_object       *obj;
189         ENTRY;
190
191         LASSERT(it);
192         LASSERT(it->ozi_obj);
193
194         obj = it->ozi_obj;
195
196         osd_zap_cursor_fini(it->ozi_zc);
197         lu_object_put(env, &obj->oo_dt.do_lu);
198         OBD_SLAB_FREE_PTR(it, osd_zapit_cachep);
199
200         EXIT;
201 }
202
203
204 static void osd_index_it_put(const struct lu_env *env, struct dt_it *di)
205 {
206         /* PBS: do nothing : ref are incremented at retrive and decreamented
207          *      next/finish. */
208 }
209
210 static inline void osd_it_append_attrs(struct lu_dirent *ent, __u32 attr,
211                                        int len, __u16 type)
212 {
213         const unsigned    align = sizeof(struct luda_type) - 1;
214         struct luda_type *lt;
215
216         /* check if file type is required */
217         if (attr & LUDA_TYPE) {
218                 len = (len + align) & ~align;
219
220                 lt = (void *)ent->lde_name + len;
221                 lt->lt_type = cpu_to_le16(DTTOIF(type));
222                 ent->lde_attrs |= LUDA_TYPE;
223         }
224
225         ent->lde_attrs = cpu_to_le32(ent->lde_attrs);
226 }
227
228 /*
229  * as we don't know FID, we can't use LU object, so this function
230  * partially duplicate __osd_xattr_get() which is built around
231  * LU-object and uses it to cache data like regular EA dnode, etc
232  */
233 static int osd_find_parent_by_dnode(const struct lu_env *env,
234                                     struct dt_object *o,
235                                     struct lu_fid *fid)
236 {
237         struct osd_device       *osd = osd_obj2dev(osd_dt_obj(o));
238         struct lustre_mdt_attrs *lma;
239         struct lu_buf            buf;
240         sa_handle_t             *sa_hdl;
241         nvlist_t                *nvbuf = NULL;
242         uchar_t                 *value;
243         uint64_t                 dnode;
244         int                      rc, size;
245         ENTRY;
246
247         /* first of all, get parent dnode from own attributes */
248         LASSERT(osd_dt_obj(o)->oo_db);
249         rc = -sa_handle_get(osd->od_os, osd_dt_obj(o)->oo_db->db_object,
250                             NULL, SA_HDL_PRIVATE, &sa_hdl);
251         if (rc)
252                 RETURN(rc);
253
254         dnode = ZFS_NO_OBJECT;
255         rc = -sa_lookup(sa_hdl, SA_ZPL_PARENT(osd), &dnode, 8);
256         sa_handle_destroy(sa_hdl);
257         if (rc)
258                 RETURN(rc);
259
260         /* now get EA buffer */
261         rc = __osd_xattr_load(osd, dnode, &nvbuf);
262         if (rc)
263                 GOTO(regular, rc);
264
265         /* XXX: if we get that far.. should we cache the result? */
266
267         /* try to find LMA attribute */
268         LASSERT(nvbuf != NULL);
269         rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA, &value, &size);
270         if (rc == 0 && size >= sizeof(*lma)) {
271                 lma = (struct lustre_mdt_attrs *)value;
272                 lustre_lma_swab(lma);
273                 *fid = lma->lma_self_fid;
274                 GOTO(out, rc = 0);
275         }
276
277 regular:
278         /* no LMA attribute in SA, let's try regular EA */
279
280         /* first of all, get parent dnode storing regular EA */
281         rc = -sa_handle_get(osd->od_os, dnode, NULL, SA_HDL_PRIVATE, &sa_hdl);
282         if (rc)
283                 GOTO(out, rc);
284
285         dnode = ZFS_NO_OBJECT;
286         rc = -sa_lookup(sa_hdl, SA_ZPL_XATTR(osd), &dnode, 8);
287         sa_handle_destroy(sa_hdl);
288         if (rc)
289                 GOTO(out, rc);
290
291         CLASSERT(sizeof(*lma) <= sizeof(osd_oti_get(env)->oti_buf));
292         buf.lb_buf = osd_oti_get(env)->oti_buf;
293         buf.lb_len = sizeof(osd_oti_get(env)->oti_buf);
294
295         /* now try to find LMA */
296         rc = __osd_xattr_get_large(env, osd, dnode, &buf,
297                                    XATTR_NAME_LMA, &size);
298         if (rc == 0 && size >= sizeof(*lma)) {
299                 lma = buf.lb_buf;
300                 lustre_lma_swab(lma);
301                 *fid = lma->lma_self_fid;
302                 GOTO(out, rc = 0);
303         } else if (rc < 0) {
304                 GOTO(out, rc);
305         } else {
306                 GOTO(out, rc = -EIO);
307         }
308
309 out:
310         if (nvbuf != NULL)
311                 nvlist_free(nvbuf);
312         RETURN(rc);
313 }
314
315 static int osd_find_parent_fid(const struct lu_env *env, struct dt_object *o,
316                                struct lu_fid *fid)
317 {
318         struct link_ea_header  *leh;
319         struct link_ea_entry   *lee;
320         struct lu_buf           buf;
321         int                     rc;
322         ENTRY;
323
324         buf.lb_buf = osd_oti_get(env)->oti_buf;
325         buf.lb_len = sizeof(osd_oti_get(env)->oti_buf);
326
327         rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK);
328         if (rc == -ERANGE) {
329                 rc = osd_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LINK);
330                 if (rc < 0)
331                         RETURN(rc);
332                 LASSERT(rc > 0);
333                 OBD_ALLOC(buf.lb_buf, rc);
334                 if (buf.lb_buf == NULL)
335                         RETURN(-ENOMEM);
336                 buf.lb_len = rc;
337                 rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK);
338         }
339         if (rc < 0)
340                 GOTO(out, rc);
341         if (rc < sizeof(*leh) + sizeof(*lee))
342                 GOTO(out, rc = -EINVAL);
343
344         leh = buf.lb_buf;
345         if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
346                 leh->leh_magic = LINK_EA_MAGIC;
347                 leh->leh_reccount = __swab32(leh->leh_reccount);
348                 leh->leh_len = __swab64(leh->leh_len);
349         }
350         if (leh->leh_magic != LINK_EA_MAGIC)
351                 GOTO(out, rc = -EINVAL);
352         if (leh->leh_reccount == 0)
353                 GOTO(out, rc = -ENODATA);
354
355         lee = (struct link_ea_entry *)(leh + 1);
356         fid_be_to_cpu(fid, (const struct lu_fid *)&lee->lee_parent_fid);
357         rc = 0;
358
359 out:
360         if (buf.lb_buf != osd_oti_get(env)->oti_buf)
361                 OBD_FREE(buf.lb_buf, buf.lb_len);
362
363 #if 0
364         /* this block can be enabled for additional verification
365          * it's trying to match FID from LinkEA vs. FID from LMA */
366         if (rc == 0) {
367                 struct lu_fid fid2;
368                 int rc2;
369                 rc2 = osd_find_parent_by_dnode(env, o, &fid2);
370                 if (rc2 == 0)
371                         if (lu_fid_eq(fid, &fid2) == 0)
372                                 CERROR("wrong parent: "DFID" != "DFID"\n",
373                                        PFID(fid), PFID(&fid2));
374         }
375 #endif
376
377         /* no LinkEA is found, let's try to find the fid in parent's LMA */
378         if (unlikely(rc != 0))
379                 rc = osd_find_parent_by_dnode(env, o, fid);
380
381         RETURN(rc);
382 }
383
384 static int osd_dir_lookup(const struct lu_env *env, struct dt_object *dt,
385                           struct dt_rec *rec, const struct dt_key *key)
386 {
387         struct osd_thread_info *oti = osd_oti_get(env);
388         struct osd_object  *obj = osd_dt_obj(dt);
389         struct osd_device  *osd = osd_obj2dev(obj);
390         char               *name = (char *)key;
391         int                 rc;
392         ENTRY;
393
394         LASSERT(osd_object_is_zap(obj->oo_db));
395
396         if (name[0] == '.') {
397                 if (name[1] == 0) {
398                         const struct lu_fid *f = lu_object_fid(&dt->do_lu);
399                         memcpy(rec, f, sizeof(*f));
400                         RETURN(1);
401                 } else if (name[1] == '.' && name[2] == 0) {
402                         rc = osd_find_parent_fid(env, dt, (struct lu_fid *)rec);
403                         RETURN(rc == 0 ? 1 : rc);
404                 }
405         }
406
407         rc = -zap_lookup(osd->od_os, obj->oo_db->db_object,
408                          (char *)key, 8, sizeof(oti->oti_zde) / 8,
409                          (void *)&oti->oti_zde);
410         memcpy(rec, &oti->oti_zde.lzd_fid, sizeof(struct lu_fid));
411
412         RETURN(rc == 0 ? 1 : rc);
413 }
414
415 static int osd_declare_dir_insert(const struct lu_env *env,
416                                   struct dt_object *dt,
417                                   const struct dt_rec *rec,
418                                   const struct dt_key *key,
419                                   struct thandle *th)
420 {
421         struct osd_object  *obj = osd_dt_obj(dt);
422         struct osd_thandle *oh;
423         uint64_t object;
424         ENTRY;
425
426         LASSERT(th != NULL);
427         oh = container_of0(th, struct osd_thandle, ot_super);
428
429         /* This is for inserting dot/dotdot for new created dir. */
430         if (obj->oo_db == NULL)
431                 object = DMU_NEW_OBJECT;
432         else
433                 object = obj->oo_db->db_object;
434
435         dmu_tx_hold_bonus(oh->ot_tx, object);
436         dmu_tx_hold_zap(oh->ot_tx, object, TRUE, (char *)key);
437
438         RETURN(0);
439 }
440
441 /**
442  * Find the osd object for given fid.
443  *
444  * \param fid need to find the osd object having this fid
445  *
446  * \retval osd_object on success
447  * \retval        -ve on error
448  */
449 struct osd_object *osd_object_find(const struct lu_env *env,
450                                    struct dt_object *dt,
451                                    const struct lu_fid *fid)
452 {
453         struct lu_device         *ludev = dt->do_lu.lo_dev;
454         struct osd_object        *child = NULL;
455         struct lu_object         *luch;
456         struct lu_object         *lo;
457
458         /*
459          * at this point topdev might not exist yet
460          * (i.e. MGS is preparing profiles). so we can
461          * not rely on topdev and instead lookup with
462          * our device passed as topdev. this can't work
463          * if the object isn't cached yet (as osd doesn't
464          * allocate lu_header). IOW, the object must be
465          * in the cache, otherwise lu_object_alloc() crashes
466          * -bzzz
467          */
468         luch = lu_object_find_at(env, ludev, fid, NULL);
469         if (IS_ERR(luch))
470                 return (void *)luch;
471
472         if (lu_object_exists(luch)) {
473                 lo = lu_object_locate(luch->lo_header, ludev->ld_type);
474                 if (lo != NULL)
475                         child = osd_obj(lo);
476                 else
477                         LU_OBJECT_DEBUG(D_ERROR, env, luch,
478                                         "%s: object can't be located "DFID"\n",
479                                         osd_dev(ludev)->od_svname, PFID(fid));
480
481                 if (child == NULL) {
482                         lu_object_put(env, luch);
483                         CERROR("%s: Unable to get osd_object "DFID"\n",
484                                osd_dev(ludev)->od_svname, PFID(fid));
485                         child = ERR_PTR(-ENOENT);
486                 }
487         } else {
488                 LU_OBJECT_DEBUG(D_ERROR, env, luch,
489                                 "%s: lu_object does not exists "DFID"\n",
490                                 osd_dev(ludev)->od_svname, PFID(fid));
491                 lu_object_put(env, luch);
492                 child = ERR_PTR(-ENOENT);
493         }
494
495         return child;
496 }
497
498 /**
499  * Put the osd object once done with it.
500  *
501  * \param obj osd object that needs to be put
502  */
503 static inline void osd_object_put(const struct lu_env *env,
504                                   struct osd_object *obj)
505 {
506         lu_object_put(env, &obj->oo_dt.do_lu);
507 }
508
509 static int osd_seq_exists(const struct lu_env *env, struct osd_device *osd,
510                           u64 seq)
511 {
512         struct lu_seq_range     *range = &osd_oti_get(env)->oti_seq_range;
513         struct seq_server_site  *ss = osd_seq_site(osd);
514         int                     rc;
515         ENTRY;
516
517         LASSERT(ss != NULL);
518         LASSERT(ss->ss_server_fld != NULL);
519
520         rc = osd_fld_lookup(env, osd, seq, range);
521         if (rc != 0) {
522                 if (rc != -ENOENT)
523                         CERROR("%s: Can not lookup fld for "LPX64"\n",
524                                osd_name(osd), seq);
525                 RETURN(0);
526         }
527
528         RETURN(ss->ss_node_id == range->lsr_index);
529 }
530
531 static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
532                           const struct lu_fid *fid)
533 {
534         struct seq_server_site  *ss = osd_seq_site(osd);
535         ENTRY;
536
537         /* FID seqs not in FLDB, must be local seq */
538         if (unlikely(!fid_seq_in_fldb(fid_seq(fid))))
539                 RETURN(0);
540
541         /* If FLD is not being initialized yet, it only happens during the
542          * initialization, likely during mgs initialization, and we assume
543          * this is local FID. */
544         if (ss == NULL || ss->ss_server_fld == NULL)
545                 RETURN(0);
546
547         /* Only check the local FLDB here */
548         if (osd_seq_exists(env, osd, fid_seq(fid)))
549                 RETURN(0);
550
551         RETURN(1);
552 }
553
554 /**
555  *      Inserts (key, value) pair in \a directory object.
556  *
557  *      \param  dt      osd index object
558  *      \param  key     key for index
559  *      \param  rec     record reference
560  *      \param  th      transaction handler
561  *      \param  ignore_quota update should not affect quota
562  *
563  *      \retval  0  success
564  *      \retval -ve failure
565  */
566 static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt,
567                           const struct dt_rec *rec, const struct dt_key *key,
568                           struct thandle *th, int ignore_quota)
569 {
570         struct osd_thread_info *oti = osd_oti_get(env);
571         struct osd_object   *parent = osd_dt_obj(dt);
572         struct osd_device   *osd = osd_obj2dev(parent);
573         struct dt_insert_rec *rec1 = (struct dt_insert_rec *)rec;
574         const struct lu_fid *fid = rec1->rec_fid;
575         struct osd_thandle  *oh;
576         struct osd_object   *child = NULL;
577         __u32                attr;
578         char                *name = (char *)key;
579         int                  rc;
580         ENTRY;
581
582         LASSERT(parent->oo_db);
583         LASSERT(osd_object_is_zap(parent->oo_db));
584
585         LASSERT(dt_object_exists(dt));
586         LASSERT(osd_invariant(parent));
587
588         LASSERT(th != NULL);
589         oh = container_of0(th, struct osd_thandle, ot_super);
590
591         rc = osd_remote_fid(env, osd, fid);
592         if (rc < 0) {
593                 CERROR("%s: Can not find object "DFID": rc = %d\n",
594                        osd->od_svname, PFID(fid), rc);
595                 RETURN(rc);
596         }
597
598         if (unlikely(rc == 1)) {
599                 /* Insert remote entry */
600                 memset(&oti->oti_zde.lzd_reg, 0, sizeof(oti->oti_zde.lzd_reg));
601                 oti->oti_zde.lzd_reg.zde_type = IFTODT(rec1->rec_type & S_IFMT);
602         } else {
603                 /*
604                  * To simulate old Orion setups with ./..  stored in the
605                  * directories
606                  */
607                 /* Insert local entry */
608                 child = osd_object_find(env, dt, fid);
609                 if (IS_ERR(child))
610                         RETURN(PTR_ERR(child));
611
612                 LASSERT(child->oo_db);
613                 if (name[0] == '.') {
614                         if (name[1] == 0) {
615                                 /* do not store ".", instead generate it
616                                  * during iteration */
617                                 GOTO(out, rc = 0);
618                         } else if (name[1] == '.' && name[2] == 0) {
619                                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PARENT)) {
620                                         struct lu_fid tfid = *fid;
621
622                                         osd_object_put(env, child);
623                                         tfid.f_oid--;
624                                         child = osd_object_find(env, dt, &tfid);
625                                         if (IS_ERR(child))
626                                                 RETURN(PTR_ERR(child));
627
628                                         LASSERT(child->oo_db);
629                                 }
630
631                                 /* update parent dnode in the child.
632                                  * later it will be used to generate ".." */
633                                 rc = osd_object_sa_update(parent,
634                                                  SA_ZPL_PARENT(osd),
635                                                  &child->oo_db->db_object,
636                                                  8, oh);
637
638                                 GOTO(out, rc);
639                         }
640                 }
641                 CLASSERT(sizeof(oti->oti_zde.lzd_reg) == 8);
642                 CLASSERT(sizeof(oti->oti_zde) % 8 == 0);
643                 attr = child->oo_dt.do_lu.lo_header ->loh_attr;
644                 oti->oti_zde.lzd_reg.zde_type = IFTODT(attr & S_IFMT);
645                 oti->oti_zde.lzd_reg.zde_dnode = child->oo_db->db_object;
646         }
647
648         oti->oti_zde.lzd_fid = *fid;
649         /* Insert (key,oid) into ZAP */
650         rc = -zap_add(osd->od_os, parent->oo_db->db_object,
651                       (char *)key, 8, sizeof(oti->oti_zde) / 8,
652                       (void *)&oti->oti_zde, oh->ot_tx);
653         if (unlikely(rc == -EEXIST &&
654                      name[0] == '.' && name[1] == '.' && name[2] == 0))
655                 /* Update (key,oid) in ZAP */
656                 rc = -zap_update(osd->od_os, parent->oo_db->db_object,
657                                 (char *)key, 8, sizeof(oti->oti_zde) / 8,
658                                 (void *)&oti->oti_zde, oh->ot_tx);
659
660 out:
661         if (child != NULL)
662                 osd_object_put(env, child);
663
664         RETURN(rc);
665 }
666
667 static int osd_declare_dir_delete(const struct lu_env *env,
668                                   struct dt_object *dt,
669                                   const struct dt_key *key,
670                                   struct thandle *th)
671 {
672         struct osd_object *obj = osd_dt_obj(dt);
673         struct osd_thandle *oh;
674         ENTRY;
675
676         LASSERT(dt_object_exists(dt));
677         LASSERT(osd_invariant(obj));
678
679         LASSERT(th != NULL);
680         oh = container_of0(th, struct osd_thandle, ot_super);
681
682         LASSERT(obj->oo_db);
683         LASSERT(osd_object_is_zap(obj->oo_db));
684
685         dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, (char *)key);
686
687         RETURN(0);
688 }
689
690 static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt,
691                           const struct dt_key *key, struct thandle *th)
692 {
693         struct osd_object *obj = osd_dt_obj(dt);
694         struct osd_device *osd = osd_obj2dev(obj);
695         struct osd_thandle *oh;
696         dmu_buf_t *zap_db = obj->oo_db;
697         char      *name = (char *)key;
698         int rc;
699         ENTRY;
700
701         LASSERT(obj->oo_db);
702         LASSERT(osd_object_is_zap(obj->oo_db));
703
704         LASSERT(th != NULL);
705         oh = container_of0(th, struct osd_thandle, ot_super);
706
707         /*
708          * In Orion . and .. were stored in the directory (not generated upon
709          * request as now). we preserve them for backward compatibility
710          */
711         if (name[0] == '.') {
712                 if (name[1] == 0) {
713                         RETURN(0);
714                 } else if (name[1] == '.' && name[2] == 0) {
715                         RETURN(0);
716                 }
717         }
718
719         /* Remove key from the ZAP */
720         rc = -zap_remove(osd->od_os, zap_db->db_object,
721                          (char *) key, oh->ot_tx);
722
723         if (unlikely(rc && rc != -ENOENT))
724                 CERROR("%s: zap_remove failed: rc = %d\n", osd->od_svname, rc);
725
726         RETURN(rc);
727 }
728
729 static struct dt_it *osd_dir_it_init(const struct lu_env *env,
730                                      struct dt_object *dt,
731                                      __u32 unused)
732 {
733         struct osd_zap_it *it;
734
735         it = (struct osd_zap_it *)osd_index_it_init(env, dt, unused);
736         if (!IS_ERR(it))
737                 it->ozi_pos = 0;
738
739         RETURN((struct dt_it *)it);
740 }
741
742 /**
743  *  Move Iterator to record specified by \a key
744  *
745  *  \param  di      osd iterator
746  *  \param  key     key for index
747  *
748  *  \retval +ve  di points to record with least key not larger than key
749  *  \retval  0   di points to exact matched key
750  *  \retval -ve  failure
751  */
752 static int osd_dir_it_get(const struct lu_env *env,
753                           struct dt_it *di, const struct dt_key *key)
754 {
755         struct osd_zap_it *it = (struct osd_zap_it *)di;
756         struct osd_object *obj = it->ozi_obj;
757         char              *name = (char *)key;
758         int                rc;
759         ENTRY;
760
761         LASSERT(it);
762         LASSERT(it->ozi_zc);
763
764         /* reset the cursor */
765         zap_cursor_fini(it->ozi_zc);
766         osd_obj_cursor_init_serialized(it->ozi_zc, obj, 0);
767
768         /* XXX: implementation of the API is broken at the moment */
769         LASSERT(((const char *)key)[0] == 0);
770
771         if (name[0] == 0) {
772                 it->ozi_pos = 0;
773                 RETURN(1);
774         }
775
776         if (name[0] == '.') {
777                 if (name[1] == 0) {
778                         it->ozi_pos = 1;
779                         GOTO(out, rc = 1);
780                 } else if (name[1] == '.' && name[2] == 0) {
781                         it->ozi_pos = 2;
782                         GOTO(out, rc = 1);
783                 }
784         }
785
786         /* neither . nor .. - some real record */
787         it->ozi_pos = 3;
788         rc = +1;
789
790 out:
791         RETURN(rc);
792 }
793
794 static void osd_dir_it_put(const struct lu_env *env, struct dt_it *di)
795 {
796         /* PBS: do nothing : ref are incremented at retrive and decreamented
797          *      next/finish. */
798 }
799
800 /*
801  * in Orion . and .. were stored in the directory, while ZPL
802  * and current osd-zfs generate them up on request. so, we
803  * need to ignore previously stored . and ..
804  */
805 static int osd_index_retrieve_skip_dots(struct osd_zap_it *it,
806                                         zap_attribute_t *za)
807 {
808         int rc, isdot;
809
810         do {
811                 rc = -zap_cursor_retrieve(it->ozi_zc, za);
812
813                 isdot = 0;
814                 if (unlikely(rc == 0 && za->za_name[0] == '.')) {
815                         if (za->za_name[1] == 0) {
816                                 isdot = 1;
817                         } else if (za->za_name[1] == '.' &&
818                                    za->za_name[2] == 0) {
819                                 isdot = 1;
820                         }
821                         if (unlikely(isdot))
822                                 zap_cursor_advance(it->ozi_zc);
823                 }
824         } while (unlikely(rc == 0 && isdot));
825
826         return rc;
827 }
828
829 /**
830  * to load a directory entry at a time and stored it in
831  * iterator's in-memory data structure.
832  *
833  * \param di, struct osd_it_ea, iterator's in memory structure
834  *
835  * \retval +ve, iterator reached to end
836  * \retval   0, iterator not reached to end
837  * \retval -ve, on error
838  */
839 static int osd_dir_it_next(const struct lu_env *env, struct dt_it *di)
840 {
841         struct osd_zap_it *it = (struct osd_zap_it *)di;
842         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
843         int                rc;
844
845         ENTRY;
846
847         /* temp. storage should be enough for any key supported by ZFS */
848         CLASSERT(sizeof(za->za_name) <= sizeof(it->ozi_name));
849
850         /*
851          * the first ->next() moves the cursor to .
852          * the second ->next() moves the cursor to ..
853          * then we get to the real records and have to verify any exist
854          */
855         if (it->ozi_pos <= 2) {
856                 it->ozi_pos++;
857                 if (it->ozi_pos <=2)
858                         RETURN(0);
859
860         } else {
861                 zap_cursor_advance(it->ozi_zc);
862         }
863
864         /*
865          * According to current API we need to return error if its last entry.
866          * zap_cursor_advance() does not return any value. So we need to call
867          * retrieve to check if there is any record.  We should make
868          * changes to Iterator API to not return status for this API
869          */
870         rc = osd_index_retrieve_skip_dots(it, za);
871
872         if (rc == -ENOENT) /* end of dir */
873                 RETURN(+1);
874
875         RETURN(rc);
876 }
877
878 static struct dt_key *osd_dir_it_key(const struct lu_env *env,
879                                      const struct dt_it *di)
880 {
881         struct osd_zap_it *it = (struct osd_zap_it *)di;
882         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
883         int                rc = 0;
884         ENTRY;
885
886         if (it->ozi_pos <= 1) {
887                 it->ozi_pos = 1;
888                 RETURN((struct dt_key *)".");
889         } else if (it->ozi_pos == 2) {
890                 RETURN((struct dt_key *)"..");
891         }
892
893         if ((rc = -zap_cursor_retrieve(it->ozi_zc, za)))
894                 RETURN(ERR_PTR(rc));
895
896         strcpy(it->ozi_name, za->za_name);
897
898         RETURN((struct dt_key *)it->ozi_name);
899 }
900
901 static int osd_dir_it_key_size(const struct lu_env *env, const struct dt_it *di)
902 {
903         struct osd_zap_it *it = (struct osd_zap_it *)di;
904         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
905         int                rc;
906         ENTRY;
907
908         if (it->ozi_pos <= 1) {
909                 it->ozi_pos = 1;
910                 RETURN(2);
911         } else if (it->ozi_pos == 2) {
912                 RETURN(3);
913         }
914
915         if ((rc = -zap_cursor_retrieve(it->ozi_zc, za)) == 0)
916                 rc = strlen(za->za_name);
917
918         RETURN(rc);
919 }
920
921 static int osd_dir_it_rec(const struct lu_env *env, const struct dt_it *di,
922                           struct dt_rec *dtrec, __u32 attr)
923 {
924         struct osd_zap_it   *it = (struct osd_zap_it *)di;
925         struct lu_dirent    *lde = (struct lu_dirent *)dtrec;
926         struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
927         zap_attribute_t     *za = &osd_oti_get(env)->oti_za;
928         int                  rc, namelen;
929         ENTRY;
930
931         if (it->ozi_pos <= 1) {
932                 lde->lde_hash = cpu_to_le64(1);
933                 strcpy(lde->lde_name, ".");
934                 lde->lde_namelen = cpu_to_le16(1);
935                 lde->lde_fid = *lu_object_fid(&it->ozi_obj->oo_dt.do_lu);
936                 lde->lde_attrs = LUDA_FID;
937                 /* append lustre attributes */
938                 osd_it_append_attrs(lde, attr, 1, IFTODT(S_IFDIR));
939                 lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(1, attr));
940                 it->ozi_pos = 1;
941                 GOTO(out, rc = 0);
942
943         } else if (it->ozi_pos == 2) {
944                 lde->lde_hash = cpu_to_le64(2);
945                 strcpy(lde->lde_name, "..");
946                 lde->lde_namelen = cpu_to_le16(2);
947                 lde->lde_attrs = LUDA_FID;
948                 /* append lustre attributes */
949                 osd_it_append_attrs(lde, attr, 2, IFTODT(S_IFDIR));
950                 lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(2, attr));
951                 rc = osd_find_parent_fid(env, &it->ozi_obj->oo_dt, &lde->lde_fid);
952
953                 /* ENOENT happens at the root of filesystem so ignore it */
954                 if (rc == -ENOENT)
955                         rc = 0;
956                 GOTO(out, rc);
957         }
958
959         LASSERT(lde);
960
961         rc = -zap_cursor_retrieve(it->ozi_zc, za);
962         if (unlikely(rc != 0))
963                 GOTO(out, rc);
964
965         lde->lde_hash = cpu_to_le64(osd_zap_cursor_serialize(it->ozi_zc));
966         namelen = strlen(za->za_name);
967         if (namelen > NAME_MAX)
968                 GOTO(out, rc = -EOVERFLOW);
969         strcpy(lde->lde_name, za->za_name);
970         lde->lde_namelen = cpu_to_le16(namelen);
971
972         if (za->za_integer_length != 8 || za->za_num_integers < 3) {
973                 CERROR("%s: unsupported direntry format: %d %d\n",
974                        osd_obj2dev(it->ozi_obj)->od_svname,
975                        za->za_integer_length, (int)za->za_num_integers);
976
977                 GOTO(out, rc = -EIO);
978         }
979
980         rc = -zap_lookup(it->ozi_zc->zc_objset, it->ozi_zc->zc_zapobj,
981                          za->za_name, za->za_integer_length, 3, zde);
982         if (rc)
983                 GOTO(out, rc);
984
985         lde->lde_fid = zde->lzd_fid;
986         lde->lde_attrs = LUDA_FID;
987
988         /* append lustre attributes */
989         osd_it_append_attrs(lde, attr, namelen, zde->lzd_reg.zde_type);
990
991         lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(namelen, attr));
992
993 out:
994         RETURN(rc);
995 }
996
997 static int osd_dir_it_rec_size(const struct lu_env *env, const struct dt_it *di,
998                                __u32 attr)
999 {
1000         struct osd_zap_it   *it = (struct osd_zap_it *)di;
1001         zap_attribute_t     *za = &osd_oti_get(env)->oti_za;
1002         size_t               namelen = 0;
1003         int                  rc;
1004         ENTRY;
1005
1006         if (it->ozi_pos <= 1)
1007                 namelen = 1;
1008         else if (it->ozi_pos == 2)
1009                 namelen = 2;
1010
1011         if (namelen > 0) {
1012                 rc = lu_dirent_calc_size(namelen, attr);
1013                 RETURN(rc);
1014         }
1015
1016         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1017         if (unlikely(rc != 0))
1018                 RETURN(rc);
1019
1020         if (za->za_integer_length != 8 || za->za_num_integers < 3) {
1021                 CERROR("%s: unsupported direntry format: %d %d\n",
1022                        osd_obj2dev(it->ozi_obj)->od_svname,
1023                        za->za_integer_length, (int)za->za_num_integers);
1024                 RETURN(-EIO);
1025         }
1026
1027         namelen = strlen(za->za_name);
1028         if (namelen > NAME_MAX)
1029                 RETURN(-EOVERFLOW);
1030
1031         rc = lu_dirent_calc_size(namelen, attr);
1032
1033         RETURN(rc);
1034 }
1035
1036 static __u64 osd_dir_it_store(const struct lu_env *env, const struct dt_it *di)
1037 {
1038         struct osd_zap_it *it = (struct osd_zap_it *)di;
1039         __u64              pos;
1040         ENTRY;
1041
1042         if (it->ozi_pos <= 2)
1043                 pos = it->ozi_pos;
1044         else
1045                 pos = osd_zap_cursor_serialize(it->ozi_zc);
1046
1047         RETURN(pos);
1048 }
1049
1050 /*
1051  * return status :
1052  *  rc == 0 -> end of directory.
1053  *  rc >  0 -> ok, proceed.
1054  *  rc <  0 -> error.  ( EOVERFLOW  can be masked.)
1055  */
1056 static int osd_dir_it_load(const struct lu_env *env,
1057                         const struct dt_it *di, __u64 hash)
1058 {
1059         struct osd_zap_it *it = (struct osd_zap_it *)di;
1060         struct osd_object *obj = it->ozi_obj;
1061         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1062         int                rc;
1063         ENTRY;
1064
1065         /* reset the cursor */
1066         zap_cursor_fini(it->ozi_zc);
1067         osd_obj_cursor_init_serialized(it->ozi_zc, obj, hash);
1068
1069         if (hash <= 2) {
1070                 it->ozi_pos = hash;
1071                 rc = +1;
1072         } else {
1073                 it->ozi_pos = 3;
1074                 /* to return whether the end has been reached */
1075                 rc = osd_index_retrieve_skip_dots(it, za);
1076                 if (rc == 0)
1077                         rc = +1;
1078                 else if (rc == -ENOENT)
1079                         rc = 0;
1080         }
1081
1082         RETURN(rc);
1083 }
1084
1085 struct dt_index_operations osd_dir_ops = {
1086         .dio_lookup         = osd_dir_lookup,
1087         .dio_declare_insert = osd_declare_dir_insert,
1088         .dio_insert         = osd_dir_insert,
1089         .dio_declare_delete = osd_declare_dir_delete,
1090         .dio_delete         = osd_dir_delete,
1091         .dio_it     = {
1092                 .init     = osd_dir_it_init,
1093                 .fini     = osd_index_it_fini,
1094                 .get      = osd_dir_it_get,
1095                 .put      = osd_dir_it_put,
1096                 .next     = osd_dir_it_next,
1097                 .key      = osd_dir_it_key,
1098                 .key_size = osd_dir_it_key_size,
1099                 .rec      = osd_dir_it_rec,
1100                 .rec_size = osd_dir_it_rec_size,
1101                 .store    = osd_dir_it_store,
1102                 .load     = osd_dir_it_load
1103         }
1104 };
1105
1106 /*
1107  * Primitives for index files using binary keys.
1108  */
1109
1110 /* key integer_size is 8 */
1111 static int osd_prepare_key_uint64(struct osd_object *o, __u64 *dst,
1112                                   const struct dt_key *src)
1113 {
1114         int size;
1115
1116         LASSERT(dst);
1117         LASSERT(src);
1118
1119         /* align keysize to 64bit */
1120         size = (o->oo_keysize + sizeof(__u64) - 1) / sizeof(__u64);
1121         size *= sizeof(__u64);
1122
1123         LASSERT(size <= MAXNAMELEN);
1124
1125         if (unlikely(size > o->oo_keysize))
1126                 memset(dst + o->oo_keysize, 0, size - o->oo_keysize);
1127         memcpy(dst, (const char *)src, o->oo_keysize);
1128
1129         return (size/sizeof(__u64));
1130 }
1131
1132 static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
1133                         struct dt_rec *rec, const struct dt_key *key)
1134 {
1135         struct osd_object *obj = osd_dt_obj(dt);
1136         struct osd_device *osd = osd_obj2dev(obj);
1137         __u64             *k = osd_oti_get(env)->oti_key64;
1138         int                rc;
1139         ENTRY;
1140
1141         rc = osd_prepare_key_uint64(obj, k, key);
1142
1143         rc = -zap_lookup_uint64(osd->od_os, obj->oo_db->db_object,
1144                                 k, rc, obj->oo_recusize, obj->oo_recsize,
1145                                 (void *)rec);
1146         RETURN(rc == 0 ? 1 : rc);
1147 }
1148
1149 static int osd_declare_index_insert(const struct lu_env *env,
1150                                     struct dt_object *dt,
1151                                     const struct dt_rec *rec,
1152                                     const struct dt_key *key,
1153                                     struct thandle *th)
1154 {
1155         struct osd_object  *obj = osd_dt_obj(dt);
1156         struct osd_thandle *oh;
1157         ENTRY;
1158
1159         LASSERT(th != NULL);
1160         oh = container_of0(th, struct osd_thandle, ot_super);
1161
1162         LASSERT(obj->oo_db);
1163
1164         dmu_tx_hold_bonus(oh->ot_tx, obj->oo_db->db_object);
1165
1166         /* It is not clear what API should be used for binary keys, so we pass
1167          * a null name which has the side effect of over-reserving space,
1168          * accounting for the worst case. See zap_count_write() */
1169         dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, NULL);
1170
1171         RETURN(0);
1172 }
1173
1174 static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
1175                             const struct dt_rec *rec, const struct dt_key *key,
1176                             struct thandle *th, int ignore_quota)
1177 {
1178         struct osd_object  *obj = osd_dt_obj(dt);
1179         struct osd_device  *osd = osd_obj2dev(obj);
1180         struct osd_thandle *oh;
1181         __u64              *k = osd_oti_get(env)->oti_key64;
1182         int                 rc;
1183         ENTRY;
1184
1185         LASSERT(obj->oo_db);
1186         LASSERT(dt_object_exists(dt));
1187         LASSERT(osd_invariant(obj));
1188         LASSERT(th != NULL);
1189
1190         oh = container_of0(th, struct osd_thandle, ot_super);
1191
1192         rc = osd_prepare_key_uint64(obj, k, key);
1193
1194         /* Insert (key,oid) into ZAP */
1195         rc = -zap_add_uint64(osd->od_os, obj->oo_db->db_object,
1196                              k, rc, obj->oo_recusize, obj->oo_recsize,
1197                              (void *)rec, oh->ot_tx);
1198         RETURN(rc);
1199 }
1200
1201 static int osd_declare_index_delete(const struct lu_env *env,
1202                                     struct dt_object *dt,
1203                                     const struct dt_key *key,
1204                                     struct thandle *th)
1205 {
1206         struct osd_object  *obj = osd_dt_obj(dt);
1207         struct osd_thandle *oh;
1208         ENTRY;
1209
1210         LASSERT(dt_object_exists(dt));
1211         LASSERT(osd_invariant(obj));
1212         LASSERT(th != NULL);
1213         LASSERT(obj->oo_db);
1214
1215         oh = container_of0(th, struct osd_thandle, ot_super);
1216         dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, NULL);
1217
1218         RETURN(0);
1219 }
1220
1221 static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
1222                             const struct dt_key *key, struct thandle *th)
1223 {
1224         struct osd_object  *obj = osd_dt_obj(dt);
1225         struct osd_device  *osd = osd_obj2dev(obj);
1226         struct osd_thandle *oh;
1227         __u64              *k = osd_oti_get(env)->oti_key64;
1228         int                 rc;
1229         ENTRY;
1230
1231         LASSERT(obj->oo_db);
1232         LASSERT(th != NULL);
1233         oh = container_of0(th, struct osd_thandle, ot_super);
1234
1235         rc = osd_prepare_key_uint64(obj, k, key);
1236
1237         /* Remove binary key from the ZAP */
1238         rc = -zap_remove_uint64(osd->od_os, obj->oo_db->db_object,
1239                                 k, rc, oh->ot_tx);
1240         RETURN(rc);
1241 }
1242
1243 static int osd_index_it_get(const struct lu_env *env, struct dt_it *di,
1244                             const struct dt_key *key)
1245 {
1246         struct osd_zap_it *it = (struct osd_zap_it *)di;
1247         struct osd_object *obj = it->ozi_obj;
1248         struct osd_device *osd = osd_obj2dev(obj);
1249         ENTRY;
1250
1251         LASSERT(it);
1252         LASSERT(it->ozi_zc);
1253
1254         /*
1255          * XXX: we need a binary version of zap_cursor_move_to_key()
1256          *      to implement this API */
1257         if (*((const __u64 *)key) != 0)
1258                 CERROR("NOT IMPLEMETED YET (move to "LPX64")\n",
1259                        *((__u64 *)key));
1260
1261         zap_cursor_fini(it->ozi_zc);
1262         zap_cursor_init(it->ozi_zc, osd->od_os, obj->oo_db->db_object);
1263         it->ozi_reset = 1;
1264
1265         RETURN(+1);
1266 }
1267
1268 static int osd_index_it_next(const struct lu_env *env, struct dt_it *di)
1269 {
1270         struct osd_zap_it *it = (struct osd_zap_it *)di;
1271         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1272         int                rc;
1273         ENTRY;
1274
1275         if (it->ozi_reset == 0)
1276                 zap_cursor_advance(it->ozi_zc);
1277         it->ozi_reset = 0;
1278
1279         /*
1280          * According to current API we need to return error if it's last entry.
1281          * zap_cursor_advance() does not return any value. So we need to call
1282          * retrieve to check if there is any record.  We should make
1283          * changes to Iterator API to not return status for this API
1284          */
1285         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1286         if (rc == -ENOENT)
1287                 RETURN(+1);
1288
1289         RETURN((rc));
1290 }
1291
1292 static struct dt_key *osd_index_it_key(const struct lu_env *env,
1293                                        const struct dt_it *di)
1294 {
1295         struct osd_zap_it *it = (struct osd_zap_it *)di;
1296         struct osd_object *obj = it->ozi_obj;
1297         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1298         int                rc = 0;
1299         ENTRY;
1300
1301         it->ozi_reset = 0;
1302         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1303         if (rc)
1304                 RETURN(ERR_PTR(rc));
1305
1306         /* the binary key is stored in the name */
1307         memcpy(&it->ozi_key, za->za_name, obj->oo_keysize);
1308
1309         RETURN((struct dt_key *)&it->ozi_key);
1310 }
1311
1312 static int osd_index_it_key_size(const struct lu_env *env,
1313                                 const struct dt_it *di)
1314 {
1315         struct osd_zap_it *it = (struct osd_zap_it *)di;
1316         struct osd_object *obj = it->ozi_obj;
1317         RETURN(obj->oo_keysize);
1318 }
1319
1320 static int osd_index_it_rec(const struct lu_env *env, const struct dt_it *di,
1321                             struct dt_rec *rec, __u32 attr)
1322 {
1323         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1324         struct osd_zap_it *it = (struct osd_zap_it *)di;
1325         struct osd_object *obj = it->ozi_obj;
1326         struct osd_device *osd = osd_obj2dev(obj);
1327         __u64             *k = osd_oti_get(env)->oti_key64;
1328         int                rc;
1329         ENTRY;
1330
1331         it->ozi_reset = 0;
1332         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1333         if (rc)
1334                 RETURN(rc);
1335
1336         rc = osd_prepare_key_uint64(obj, k, (const struct dt_key *)za->za_name);
1337
1338         rc = -zap_lookup_uint64(osd->od_os, obj->oo_db->db_object,
1339                                 k, rc, obj->oo_recusize, obj->oo_recsize,
1340                                 (void *)rec);
1341         RETURN(rc);
1342 }
1343
1344 static __u64 osd_index_it_store(const struct lu_env *env,
1345                                 const struct dt_it *di)
1346 {
1347         struct osd_zap_it *it = (struct osd_zap_it *)di;
1348
1349         it->ozi_reset = 0;
1350         RETURN((__u64)zap_cursor_serialize(it->ozi_zc));
1351 }
1352
1353 static int osd_index_it_load(const struct lu_env *env, const struct dt_it *di,
1354                              __u64 hash)
1355 {
1356         struct osd_zap_it *it = (struct osd_zap_it *)di;
1357         struct osd_object *obj = it->ozi_obj;
1358         struct osd_device *osd = osd_obj2dev(obj);
1359         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1360         int                rc;
1361         ENTRY;
1362
1363         /* reset the cursor */
1364         zap_cursor_fini(it->ozi_zc);
1365         zap_cursor_init_serialized(it->ozi_zc, osd->od_os,
1366                                    obj->oo_db->db_object, hash);
1367         it->ozi_reset = 0;
1368
1369         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1370         if (rc == 0)
1371                 RETURN(+1);
1372         else if (rc == -ENOENT)
1373                 RETURN(0);
1374
1375         RETURN(rc);
1376 }
1377
1378 static struct dt_index_operations osd_index_ops = {
1379         .dio_lookup             = osd_index_lookup,
1380         .dio_declare_insert     = osd_declare_index_insert,
1381         .dio_insert             = osd_index_insert,
1382         .dio_declare_delete     = osd_declare_index_delete,
1383         .dio_delete             = osd_index_delete,
1384         .dio_it = {
1385                 .init           = osd_index_it_init,
1386                 .fini           = osd_index_it_fini,
1387                 .get            = osd_index_it_get,
1388                 .put            = osd_index_it_put,
1389                 .next           = osd_index_it_next,
1390                 .key            = osd_index_it_key,
1391                 .key_size       = osd_index_it_key_size,
1392                 .rec            = osd_index_it_rec,
1393                 .store          = osd_index_it_store,
1394                 .load           = osd_index_it_load
1395         }
1396 };
1397
1398 struct osd_metadnode_it {
1399         struct osd_device       *mit_dev;
1400         __u64                    mit_pos;
1401         struct lu_fid            mit_fid;
1402         int                      mit_prefetched;
1403         __u64                    mit_prefetched_dnode;
1404 };
1405
1406 static struct dt_it *osd_zfs_otable_it_init(const struct lu_env *env,
1407                                             struct dt_object *dt, __u32 attr)
1408 {
1409         struct osd_device       *dev   = osd_dev(dt->do_lu.lo_dev);
1410         struct osd_metadnode_it *it;
1411         ENTRY;
1412
1413         OBD_ALLOC_PTR(it);
1414         if (unlikely(it == NULL))
1415                 RETURN(ERR_PTR(-ENOMEM));
1416
1417         it->mit_dev = dev;
1418
1419         /* XXX: dmu_object_next() does NOT find dnodes allocated
1420          *      in the current non-committed txg, so we force txg
1421          *      commit to find all existing dnodes ... */
1422         txg_wait_synced(dmu_objset_pool(dev->od_os), 0ULL);
1423
1424         RETURN((struct dt_it *)it);
1425 }
1426
1427 static void osd_zfs_otable_it_fini(const struct lu_env *env, struct dt_it *di)
1428 {
1429         struct osd_metadnode_it *it  = (struct osd_metadnode_it *)di;
1430
1431         OBD_FREE_PTR(it);
1432 }
1433
1434 static int osd_zfs_otable_it_get(const struct lu_env *env,
1435                                  struct dt_it *di, const struct dt_key *key)
1436 {
1437         return 0;
1438 }
1439
1440 static void osd_zfs_otable_it_put(const struct lu_env *env, struct dt_it *di)
1441 {
1442 }
1443
1444 #define OTABLE_PREFETCH         256
1445
1446 static void osd_zfs_otable_prefetch(const struct lu_env *env,
1447                                     struct osd_metadnode_it *it)
1448 {
1449         struct osd_device       *dev = it->mit_dev;
1450         int                      rc;
1451
1452         /* can go negative on the very first access to the iterator
1453          * or if some non-Lustre objects were found */
1454         if (unlikely(it->mit_prefetched < 0))
1455                 it->mit_prefetched = 0;
1456
1457         if (it->mit_prefetched >= (OTABLE_PREFETCH >> 1))
1458                 return;
1459
1460         if (it->mit_prefetched_dnode == 0)
1461                 it->mit_prefetched_dnode = it->mit_pos;
1462
1463         while (it->mit_prefetched < OTABLE_PREFETCH) {
1464                 rc = -dmu_object_next(dev->od_os, &it->mit_prefetched_dnode,
1465                                       B_FALSE, 0);
1466                 if (unlikely(rc != 0))
1467                         break;
1468
1469                 /* dmu_prefetch() was exported in 0.6.2, if you use with
1470                  * an older release, just comment it out - this is an
1471                  * optimization */
1472                 dmu_prefetch(dev->od_os, it->mit_prefetched_dnode, 0, 0);
1473
1474                 it->mit_prefetched++;
1475         }
1476 }
1477
1478 static int osd_zfs_otable_it_next(const struct lu_env *env, struct dt_it *di)
1479 {
1480         struct osd_metadnode_it *it  = (struct osd_metadnode_it *)di;
1481         struct lustre_mdt_attrs *lma;
1482         struct osd_device       *dev = it->mit_dev;
1483         nvlist_t                *nvbuf = NULL;
1484         uchar_t                 *v;
1485         __u64                    dnode;
1486         int                      rc, s;
1487
1488         memset(&it->mit_fid, 0, sizeof(it->mit_fid));
1489
1490         dnode = it->mit_pos;
1491         do {
1492                 rc = -dmu_object_next(dev->od_os, &it->mit_pos, B_FALSE, 0);
1493                 if (unlikely(rc != 0))
1494                         GOTO(out, rc = 1);
1495                 it->mit_prefetched--;
1496
1497                 /* LMA is required for this to be a Lustre object.
1498                  * If there is no xattr skip it. */
1499                 rc = __osd_xattr_load(dev, it->mit_pos, &nvbuf);
1500                 if (unlikely(rc != 0))
1501                         continue;
1502
1503                 LASSERT(nvbuf != NULL);
1504                 rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA, &v, &s);
1505                 if (likely(rc == 0)) {
1506                         /* Lustre object */
1507                         lma = (struct lustre_mdt_attrs *)v;
1508                         lustre_lma_swab(lma);
1509                         it->mit_fid = lma->lma_self_fid;
1510                         nvlist_free(nvbuf);
1511                         break;
1512                 } else {
1513                         /* not a Lustre object, try next one */
1514                         nvlist_free(nvbuf);
1515                 }
1516
1517         } while (1);
1518
1519
1520         /* we aren't prefetching in the above loop because the number of
1521          * non-Lustre objects is very small and we will be repeating very
1522          * rare. in case we want to use this to iterate over non-Lustre
1523          * objects (i.e. when we convert regular ZFS in Lustre) it makes
1524          * sense to initiate prefetching in the loop */
1525
1526         /* 0 - there are more items, +1 - the end */
1527         if (likely(rc == 0))
1528                 osd_zfs_otable_prefetch(env, it);
1529
1530         CDEBUG(D_OTHER, "advance: %llu -> %llu "DFID": %d\n", dnode,
1531                it->mit_pos, PFID(&it->mit_fid), rc);
1532
1533 out:
1534         return rc;
1535 }
1536
1537 static struct dt_key *osd_zfs_otable_it_key(const struct lu_env *env,
1538                                             const struct dt_it *di)
1539 {
1540         return NULL;
1541 }
1542
1543 static int osd_zfs_otable_it_key_size(const struct lu_env *env,
1544                                       const struct dt_it *di)
1545 {
1546         return sizeof(__u64);
1547 }
1548
1549 static int osd_zfs_otable_it_rec(const struct lu_env *env,
1550                                  const struct dt_it *di,
1551                                  struct dt_rec *rec, __u32 attr)
1552 {
1553         struct osd_metadnode_it *it  = (struct osd_metadnode_it *)di;
1554         struct lu_fid *fid = (struct lu_fid *)rec;
1555         ENTRY;
1556
1557         *fid = it->mit_fid;
1558
1559         RETURN(0);
1560 }
1561
1562
1563 static __u64 osd_zfs_otable_it_store(const struct lu_env *env,
1564                                      const struct dt_it *di)
1565 {
1566         struct osd_metadnode_it *it  = (struct osd_metadnode_it *)di;
1567
1568         return it->mit_pos;
1569 }
1570
1571 static int osd_zfs_otable_it_load(const struct lu_env *env,
1572                                   const struct dt_it *di, __u64 hash)
1573 {
1574         struct osd_metadnode_it *it  = (struct osd_metadnode_it *)di;
1575
1576         it->mit_pos = hash;
1577         it->mit_prefetched = 0;
1578         it->mit_prefetched_dnode = 0;
1579
1580         return osd_zfs_otable_it_next(env, (struct dt_it *)di);
1581 }
1582
1583 static int osd_zfs_otable_it_key_rec(const struct lu_env *env,
1584                                      const struct dt_it *di, void *key_rec)
1585 {
1586         return 0;
1587 }
1588
1589 const struct dt_index_operations osd_zfs_otable_ops = {
1590         .dio_it = {
1591                 .init     = osd_zfs_otable_it_init,
1592                 .fini     = osd_zfs_otable_it_fini,
1593                 .get      = osd_zfs_otable_it_get,
1594                 .put      = osd_zfs_otable_it_put,
1595                 .next     = osd_zfs_otable_it_next,
1596                 .key      = osd_zfs_otable_it_key,
1597                 .key_size = osd_zfs_otable_it_key_size,
1598                 .rec      = osd_zfs_otable_it_rec,
1599                 .store    = osd_zfs_otable_it_store,
1600                 .load     = osd_zfs_otable_it_load,
1601                 .key_rec  = osd_zfs_otable_it_key_rec,
1602         }
1603 };
1604
1605 int osd_index_try(const struct lu_env *env, struct dt_object *dt,
1606                 const struct dt_index_features *feat)
1607 {
1608         struct osd_object *obj = osd_dt_obj(dt);
1609         ENTRY;
1610
1611         LASSERT(dt_object_exists(dt));
1612
1613         /*
1614          * XXX: implement support for fixed-size keys sorted with natural
1615          *      numerical way (not using internal hash value)
1616          */
1617         if (feat->dif_flags & DT_IND_RANGE)
1618                 RETURN(-ERANGE);
1619
1620         if (unlikely(feat == &dt_otable_features)) {
1621                 dt->do_index_ops = &osd_zfs_otable_ops;
1622                 RETURN(0);
1623         }
1624
1625         LASSERT(obj->oo_db != NULL);
1626         if (likely(feat == &dt_directory_features)) {
1627                 if (osd_object_is_zap(obj->oo_db))
1628                         dt->do_index_ops = &osd_dir_ops;
1629                 else
1630                         RETURN(-ENOTDIR);
1631         } else if (unlikely(feat == &dt_acct_features)) {
1632                 LASSERT(fid_is_acct(lu_object_fid(&dt->do_lu)));
1633                 dt->do_index_ops = &osd_acct_index_ops;
1634         } else if (osd_object_is_zap(obj->oo_db) &&
1635                    dt->do_index_ops == NULL) {
1636                 /* For index file, we don't support variable key & record sizes
1637                  * and the key has to be unique */
1638                 if ((feat->dif_flags & ~DT_IND_UPDATE) != 0)
1639                         RETURN(-EINVAL);
1640
1641                 if (feat->dif_keysize_max > ZAP_MAXNAMELEN)
1642                         RETURN(-E2BIG);
1643                 if (feat->dif_keysize_max != feat->dif_keysize_min)
1644                         RETURN(-EINVAL);
1645
1646                 /* As for the record size, it should be a multiple of 8 bytes
1647                  * and smaller than the maximum value length supported by ZAP.
1648                  */
1649                 if (feat->dif_recsize_max > ZAP_MAXVALUELEN)
1650                         RETURN(-E2BIG);
1651                 if (feat->dif_recsize_max != feat->dif_recsize_min)
1652                         RETURN(-EINVAL);
1653
1654                 obj->oo_keysize = feat->dif_keysize_max;
1655                 obj->oo_recsize = feat->dif_recsize_max;
1656                 obj->oo_recusize = 1;
1657
1658                 /* ZFS prefers to work with array of 64bits */
1659                 if ((obj->oo_recsize & 7) == 0) {
1660                         obj->oo_recsize >>= 3;
1661                         obj->oo_recusize = 8;
1662                 }
1663                 dt->do_index_ops = &osd_index_ops;
1664         }
1665
1666         RETURN(0);
1667 }