Whamcloud - gitweb
LU-6427 osd: race between destroy and load LMV
[fs/lustre-release.git] / lustre / osd-zfs / osd_index.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/osd-zfs/osd_index.c
37  *
38  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
39  * Author: Mike Pershin <tappro@whamcloud.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_OSD
43
44 #include <lustre_ver.h>
45 #include <libcfs/libcfs.h>
46 #include <obd_support.h>
47 #include <lustre_net.h>
48 #include <obd.h>
49 #include <obd_class.h>
50 #include <lustre_disk.h>
51 #include <lustre_fid.h>
52
53 #include "osd_internal.h"
54
55 #include <sys/dnode.h>
56 #include <sys/dbuf.h>
57 #include <sys/spa.h>
58 #include <sys/stat.h>
59 #include <sys/zap.h>
60 #include <sys/spa_impl.h>
61 #include <sys/zfs_znode.h>
62 #include <sys/dmu_tx.h>
63 #include <sys/dmu_objset.h>
64 #include <sys/dsl_prop.h>
65 #include <sys/sa_impl.h>
66 #include <sys/txg.h>
67
68 static inline int osd_object_is_zap(dmu_buf_t *db)
69 {
70         dmu_buf_impl_t *dbi = (dmu_buf_impl_t *) db;
71         dnode_t *dn;
72         int rc;
73
74         DB_DNODE_ENTER(dbi);
75         dn = DB_DNODE(dbi);
76         rc = (dn->dn_type == DMU_OT_DIRECTORY_CONTENTS ||
77                         dn->dn_type == DMU_OT_USERGROUP_USED);
78         DB_DNODE_EXIT(dbi);
79
80         return rc;
81 }
82
83 /* We don't actually have direct access to the zap_hashbits() function
84  * so just pretend like we do for now.  If this ever breaks we can look at
85  * it at that time. */
86 #define zap_hashbits(zc) 48
87 /*
88  * ZFS hash format:
89  * | cd (16 bits) | hash (48 bits) |
90  * we need it in other form:
91  * |0| hash (48 bit) | cd (15 bit) |
92  * to be a full 64-bit ordered hash so that Lustre readdir can use it to merge
93  * the readdir hashes from multiple directory stripes uniformly on the client.
94  * Another point is sign bit, the hash range should be in [0, 2^63-1] because
95  * loff_t (for llseek) needs to be a positive value.  This means the "cd" field
96  * should only be the low 15 bits.
97  */
98 uint64_t osd_zap_cursor_serialize(zap_cursor_t *zc)
99 {
100         uint64_t zfs_hash = zap_cursor_serialize(zc) & (~0ULL >> 1);
101
102         return (zfs_hash >> zap_hashbits(zc)) |
103                 (zfs_hash << (63 - zap_hashbits(zc)));
104 }
105
106 void osd_zap_cursor_init_serialized(zap_cursor_t *zc, struct objset *os,
107                                     uint64_t id, uint64_t dirhash)
108 {
109         uint64_t zfs_hash = ((dirhash << zap_hashbits(zc)) & (~0ULL >> 1)) |
110                 (dirhash >> (63 - zap_hashbits(zc)));
111
112         zap_cursor_init_serialized(zc, os, id, zfs_hash);
113 }
114
115 int osd_zap_cursor_init(zap_cursor_t **zc, struct objset *os,
116                         uint64_t id, uint64_t dirhash)
117 {
118         zap_cursor_t *t;
119
120         OBD_ALLOC_PTR(t);
121         if (unlikely(t == NULL))
122                 return -ENOMEM;
123
124         osd_zap_cursor_init_serialized(t, os, id, dirhash);
125         *zc = t;
126
127         return 0;
128 }
129
130 void osd_zap_cursor_fini(zap_cursor_t *zc)
131 {
132         zap_cursor_fini(zc);
133         OBD_FREE_PTR(zc);
134 }
135
136 static inline void osd_obj_cursor_init_serialized(zap_cursor_t *zc,
137                                                  struct osd_object *o,
138                                                  uint64_t dirhash)
139 {
140         struct osd_device *d = osd_obj2dev(o);
141         osd_zap_cursor_init_serialized(zc, d->od_os,
142                                        o->oo_db->db_object, dirhash);
143 }
144
145 static inline int osd_obj_cursor_init(zap_cursor_t **zc, struct osd_object *o,
146                         uint64_t dirhash)
147 {
148         struct osd_device *d = osd_obj2dev(o);
149         return osd_zap_cursor_init(zc, d->od_os, o->oo_db->db_object, dirhash);
150 }
151
152 static struct dt_it *osd_index_it_init(const struct lu_env *env,
153                                        struct dt_object *dt,
154                                        __u32 unused)
155 {
156         struct osd_thread_info  *info = osd_oti_get(env);
157         struct osd_zap_it       *it;
158         struct osd_object       *obj = osd_dt_obj(dt);
159         struct lu_object        *lo  = &dt->do_lu;
160         int                      rc;
161         ENTRY;
162
163         if (obj->oo_destroyed)
164                 RETURN(ERR_PTR(-ENOENT));
165
166         LASSERT(lu_object_exists(lo));
167         LASSERT(obj->oo_db);
168         LASSERT(osd_object_is_zap(obj->oo_db));
169         LASSERT(info);
170
171         OBD_SLAB_ALLOC_PTR_GFP(it, osd_zapit_cachep, GFP_NOFS);
172         if (it == NULL)
173                 RETURN(ERR_PTR(-ENOMEM));
174
175         rc = osd_obj_cursor_init(&it->ozi_zc, obj, 0);
176         if (rc != 0) {
177                 OBD_SLAB_FREE_PTR(it, osd_zapit_cachep);
178                 RETURN(ERR_PTR(rc));
179         }
180
181         it->ozi_obj   = obj;
182         it->ozi_reset = 1;
183         lu_object_get(lo);
184
185         RETURN((struct dt_it *)it);
186 }
187
188 static void osd_index_it_fini(const struct lu_env *env, struct dt_it *di)
189 {
190         struct osd_zap_it       *it     = (struct osd_zap_it *)di;
191         struct osd_object       *obj;
192         ENTRY;
193
194         LASSERT(it);
195         LASSERT(it->ozi_obj);
196
197         obj = it->ozi_obj;
198
199         osd_zap_cursor_fini(it->ozi_zc);
200         lu_object_put(env, &obj->oo_dt.do_lu);
201         OBD_SLAB_FREE_PTR(it, osd_zapit_cachep);
202
203         EXIT;
204 }
205
206
207 static void osd_index_it_put(const struct lu_env *env, struct dt_it *di)
208 {
209         /* PBS: do nothing : ref are incremented at retrive and decreamented
210          *      next/finish. */
211 }
212
213 static inline void osd_it_append_attrs(struct lu_dirent *ent, __u32 attr,
214                                        int len, __u16 type)
215 {
216         const unsigned    align = sizeof(struct luda_type) - 1;
217         struct luda_type *lt;
218
219         /* check if file type is required */
220         if (attr & LUDA_TYPE) {
221                 len = (len + align) & ~align;
222
223                 lt = (void *)ent->lde_name + len;
224                 lt->lt_type = cpu_to_le16(DTTOIF(type));
225                 ent->lde_attrs |= LUDA_TYPE;
226         }
227
228         ent->lde_attrs = cpu_to_le32(ent->lde_attrs);
229 }
230
231 /*
232  * as we don't know FID, we can't use LU object, so this function
233  * partially duplicate __osd_xattr_get() which is built around
234  * LU-object and uses it to cache data like regular EA dnode, etc
235  */
236 static int osd_find_parent_by_dnode(const struct lu_env *env,
237                                     struct dt_object *o,
238                                     struct lu_fid *fid)
239 {
240         struct osd_device       *osd = osd_obj2dev(osd_dt_obj(o));
241         struct lustre_mdt_attrs *lma;
242         struct lu_buf            buf;
243         sa_handle_t             *sa_hdl;
244         nvlist_t                *nvbuf = NULL;
245         uchar_t                 *value;
246         uint64_t                 dnode;
247         int                      rc, size;
248         ENTRY;
249
250         /* first of all, get parent dnode from own attributes */
251         LASSERT(osd_dt_obj(o)->oo_db);
252         rc = -sa_handle_get(osd->od_os, osd_dt_obj(o)->oo_db->db_object,
253                             NULL, SA_HDL_PRIVATE, &sa_hdl);
254         if (rc)
255                 RETURN(rc);
256
257         dnode = ZFS_NO_OBJECT;
258         rc = -sa_lookup(sa_hdl, SA_ZPL_PARENT(osd), &dnode, 8);
259         sa_handle_destroy(sa_hdl);
260         if (rc)
261                 RETURN(rc);
262
263         /* now get EA buffer */
264         rc = __osd_xattr_load(osd, dnode, &nvbuf);
265         if (rc)
266                 GOTO(regular, rc);
267
268         /* XXX: if we get that far.. should we cache the result? */
269
270         /* try to find LMA attribute */
271         LASSERT(nvbuf != NULL);
272         rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA, &value, &size);
273         if (rc == 0 && size >= sizeof(*lma)) {
274                 lma = (struct lustre_mdt_attrs *)value;
275                 lustre_lma_swab(lma);
276                 *fid = lma->lma_self_fid;
277                 GOTO(out, rc = 0);
278         }
279
280 regular:
281         /* no LMA attribute in SA, let's try regular EA */
282
283         /* first of all, get parent dnode storing regular EA */
284         rc = -sa_handle_get(osd->od_os, dnode, NULL, SA_HDL_PRIVATE, &sa_hdl);
285         if (rc)
286                 GOTO(out, rc);
287
288         dnode = ZFS_NO_OBJECT;
289         rc = -sa_lookup(sa_hdl, SA_ZPL_XATTR(osd), &dnode, 8);
290         sa_handle_destroy(sa_hdl);
291         if (rc)
292                 GOTO(out, rc);
293
294         CLASSERT(sizeof(*lma) <= sizeof(osd_oti_get(env)->oti_buf));
295         buf.lb_buf = osd_oti_get(env)->oti_buf;
296         buf.lb_len = sizeof(osd_oti_get(env)->oti_buf);
297
298         /* now try to find LMA */
299         rc = __osd_xattr_get_large(env, osd, dnode, &buf,
300                                    XATTR_NAME_LMA, &size);
301         if (rc == 0 && size >= sizeof(*lma)) {
302                 lma = buf.lb_buf;
303                 lustre_lma_swab(lma);
304                 *fid = lma->lma_self_fid;
305                 GOTO(out, rc = 0);
306         } else if (rc < 0) {
307                 GOTO(out, rc);
308         } else {
309                 GOTO(out, rc = -EIO);
310         }
311
312 out:
313         if (nvbuf != NULL)
314                 nvlist_free(nvbuf);
315         RETURN(rc);
316 }
317
318 static int osd_find_parent_fid(const struct lu_env *env, struct dt_object *o,
319                                struct lu_fid *fid)
320 {
321         struct link_ea_header  *leh;
322         struct link_ea_entry   *lee;
323         struct lu_buf           buf;
324         int                     rc;
325         ENTRY;
326
327         buf.lb_buf = osd_oti_get(env)->oti_buf;
328         buf.lb_len = sizeof(osd_oti_get(env)->oti_buf);
329
330         rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK);
331         if (rc == -ERANGE) {
332                 rc = osd_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LINK);
333                 if (rc < 0)
334                         RETURN(rc);
335                 LASSERT(rc > 0);
336                 OBD_ALLOC(buf.lb_buf, rc);
337                 if (buf.lb_buf == NULL)
338                         RETURN(-ENOMEM);
339                 buf.lb_len = rc;
340                 rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK);
341         }
342         if (rc < 0)
343                 GOTO(out, rc);
344         if (rc < sizeof(*leh) + sizeof(*lee))
345                 GOTO(out, rc = -EINVAL);
346
347         leh = buf.lb_buf;
348         if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
349                 leh->leh_magic = LINK_EA_MAGIC;
350                 leh->leh_reccount = __swab32(leh->leh_reccount);
351                 leh->leh_len = __swab64(leh->leh_len);
352         }
353         if (leh->leh_magic != LINK_EA_MAGIC)
354                 GOTO(out, rc = -EINVAL);
355         if (leh->leh_reccount == 0)
356                 GOTO(out, rc = -ENODATA);
357
358         lee = (struct link_ea_entry *)(leh + 1);
359         fid_be_to_cpu(fid, (const struct lu_fid *)&lee->lee_parent_fid);
360         rc = 0;
361
362 out:
363         if (buf.lb_buf != osd_oti_get(env)->oti_buf)
364                 OBD_FREE(buf.lb_buf, buf.lb_len);
365
366 #if 0
367         /* this block can be enabled for additional verification
368          * it's trying to match FID from LinkEA vs. FID from LMA */
369         if (rc == 0) {
370                 struct lu_fid fid2;
371                 int rc2;
372                 rc2 = osd_find_parent_by_dnode(env, o, &fid2);
373                 if (rc2 == 0)
374                         if (lu_fid_eq(fid, &fid2) == 0)
375                                 CERROR("wrong parent: "DFID" != "DFID"\n",
376                                        PFID(fid), PFID(&fid2));
377         }
378 #endif
379
380         /* no LinkEA is found, let's try to find the fid in parent's LMA */
381         if (unlikely(rc != 0))
382                 rc = osd_find_parent_by_dnode(env, o, fid);
383
384         RETURN(rc);
385 }
386
387 static int osd_dir_lookup(const struct lu_env *env, struct dt_object *dt,
388                           struct dt_rec *rec, const struct dt_key *key)
389 {
390         struct osd_thread_info *oti = osd_oti_get(env);
391         struct osd_object  *obj = osd_dt_obj(dt);
392         struct osd_device  *osd = osd_obj2dev(obj);
393         char               *name = (char *)key;
394         int                 rc;
395         ENTRY;
396
397         LASSERT(osd_object_is_zap(obj->oo_db));
398
399         if (name[0] == '.') {
400                 if (name[1] == 0) {
401                         const struct lu_fid *f = lu_object_fid(&dt->do_lu);
402                         memcpy(rec, f, sizeof(*f));
403                         RETURN(1);
404                 } else if (name[1] == '.' && name[2] == 0) {
405                         rc = osd_find_parent_fid(env, dt, (struct lu_fid *)rec);
406                         RETURN(rc == 0 ? 1 : rc);
407                 }
408         }
409
410         rc = -zap_lookup(osd->od_os, obj->oo_db->db_object,
411                          (char *)key, 8, sizeof(oti->oti_zde) / 8,
412                          (void *)&oti->oti_zde);
413         memcpy(rec, &oti->oti_zde.lzd_fid, sizeof(struct lu_fid));
414
415         RETURN(rc == 0 ? 1 : rc);
416 }
417
418 static int osd_declare_dir_insert(const struct lu_env *env,
419                                   struct dt_object *dt,
420                                   const struct dt_rec *rec,
421                                   const struct dt_key *key,
422                                   struct thandle *th)
423 {
424         struct osd_object  *obj = osd_dt_obj(dt);
425         struct osd_thandle *oh;
426         uint64_t object;
427         ENTRY;
428
429         LASSERT(th != NULL);
430         oh = container_of0(th, struct osd_thandle, ot_super);
431
432         /* This is for inserting dot/dotdot for new created dir. */
433         if (obj->oo_db == NULL)
434                 object = DMU_NEW_OBJECT;
435         else
436                 object = obj->oo_db->db_object;
437
438         dmu_tx_hold_bonus(oh->ot_tx, object);
439         dmu_tx_hold_zap(oh->ot_tx, object, TRUE, (char *)key);
440
441         RETURN(0);
442 }
443
444 /**
445  * Find the osd object for given fid.
446  *
447  * \param fid need to find the osd object having this fid
448  *
449  * \retval osd_object on success
450  * \retval        -ve on error
451  */
452 struct osd_object *osd_object_find(const struct lu_env *env,
453                                    struct dt_object *dt,
454                                    const struct lu_fid *fid)
455 {
456         struct lu_device         *ludev = dt->do_lu.lo_dev;
457         struct osd_object        *child = NULL;
458         struct lu_object         *luch;
459         struct lu_object         *lo;
460
461         /*
462          * at this point topdev might not exist yet
463          * (i.e. MGS is preparing profiles). so we can
464          * not rely on topdev and instead lookup with
465          * our device passed as topdev. this can't work
466          * if the object isn't cached yet (as osd doesn't
467          * allocate lu_header). IOW, the object must be
468          * in the cache, otherwise lu_object_alloc() crashes
469          * -bzzz
470          */
471         luch = lu_object_find_at(env, ludev, fid, NULL);
472         if (IS_ERR(luch))
473                 return (void *)luch;
474
475         if (lu_object_exists(luch)) {
476                 lo = lu_object_locate(luch->lo_header, ludev->ld_type);
477                 if (lo != NULL)
478                         child = osd_obj(lo);
479                 else
480                         LU_OBJECT_DEBUG(D_ERROR, env, luch,
481                                         "%s: object can't be located "DFID"\n",
482                                         osd_dev(ludev)->od_svname, PFID(fid));
483
484                 if (child == NULL) {
485                         lu_object_put(env, luch);
486                         CERROR("%s: Unable to get osd_object "DFID"\n",
487                                osd_dev(ludev)->od_svname, PFID(fid));
488                         child = ERR_PTR(-ENOENT);
489                 }
490         } else {
491                 LU_OBJECT_DEBUG(D_ERROR, env, luch,
492                                 "%s: lu_object does not exists "DFID"\n",
493                                 osd_dev(ludev)->od_svname, PFID(fid));
494                 lu_object_put(env, luch);
495                 child = ERR_PTR(-ENOENT);
496         }
497
498         return child;
499 }
500
501 /**
502  * Put the osd object once done with it.
503  *
504  * \param obj osd object that needs to be put
505  */
506 static inline void osd_object_put(const struct lu_env *env,
507                                   struct osd_object *obj)
508 {
509         lu_object_put(env, &obj->oo_dt.do_lu);
510 }
511
512 static int osd_seq_exists(const struct lu_env *env, struct osd_device *osd,
513                           u64 seq)
514 {
515         struct lu_seq_range     *range = &osd_oti_get(env)->oti_seq_range;
516         struct seq_server_site  *ss = osd_seq_site(osd);
517         int                     rc;
518         ENTRY;
519
520         LASSERT(ss != NULL);
521         LASSERT(ss->ss_server_fld != NULL);
522
523         rc = osd_fld_lookup(env, osd, seq, range);
524         if (rc != 0) {
525                 if (rc != -ENOENT)
526                         CERROR("%s: Can not lookup fld for "LPX64"\n",
527                                osd_name(osd), seq);
528                 RETURN(0);
529         }
530
531         RETURN(ss->ss_node_id == range->lsr_index);
532 }
533
534 static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
535                           const struct lu_fid *fid)
536 {
537         struct seq_server_site  *ss = osd_seq_site(osd);
538         ENTRY;
539
540         /* FID seqs not in FLDB, must be local seq */
541         if (unlikely(!fid_seq_in_fldb(fid_seq(fid))))
542                 RETURN(0);
543
544         /* If FLD is not being initialized yet, it only happens during the
545          * initialization, likely during mgs initialization, and we assume
546          * this is local FID. */
547         if (ss == NULL || ss->ss_server_fld == NULL)
548                 RETURN(0);
549
550         /* Only check the local FLDB here */
551         if (osd_seq_exists(env, osd, fid_seq(fid)))
552                 RETURN(0);
553
554         RETURN(1);
555 }
556
557 /**
558  *      Inserts (key, value) pair in \a directory object.
559  *
560  *      \param  dt      osd index object
561  *      \param  key     key for index
562  *      \param  rec     record reference
563  *      \param  th      transaction handler
564  *      \param  ignore_quota update should not affect quota
565  *
566  *      \retval  0  success
567  *      \retval -ve failure
568  */
569 static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt,
570                           const struct dt_rec *rec, const struct dt_key *key,
571                           struct thandle *th, int ignore_quota)
572 {
573         struct osd_thread_info *oti = osd_oti_get(env);
574         struct osd_object   *parent = osd_dt_obj(dt);
575         struct osd_device   *osd = osd_obj2dev(parent);
576         struct dt_insert_rec *rec1 = (struct dt_insert_rec *)rec;
577         const struct lu_fid *fid = rec1->rec_fid;
578         struct osd_thandle  *oh;
579         struct osd_object   *child = NULL;
580         __u32                attr;
581         char                *name = (char *)key;
582         int                  rc;
583         ENTRY;
584
585         LASSERT(parent->oo_db);
586         LASSERT(osd_object_is_zap(parent->oo_db));
587
588         LASSERT(dt_object_exists(dt));
589         LASSERT(osd_invariant(parent));
590
591         LASSERT(th != NULL);
592         oh = container_of0(th, struct osd_thandle, ot_super);
593
594         rc = osd_remote_fid(env, osd, fid);
595         if (rc < 0) {
596                 CERROR("%s: Can not find object "DFID": rc = %d\n",
597                        osd->od_svname, PFID(fid), rc);
598                 RETURN(rc);
599         }
600
601         if (unlikely(rc == 1)) {
602                 /* Insert remote entry */
603                 memset(&oti->oti_zde.lzd_reg, 0, sizeof(oti->oti_zde.lzd_reg));
604                 oti->oti_zde.lzd_reg.zde_type = IFTODT(rec1->rec_type & S_IFMT);
605         } else {
606                 /*
607                  * To simulate old Orion setups with ./..  stored in the
608                  * directories
609                  */
610                 /* Insert local entry */
611                 child = osd_object_find(env, dt, fid);
612                 if (IS_ERR(child))
613                         RETURN(PTR_ERR(child));
614
615                 LASSERT(child->oo_db);
616                 if (name[0] == '.') {
617                         if (name[1] == 0) {
618                                 /* do not store ".", instead generate it
619                                  * during iteration */
620                                 GOTO(out, rc = 0);
621                         } else if (name[1] == '.' && name[2] == 0) {
622                                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PARENT)) {
623                                         struct lu_fid tfid = *fid;
624
625                                         osd_object_put(env, child);
626                                         tfid.f_oid--;
627                                         child = osd_object_find(env, dt, &tfid);
628                                         if (IS_ERR(child))
629                                                 RETURN(PTR_ERR(child));
630
631                                         LASSERT(child->oo_db);
632                                 }
633
634                                 /* update parent dnode in the child.
635                                  * later it will be used to generate ".." */
636                                 rc = osd_object_sa_update(parent,
637                                                  SA_ZPL_PARENT(osd),
638                                                  &child->oo_db->db_object,
639                                                  8, oh);
640
641                                 GOTO(out, rc);
642                         }
643                 }
644                 CLASSERT(sizeof(oti->oti_zde.lzd_reg) == 8);
645                 CLASSERT(sizeof(oti->oti_zde) % 8 == 0);
646                 attr = child->oo_dt.do_lu.lo_header ->loh_attr;
647                 oti->oti_zde.lzd_reg.zde_type = IFTODT(attr & S_IFMT);
648                 oti->oti_zde.lzd_reg.zde_dnode = child->oo_db->db_object;
649         }
650
651         oti->oti_zde.lzd_fid = *fid;
652         /* Insert (key,oid) into ZAP */
653         rc = -zap_add(osd->od_os, parent->oo_db->db_object,
654                       (char *)key, 8, sizeof(oti->oti_zde) / 8,
655                       (void *)&oti->oti_zde, oh->ot_tx);
656         if (unlikely(rc == -EEXIST &&
657                      name[0] == '.' && name[1] == '.' && name[2] == 0))
658                 /* Update (key,oid) in ZAP */
659                 rc = -zap_update(osd->od_os, parent->oo_db->db_object,
660                                 (char *)key, 8, sizeof(oti->oti_zde) / 8,
661                                 (void *)&oti->oti_zde, oh->ot_tx);
662
663 out:
664         if (child != NULL)
665                 osd_object_put(env, child);
666
667         RETURN(rc);
668 }
669
670 static int osd_declare_dir_delete(const struct lu_env *env,
671                                   struct dt_object *dt,
672                                   const struct dt_key *key,
673                                   struct thandle *th)
674 {
675         struct osd_object  *obj = osd_dt_obj(dt);
676         struct osd_thandle *oh;
677         uint64_t            dnode;
678         ENTRY;
679
680         LASSERT(dt_object_exists(dt));
681         LASSERT(osd_invariant(obj));
682
683         LASSERT(th != NULL);
684         oh = container_of0(th, struct osd_thandle, ot_super);
685
686         if (dt_object_exists(dt)) {
687                 LASSERT(obj->oo_db);
688                 LASSERT(osd_object_is_zap(obj->oo_db));
689                 dnode = obj->oo_db->db_object;
690         } else {
691                 dnode = DMU_NEW_OBJECT;
692         }
693         dmu_tx_hold_zap(oh->ot_tx, dnode, TRUE, (char *)key);
694
695         RETURN(0);
696 }
697
698 static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt,
699                           const struct dt_key *key, struct thandle *th)
700 {
701         struct osd_object *obj = osd_dt_obj(dt);
702         struct osd_device *osd = osd_obj2dev(obj);
703         struct osd_thandle *oh;
704         dmu_buf_t *zap_db = obj->oo_db;
705         char      *name = (char *)key;
706         int rc;
707         ENTRY;
708
709         LASSERT(zap_db);
710         LASSERT(osd_object_is_zap(zap_db));
711
712         LASSERT(th != NULL);
713         oh = container_of0(th, struct osd_thandle, ot_super);
714
715         /*
716          * In Orion . and .. were stored in the directory (not generated upon
717          * request as now). we preserve them for backward compatibility
718          */
719         if (name[0] == '.') {
720                 if (name[1] == 0) {
721                         RETURN(0);
722                 } else if (name[1] == '.' && name[2] == 0) {
723                         RETURN(0);
724                 }
725         }
726
727         /* Remove key from the ZAP */
728         rc = -zap_remove(osd->od_os, zap_db->db_object,
729                          (char *) key, oh->ot_tx);
730
731         if (unlikely(rc && rc != -ENOENT))
732                 CERROR("%s: zap_remove failed: rc = %d\n", osd->od_svname, rc);
733
734         RETURN(rc);
735 }
736
737 static struct dt_it *osd_dir_it_init(const struct lu_env *env,
738                                      struct dt_object *dt,
739                                      __u32 unused)
740 {
741         struct osd_zap_it *it;
742
743         it = (struct osd_zap_it *)osd_index_it_init(env, dt, unused);
744         if (!IS_ERR(it))
745                 it->ozi_pos = 0;
746
747         RETURN((struct dt_it *)it);
748 }
749
750 /**
751  *  Move Iterator to record specified by \a key
752  *
753  *  \param  di      osd iterator
754  *  \param  key     key for index
755  *
756  *  \retval +ve  di points to record with least key not larger than key
757  *  \retval  0   di points to exact matched key
758  *  \retval -ve  failure
759  */
760 static int osd_dir_it_get(const struct lu_env *env,
761                           struct dt_it *di, const struct dt_key *key)
762 {
763         struct osd_zap_it *it = (struct osd_zap_it *)di;
764         struct osd_object *obj = it->ozi_obj;
765         char              *name = (char *)key;
766         int                rc;
767         ENTRY;
768
769         LASSERT(it);
770         LASSERT(it->ozi_zc);
771
772         /* reset the cursor */
773         zap_cursor_fini(it->ozi_zc);
774         osd_obj_cursor_init_serialized(it->ozi_zc, obj, 0);
775
776         /* XXX: implementation of the API is broken at the moment */
777         LASSERT(((const char *)key)[0] == 0);
778
779         if (name[0] == 0) {
780                 it->ozi_pos = 0;
781                 RETURN(1);
782         }
783
784         if (name[0] == '.') {
785                 if (name[1] == 0) {
786                         it->ozi_pos = 1;
787                         GOTO(out, rc = 1);
788                 } else if (name[1] == '.' && name[2] == 0) {
789                         it->ozi_pos = 2;
790                         GOTO(out, rc = 1);
791                 }
792         }
793
794         /* neither . nor .. - some real record */
795         it->ozi_pos = 3;
796         rc = +1;
797
798 out:
799         RETURN(rc);
800 }
801
802 static void osd_dir_it_put(const struct lu_env *env, struct dt_it *di)
803 {
804         /* PBS: do nothing : ref are incremented at retrive and decreamented
805          *      next/finish. */
806 }
807
808 /*
809  * in Orion . and .. were stored in the directory, while ZPL
810  * and current osd-zfs generate them up on request. so, we
811  * need to ignore previously stored . and ..
812  */
813 static int osd_index_retrieve_skip_dots(struct osd_zap_it *it,
814                                         zap_attribute_t *za)
815 {
816         int rc, isdot;
817
818         do {
819                 rc = -zap_cursor_retrieve(it->ozi_zc, za);
820
821                 isdot = 0;
822                 if (unlikely(rc == 0 && za->za_name[0] == '.')) {
823                         if (za->za_name[1] == 0) {
824                                 isdot = 1;
825                         } else if (za->za_name[1] == '.' &&
826                                    za->za_name[2] == 0) {
827                                 isdot = 1;
828                         }
829                         if (unlikely(isdot))
830                                 zap_cursor_advance(it->ozi_zc);
831                 }
832         } while (unlikely(rc == 0 && isdot));
833
834         return rc;
835 }
836
837 /**
838  * to load a directory entry at a time and stored it in
839  * iterator's in-memory data structure.
840  *
841  * \param di, struct osd_it_ea, iterator's in memory structure
842  *
843  * \retval +ve, iterator reached to end
844  * \retval   0, iterator not reached to end
845  * \retval -ve, on error
846  */
847 static int osd_dir_it_next(const struct lu_env *env, struct dt_it *di)
848 {
849         struct osd_zap_it *it = (struct osd_zap_it *)di;
850         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
851         int                rc;
852
853         ENTRY;
854
855         /* temp. storage should be enough for any key supported by ZFS */
856         CLASSERT(sizeof(za->za_name) <= sizeof(it->ozi_name));
857
858         /*
859          * the first ->next() moves the cursor to .
860          * the second ->next() moves the cursor to ..
861          * then we get to the real records and have to verify any exist
862          */
863         if (it->ozi_pos <= 2) {
864                 it->ozi_pos++;
865                 if (it->ozi_pos <=2)
866                         RETURN(0);
867
868         } else {
869                 zap_cursor_advance(it->ozi_zc);
870         }
871
872         /*
873          * According to current API we need to return error if its last entry.
874          * zap_cursor_advance() does not return any value. So we need to call
875          * retrieve to check if there is any record.  We should make
876          * changes to Iterator API to not return status for this API
877          */
878         rc = osd_index_retrieve_skip_dots(it, za);
879
880         if (rc == -ENOENT) /* end of dir */
881                 RETURN(+1);
882
883         RETURN(rc);
884 }
885
886 static struct dt_key *osd_dir_it_key(const struct lu_env *env,
887                                      const struct dt_it *di)
888 {
889         struct osd_zap_it *it = (struct osd_zap_it *)di;
890         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
891         int                rc = 0;
892         ENTRY;
893
894         if (it->ozi_pos <= 1) {
895                 it->ozi_pos = 1;
896                 RETURN((struct dt_key *)".");
897         } else if (it->ozi_pos == 2) {
898                 RETURN((struct dt_key *)"..");
899         }
900
901         if ((rc = -zap_cursor_retrieve(it->ozi_zc, za)))
902                 RETURN(ERR_PTR(rc));
903
904         strcpy(it->ozi_name, za->za_name);
905
906         RETURN((struct dt_key *)it->ozi_name);
907 }
908
909 static int osd_dir_it_key_size(const struct lu_env *env, const struct dt_it *di)
910 {
911         struct osd_zap_it *it = (struct osd_zap_it *)di;
912         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
913         int                rc;
914         ENTRY;
915
916         if (it->ozi_pos <= 1) {
917                 it->ozi_pos = 1;
918                 RETURN(2);
919         } else if (it->ozi_pos == 2) {
920                 RETURN(3);
921         }
922
923         if ((rc = -zap_cursor_retrieve(it->ozi_zc, za)) == 0)
924                 rc = strlen(za->za_name);
925
926         RETURN(rc);
927 }
928
929 static int osd_dir_it_rec(const struct lu_env *env, const struct dt_it *di,
930                           struct dt_rec *dtrec, __u32 attr)
931 {
932         struct osd_zap_it   *it = (struct osd_zap_it *)di;
933         struct lu_dirent    *lde = (struct lu_dirent *)dtrec;
934         struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
935         zap_attribute_t     *za = &osd_oti_get(env)->oti_za;
936         int                  rc, namelen;
937         ENTRY;
938
939         if (it->ozi_pos <= 1) {
940                 lde->lde_hash = cpu_to_le64(1);
941                 strcpy(lde->lde_name, ".");
942                 lde->lde_namelen = cpu_to_le16(1);
943                 lde->lde_fid = *lu_object_fid(&it->ozi_obj->oo_dt.do_lu);
944                 lde->lde_attrs = LUDA_FID;
945                 /* append lustre attributes */
946                 osd_it_append_attrs(lde, attr, 1, IFTODT(S_IFDIR));
947                 lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(1, attr));
948                 it->ozi_pos = 1;
949                 GOTO(out, rc = 0);
950
951         } else if (it->ozi_pos == 2) {
952                 lde->lde_hash = cpu_to_le64(2);
953                 strcpy(lde->lde_name, "..");
954                 lde->lde_namelen = cpu_to_le16(2);
955                 lde->lde_attrs = LUDA_FID;
956                 /* append lustre attributes */
957                 osd_it_append_attrs(lde, attr, 2, IFTODT(S_IFDIR));
958                 lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(2, attr));
959                 rc = osd_find_parent_fid(env, &it->ozi_obj->oo_dt, &lde->lde_fid);
960
961                 /* ENOENT happens at the root of filesystem so ignore it */
962                 if (rc == -ENOENT)
963                         rc = 0;
964                 GOTO(out, rc);
965         }
966
967         LASSERT(lde);
968
969         rc = -zap_cursor_retrieve(it->ozi_zc, za);
970         if (unlikely(rc != 0))
971                 GOTO(out, rc);
972
973         lde->lde_hash = cpu_to_le64(osd_zap_cursor_serialize(it->ozi_zc));
974         namelen = strlen(za->za_name);
975         if (namelen > NAME_MAX)
976                 GOTO(out, rc = -EOVERFLOW);
977         strcpy(lde->lde_name, za->za_name);
978         lde->lde_namelen = cpu_to_le16(namelen);
979
980         if (za->za_integer_length != 8 || za->za_num_integers < 3) {
981                 CERROR("%s: unsupported direntry format: %d %d\n",
982                        osd_obj2dev(it->ozi_obj)->od_svname,
983                        za->za_integer_length, (int)za->za_num_integers);
984
985                 GOTO(out, rc = -EIO);
986         }
987
988         rc = -zap_lookup(it->ozi_zc->zc_objset, it->ozi_zc->zc_zapobj,
989                          za->za_name, za->za_integer_length, 3, zde);
990         if (rc)
991                 GOTO(out, rc);
992
993         lde->lde_fid = zde->lzd_fid;
994         lde->lde_attrs = LUDA_FID;
995
996         /* append lustre attributes */
997         osd_it_append_attrs(lde, attr, namelen, zde->lzd_reg.zde_type);
998
999         lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(namelen, attr));
1000
1001 out:
1002         RETURN(rc);
1003 }
1004
1005 static int osd_dir_it_rec_size(const struct lu_env *env, const struct dt_it *di,
1006                                __u32 attr)
1007 {
1008         struct osd_zap_it   *it = (struct osd_zap_it *)di;
1009         zap_attribute_t     *za = &osd_oti_get(env)->oti_za;
1010         size_t               namelen = 0;
1011         int                  rc;
1012         ENTRY;
1013
1014         if (it->ozi_pos <= 1)
1015                 namelen = 1;
1016         else if (it->ozi_pos == 2)
1017                 namelen = 2;
1018
1019         if (namelen > 0) {
1020                 rc = lu_dirent_calc_size(namelen, attr);
1021                 RETURN(rc);
1022         }
1023
1024         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1025         if (unlikely(rc != 0))
1026                 RETURN(rc);
1027
1028         if (za->za_integer_length != 8 || za->za_num_integers < 3) {
1029                 CERROR("%s: unsupported direntry format: %d %d\n",
1030                        osd_obj2dev(it->ozi_obj)->od_svname,
1031                        za->za_integer_length, (int)za->za_num_integers);
1032                 RETURN(-EIO);
1033         }
1034
1035         namelen = strlen(za->za_name);
1036         if (namelen > NAME_MAX)
1037                 RETURN(-EOVERFLOW);
1038
1039         rc = lu_dirent_calc_size(namelen, attr);
1040
1041         RETURN(rc);
1042 }
1043
1044 static __u64 osd_dir_it_store(const struct lu_env *env, const struct dt_it *di)
1045 {
1046         struct osd_zap_it *it = (struct osd_zap_it *)di;
1047         __u64              pos;
1048         ENTRY;
1049
1050         if (it->ozi_pos <= 2)
1051                 pos = it->ozi_pos;
1052         else
1053                 pos = osd_zap_cursor_serialize(it->ozi_zc);
1054
1055         RETURN(pos);
1056 }
1057
1058 /*
1059  * return status :
1060  *  rc == 0 -> end of directory.
1061  *  rc >  0 -> ok, proceed.
1062  *  rc <  0 -> error.  ( EOVERFLOW  can be masked.)
1063  */
1064 static int osd_dir_it_load(const struct lu_env *env,
1065                         const struct dt_it *di, __u64 hash)
1066 {
1067         struct osd_zap_it *it = (struct osd_zap_it *)di;
1068         struct osd_object *obj = it->ozi_obj;
1069         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1070         int                rc;
1071         ENTRY;
1072
1073         /* reset the cursor */
1074         zap_cursor_fini(it->ozi_zc);
1075         osd_obj_cursor_init_serialized(it->ozi_zc, obj, hash);
1076
1077         if (hash <= 2) {
1078                 it->ozi_pos = hash;
1079                 rc = +1;
1080         } else {
1081                 it->ozi_pos = 3;
1082                 /* to return whether the end has been reached */
1083                 rc = osd_index_retrieve_skip_dots(it, za);
1084                 if (rc == 0)
1085                         rc = +1;
1086                 else if (rc == -ENOENT)
1087                         rc = 0;
1088         }
1089
1090         RETURN(rc);
1091 }
1092
1093 struct dt_index_operations osd_dir_ops = {
1094         .dio_lookup         = osd_dir_lookup,
1095         .dio_declare_insert = osd_declare_dir_insert,
1096         .dio_insert         = osd_dir_insert,
1097         .dio_declare_delete = osd_declare_dir_delete,
1098         .dio_delete         = osd_dir_delete,
1099         .dio_it     = {
1100                 .init     = osd_dir_it_init,
1101                 .fini     = osd_index_it_fini,
1102                 .get      = osd_dir_it_get,
1103                 .put      = osd_dir_it_put,
1104                 .next     = osd_dir_it_next,
1105                 .key      = osd_dir_it_key,
1106                 .key_size = osd_dir_it_key_size,
1107                 .rec      = osd_dir_it_rec,
1108                 .rec_size = osd_dir_it_rec_size,
1109                 .store    = osd_dir_it_store,
1110                 .load     = osd_dir_it_load
1111         }
1112 };
1113
1114 /*
1115  * Primitives for index files using binary keys.
1116  */
1117
1118 /* key integer_size is 8 */
1119 static int osd_prepare_key_uint64(struct osd_object *o, __u64 *dst,
1120                                   const struct dt_key *src)
1121 {
1122         int size;
1123
1124         LASSERT(dst);
1125         LASSERT(src);
1126
1127         /* align keysize to 64bit */
1128         size = (o->oo_keysize + sizeof(__u64) - 1) / sizeof(__u64);
1129         size *= sizeof(__u64);
1130
1131         LASSERT(size <= MAXNAMELEN);
1132
1133         if (unlikely(size > o->oo_keysize))
1134                 memset(dst + o->oo_keysize, 0, size - o->oo_keysize);
1135         memcpy(dst, (const char *)src, o->oo_keysize);
1136
1137         return (size/sizeof(__u64));
1138 }
1139
1140 static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
1141                         struct dt_rec *rec, const struct dt_key *key)
1142 {
1143         struct osd_object *obj = osd_dt_obj(dt);
1144         struct osd_device *osd = osd_obj2dev(obj);
1145         __u64             *k = osd_oti_get(env)->oti_key64;
1146         int                rc;
1147         ENTRY;
1148
1149         rc = osd_prepare_key_uint64(obj, k, key);
1150
1151         rc = -zap_lookup_uint64(osd->od_os, obj->oo_db->db_object,
1152                                 k, rc, obj->oo_recusize, obj->oo_recsize,
1153                                 (void *)rec);
1154         RETURN(rc == 0 ? 1 : rc);
1155 }
1156
1157 static int osd_declare_index_insert(const struct lu_env *env,
1158                                     struct dt_object *dt,
1159                                     const struct dt_rec *rec,
1160                                     const struct dt_key *key,
1161                                     struct thandle *th)
1162 {
1163         struct osd_object  *obj = osd_dt_obj(dt);
1164         struct osd_thandle *oh;
1165         ENTRY;
1166
1167         LASSERT(th != NULL);
1168         oh = container_of0(th, struct osd_thandle, ot_super);
1169
1170         LASSERT(obj->oo_db);
1171
1172         dmu_tx_hold_bonus(oh->ot_tx, obj->oo_db->db_object);
1173
1174         /* It is not clear what API should be used for binary keys, so we pass
1175          * a null name which has the side effect of over-reserving space,
1176          * accounting for the worst case. See zap_count_write() */
1177         dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, NULL);
1178
1179         RETURN(0);
1180 }
1181
1182 static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
1183                             const struct dt_rec *rec, const struct dt_key *key,
1184                             struct thandle *th, int ignore_quota)
1185 {
1186         struct osd_object  *obj = osd_dt_obj(dt);
1187         struct osd_device  *osd = osd_obj2dev(obj);
1188         struct osd_thandle *oh;
1189         __u64              *k = osd_oti_get(env)->oti_key64;
1190         int                 rc;
1191         ENTRY;
1192
1193         LASSERT(obj->oo_db);
1194         LASSERT(dt_object_exists(dt));
1195         LASSERT(osd_invariant(obj));
1196         LASSERT(th != NULL);
1197
1198         oh = container_of0(th, struct osd_thandle, ot_super);
1199
1200         rc = osd_prepare_key_uint64(obj, k, key);
1201
1202         /* Insert (key,oid) into ZAP */
1203         rc = -zap_add_uint64(osd->od_os, obj->oo_db->db_object,
1204                              k, rc, obj->oo_recusize, obj->oo_recsize,
1205                              (void *)rec, oh->ot_tx);
1206         RETURN(rc);
1207 }
1208
1209 static int osd_declare_index_delete(const struct lu_env *env,
1210                                     struct dt_object *dt,
1211                                     const struct dt_key *key,
1212                                     struct thandle *th)
1213 {
1214         struct osd_object  *obj = osd_dt_obj(dt);
1215         struct osd_thandle *oh;
1216         ENTRY;
1217
1218         LASSERT(dt_object_exists(dt));
1219         LASSERT(osd_invariant(obj));
1220         LASSERT(th != NULL);
1221         LASSERT(obj->oo_db);
1222
1223         oh = container_of0(th, struct osd_thandle, ot_super);
1224         dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, NULL);
1225
1226         RETURN(0);
1227 }
1228
1229 static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
1230                             const struct dt_key *key, struct thandle *th)
1231 {
1232         struct osd_object  *obj = osd_dt_obj(dt);
1233         struct osd_device  *osd = osd_obj2dev(obj);
1234         struct osd_thandle *oh;
1235         __u64              *k = osd_oti_get(env)->oti_key64;
1236         int                 rc;
1237         ENTRY;
1238
1239         LASSERT(obj->oo_db);
1240         LASSERT(th != NULL);
1241         oh = container_of0(th, struct osd_thandle, ot_super);
1242
1243         rc = osd_prepare_key_uint64(obj, k, key);
1244
1245         /* Remove binary key from the ZAP */
1246         rc = -zap_remove_uint64(osd->od_os, obj->oo_db->db_object,
1247                                 k, rc, oh->ot_tx);
1248         RETURN(rc);
1249 }
1250
1251 static int osd_index_it_get(const struct lu_env *env, struct dt_it *di,
1252                             const struct dt_key *key)
1253 {
1254         struct osd_zap_it *it = (struct osd_zap_it *)di;
1255         struct osd_object *obj = it->ozi_obj;
1256         struct osd_device *osd = osd_obj2dev(obj);
1257         ENTRY;
1258
1259         LASSERT(it);
1260         LASSERT(it->ozi_zc);
1261
1262         /*
1263          * XXX: we need a binary version of zap_cursor_move_to_key()
1264          *      to implement this API */
1265         if (*((const __u64 *)key) != 0)
1266                 CERROR("NOT IMPLEMETED YET (move to "LPX64")\n",
1267                        *((__u64 *)key));
1268
1269         zap_cursor_fini(it->ozi_zc);
1270         zap_cursor_init(it->ozi_zc, osd->od_os, obj->oo_db->db_object);
1271         it->ozi_reset = 1;
1272
1273         RETURN(+1);
1274 }
1275
1276 static int osd_index_it_next(const struct lu_env *env, struct dt_it *di)
1277 {
1278         struct osd_zap_it *it = (struct osd_zap_it *)di;
1279         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1280         int                rc;
1281         ENTRY;
1282
1283         if (it->ozi_reset == 0)
1284                 zap_cursor_advance(it->ozi_zc);
1285         it->ozi_reset = 0;
1286
1287         /*
1288          * According to current API we need to return error if it's last entry.
1289          * zap_cursor_advance() does not return any value. So we need to call
1290          * retrieve to check if there is any record.  We should make
1291          * changes to Iterator API to not return status for this API
1292          */
1293         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1294         if (rc == -ENOENT)
1295                 RETURN(+1);
1296
1297         RETURN((rc));
1298 }
1299
1300 static struct dt_key *osd_index_it_key(const struct lu_env *env,
1301                                        const struct dt_it *di)
1302 {
1303         struct osd_zap_it *it = (struct osd_zap_it *)di;
1304         struct osd_object *obj = it->ozi_obj;
1305         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1306         int                rc = 0;
1307         ENTRY;
1308
1309         it->ozi_reset = 0;
1310         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1311         if (rc)
1312                 RETURN(ERR_PTR(rc));
1313
1314         /* the binary key is stored in the name */
1315         memcpy(&it->ozi_key, za->za_name, obj->oo_keysize);
1316
1317         RETURN((struct dt_key *)&it->ozi_key);
1318 }
1319
1320 static int osd_index_it_key_size(const struct lu_env *env,
1321                                 const struct dt_it *di)
1322 {
1323         struct osd_zap_it *it = (struct osd_zap_it *)di;
1324         struct osd_object *obj = it->ozi_obj;
1325         RETURN(obj->oo_keysize);
1326 }
1327
1328 static int osd_index_it_rec(const struct lu_env *env, const struct dt_it *di,
1329                             struct dt_rec *rec, __u32 attr)
1330 {
1331         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1332         struct osd_zap_it *it = (struct osd_zap_it *)di;
1333         struct osd_object *obj = it->ozi_obj;
1334         struct osd_device *osd = osd_obj2dev(obj);
1335         __u64             *k = osd_oti_get(env)->oti_key64;
1336         int                rc;
1337         ENTRY;
1338
1339         it->ozi_reset = 0;
1340         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1341         if (rc)
1342                 RETURN(rc);
1343
1344         rc = osd_prepare_key_uint64(obj, k, (const struct dt_key *)za->za_name);
1345
1346         rc = -zap_lookup_uint64(osd->od_os, obj->oo_db->db_object,
1347                                 k, rc, obj->oo_recusize, obj->oo_recsize,
1348                                 (void *)rec);
1349         RETURN(rc);
1350 }
1351
1352 static __u64 osd_index_it_store(const struct lu_env *env,
1353                                 const struct dt_it *di)
1354 {
1355         struct osd_zap_it *it = (struct osd_zap_it *)di;
1356
1357         it->ozi_reset = 0;
1358         RETURN((__u64)zap_cursor_serialize(it->ozi_zc));
1359 }
1360
1361 static int osd_index_it_load(const struct lu_env *env, const struct dt_it *di,
1362                              __u64 hash)
1363 {
1364         struct osd_zap_it *it = (struct osd_zap_it *)di;
1365         struct osd_object *obj = it->ozi_obj;
1366         struct osd_device *osd = osd_obj2dev(obj);
1367         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1368         int                rc;
1369         ENTRY;
1370
1371         /* reset the cursor */
1372         zap_cursor_fini(it->ozi_zc);
1373         zap_cursor_init_serialized(it->ozi_zc, osd->od_os,
1374                                    obj->oo_db->db_object, hash);
1375         it->ozi_reset = 0;
1376
1377         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1378         if (rc == 0)
1379                 RETURN(+1);
1380         else if (rc == -ENOENT)
1381                 RETURN(0);
1382
1383         RETURN(rc);
1384 }
1385
1386 static struct dt_index_operations osd_index_ops = {
1387         .dio_lookup             = osd_index_lookup,
1388         .dio_declare_insert     = osd_declare_index_insert,
1389         .dio_insert             = osd_index_insert,
1390         .dio_declare_delete     = osd_declare_index_delete,
1391         .dio_delete             = osd_index_delete,
1392         .dio_it = {
1393                 .init           = osd_index_it_init,
1394                 .fini           = osd_index_it_fini,
1395                 .get            = osd_index_it_get,
1396                 .put            = osd_index_it_put,
1397                 .next           = osd_index_it_next,
1398                 .key            = osd_index_it_key,
1399                 .key_size       = osd_index_it_key_size,
1400                 .rec            = osd_index_it_rec,
1401                 .store          = osd_index_it_store,
1402                 .load           = osd_index_it_load
1403         }
1404 };
1405
1406 struct osd_metadnode_it {
1407         struct osd_device       *mit_dev;
1408         __u64                    mit_pos;
1409         struct lu_fid            mit_fid;
1410         int                      mit_prefetched;
1411         __u64                    mit_prefetched_dnode;
1412 };
1413
1414 static struct dt_it *osd_zfs_otable_it_init(const struct lu_env *env,
1415                                             struct dt_object *dt, __u32 attr)
1416 {
1417         struct osd_device       *dev   = osd_dev(dt->do_lu.lo_dev);
1418         struct osd_metadnode_it *it;
1419         ENTRY;
1420
1421         OBD_ALLOC_PTR(it);
1422         if (unlikely(it == NULL))
1423                 RETURN(ERR_PTR(-ENOMEM));
1424
1425         it->mit_dev = dev;
1426
1427         /* XXX: dmu_object_next() does NOT find dnodes allocated
1428          *      in the current non-committed txg, so we force txg
1429          *      commit to find all existing dnodes ... */
1430         txg_wait_synced(dmu_objset_pool(dev->od_os), 0ULL);
1431
1432         RETURN((struct dt_it *)it);
1433 }
1434
1435 static void osd_zfs_otable_it_fini(const struct lu_env *env, struct dt_it *di)
1436 {
1437         struct osd_metadnode_it *it  = (struct osd_metadnode_it *)di;
1438
1439         OBD_FREE_PTR(it);
1440 }
1441
1442 static int osd_zfs_otable_it_get(const struct lu_env *env,
1443                                  struct dt_it *di, const struct dt_key *key)
1444 {
1445         return 0;
1446 }
1447
1448 static void osd_zfs_otable_it_put(const struct lu_env *env, struct dt_it *di)
1449 {
1450 }
1451
1452 #define OTABLE_PREFETCH         256
1453
1454 static void osd_zfs_otable_prefetch(const struct lu_env *env,
1455                                     struct osd_metadnode_it *it)
1456 {
1457         struct osd_device       *dev = it->mit_dev;
1458         int                      rc;
1459
1460         /* can go negative on the very first access to the iterator
1461          * or if some non-Lustre objects were found */
1462         if (unlikely(it->mit_prefetched < 0))
1463                 it->mit_prefetched = 0;
1464
1465         if (it->mit_prefetched >= (OTABLE_PREFETCH >> 1))
1466                 return;
1467
1468         if (it->mit_prefetched_dnode == 0)
1469                 it->mit_prefetched_dnode = it->mit_pos;
1470
1471         while (it->mit_prefetched < OTABLE_PREFETCH) {
1472                 rc = -dmu_object_next(dev->od_os, &it->mit_prefetched_dnode,
1473                                       B_FALSE, 0);
1474                 if (unlikely(rc != 0))
1475                         break;
1476
1477                 /* dmu_prefetch() was exported in 0.6.2, if you use with
1478                  * an older release, just comment it out - this is an
1479                  * optimization */
1480                 dmu_prefetch(dev->od_os, it->mit_prefetched_dnode, 0, 0);
1481
1482                 it->mit_prefetched++;
1483         }
1484 }
1485
1486 static int osd_zfs_otable_it_next(const struct lu_env *env, struct dt_it *di)
1487 {
1488         struct osd_metadnode_it *it  = (struct osd_metadnode_it *)di;
1489         struct lustre_mdt_attrs *lma;
1490         struct osd_device       *dev = it->mit_dev;
1491         nvlist_t                *nvbuf = NULL;
1492         uchar_t                 *v;
1493         __u64                    dnode;
1494         int                      rc, s;
1495
1496         memset(&it->mit_fid, 0, sizeof(it->mit_fid));
1497
1498         dnode = it->mit_pos;
1499         do {
1500                 rc = -dmu_object_next(dev->od_os, &it->mit_pos, B_FALSE, 0);
1501                 if (unlikely(rc != 0))
1502                         GOTO(out, rc = 1);
1503                 it->mit_prefetched--;
1504
1505                 /* LMA is required for this to be a Lustre object.
1506                  * If there is no xattr skip it. */
1507                 rc = __osd_xattr_load(dev, it->mit_pos, &nvbuf);
1508                 if (unlikely(rc != 0))
1509                         continue;
1510
1511                 LASSERT(nvbuf != NULL);
1512                 rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA, &v, &s);
1513                 if (likely(rc == 0)) {
1514                         /* Lustre object */
1515                         lma = (struct lustre_mdt_attrs *)v;
1516                         lustre_lma_swab(lma);
1517                         it->mit_fid = lma->lma_self_fid;
1518                         nvlist_free(nvbuf);
1519                         break;
1520                 } else {
1521                         /* not a Lustre object, try next one */
1522                         nvlist_free(nvbuf);
1523                 }
1524
1525         } while (1);
1526
1527
1528         /* we aren't prefetching in the above loop because the number of
1529          * non-Lustre objects is very small and we will be repeating very
1530          * rare. in case we want to use this to iterate over non-Lustre
1531          * objects (i.e. when we convert regular ZFS in Lustre) it makes
1532          * sense to initiate prefetching in the loop */
1533
1534         /* 0 - there are more items, +1 - the end */
1535         if (likely(rc == 0))
1536                 osd_zfs_otable_prefetch(env, it);
1537
1538         CDEBUG(D_OTHER, "advance: %llu -> %llu "DFID": %d\n", dnode,
1539                it->mit_pos, PFID(&it->mit_fid), rc);
1540
1541 out:
1542         return rc;
1543 }
1544
1545 static struct dt_key *osd_zfs_otable_it_key(const struct lu_env *env,
1546                                             const struct dt_it *di)
1547 {
1548         return NULL;
1549 }
1550
1551 static int osd_zfs_otable_it_key_size(const struct lu_env *env,
1552                                       const struct dt_it *di)
1553 {
1554         return sizeof(__u64);
1555 }
1556
1557 static int osd_zfs_otable_it_rec(const struct lu_env *env,
1558                                  const struct dt_it *di,
1559                                  struct dt_rec *rec, __u32 attr)
1560 {
1561         struct osd_metadnode_it *it  = (struct osd_metadnode_it *)di;
1562         struct lu_fid *fid = (struct lu_fid *)rec;
1563         ENTRY;
1564
1565         *fid = it->mit_fid;
1566
1567         RETURN(0);
1568 }
1569
1570
1571 static __u64 osd_zfs_otable_it_store(const struct lu_env *env,
1572                                      const struct dt_it *di)
1573 {
1574         struct osd_metadnode_it *it  = (struct osd_metadnode_it *)di;
1575
1576         return it->mit_pos;
1577 }
1578
1579 static int osd_zfs_otable_it_load(const struct lu_env *env,
1580                                   const struct dt_it *di, __u64 hash)
1581 {
1582         struct osd_metadnode_it *it  = (struct osd_metadnode_it *)di;
1583
1584         it->mit_pos = hash;
1585         it->mit_prefetched = 0;
1586         it->mit_prefetched_dnode = 0;
1587
1588         return osd_zfs_otable_it_next(env, (struct dt_it *)di);
1589 }
1590
1591 static int osd_zfs_otable_it_key_rec(const struct lu_env *env,
1592                                      const struct dt_it *di, void *key_rec)
1593 {
1594         return 0;
1595 }
1596
1597 const struct dt_index_operations osd_zfs_otable_ops = {
1598         .dio_it = {
1599                 .init     = osd_zfs_otable_it_init,
1600                 .fini     = osd_zfs_otable_it_fini,
1601                 .get      = osd_zfs_otable_it_get,
1602                 .put      = osd_zfs_otable_it_put,
1603                 .next     = osd_zfs_otable_it_next,
1604                 .key      = osd_zfs_otable_it_key,
1605                 .key_size = osd_zfs_otable_it_key_size,
1606                 .rec      = osd_zfs_otable_it_rec,
1607                 .store    = osd_zfs_otable_it_store,
1608                 .load     = osd_zfs_otable_it_load,
1609                 .key_rec  = osd_zfs_otable_it_key_rec,
1610         }
1611 };
1612
1613 int osd_index_try(const struct lu_env *env, struct dt_object *dt,
1614                 const struct dt_index_features *feat)
1615 {
1616         struct osd_object *obj = osd_dt_obj(dt);
1617         ENTRY;
1618
1619         /*
1620          * XXX: implement support for fixed-size keys sorted with natural
1621          *      numerical way (not using internal hash value)
1622          */
1623         if (feat->dif_flags & DT_IND_RANGE)
1624                 RETURN(-ERANGE);
1625
1626         if (unlikely(feat == &dt_otable_features)) {
1627                 dt->do_index_ops = &osd_zfs_otable_ops;
1628                 RETURN(0);
1629         }
1630
1631         LASSERT(!dt_object_exists(dt) || obj->oo_db != NULL);
1632         if (likely(feat == &dt_directory_features)) {
1633                 if (!dt_object_exists(dt) || osd_object_is_zap(obj->oo_db))
1634                         dt->do_index_ops = &osd_dir_ops;
1635                 else
1636                         RETURN(-ENOTDIR);
1637         } else if (unlikely(feat == &dt_acct_features)) {
1638                 LASSERT(fid_is_acct(lu_object_fid(&dt->do_lu)));
1639                 dt->do_index_ops = &osd_acct_index_ops;
1640         } else if (dt->do_index_ops == NULL) {
1641                 /* For index file, we don't support variable key & record sizes
1642                  * and the key has to be unique */
1643                 if ((feat->dif_flags & ~DT_IND_UPDATE) != 0)
1644                         RETURN(-EINVAL);
1645
1646                 if (feat->dif_keysize_max > ZAP_MAXNAMELEN)
1647                         RETURN(-E2BIG);
1648                 if (feat->dif_keysize_max != feat->dif_keysize_min)
1649                         RETURN(-EINVAL);
1650
1651                 /* As for the record size, it should be a multiple of 8 bytes
1652                  * and smaller than the maximum value length supported by ZAP.
1653                  */
1654                 if (feat->dif_recsize_max > ZAP_MAXVALUELEN)
1655                         RETURN(-E2BIG);
1656                 if (feat->dif_recsize_max != feat->dif_recsize_min)
1657                         RETURN(-EINVAL);
1658
1659                 obj->oo_keysize = feat->dif_keysize_max;
1660                 obj->oo_recsize = feat->dif_recsize_max;
1661                 obj->oo_recusize = 1;
1662
1663                 /* ZFS prefers to work with array of 64bits */
1664                 if ((obj->oo_recsize & 7) == 0) {
1665                         obj->oo_recsize >>= 3;
1666                         obj->oo_recusize = 8;
1667                 }
1668                 dt->do_index_ops = &osd_index_ops;
1669         }
1670
1671         RETURN(0);
1672 }