Whamcloud - gitweb
LU-5794 osd: additional checks to verify credits calculation
[fs/lustre-release.git] / lustre / osd-zfs / osd_index.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/osd-zfs/osd_index.c
37  *
38  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
39  * Author: Mike Pershin <tappro@whamcloud.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_OSD
43
44 #include <lustre_ver.h>
45 #include <libcfs/libcfs.h>
46 #include <obd_support.h>
47 #include <lustre_net.h>
48 #include <obd.h>
49 #include <obd_class.h>
50 #include <lustre_disk.h>
51 #include <lustre_fid.h>
52
53 #include "osd_internal.h"
54
55 #include <sys/dnode.h>
56 #include <sys/dbuf.h>
57 #include <sys/spa.h>
58 #include <sys/stat.h>
59 #include <sys/zap.h>
60 #include <sys/spa_impl.h>
61 #include <sys/zfs_znode.h>
62 #include <sys/dmu_tx.h>
63 #include <sys/dmu_objset.h>
64 #include <sys/dsl_prop.h>
65 #include <sys/sa_impl.h>
66 #include <sys/txg.h>
67
68 static inline int osd_object_is_zap(dmu_buf_t *db)
69 {
70         dmu_buf_impl_t *dbi = (dmu_buf_impl_t *) db;
71         dnode_t *dn;
72         int rc;
73
74         DB_DNODE_ENTER(dbi);
75         dn = DB_DNODE(dbi);
76         rc = (dn->dn_type == DMU_OT_DIRECTORY_CONTENTS ||
77                         dn->dn_type == DMU_OT_USERGROUP_USED);
78         DB_DNODE_EXIT(dbi);
79
80         return rc;
81 }
82
83 /* We don't actually have direct access to the zap_hashbits() function
84  * so just pretend like we do for now.  If this ever breaks we can look at
85  * it at that time. */
86 #define zap_hashbits(zc) 48
87 /*
88  * ZFS hash format:
89  * | cd (16 bits) | hash (48 bits) |
90  * we need it in other form:
91  * |0| hash (48 bit) | cd (15 bit) |
92  * to be a full 64-bit ordered hash so that Lustre readdir can use it to merge
93  * the readdir hashes from multiple directory stripes uniformly on the client.
94  * Another point is sign bit, the hash range should be in [0, 2^63-1] because
95  * loff_t (for llseek) needs to be a positive value.  This means the "cd" field
96  * should only be the low 15 bits.
97  */
98 uint64_t osd_zap_cursor_serialize(zap_cursor_t *zc)
99 {
100         uint64_t zfs_hash = zap_cursor_serialize(zc) & (~0ULL >> 1);
101
102         return (zfs_hash >> zap_hashbits(zc)) |
103                 (zfs_hash << (63 - zap_hashbits(zc)));
104 }
105
106 void osd_zap_cursor_init_serialized(zap_cursor_t *zc, struct objset *os,
107                                     uint64_t id, uint64_t dirhash)
108 {
109         uint64_t zfs_hash = ((dirhash << zap_hashbits(zc)) & (~0ULL >> 1)) |
110                 (dirhash >> (63 - zap_hashbits(zc)));
111
112         zap_cursor_init_serialized(zc, os, id, zfs_hash);
113 }
114
115 int osd_zap_cursor_init(zap_cursor_t **zc, struct objset *os,
116                         uint64_t id, uint64_t dirhash)
117 {
118         zap_cursor_t *t;
119
120         OBD_ALLOC_PTR(t);
121         if (unlikely(t == NULL))
122                 return -ENOMEM;
123
124         osd_zap_cursor_init_serialized(t, os, id, dirhash);
125         *zc = t;
126
127         return 0;
128 }
129
130 void osd_zap_cursor_fini(zap_cursor_t *zc)
131 {
132         zap_cursor_fini(zc);
133         OBD_FREE_PTR(zc);
134 }
135
136 static inline void osd_obj_cursor_init_serialized(zap_cursor_t *zc,
137                                                  struct osd_object *o,
138                                                  uint64_t dirhash)
139 {
140         struct osd_device *d = osd_obj2dev(o);
141         osd_zap_cursor_init_serialized(zc, d->od_os,
142                                        o->oo_db->db_object, dirhash);
143 }
144
145 static inline int osd_obj_cursor_init(zap_cursor_t **zc, struct osd_object *o,
146                         uint64_t dirhash)
147 {
148         struct osd_device *d = osd_obj2dev(o);
149         return osd_zap_cursor_init(zc, d->od_os, o->oo_db->db_object, dirhash);
150 }
151
152 static struct dt_it *osd_index_it_init(const struct lu_env *env,
153                                        struct dt_object *dt,
154                                        __u32 unused)
155 {
156         struct osd_thread_info  *info = osd_oti_get(env);
157         struct osd_zap_it       *it;
158         struct osd_object       *obj = osd_dt_obj(dt);
159         struct lu_object        *lo  = &dt->do_lu;
160         int                      rc;
161         ENTRY;
162
163         LASSERT(lu_object_exists(lo));
164         LASSERT(obj->oo_db);
165         LASSERT(osd_object_is_zap(obj->oo_db));
166         LASSERT(info);
167
168         OBD_SLAB_ALLOC_PTR_GFP(it, osd_zapit_cachep, GFP_NOFS);
169         if (it == NULL)
170                 RETURN(ERR_PTR(-ENOMEM));
171
172         rc = osd_obj_cursor_init(&it->ozi_zc, obj, 0);
173         if (rc != 0) {
174                 OBD_SLAB_FREE_PTR(it, osd_zapit_cachep);
175                 RETURN(ERR_PTR(rc));
176         }
177
178         it->ozi_obj   = obj;
179         it->ozi_reset = 1;
180         lu_object_get(lo);
181
182         RETURN((struct dt_it *)it);
183 }
184
185 static void osd_index_it_fini(const struct lu_env *env, struct dt_it *di)
186 {
187         struct osd_zap_it       *it     = (struct osd_zap_it *)di;
188         struct osd_object       *obj;
189         ENTRY;
190
191         LASSERT(it);
192         LASSERT(it->ozi_obj);
193
194         obj = it->ozi_obj;
195
196         osd_zap_cursor_fini(it->ozi_zc);
197         lu_object_put(env, &obj->oo_dt.do_lu);
198         OBD_SLAB_FREE_PTR(it, osd_zapit_cachep);
199
200         EXIT;
201 }
202
203
204 static void osd_index_it_put(const struct lu_env *env, struct dt_it *di)
205 {
206         /* PBS: do nothing : ref are incremented at retrive and decreamented
207          *      next/finish. */
208 }
209
210 static inline void osd_it_append_attrs(struct lu_dirent *ent, __u32 attr,
211                                        int len, __u16 type)
212 {
213         const unsigned    align = sizeof(struct luda_type) - 1;
214         struct luda_type *lt;
215
216         /* check if file type is required */
217         if (attr & LUDA_TYPE) {
218                 len = (len + align) & ~align;
219
220                 lt = (void *)ent->lde_name + len;
221                 lt->lt_type = cpu_to_le16(DTTOIF(type));
222                 ent->lde_attrs |= LUDA_TYPE;
223         }
224
225         ent->lde_attrs = cpu_to_le32(ent->lde_attrs);
226 }
227
228 /*
229  * as we don't know FID, we can't use LU object, so this function
230  * partially duplicate __osd_xattr_get() which is built around
231  * LU-object and uses it to cache data like regular EA dnode, etc
232  */
233 static int osd_find_parent_by_dnode(const struct lu_env *env,
234                                     struct dt_object *o,
235                                     struct lu_fid *fid)
236 {
237         struct osd_device       *osd = osd_obj2dev(osd_dt_obj(o));
238         struct lustre_mdt_attrs *lma;
239         struct lu_buf            buf;
240         sa_handle_t             *sa_hdl;
241         nvlist_t                *nvbuf = NULL;
242         uchar_t                 *value;
243         uint64_t                 dnode;
244         int                      rc, size;
245         ENTRY;
246
247         /* first of all, get parent dnode from own attributes */
248         LASSERT(osd_dt_obj(o)->oo_db);
249         rc = -sa_handle_get(osd->od_os, osd_dt_obj(o)->oo_db->db_object,
250                             NULL, SA_HDL_PRIVATE, &sa_hdl);
251         if (rc)
252                 RETURN(rc);
253
254         dnode = ZFS_NO_OBJECT;
255         rc = -sa_lookup(sa_hdl, SA_ZPL_PARENT(osd), &dnode, 8);
256         sa_handle_destroy(sa_hdl);
257         if (rc)
258                 RETURN(rc);
259
260         /* now get EA buffer */
261         rc = __osd_xattr_load(osd, dnode, &nvbuf);
262         if (rc)
263                 GOTO(regular, rc);
264
265         /* XXX: if we get that far.. should we cache the result? */
266
267         /* try to find LMA attribute */
268         LASSERT(nvbuf != NULL);
269         rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA, &value, &size);
270         if (rc == 0 && size >= sizeof(*lma)) {
271                 lma = (struct lustre_mdt_attrs *)value;
272                 lustre_lma_swab(lma);
273                 *fid = lma->lma_self_fid;
274                 GOTO(out, rc = 0);
275         }
276
277 regular:
278         /* no LMA attribute in SA, let's try regular EA */
279
280         /* first of all, get parent dnode storing regular EA */
281         rc = -sa_handle_get(osd->od_os, dnode, NULL, SA_HDL_PRIVATE, &sa_hdl);
282         if (rc)
283                 GOTO(out, rc);
284
285         dnode = ZFS_NO_OBJECT;
286         rc = -sa_lookup(sa_hdl, SA_ZPL_XATTR(osd), &dnode, 8);
287         sa_handle_destroy(sa_hdl);
288         if (rc)
289                 GOTO(out, rc);
290
291         CLASSERT(sizeof(*lma) <= sizeof(osd_oti_get(env)->oti_buf));
292         buf.lb_buf = osd_oti_get(env)->oti_buf;
293         buf.lb_len = sizeof(osd_oti_get(env)->oti_buf);
294
295         /* now try to find LMA */
296         rc = __osd_xattr_get_large(env, osd, dnode, &buf,
297                                    XATTR_NAME_LMA, &size);
298         if (rc == 0 && size >= sizeof(*lma)) {
299                 lma = buf.lb_buf;
300                 lustre_lma_swab(lma);
301                 *fid = lma->lma_self_fid;
302                 GOTO(out, rc = 0);
303         } else if (rc < 0) {
304                 GOTO(out, rc);
305         } else {
306                 GOTO(out, rc = -EIO);
307         }
308
309 out:
310         if (nvbuf != NULL)
311                 nvlist_free(nvbuf);
312         RETURN(rc);
313 }
314
315 static int osd_find_parent_fid(const struct lu_env *env, struct dt_object *o,
316                                struct lu_fid *fid)
317 {
318         struct link_ea_header  *leh;
319         struct link_ea_entry   *lee;
320         struct lu_buf           buf;
321         int                     rc;
322         ENTRY;
323
324         buf.lb_buf = osd_oti_get(env)->oti_buf;
325         buf.lb_len = sizeof(osd_oti_get(env)->oti_buf);
326
327         rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK);
328         if (rc == -ERANGE) {
329                 rc = osd_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LINK);
330                 if (rc < 0)
331                         RETURN(rc);
332                 LASSERT(rc > 0);
333                 OBD_ALLOC(buf.lb_buf, rc);
334                 if (buf.lb_buf == NULL)
335                         RETURN(-ENOMEM);
336                 buf.lb_len = rc;
337                 rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK);
338         }
339         if (rc < 0)
340                 GOTO(out, rc);
341         if (rc < sizeof(*leh) + sizeof(*lee))
342                 GOTO(out, rc = -EINVAL);
343
344         leh = buf.lb_buf;
345         if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
346                 leh->leh_magic = LINK_EA_MAGIC;
347                 leh->leh_reccount = __swab32(leh->leh_reccount);
348                 leh->leh_len = __swab64(leh->leh_len);
349         }
350         if (leh->leh_magic != LINK_EA_MAGIC)
351                 GOTO(out, rc = -EINVAL);
352         if (leh->leh_reccount == 0)
353                 GOTO(out, rc = -ENODATA);
354
355         lee = (struct link_ea_entry *)(leh + 1);
356         fid_be_to_cpu(fid, (const struct lu_fid *)&lee->lee_parent_fid);
357         rc = 0;
358
359 out:
360         if (buf.lb_buf != osd_oti_get(env)->oti_buf)
361                 OBD_FREE(buf.lb_buf, buf.lb_len);
362
363 #if 0
364         /* this block can be enabled for additional verification
365          * it's trying to match FID from LinkEA vs. FID from LMA */
366         if (rc == 0) {
367                 struct lu_fid fid2;
368                 int rc2;
369                 rc2 = osd_find_parent_by_dnode(env, o, &fid2);
370                 if (rc2 == 0)
371                         if (lu_fid_eq(fid, &fid2) == 0)
372                                 CERROR("wrong parent: "DFID" != "DFID"\n",
373                                        PFID(fid), PFID(&fid2));
374         }
375 #endif
376
377         /* no LinkEA is found, let's try to find the fid in parent's LMA */
378         if (unlikely(rc != 0))
379                 rc = osd_find_parent_by_dnode(env, o, fid);
380
381         RETURN(rc);
382 }
383
384 static int osd_dir_lookup(const struct lu_env *env, struct dt_object *dt,
385                           struct dt_rec *rec, const struct dt_key *key)
386 {
387         struct osd_thread_info *oti = osd_oti_get(env);
388         struct osd_object  *obj = osd_dt_obj(dt);
389         struct osd_device  *osd = osd_obj2dev(obj);
390         char               *name = (char *)key;
391         int                 rc;
392         ENTRY;
393
394         LASSERT(osd_object_is_zap(obj->oo_db));
395
396         if (name[0] == '.') {
397                 if (name[1] == 0) {
398                         const struct lu_fid *f = lu_object_fid(&dt->do_lu);
399                         memcpy(rec, f, sizeof(*f));
400                         RETURN(1);
401                 } else if (name[1] == '.' && name[2] == 0) {
402                         rc = osd_find_parent_fid(env, dt, (struct lu_fid *)rec);
403                         RETURN(rc == 0 ? 1 : rc);
404                 }
405         }
406
407         rc = -zap_lookup(osd->od_os, obj->oo_db->db_object,
408                          (char *)key, 8, sizeof(oti->oti_zde) / 8,
409                          (void *)&oti->oti_zde);
410         memcpy(rec, &oti->oti_zde.lzd_fid, sizeof(struct lu_fid));
411
412         RETURN(rc == 0 ? 1 : rc);
413 }
414
415 static int osd_declare_dir_insert(const struct lu_env *env,
416                                   struct dt_object *dt,
417                                   const struct dt_rec *rec,
418                                   const struct dt_key *key,
419                                   struct thandle *th)
420 {
421         struct osd_object  *obj = osd_dt_obj(dt);
422         struct osd_thandle *oh;
423         uint64_t object;
424         ENTRY;
425
426         LASSERT(th != NULL);
427         oh = container_of0(th, struct osd_thandle, ot_super);
428
429         /* This is for inserting dot/dotdot for new created dir. */
430         if (obj->oo_db == NULL)
431                 object = DMU_NEW_OBJECT;
432         else
433                 object = obj->oo_db->db_object;
434
435         dmu_tx_hold_bonus(oh->ot_tx, object);
436         dmu_tx_hold_zap(oh->ot_tx, object, TRUE, (char *)key);
437
438         RETURN(0);
439 }
440
441 /**
442  * Find the osd object for given fid.
443  *
444  * \param fid need to find the osd object having this fid
445  *
446  * \retval osd_object on success
447  * \retval        -ve on error
448  */
449 struct osd_object *osd_object_find(const struct lu_env *env,
450                                    struct dt_object *dt,
451                                    const struct lu_fid *fid)
452 {
453         struct lu_device         *ludev = dt->do_lu.lo_dev;
454         struct osd_object        *child = NULL;
455         struct lu_object         *luch;
456         struct lu_object         *lo;
457
458         /*
459          * at this point topdev might not exist yet
460          * (i.e. MGS is preparing profiles). so we can
461          * not rely on topdev and instead lookup with
462          * our device passed as topdev. this can't work
463          * if the object isn't cached yet (as osd doesn't
464          * allocate lu_header). IOW, the object must be
465          * in the cache, otherwise lu_object_alloc() crashes
466          * -bzzz
467          */
468         luch = lu_object_find_at(env, ludev, fid, NULL);
469         if (IS_ERR(luch))
470                 return (void *)luch;
471
472         if (lu_object_exists(luch)) {
473                 lo = lu_object_locate(luch->lo_header, ludev->ld_type);
474                 if (lo != NULL)
475                         child = osd_obj(lo);
476                 else
477                         LU_OBJECT_DEBUG(D_ERROR, env, luch,
478                                         "%s: object can't be located "DFID"\n",
479                                         osd_dev(ludev)->od_svname, PFID(fid));
480
481                 if (child == NULL) {
482                         lu_object_put(env, luch);
483                         CERROR("%s: Unable to get osd_object "DFID"\n",
484                                osd_dev(ludev)->od_svname, PFID(fid));
485                         child = ERR_PTR(-ENOENT);
486                 }
487         } else {
488                 LU_OBJECT_DEBUG(D_ERROR, env, luch,
489                                 "%s: lu_object does not exists "DFID"\n",
490                                 osd_dev(ludev)->od_svname, PFID(fid));
491                 lu_object_put(env, luch);
492                 child = ERR_PTR(-ENOENT);
493         }
494
495         return child;
496 }
497
498 /**
499  * Put the osd object once done with it.
500  *
501  * \param obj osd object that needs to be put
502  */
503 static inline void osd_object_put(const struct lu_env *env,
504                                   struct osd_object *obj)
505 {
506         lu_object_put(env, &obj->oo_dt.do_lu);
507 }
508
509 static int osd_seq_exists(const struct lu_env *env, struct osd_device *osd,
510                           u64 seq)
511 {
512         struct lu_seq_range     *range = &osd_oti_get(env)->oti_seq_range;
513         struct seq_server_site  *ss = osd_seq_site(osd);
514         int                     rc;
515         ENTRY;
516
517         LASSERT(ss != NULL);
518         LASSERT(ss->ss_server_fld != NULL);
519
520         rc = osd_fld_lookup(env, osd, seq, range);
521         if (rc != 0) {
522                 if (rc != -ENOENT)
523                         CERROR("%s: Can not lookup fld for "LPX64"\n",
524                                osd_name(osd), seq);
525                 RETURN(0);
526         }
527
528         RETURN(ss->ss_node_id == range->lsr_index);
529 }
530
531 static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
532                           const struct lu_fid *fid)
533 {
534         struct seq_server_site  *ss = osd_seq_site(osd);
535         ENTRY;
536
537         /* FID seqs not in FLDB, must be local seq */
538         if (unlikely(!fid_seq_in_fldb(fid_seq(fid))))
539                 RETURN(0);
540
541         /* If FLD is not being initialized yet, it only happens during the
542          * initialization, likely during mgs initialization, and we assume
543          * this is local FID. */
544         if (ss == NULL || ss->ss_server_fld == NULL)
545                 RETURN(0);
546
547         /* Only check the local FLDB here */
548         if (osd_seq_exists(env, osd, fid_seq(fid)))
549                 RETURN(0);
550
551         RETURN(1);
552 }
553
554 /**
555  *      Inserts (key, value) pair in \a directory object.
556  *
557  *      \param  dt      osd index object
558  *      \param  key     key for index
559  *      \param  rec     record reference
560  *      \param  th      transaction handler
561  *      \param  ignore_quota update should not affect quota
562  *
563  *      \retval  0  success
564  *      \retval -ve failure
565  */
566 static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt,
567                           const struct dt_rec *rec, const struct dt_key *key,
568                           struct thandle *th, int ignore_quota)
569 {
570         struct osd_thread_info *oti = osd_oti_get(env);
571         struct osd_object   *parent = osd_dt_obj(dt);
572         struct osd_device   *osd = osd_obj2dev(parent);
573         struct dt_insert_rec *rec1 = (struct dt_insert_rec *)rec;
574         const struct lu_fid *fid = rec1->rec_fid;
575         struct osd_thandle  *oh;
576         struct osd_object   *child = NULL;
577         __u32                attr;
578         char                *name = (char *)key;
579         int                  rc;
580         ENTRY;
581
582         LASSERT(parent->oo_db);
583         LASSERT(osd_object_is_zap(parent->oo_db));
584
585         LASSERT(dt_object_exists(dt));
586         LASSERT(osd_invariant(parent));
587
588         LASSERT(th != NULL);
589         oh = container_of0(th, struct osd_thandle, ot_super);
590
591         rc = osd_remote_fid(env, osd, fid);
592         if (rc < 0) {
593                 CERROR("%s: Can not find object "DFID": rc = %d\n",
594                        osd->od_svname, PFID(fid), rc);
595                 RETURN(rc);
596         }
597
598         if (unlikely(rc == 1)) {
599                 /* Insert remote entry */
600                 memset(&oti->oti_zde.lzd_reg, 0, sizeof(oti->oti_zde.lzd_reg));
601                 oti->oti_zde.lzd_reg.zde_type = IFTODT(rec1->rec_type & S_IFMT);
602         } else {
603                 /*
604                  * To simulate old Orion setups with ./..  stored in the
605                  * directories
606                  */
607                 /* Insert local entry */
608                 child = osd_object_find(env, dt, fid);
609                 if (IS_ERR(child))
610                         RETURN(PTR_ERR(child));
611
612                 LASSERT(child->oo_db);
613                 if (name[0] == '.') {
614                         if (name[1] == 0) {
615                                 /* do not store ".", instead generate it
616                                  * during iteration */
617                                 GOTO(out, rc = 0);
618                         } else if (name[1] == '.' && name[2] == 0) {
619                                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PARENT)) {
620                                         struct lu_fid tfid = *fid;
621
622                                         osd_object_put(env, child);
623                                         tfid.f_oid--;
624                                         child = osd_object_find(env, dt, &tfid);
625                                         if (IS_ERR(child))
626                                                 RETURN(PTR_ERR(child));
627
628                                         LASSERT(child->oo_db);
629                                 }
630
631                                 /* update parent dnode in the child.
632                                  * later it will be used to generate ".." */
633                                 rc = osd_object_sa_update(parent,
634                                                  SA_ZPL_PARENT(osd),
635                                                  &child->oo_db->db_object,
636                                                  8, oh);
637
638                                 GOTO(out, rc);
639                         }
640                 }
641                 CLASSERT(sizeof(oti->oti_zde.lzd_reg) == 8);
642                 CLASSERT(sizeof(oti->oti_zde) % 8 == 0);
643                 attr = child->oo_dt.do_lu.lo_header ->loh_attr;
644                 oti->oti_zde.lzd_reg.zde_type = IFTODT(attr & S_IFMT);
645                 oti->oti_zde.lzd_reg.zde_dnode = child->oo_db->db_object;
646         }
647
648         oti->oti_zde.lzd_fid = *fid;
649         /* Insert (key,oid) into ZAP */
650         rc = -zap_add(osd->od_os, parent->oo_db->db_object,
651                       (char *)key, 8, sizeof(oti->oti_zde) / 8,
652                       (void *)&oti->oti_zde, oh->ot_tx);
653         if (unlikely(rc == -EEXIST &&
654                      name[0] == '.' && name[1] == '.' && name[2] == 0))
655                 /* Update (key,oid) in ZAP */
656                 rc = -zap_update(osd->od_os, parent->oo_db->db_object,
657                                 (char *)key, 8, sizeof(oti->oti_zde) / 8,
658                                 (void *)&oti->oti_zde, oh->ot_tx);
659
660 out:
661         if (child != NULL)
662                 osd_object_put(env, child);
663
664         RETURN(rc);
665 }
666
667 static int osd_declare_dir_delete(const struct lu_env *env,
668                                   struct dt_object *dt,
669                                   const struct dt_key *key,
670                                   struct thandle *th)
671 {
672         struct osd_object  *obj = osd_dt_obj(dt);
673         struct osd_thandle *oh;
674         uint64_t            dnode;
675         ENTRY;
676
677         LASSERT(dt_object_exists(dt));
678         LASSERT(osd_invariant(obj));
679
680         LASSERT(th != NULL);
681         oh = container_of0(th, struct osd_thandle, ot_super);
682
683         if (dt_object_exists(dt)) {
684                 LASSERT(obj->oo_db);
685                 LASSERT(osd_object_is_zap(obj->oo_db));
686                 dnode = obj->oo_db->db_object;
687         } else {
688                 dnode = DMU_NEW_OBJECT;
689         }
690         dmu_tx_hold_zap(oh->ot_tx, dnode, TRUE, (char *)key);
691
692         RETURN(0);
693 }
694
695 static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt,
696                           const struct dt_key *key, struct thandle *th)
697 {
698         struct osd_object *obj = osd_dt_obj(dt);
699         struct osd_device *osd = osd_obj2dev(obj);
700         struct osd_thandle *oh;
701         dmu_buf_t *zap_db = obj->oo_db;
702         char      *name = (char *)key;
703         int rc;
704         ENTRY;
705
706         LASSERT(zap_db);
707         LASSERT(osd_object_is_zap(zap_db));
708
709         LASSERT(th != NULL);
710         oh = container_of0(th, struct osd_thandle, ot_super);
711
712         /*
713          * In Orion . and .. were stored in the directory (not generated upon
714          * request as now). we preserve them for backward compatibility
715          */
716         if (name[0] == '.') {
717                 if (name[1] == 0) {
718                         RETURN(0);
719                 } else if (name[1] == '.' && name[2] == 0) {
720                         RETURN(0);
721                 }
722         }
723
724         /* Remove key from the ZAP */
725         rc = -zap_remove(osd->od_os, zap_db->db_object,
726                          (char *) key, oh->ot_tx);
727
728         if (unlikely(rc && rc != -ENOENT))
729                 CERROR("%s: zap_remove failed: rc = %d\n", osd->od_svname, rc);
730
731         RETURN(rc);
732 }
733
734 static struct dt_it *osd_dir_it_init(const struct lu_env *env,
735                                      struct dt_object *dt,
736                                      __u32 unused)
737 {
738         struct osd_zap_it *it;
739
740         it = (struct osd_zap_it *)osd_index_it_init(env, dt, unused);
741         if (!IS_ERR(it))
742                 it->ozi_pos = 0;
743
744         RETURN((struct dt_it *)it);
745 }
746
747 /**
748  *  Move Iterator to record specified by \a key
749  *
750  *  \param  di      osd iterator
751  *  \param  key     key for index
752  *
753  *  \retval +ve  di points to record with least key not larger than key
754  *  \retval  0   di points to exact matched key
755  *  \retval -ve  failure
756  */
757 static int osd_dir_it_get(const struct lu_env *env,
758                           struct dt_it *di, const struct dt_key *key)
759 {
760         struct osd_zap_it *it = (struct osd_zap_it *)di;
761         struct osd_object *obj = it->ozi_obj;
762         char              *name = (char *)key;
763         int                rc;
764         ENTRY;
765
766         LASSERT(it);
767         LASSERT(it->ozi_zc);
768
769         /* reset the cursor */
770         zap_cursor_fini(it->ozi_zc);
771         osd_obj_cursor_init_serialized(it->ozi_zc, obj, 0);
772
773         /* XXX: implementation of the API is broken at the moment */
774         LASSERT(((const char *)key)[0] == 0);
775
776         if (name[0] == 0) {
777                 it->ozi_pos = 0;
778                 RETURN(1);
779         }
780
781         if (name[0] == '.') {
782                 if (name[1] == 0) {
783                         it->ozi_pos = 1;
784                         GOTO(out, rc = 1);
785                 } else if (name[1] == '.' && name[2] == 0) {
786                         it->ozi_pos = 2;
787                         GOTO(out, rc = 1);
788                 }
789         }
790
791         /* neither . nor .. - some real record */
792         it->ozi_pos = 3;
793         rc = +1;
794
795 out:
796         RETURN(rc);
797 }
798
799 static void osd_dir_it_put(const struct lu_env *env, struct dt_it *di)
800 {
801         /* PBS: do nothing : ref are incremented at retrive and decreamented
802          *      next/finish. */
803 }
804
805 /*
806  * in Orion . and .. were stored in the directory, while ZPL
807  * and current osd-zfs generate them up on request. so, we
808  * need to ignore previously stored . and ..
809  */
810 static int osd_index_retrieve_skip_dots(struct osd_zap_it *it,
811                                         zap_attribute_t *za)
812 {
813         int rc, isdot;
814
815         do {
816                 rc = -zap_cursor_retrieve(it->ozi_zc, za);
817
818                 isdot = 0;
819                 if (unlikely(rc == 0 && za->za_name[0] == '.')) {
820                         if (za->za_name[1] == 0) {
821                                 isdot = 1;
822                         } else if (za->za_name[1] == '.' &&
823                                    za->za_name[2] == 0) {
824                                 isdot = 1;
825                         }
826                         if (unlikely(isdot))
827                                 zap_cursor_advance(it->ozi_zc);
828                 }
829         } while (unlikely(rc == 0 && isdot));
830
831         return rc;
832 }
833
834 /**
835  * to load a directory entry at a time and stored it in
836  * iterator's in-memory data structure.
837  *
838  * \param di, struct osd_it_ea, iterator's in memory structure
839  *
840  * \retval +ve, iterator reached to end
841  * \retval   0, iterator not reached to end
842  * \retval -ve, on error
843  */
844 static int osd_dir_it_next(const struct lu_env *env, struct dt_it *di)
845 {
846         struct osd_zap_it *it = (struct osd_zap_it *)di;
847         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
848         int                rc;
849
850         ENTRY;
851
852         /* temp. storage should be enough for any key supported by ZFS */
853         CLASSERT(sizeof(za->za_name) <= sizeof(it->ozi_name));
854
855         /*
856          * the first ->next() moves the cursor to .
857          * the second ->next() moves the cursor to ..
858          * then we get to the real records and have to verify any exist
859          */
860         if (it->ozi_pos <= 2) {
861                 it->ozi_pos++;
862                 if (it->ozi_pos <=2)
863                         RETURN(0);
864
865         } else {
866                 zap_cursor_advance(it->ozi_zc);
867         }
868
869         /*
870          * According to current API we need to return error if its last entry.
871          * zap_cursor_advance() does not return any value. So we need to call
872          * retrieve to check if there is any record.  We should make
873          * changes to Iterator API to not return status for this API
874          */
875         rc = osd_index_retrieve_skip_dots(it, za);
876
877         if (rc == -ENOENT) /* end of dir */
878                 RETURN(+1);
879
880         RETURN(rc);
881 }
882
883 static struct dt_key *osd_dir_it_key(const struct lu_env *env,
884                                      const struct dt_it *di)
885 {
886         struct osd_zap_it *it = (struct osd_zap_it *)di;
887         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
888         int                rc = 0;
889         ENTRY;
890
891         if (it->ozi_pos <= 1) {
892                 it->ozi_pos = 1;
893                 RETURN((struct dt_key *)".");
894         } else if (it->ozi_pos == 2) {
895                 RETURN((struct dt_key *)"..");
896         }
897
898         if ((rc = -zap_cursor_retrieve(it->ozi_zc, za)))
899                 RETURN(ERR_PTR(rc));
900
901         strcpy(it->ozi_name, za->za_name);
902
903         RETURN((struct dt_key *)it->ozi_name);
904 }
905
906 static int osd_dir_it_key_size(const struct lu_env *env, const struct dt_it *di)
907 {
908         struct osd_zap_it *it = (struct osd_zap_it *)di;
909         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
910         int                rc;
911         ENTRY;
912
913         if (it->ozi_pos <= 1) {
914                 it->ozi_pos = 1;
915                 RETURN(2);
916         } else if (it->ozi_pos == 2) {
917                 RETURN(3);
918         }
919
920         if ((rc = -zap_cursor_retrieve(it->ozi_zc, za)) == 0)
921                 rc = strlen(za->za_name);
922
923         RETURN(rc);
924 }
925
926 static int osd_dir_it_rec(const struct lu_env *env, const struct dt_it *di,
927                           struct dt_rec *dtrec, __u32 attr)
928 {
929         struct osd_zap_it   *it = (struct osd_zap_it *)di;
930         struct lu_dirent    *lde = (struct lu_dirent *)dtrec;
931         struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
932         zap_attribute_t     *za = &osd_oti_get(env)->oti_za;
933         int                  rc, namelen;
934         ENTRY;
935
936         if (it->ozi_pos <= 1) {
937                 lde->lde_hash = cpu_to_le64(1);
938                 strcpy(lde->lde_name, ".");
939                 lde->lde_namelen = cpu_to_le16(1);
940                 lde->lde_fid = *lu_object_fid(&it->ozi_obj->oo_dt.do_lu);
941                 lde->lde_attrs = LUDA_FID;
942                 /* append lustre attributes */
943                 osd_it_append_attrs(lde, attr, 1, IFTODT(S_IFDIR));
944                 lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(1, attr));
945                 it->ozi_pos = 1;
946                 GOTO(out, rc = 0);
947
948         } else if (it->ozi_pos == 2) {
949                 lde->lde_hash = cpu_to_le64(2);
950                 strcpy(lde->lde_name, "..");
951                 lde->lde_namelen = cpu_to_le16(2);
952                 lde->lde_attrs = LUDA_FID;
953                 /* append lustre attributes */
954                 osd_it_append_attrs(lde, attr, 2, IFTODT(S_IFDIR));
955                 lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(2, attr));
956                 rc = osd_find_parent_fid(env, &it->ozi_obj->oo_dt, &lde->lde_fid);
957
958                 /* ENOENT happens at the root of filesystem so ignore it */
959                 if (rc == -ENOENT)
960                         rc = 0;
961                 GOTO(out, rc);
962         }
963
964         LASSERT(lde);
965
966         rc = -zap_cursor_retrieve(it->ozi_zc, za);
967         if (unlikely(rc != 0))
968                 GOTO(out, rc);
969
970         lde->lde_hash = cpu_to_le64(osd_zap_cursor_serialize(it->ozi_zc));
971         namelen = strlen(za->za_name);
972         if (namelen > NAME_MAX)
973                 GOTO(out, rc = -EOVERFLOW);
974         strcpy(lde->lde_name, za->za_name);
975         lde->lde_namelen = cpu_to_le16(namelen);
976
977         if (za->za_integer_length != 8 || za->za_num_integers < 3) {
978                 CERROR("%s: unsupported direntry format: %d %d\n",
979                        osd_obj2dev(it->ozi_obj)->od_svname,
980                        za->za_integer_length, (int)za->za_num_integers);
981
982                 GOTO(out, rc = -EIO);
983         }
984
985         rc = -zap_lookup(it->ozi_zc->zc_objset, it->ozi_zc->zc_zapobj,
986                          za->za_name, za->za_integer_length, 3, zde);
987         if (rc)
988                 GOTO(out, rc);
989
990         lde->lde_fid = zde->lzd_fid;
991         lde->lde_attrs = LUDA_FID;
992
993         /* append lustre attributes */
994         osd_it_append_attrs(lde, attr, namelen, zde->lzd_reg.zde_type);
995
996         lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(namelen, attr));
997
998 out:
999         RETURN(rc);
1000 }
1001
1002 static int osd_dir_it_rec_size(const struct lu_env *env, const struct dt_it *di,
1003                                __u32 attr)
1004 {
1005         struct osd_zap_it   *it = (struct osd_zap_it *)di;
1006         zap_attribute_t     *za = &osd_oti_get(env)->oti_za;
1007         size_t               namelen = 0;
1008         int                  rc;
1009         ENTRY;
1010
1011         if (it->ozi_pos <= 1)
1012                 namelen = 1;
1013         else if (it->ozi_pos == 2)
1014                 namelen = 2;
1015
1016         if (namelen > 0) {
1017                 rc = lu_dirent_calc_size(namelen, attr);
1018                 RETURN(rc);
1019         }
1020
1021         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1022         if (unlikely(rc != 0))
1023                 RETURN(rc);
1024
1025         if (za->za_integer_length != 8 || za->za_num_integers < 3) {
1026                 CERROR("%s: unsupported direntry format: %d %d\n",
1027                        osd_obj2dev(it->ozi_obj)->od_svname,
1028                        za->za_integer_length, (int)za->za_num_integers);
1029                 RETURN(-EIO);
1030         }
1031
1032         namelen = strlen(za->za_name);
1033         if (namelen > NAME_MAX)
1034                 RETURN(-EOVERFLOW);
1035
1036         rc = lu_dirent_calc_size(namelen, attr);
1037
1038         RETURN(rc);
1039 }
1040
1041 static __u64 osd_dir_it_store(const struct lu_env *env, const struct dt_it *di)
1042 {
1043         struct osd_zap_it *it = (struct osd_zap_it *)di;
1044         __u64              pos;
1045         ENTRY;
1046
1047         if (it->ozi_pos <= 2)
1048                 pos = it->ozi_pos;
1049         else
1050                 pos = osd_zap_cursor_serialize(it->ozi_zc);
1051
1052         RETURN(pos);
1053 }
1054
1055 /*
1056  * return status :
1057  *  rc == 0 -> end of directory.
1058  *  rc >  0 -> ok, proceed.
1059  *  rc <  0 -> error.  ( EOVERFLOW  can be masked.)
1060  */
1061 static int osd_dir_it_load(const struct lu_env *env,
1062                         const struct dt_it *di, __u64 hash)
1063 {
1064         struct osd_zap_it *it = (struct osd_zap_it *)di;
1065         struct osd_object *obj = it->ozi_obj;
1066         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1067         int                rc;
1068         ENTRY;
1069
1070         /* reset the cursor */
1071         zap_cursor_fini(it->ozi_zc);
1072         osd_obj_cursor_init_serialized(it->ozi_zc, obj, hash);
1073
1074         if (hash <= 2) {
1075                 it->ozi_pos = hash;
1076                 rc = +1;
1077         } else {
1078                 it->ozi_pos = 3;
1079                 /* to return whether the end has been reached */
1080                 rc = osd_index_retrieve_skip_dots(it, za);
1081                 if (rc == 0)
1082                         rc = +1;
1083                 else if (rc == -ENOENT)
1084                         rc = 0;
1085         }
1086
1087         RETURN(rc);
1088 }
1089
1090 struct dt_index_operations osd_dir_ops = {
1091         .dio_lookup         = osd_dir_lookup,
1092         .dio_declare_insert = osd_declare_dir_insert,
1093         .dio_insert         = osd_dir_insert,
1094         .dio_declare_delete = osd_declare_dir_delete,
1095         .dio_delete         = osd_dir_delete,
1096         .dio_it     = {
1097                 .init     = osd_dir_it_init,
1098                 .fini     = osd_index_it_fini,
1099                 .get      = osd_dir_it_get,
1100                 .put      = osd_dir_it_put,
1101                 .next     = osd_dir_it_next,
1102                 .key      = osd_dir_it_key,
1103                 .key_size = osd_dir_it_key_size,
1104                 .rec      = osd_dir_it_rec,
1105                 .rec_size = osd_dir_it_rec_size,
1106                 .store    = osd_dir_it_store,
1107                 .load     = osd_dir_it_load
1108         }
1109 };
1110
1111 /*
1112  * Primitives for index files using binary keys.
1113  */
1114
1115 /* key integer_size is 8 */
1116 static int osd_prepare_key_uint64(struct osd_object *o, __u64 *dst,
1117                                   const struct dt_key *src)
1118 {
1119         int size;
1120
1121         LASSERT(dst);
1122         LASSERT(src);
1123
1124         /* align keysize to 64bit */
1125         size = (o->oo_keysize + sizeof(__u64) - 1) / sizeof(__u64);
1126         size *= sizeof(__u64);
1127
1128         LASSERT(size <= MAXNAMELEN);
1129
1130         if (unlikely(size > o->oo_keysize))
1131                 memset(dst + o->oo_keysize, 0, size - o->oo_keysize);
1132         memcpy(dst, (const char *)src, o->oo_keysize);
1133
1134         return (size/sizeof(__u64));
1135 }
1136
1137 static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
1138                         struct dt_rec *rec, const struct dt_key *key)
1139 {
1140         struct osd_object *obj = osd_dt_obj(dt);
1141         struct osd_device *osd = osd_obj2dev(obj);
1142         __u64             *k = osd_oti_get(env)->oti_key64;
1143         int                rc;
1144         ENTRY;
1145
1146         rc = osd_prepare_key_uint64(obj, k, key);
1147
1148         rc = -zap_lookup_uint64(osd->od_os, obj->oo_db->db_object,
1149                                 k, rc, obj->oo_recusize, obj->oo_recsize,
1150                                 (void *)rec);
1151         RETURN(rc == 0 ? 1 : rc);
1152 }
1153
1154 static int osd_declare_index_insert(const struct lu_env *env,
1155                                     struct dt_object *dt,
1156                                     const struct dt_rec *rec,
1157                                     const struct dt_key *key,
1158                                     struct thandle *th)
1159 {
1160         struct osd_object  *obj = osd_dt_obj(dt);
1161         struct osd_thandle *oh;
1162         ENTRY;
1163
1164         LASSERT(th != NULL);
1165         oh = container_of0(th, struct osd_thandle, ot_super);
1166
1167         LASSERT(obj->oo_db);
1168
1169         dmu_tx_hold_bonus(oh->ot_tx, obj->oo_db->db_object);
1170
1171         /* It is not clear what API should be used for binary keys, so we pass
1172          * a null name which has the side effect of over-reserving space,
1173          * accounting for the worst case. See zap_count_write() */
1174         dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, NULL);
1175
1176         RETURN(0);
1177 }
1178
1179 static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
1180                             const struct dt_rec *rec, const struct dt_key *key,
1181                             struct thandle *th, int ignore_quota)
1182 {
1183         struct osd_object  *obj = osd_dt_obj(dt);
1184         struct osd_device  *osd = osd_obj2dev(obj);
1185         struct osd_thandle *oh;
1186         __u64              *k = osd_oti_get(env)->oti_key64;
1187         int                 rc;
1188         ENTRY;
1189
1190         LASSERT(obj->oo_db);
1191         LASSERT(dt_object_exists(dt));
1192         LASSERT(osd_invariant(obj));
1193         LASSERT(th != NULL);
1194
1195         oh = container_of0(th, struct osd_thandle, ot_super);
1196
1197         rc = osd_prepare_key_uint64(obj, k, key);
1198
1199         /* Insert (key,oid) into ZAP */
1200         rc = -zap_add_uint64(osd->od_os, obj->oo_db->db_object,
1201                              k, rc, obj->oo_recusize, obj->oo_recsize,
1202                              (void *)rec, oh->ot_tx);
1203         RETURN(rc);
1204 }
1205
1206 static int osd_declare_index_delete(const struct lu_env *env,
1207                                     struct dt_object *dt,
1208                                     const struct dt_key *key,
1209                                     struct thandle *th)
1210 {
1211         struct osd_object  *obj = osd_dt_obj(dt);
1212         struct osd_thandle *oh;
1213         ENTRY;
1214
1215         LASSERT(dt_object_exists(dt));
1216         LASSERT(osd_invariant(obj));
1217         LASSERT(th != NULL);
1218         LASSERT(obj->oo_db);
1219
1220         oh = container_of0(th, struct osd_thandle, ot_super);
1221         dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, NULL);
1222
1223         RETURN(0);
1224 }
1225
1226 static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
1227                             const struct dt_key *key, struct thandle *th)
1228 {
1229         struct osd_object  *obj = osd_dt_obj(dt);
1230         struct osd_device  *osd = osd_obj2dev(obj);
1231         struct osd_thandle *oh;
1232         __u64              *k = osd_oti_get(env)->oti_key64;
1233         int                 rc;
1234         ENTRY;
1235
1236         LASSERT(obj->oo_db);
1237         LASSERT(th != NULL);
1238         oh = container_of0(th, struct osd_thandle, ot_super);
1239
1240         rc = osd_prepare_key_uint64(obj, k, key);
1241
1242         /* Remove binary key from the ZAP */
1243         rc = -zap_remove_uint64(osd->od_os, obj->oo_db->db_object,
1244                                 k, rc, oh->ot_tx);
1245         RETURN(rc);
1246 }
1247
1248 static int osd_index_it_get(const struct lu_env *env, struct dt_it *di,
1249                             const struct dt_key *key)
1250 {
1251         struct osd_zap_it *it = (struct osd_zap_it *)di;
1252         struct osd_object *obj = it->ozi_obj;
1253         struct osd_device *osd = osd_obj2dev(obj);
1254         ENTRY;
1255
1256         LASSERT(it);
1257         LASSERT(it->ozi_zc);
1258
1259         /*
1260          * XXX: we need a binary version of zap_cursor_move_to_key()
1261          *      to implement this API */
1262         if (*((const __u64 *)key) != 0)
1263                 CERROR("NOT IMPLEMETED YET (move to "LPX64")\n",
1264                        *((__u64 *)key));
1265
1266         zap_cursor_fini(it->ozi_zc);
1267         zap_cursor_init(it->ozi_zc, osd->od_os, obj->oo_db->db_object);
1268         it->ozi_reset = 1;
1269
1270         RETURN(+1);
1271 }
1272
1273 static int osd_index_it_next(const struct lu_env *env, struct dt_it *di)
1274 {
1275         struct osd_zap_it *it = (struct osd_zap_it *)di;
1276         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1277         int                rc;
1278         ENTRY;
1279
1280         if (it->ozi_reset == 0)
1281                 zap_cursor_advance(it->ozi_zc);
1282         it->ozi_reset = 0;
1283
1284         /*
1285          * According to current API we need to return error if it's last entry.
1286          * zap_cursor_advance() does not return any value. So we need to call
1287          * retrieve to check if there is any record.  We should make
1288          * changes to Iterator API to not return status for this API
1289          */
1290         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1291         if (rc == -ENOENT)
1292                 RETURN(+1);
1293
1294         RETURN((rc));
1295 }
1296
1297 static struct dt_key *osd_index_it_key(const struct lu_env *env,
1298                                        const struct dt_it *di)
1299 {
1300         struct osd_zap_it *it = (struct osd_zap_it *)di;
1301         struct osd_object *obj = it->ozi_obj;
1302         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1303         int                rc = 0;
1304         ENTRY;
1305
1306         it->ozi_reset = 0;
1307         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1308         if (rc)
1309                 RETURN(ERR_PTR(rc));
1310
1311         /* the binary key is stored in the name */
1312         memcpy(&it->ozi_key, za->za_name, obj->oo_keysize);
1313
1314         RETURN((struct dt_key *)&it->ozi_key);
1315 }
1316
1317 static int osd_index_it_key_size(const struct lu_env *env,
1318                                 const struct dt_it *di)
1319 {
1320         struct osd_zap_it *it = (struct osd_zap_it *)di;
1321         struct osd_object *obj = it->ozi_obj;
1322         RETURN(obj->oo_keysize);
1323 }
1324
1325 static int osd_index_it_rec(const struct lu_env *env, const struct dt_it *di,
1326                             struct dt_rec *rec, __u32 attr)
1327 {
1328         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1329         struct osd_zap_it *it = (struct osd_zap_it *)di;
1330         struct osd_object *obj = it->ozi_obj;
1331         struct osd_device *osd = osd_obj2dev(obj);
1332         __u64             *k = osd_oti_get(env)->oti_key64;
1333         int                rc;
1334         ENTRY;
1335
1336         it->ozi_reset = 0;
1337         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1338         if (rc)
1339                 RETURN(rc);
1340
1341         rc = osd_prepare_key_uint64(obj, k, (const struct dt_key *)za->za_name);
1342
1343         rc = -zap_lookup_uint64(osd->od_os, obj->oo_db->db_object,
1344                                 k, rc, obj->oo_recusize, obj->oo_recsize,
1345                                 (void *)rec);
1346         RETURN(rc);
1347 }
1348
1349 static __u64 osd_index_it_store(const struct lu_env *env,
1350                                 const struct dt_it *di)
1351 {
1352         struct osd_zap_it *it = (struct osd_zap_it *)di;
1353
1354         it->ozi_reset = 0;
1355         RETURN((__u64)zap_cursor_serialize(it->ozi_zc));
1356 }
1357
1358 static int osd_index_it_load(const struct lu_env *env, const struct dt_it *di,
1359                              __u64 hash)
1360 {
1361         struct osd_zap_it *it = (struct osd_zap_it *)di;
1362         struct osd_object *obj = it->ozi_obj;
1363         struct osd_device *osd = osd_obj2dev(obj);
1364         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1365         int                rc;
1366         ENTRY;
1367
1368         /* reset the cursor */
1369         zap_cursor_fini(it->ozi_zc);
1370         zap_cursor_init_serialized(it->ozi_zc, osd->od_os,
1371                                    obj->oo_db->db_object, hash);
1372         it->ozi_reset = 0;
1373
1374         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1375         if (rc == 0)
1376                 RETURN(+1);
1377         else if (rc == -ENOENT)
1378                 RETURN(0);
1379
1380         RETURN(rc);
1381 }
1382
1383 static struct dt_index_operations osd_index_ops = {
1384         .dio_lookup             = osd_index_lookup,
1385         .dio_declare_insert     = osd_declare_index_insert,
1386         .dio_insert             = osd_index_insert,
1387         .dio_declare_delete     = osd_declare_index_delete,
1388         .dio_delete             = osd_index_delete,
1389         .dio_it = {
1390                 .init           = osd_index_it_init,
1391                 .fini           = osd_index_it_fini,
1392                 .get            = osd_index_it_get,
1393                 .put            = osd_index_it_put,
1394                 .next           = osd_index_it_next,
1395                 .key            = osd_index_it_key,
1396                 .key_size       = osd_index_it_key_size,
1397                 .rec            = osd_index_it_rec,
1398                 .store          = osd_index_it_store,
1399                 .load           = osd_index_it_load
1400         }
1401 };
1402
1403 struct osd_metadnode_it {
1404         struct osd_device       *mit_dev;
1405         __u64                    mit_pos;
1406         struct lu_fid            mit_fid;
1407         int                      mit_prefetched;
1408         __u64                    mit_prefetched_dnode;
1409 };
1410
1411 static struct dt_it *osd_zfs_otable_it_init(const struct lu_env *env,
1412                                             struct dt_object *dt, __u32 attr)
1413 {
1414         struct osd_device       *dev   = osd_dev(dt->do_lu.lo_dev);
1415         struct osd_metadnode_it *it;
1416         ENTRY;
1417
1418         OBD_ALLOC_PTR(it);
1419         if (unlikely(it == NULL))
1420                 RETURN(ERR_PTR(-ENOMEM));
1421
1422         it->mit_dev = dev;
1423
1424         /* XXX: dmu_object_next() does NOT find dnodes allocated
1425          *      in the current non-committed txg, so we force txg
1426          *      commit to find all existing dnodes ... */
1427         txg_wait_synced(dmu_objset_pool(dev->od_os), 0ULL);
1428
1429         RETURN((struct dt_it *)it);
1430 }
1431
1432 static void osd_zfs_otable_it_fini(const struct lu_env *env, struct dt_it *di)
1433 {
1434         struct osd_metadnode_it *it  = (struct osd_metadnode_it *)di;
1435
1436         OBD_FREE_PTR(it);
1437 }
1438
1439 static int osd_zfs_otable_it_get(const struct lu_env *env,
1440                                  struct dt_it *di, const struct dt_key *key)
1441 {
1442         return 0;
1443 }
1444
1445 static void osd_zfs_otable_it_put(const struct lu_env *env, struct dt_it *di)
1446 {
1447 }
1448
1449 #define OTABLE_PREFETCH         256
1450
1451 static void osd_zfs_otable_prefetch(const struct lu_env *env,
1452                                     struct osd_metadnode_it *it)
1453 {
1454         struct osd_device       *dev = it->mit_dev;
1455         int                      rc;
1456
1457         /* can go negative on the very first access to the iterator
1458          * or if some non-Lustre objects were found */
1459         if (unlikely(it->mit_prefetched < 0))
1460                 it->mit_prefetched = 0;
1461
1462         if (it->mit_prefetched >= (OTABLE_PREFETCH >> 1))
1463                 return;
1464
1465         if (it->mit_prefetched_dnode == 0)
1466                 it->mit_prefetched_dnode = it->mit_pos;
1467
1468         while (it->mit_prefetched < OTABLE_PREFETCH) {
1469                 rc = -dmu_object_next(dev->od_os, &it->mit_prefetched_dnode,
1470                                       B_FALSE, 0);
1471                 if (unlikely(rc != 0))
1472                         break;
1473
1474                 /* dmu_prefetch() was exported in 0.6.2, if you use with
1475                  * an older release, just comment it out - this is an
1476                  * optimization */
1477                 dmu_prefetch(dev->od_os, it->mit_prefetched_dnode, 0, 0);
1478
1479                 it->mit_prefetched++;
1480         }
1481 }
1482
1483 static int osd_zfs_otable_it_next(const struct lu_env *env, struct dt_it *di)
1484 {
1485         struct osd_metadnode_it *it  = (struct osd_metadnode_it *)di;
1486         struct lustre_mdt_attrs *lma;
1487         struct osd_device       *dev = it->mit_dev;
1488         nvlist_t                *nvbuf = NULL;
1489         uchar_t                 *v;
1490         __u64                    dnode;
1491         int                      rc, s;
1492
1493         memset(&it->mit_fid, 0, sizeof(it->mit_fid));
1494
1495         dnode = it->mit_pos;
1496         do {
1497                 rc = -dmu_object_next(dev->od_os, &it->mit_pos, B_FALSE, 0);
1498                 if (unlikely(rc != 0))
1499                         GOTO(out, rc = 1);
1500                 it->mit_prefetched--;
1501
1502                 /* LMA is required for this to be a Lustre object.
1503                  * If there is no xattr skip it. */
1504                 rc = __osd_xattr_load(dev, it->mit_pos, &nvbuf);
1505                 if (unlikely(rc != 0))
1506                         continue;
1507
1508                 LASSERT(nvbuf != NULL);
1509                 rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA, &v, &s);
1510                 if (likely(rc == 0)) {
1511                         /* Lustre object */
1512                         lma = (struct lustre_mdt_attrs *)v;
1513                         lustre_lma_swab(lma);
1514                         it->mit_fid = lma->lma_self_fid;
1515                         nvlist_free(nvbuf);
1516                         break;
1517                 } else {
1518                         /* not a Lustre object, try next one */
1519                         nvlist_free(nvbuf);
1520                 }
1521
1522         } while (1);
1523
1524
1525         /* we aren't prefetching in the above loop because the number of
1526          * non-Lustre objects is very small and we will be repeating very
1527          * rare. in case we want to use this to iterate over non-Lustre
1528          * objects (i.e. when we convert regular ZFS in Lustre) it makes
1529          * sense to initiate prefetching in the loop */
1530
1531         /* 0 - there are more items, +1 - the end */
1532         if (likely(rc == 0))
1533                 osd_zfs_otable_prefetch(env, it);
1534
1535         CDEBUG(D_OTHER, "advance: %llu -> %llu "DFID": %d\n", dnode,
1536                it->mit_pos, PFID(&it->mit_fid), rc);
1537
1538 out:
1539         return rc;
1540 }
1541
1542 static struct dt_key *osd_zfs_otable_it_key(const struct lu_env *env,
1543                                             const struct dt_it *di)
1544 {
1545         return NULL;
1546 }
1547
1548 static int osd_zfs_otable_it_key_size(const struct lu_env *env,
1549                                       const struct dt_it *di)
1550 {
1551         return sizeof(__u64);
1552 }
1553
1554 static int osd_zfs_otable_it_rec(const struct lu_env *env,
1555                                  const struct dt_it *di,
1556                                  struct dt_rec *rec, __u32 attr)
1557 {
1558         struct osd_metadnode_it *it  = (struct osd_metadnode_it *)di;
1559         struct lu_fid *fid = (struct lu_fid *)rec;
1560         ENTRY;
1561
1562         *fid = it->mit_fid;
1563
1564         RETURN(0);
1565 }
1566
1567
1568 static __u64 osd_zfs_otable_it_store(const struct lu_env *env,
1569                                      const struct dt_it *di)
1570 {
1571         struct osd_metadnode_it *it  = (struct osd_metadnode_it *)di;
1572
1573         return it->mit_pos;
1574 }
1575
1576 static int osd_zfs_otable_it_load(const struct lu_env *env,
1577                                   const struct dt_it *di, __u64 hash)
1578 {
1579         struct osd_metadnode_it *it  = (struct osd_metadnode_it *)di;
1580
1581         it->mit_pos = hash;
1582         it->mit_prefetched = 0;
1583         it->mit_prefetched_dnode = 0;
1584
1585         return osd_zfs_otable_it_next(env, (struct dt_it *)di);
1586 }
1587
1588 static int osd_zfs_otable_it_key_rec(const struct lu_env *env,
1589                                      const struct dt_it *di, void *key_rec)
1590 {
1591         return 0;
1592 }
1593
1594 const struct dt_index_operations osd_zfs_otable_ops = {
1595         .dio_it = {
1596                 .init     = osd_zfs_otable_it_init,
1597                 .fini     = osd_zfs_otable_it_fini,
1598                 .get      = osd_zfs_otable_it_get,
1599                 .put      = osd_zfs_otable_it_put,
1600                 .next     = osd_zfs_otable_it_next,
1601                 .key      = osd_zfs_otable_it_key,
1602                 .key_size = osd_zfs_otable_it_key_size,
1603                 .rec      = osd_zfs_otable_it_rec,
1604                 .store    = osd_zfs_otable_it_store,
1605                 .load     = osd_zfs_otable_it_load,
1606                 .key_rec  = osd_zfs_otable_it_key_rec,
1607         }
1608 };
1609
1610 int osd_index_try(const struct lu_env *env, struct dt_object *dt,
1611                 const struct dt_index_features *feat)
1612 {
1613         struct osd_object *obj = osd_dt_obj(dt);
1614         ENTRY;
1615
1616         /*
1617          * XXX: implement support for fixed-size keys sorted with natural
1618          *      numerical way (not using internal hash value)
1619          */
1620         if (feat->dif_flags & DT_IND_RANGE)
1621                 RETURN(-ERANGE);
1622
1623         if (unlikely(feat == &dt_otable_features)) {
1624                 dt->do_index_ops = &osd_zfs_otable_ops;
1625                 RETURN(0);
1626         }
1627
1628         LASSERT(!dt_object_exists(dt) || obj->oo_db != NULL);
1629         if (likely(feat == &dt_directory_features)) {
1630                 if (!dt_object_exists(dt) || osd_object_is_zap(obj->oo_db))
1631                         dt->do_index_ops = &osd_dir_ops;
1632                 else
1633                         RETURN(-ENOTDIR);
1634         } else if (unlikely(feat == &dt_acct_features)) {
1635                 LASSERT(fid_is_acct(lu_object_fid(&dt->do_lu)));
1636                 dt->do_index_ops = &osd_acct_index_ops;
1637         } else if (dt->do_index_ops == NULL) {
1638                 /* For index file, we don't support variable key & record sizes
1639                  * and the key has to be unique */
1640                 if ((feat->dif_flags & ~DT_IND_UPDATE) != 0)
1641                         RETURN(-EINVAL);
1642
1643                 if (feat->dif_keysize_max > ZAP_MAXNAMELEN)
1644                         RETURN(-E2BIG);
1645                 if (feat->dif_keysize_max != feat->dif_keysize_min)
1646                         RETURN(-EINVAL);
1647
1648                 /* As for the record size, it should be a multiple of 8 bytes
1649                  * and smaller than the maximum value length supported by ZAP.
1650                  */
1651                 if (feat->dif_recsize_max > ZAP_MAXVALUELEN)
1652                         RETURN(-E2BIG);
1653                 if (feat->dif_recsize_max != feat->dif_recsize_min)
1654                         RETURN(-EINVAL);
1655
1656                 obj->oo_keysize = feat->dif_keysize_max;
1657                 obj->oo_recsize = feat->dif_recsize_max;
1658                 obj->oo_recusize = 1;
1659
1660                 /* ZFS prefers to work with array of 64bits */
1661                 if ((obj->oo_recsize & 7) == 0) {
1662                         obj->oo_recsize >>= 3;
1663                         obj->oo_recusize = 8;
1664                 }
1665                 dt->do_index_ops = &osd_index_ops;
1666         }
1667
1668         RETURN(0);
1669 }