Whamcloud - gitweb
LU-4821 osd: cleanups in osd-zfs
[fs/lustre-release.git] / lustre / osd-zfs / osd_index.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * Copyright (c) 2012, 2013, Intel Corporation.
32  * Use is subject to license terms.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  *
38  * lustre/osd-zfs/osd_index.c
39  *
40  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
41  * Author: Mike Pershin <tappro@whamcloud.com>
42  */
43
44 #define DEBUG_SUBSYSTEM S_OSD
45
46 #include <lustre_ver.h>
47 #include <libcfs/libcfs.h>
48 #include <obd_support.h>
49 #include <lustre_net.h>
50 #include <obd.h>
51 #include <obd_class.h>
52 #include <lustre_disk.h>
53 #include <lustre_fid.h>
54
55 #include "osd_internal.h"
56
57 #include <sys/dnode.h>
58 #include <sys/dbuf.h>
59 #include <sys/spa.h>
60 #include <sys/stat.h>
61 #include <sys/zap.h>
62 #include <sys/spa_impl.h>
63 #include <sys/zfs_znode.h>
64 #include <sys/dmu_tx.h>
65 #include <sys/dmu_objset.h>
66 #include <sys/dsl_prop.h>
67 #include <sys/sa_impl.h>
68 #include <sys/txg.h>
69
70 static inline int osd_object_is_zap(dmu_buf_t *db)
71 {
72         dmu_buf_impl_t *dbi = (dmu_buf_impl_t *) db;
73         dnode_t *dn;
74         int rc;
75
76         DB_DNODE_ENTER(dbi);
77         dn = DB_DNODE(dbi);
78         rc = (dn->dn_type == DMU_OT_DIRECTORY_CONTENTS ||
79                         dn->dn_type == DMU_OT_USERGROUP_USED);
80         DB_DNODE_EXIT(dbi);
81
82         return rc;
83 }
84
85 /* We don't actually have direct access to the zap_hashbits() function
86  * so just pretend like we do for now.  If this ever breaks we can look at
87  * it at that time. */
88 #define zap_hashbits(zc) 48
89 /*
90  * ZFS hash format:
91  * | cd (16 bits) | hash (48 bits) |
92  * we need it in other form:
93  * |0| hash (48 bit) | cd (15 bit) |
94  * to be a full 64-bit ordered hash so that Lustre readdir can use it to merge
95  * the readdir hashes from multiple directory stripes uniformly on the client.
96  * Another point is sign bit, the hash range should be in [0, 2^63-1] because
97  * loff_t (for llseek) needs to be a positive value.  This means the "cd" field
98  * should only be the low 15 bits.
99  */
100 uint64_t osd_zap_cursor_serialize(zap_cursor_t *zc)
101 {
102         uint64_t zfs_hash = zap_cursor_serialize(zc) & (~0ULL >> 1);
103
104         return (zfs_hash >> zap_hashbits(zc)) |
105                 (zfs_hash << (63 - zap_hashbits(zc)));
106 }
107
108 void osd_zap_cursor_init_serialized(zap_cursor_t *zc, struct objset *os,
109                                     uint64_t id, uint64_t dirhash)
110 {
111         uint64_t zfs_hash = ((dirhash << zap_hashbits(zc)) & (~0ULL >> 1)) |
112                 (dirhash >> (63 - zap_hashbits(zc)));
113
114         zap_cursor_init_serialized(zc, os, id, zfs_hash);
115 }
116
117 int osd_zap_cursor_init(zap_cursor_t **zc, struct objset *os,
118                         uint64_t id, uint64_t dirhash)
119 {
120         zap_cursor_t *t;
121
122         OBD_ALLOC_PTR(t);
123         if (unlikely(t == NULL))
124                 return -ENOMEM;
125
126         osd_zap_cursor_init_serialized(t, os, id, dirhash);
127         *zc = t;
128
129         return 0;
130 }
131
132 void osd_zap_cursor_fini(zap_cursor_t *zc)
133 {
134         zap_cursor_fini(zc);
135         OBD_FREE_PTR(zc);
136 }
137
138 static inline void osd_obj_cursor_init_serialized(zap_cursor_t *zc,
139                                                  struct osd_object *o,
140                                                  uint64_t dirhash)
141 {
142         struct osd_device *d = osd_obj2dev(o);
143         zap_cursor_init_serialized(zc, d->od_os, o->oo_db->db_object, dirhash);
144 }
145
146 static inline int osd_obj_cursor_init(zap_cursor_t **zc, struct osd_object *o,
147                         uint64_t dirhash)
148 {
149         struct osd_device *d = osd_obj2dev(o);
150         return osd_zap_cursor_init(zc, d->od_os, o->oo_db->db_object, dirhash);
151 }
152
153 static struct dt_it *osd_index_it_init(const struct lu_env *env,
154                                        struct dt_object *dt,
155                                        __u32 unused,
156                                        struct lustre_capa *capa)
157 {
158         struct osd_thread_info  *info = osd_oti_get(env);
159         struct osd_zap_it       *it;
160         struct osd_object       *obj = osd_dt_obj(dt);
161         struct lu_object        *lo  = &dt->do_lu;
162         int                      rc;
163         ENTRY;
164
165         /* XXX: check capa ? */
166
167         LASSERT(lu_object_exists(lo));
168         LASSERT(obj->oo_db);
169         LASSERT(osd_object_is_zap(obj->oo_db));
170         LASSERT(info);
171
172         it = &info->oti_it_zap;
173
174         rc = osd_obj_cursor_init(&it->ozi_zc, obj, 0);
175         if (rc != 0)
176                 RETURN(ERR_PTR(rc));
177
178         it->ozi_obj   = obj;
179         it->ozi_capa  = capa;
180         it->ozi_reset = 1;
181         lu_object_get(lo);
182
183         RETURN((struct dt_it *)it);
184 }
185
186 static void osd_index_it_fini(const struct lu_env *env, struct dt_it *di)
187 {
188         struct osd_zap_it *it = (struct osd_zap_it *)di;
189         struct osd_object *obj;
190         ENTRY;
191
192         LASSERT(it);
193         LASSERT(it->ozi_obj);
194
195         obj = it->ozi_obj;
196
197         osd_zap_cursor_fini(it->ozi_zc);
198         lu_object_put(env, &obj->oo_dt.do_lu);
199
200         EXIT;
201 }
202
203
204 static void osd_index_it_put(const struct lu_env *env, struct dt_it *di)
205 {
206         /* PBS: do nothing : ref are incremented at retrive and decreamented
207          *      next/finish. */
208 }
209
210 static inline void osd_it_append_attrs(struct lu_dirent *ent, __u32 attr,
211                                        int len, __u16 type)
212 {
213         const unsigned    align = sizeof(struct luda_type) - 1;
214         struct luda_type *lt;
215
216         /* check if file type is required */
217         if (attr & LUDA_TYPE) {
218                 len = (len + align) & ~align;
219
220                 lt = (void *)ent->lde_name + len;
221                 lt->lt_type = cpu_to_le16(DTTOIF(type));
222                 ent->lde_attrs |= LUDA_TYPE;
223         }
224
225         ent->lde_attrs = cpu_to_le32(ent->lde_attrs);
226 }
227
228 /*
229  * as we don't know FID, we can't use LU object, so this function
230  * partially duplicate __osd_xattr_get() which is built around
231  * LU-object and uses it to cache data like regular EA dnode, etc
232  */
233 static int osd_find_parent_by_dnode(const struct lu_env *env,
234                                     struct dt_object *o,
235                                     struct lu_fid *fid)
236 {
237         struct osd_device       *osd = osd_obj2dev(osd_dt_obj(o));
238         struct lustre_mdt_attrs *lma;
239         struct lu_buf            buf;
240         sa_handle_t             *sa_hdl;
241         nvlist_t                *nvbuf = NULL;
242         uchar_t                 *value;
243         uint64_t                 dnode;
244         int                      rc, size;
245         ENTRY;
246
247         /* first of all, get parent dnode from own attributes */
248         LASSERT(osd_dt_obj(o)->oo_db);
249         rc = -sa_handle_get(osd->od_os, osd_dt_obj(o)->oo_db->db_object,
250                             NULL, SA_HDL_PRIVATE, &sa_hdl);
251         if (rc)
252                 RETURN(rc);
253
254         dnode = ZFS_NO_OBJECT;
255         rc = -sa_lookup(sa_hdl, SA_ZPL_PARENT(osd), &dnode, 8);
256         sa_handle_destroy(sa_hdl);
257         if (rc)
258                 RETURN(rc);
259
260         /* now get EA buffer */
261         rc = __osd_xattr_load(osd, dnode, &nvbuf);
262         if (rc)
263                 GOTO(regular, rc);
264
265         /* XXX: if we get that far.. should we cache the result? */
266
267         /* try to find LMA attribute */
268         LASSERT(nvbuf != NULL);
269         rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA, &value, &size);
270         if (rc == 0 && size >= sizeof(*lma)) {
271                 lma = (struct lustre_mdt_attrs *)value;
272                 lustre_lma_swab(lma);
273                 *fid = lma->lma_self_fid;
274                 GOTO(out, rc = 0);
275         }
276
277 regular:
278         /* no LMA attribute in SA, let's try regular EA */
279
280         /* first of all, get parent dnode storing regular EA */
281         rc = -sa_handle_get(osd->od_os, dnode, NULL, SA_HDL_PRIVATE, &sa_hdl);
282         if (rc)
283                 GOTO(out, rc);
284
285         dnode = ZFS_NO_OBJECT;
286         rc = -sa_lookup(sa_hdl, SA_ZPL_XATTR(osd), &dnode, 8);
287         sa_handle_destroy(sa_hdl);
288         if (rc)
289                 GOTO(out, rc);
290
291         CLASSERT(sizeof(*lma) <= sizeof(osd_oti_get(env)->oti_buf));
292         buf.lb_buf = osd_oti_get(env)->oti_buf;
293         buf.lb_len = sizeof(osd_oti_get(env)->oti_buf);
294
295         /* now try to find LMA */
296         rc = __osd_xattr_get_large(env, osd, dnode, &buf,
297                                    XATTR_NAME_LMA, &size);
298         if (rc == 0 && size >= sizeof(*lma)) {
299                 lma = buf.lb_buf;
300                 lustre_lma_swab(lma);
301                 *fid = lma->lma_self_fid;
302                 GOTO(out, rc = 0);
303         } else if (rc < 0) {
304                 GOTO(out, rc);
305         } else {
306                 GOTO(out, rc = -EIO);
307         }
308
309 out:
310         if (nvbuf != NULL)
311                 nvlist_free(nvbuf);
312         RETURN(rc);
313 }
314
315 static int osd_find_parent_fid(const struct lu_env *env, struct dt_object *o,
316                                struct lu_fid *fid)
317 {
318         struct link_ea_header  *leh;
319         struct link_ea_entry   *lee;
320         struct lu_buf           buf;
321         int                     rc;
322         ENTRY;
323
324         buf.lb_buf = osd_oti_get(env)->oti_buf;
325         buf.lb_len = sizeof(osd_oti_get(env)->oti_buf);
326
327         rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK, BYPASS_CAPA);
328         if (rc == -ERANGE) {
329                 rc = osd_xattr_get(env, o, &LU_BUF_NULL,
330                                    XATTR_NAME_LINK, BYPASS_CAPA);
331                 if (rc < 0)
332                         RETURN(rc);
333                 LASSERT(rc > 0);
334                 OBD_ALLOC(buf.lb_buf, rc);
335                 if (buf.lb_buf == NULL)
336                         RETURN(-ENOMEM);
337                 buf.lb_len = rc;
338                 rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK, BYPASS_CAPA);
339         }
340         if (rc < 0)
341                 GOTO(out, rc);
342         if (rc < sizeof(*leh) + sizeof(*lee))
343                 GOTO(out, rc = -EINVAL);
344
345         leh = buf.lb_buf;
346         if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
347                 leh->leh_magic = LINK_EA_MAGIC;
348                 leh->leh_reccount = __swab32(leh->leh_reccount);
349                 leh->leh_len = __swab64(leh->leh_len);
350         }
351         if (leh->leh_magic != LINK_EA_MAGIC)
352                 GOTO(out, rc = -EINVAL);
353         if (leh->leh_reccount == 0)
354                 GOTO(out, rc = -ENODATA);
355
356         lee = (struct link_ea_entry *)(leh + 1);
357         fid_be_to_cpu(fid, (const struct lu_fid *)&lee->lee_parent_fid);
358         rc = 0;
359
360 out:
361         if (buf.lb_buf != osd_oti_get(env)->oti_buf)
362                 OBD_FREE(buf.lb_buf, buf.lb_len);
363
364 #if 0
365         /* this block can be enabled for additional verification
366          * it's trying to match FID from LinkEA vs. FID from LMA */
367         if (rc == 0) {
368                 struct lu_fid fid2;
369                 int rc2;
370                 rc2 = osd_find_parent_by_dnode(env, o, &fid2);
371                 if (rc2 == 0)
372                         if (lu_fid_eq(fid, &fid2) == 0)
373                                 CERROR("wrong parent: "DFID" != "DFID"\n",
374                                        PFID(fid), PFID(&fid2));
375         }
376 #endif
377
378         /* no LinkEA is found, let's try to find the fid in parent's LMA */
379         if (unlikely(rc != 0))
380                 rc = osd_find_parent_by_dnode(env, o, fid);
381
382         RETURN(rc);
383 }
384
385 static int osd_dir_lookup(const struct lu_env *env, struct dt_object *dt,
386                           struct dt_rec *rec, const struct dt_key *key,
387                           struct lustre_capa *capa)
388 {
389         struct osd_thread_info *oti = osd_oti_get(env);
390         struct osd_object  *obj = osd_dt_obj(dt);
391         struct osd_device  *osd = osd_obj2dev(obj);
392         char               *name = (char *)key;
393         int                 rc;
394         ENTRY;
395
396         LASSERT(osd_object_is_zap(obj->oo_db));
397
398         if (name[0] == '.') {
399                 if (name[1] == 0) {
400                         const struct lu_fid *f = lu_object_fid(&dt->do_lu);
401                         memcpy(rec, f, sizeof(*f));
402                         RETURN(1);
403                 } else if (name[1] == '.' && name[2] == 0) {
404                         rc = osd_find_parent_fid(env, dt, (struct lu_fid *)rec);
405                         RETURN(rc == 0 ? 1 : rc);
406                 }
407         }
408
409         rc = -zap_lookup(osd->od_os, obj->oo_db->db_object,
410                          (char *)key, 8, sizeof(oti->oti_zde) / 8,
411                          (void *)&oti->oti_zde);
412         memcpy(rec, &oti->oti_zde.lzd_fid, sizeof(struct lu_fid));
413
414         RETURN(rc == 0 ? 1 : rc);
415 }
416
417 static int osd_declare_dir_insert(const struct lu_env *env,
418                                   struct dt_object *dt,
419                                   const struct dt_rec *rec,
420                                   const struct dt_key *key,
421                                   struct thandle *th)
422 {
423         struct osd_object  *obj = osd_dt_obj(dt);
424         struct osd_thandle *oh;
425         ENTRY;
426
427         LASSERT(th != NULL);
428         oh = container_of0(th, struct osd_thandle, ot_super);
429
430         LASSERT(obj->oo_db);
431         LASSERT(osd_object_is_zap(obj->oo_db));
432
433         dmu_tx_hold_bonus(oh->ot_tx, obj->oo_db->db_object);
434         dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, (char *)key);
435
436         RETURN(0);
437 }
438
439 /**
440  * Find the osd object for given fid.
441  *
442  * \param fid need to find the osd object having this fid
443  *
444  * \retval osd_object on success
445  * \retval        -ve on error
446  */
447 struct osd_object *osd_object_find(const struct lu_env *env,
448                                    struct dt_object *dt,
449                                    const struct lu_fid *fid)
450 {
451         struct lu_device         *ludev = dt->do_lu.lo_dev;
452         struct osd_object        *child = NULL;
453         struct lu_object         *luch;
454         struct lu_object         *lo;
455
456         /*
457          * at this point topdev might not exist yet
458          * (i.e. MGS is preparing profiles). so we can
459          * not rely on topdev and instead lookup with
460          * our device passed as topdev. this can't work
461          * if the object isn't cached yet (as osd doesn't
462          * allocate lu_header). IOW, the object must be
463          * in the cache, otherwise lu_object_alloc() crashes
464          * -bzzz
465          */
466         luch = lu_object_find_at(env, ludev, fid, NULL);
467         if (IS_ERR(luch))
468                 return (void *)luch;
469
470         if (lu_object_exists(luch)) {
471                 lo = lu_object_locate(luch->lo_header, ludev->ld_type);
472                 if (lo != NULL)
473                         child = osd_obj(lo);
474                 else
475                         LU_OBJECT_DEBUG(D_ERROR, env, luch,
476                                         "%s: object can't be located "DFID"\n",
477                                         osd_dev(ludev)->od_svname, PFID(fid));
478
479                 if (child == NULL) {
480                         lu_object_put(env, luch);
481                         CERROR("%s: Unable to get osd_object "DFID"\n",
482                                osd_dev(ludev)->od_svname, PFID(fid));
483                         child = ERR_PTR(-ENOENT);
484                 }
485         } else {
486                 LU_OBJECT_DEBUG(D_ERROR, env, luch,
487                                 "%s: lu_object does not exists "DFID"\n",
488                                 osd_dev(ludev)->od_svname, PFID(fid));
489                 lu_object_put(env, luch);
490                 child = ERR_PTR(-ENOENT);
491         }
492
493         return child;
494 }
495
496 /**
497  * Put the osd object once done with it.
498  *
499  * \param obj osd object that needs to be put
500  */
501 static inline void osd_object_put(const struct lu_env *env,
502                                   struct osd_object *obj)
503 {
504         lu_object_put(env, &obj->oo_dt.do_lu);
505 }
506
507 static int osd_seq_exists(const struct lu_env *env, struct osd_device *osd,
508                           obd_seq seq)
509 {
510         struct lu_seq_range     *range = &osd_oti_get(env)->oti_seq_range;
511         struct seq_server_site  *ss = osd_seq_site(osd);
512         int                     rc;
513         ENTRY;
514
515         LASSERT(ss != NULL);
516         LASSERT(ss->ss_server_fld != NULL);
517
518         rc = osd_fld_lookup(env, osd, seq, range);
519         if (rc != 0) {
520                 CERROR("%s: Can not lookup fld for "LPX64"\n",
521                        osd_name(osd), seq);
522                 RETURN(0);
523         }
524
525         RETURN(ss->ss_node_id == range->lsr_index);
526 }
527
528 static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
529                           const struct lu_fid *fid)
530 {
531         struct seq_server_site  *ss = osd_seq_site(osd);
532         ENTRY;
533
534         /* FID seqs not in FLDB, must be local seq */
535         if (unlikely(!fid_seq_in_fldb(fid_seq(fid))))
536                 RETURN(0);
537
538         /* If FLD is not being initialized yet, it only happens during the
539          * initialization, likely during mgs initialization, and we assume
540          * this is local FID. */
541         if (ss == NULL || ss->ss_server_fld == NULL)
542                 RETURN(0);
543
544         /* Only check the local FLDB here */
545         if (osd_seq_exists(env, osd, fid_seq(fid)))
546                 RETURN(0);
547
548         RETURN(1);
549 }
550
551 /**
552  *      Inserts (key, value) pair in \a directory object.
553  *
554  *      \param  dt      osd index object
555  *      \param  key     key for index
556  *      \param  rec     record reference
557  *      \param  th      transaction handler
558  *      \param  capa    capability descriptor
559  *      \param  ignore_quota update should not affect quota
560  *
561  *      \retval  0  success
562  *      \retval -ve failure
563  */
564 static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt,
565                           const struct dt_rec *rec, const struct dt_key *key,
566                           struct thandle *th, struct lustre_capa *capa,
567                           int ignore_quota)
568 {
569         struct osd_thread_info *oti = osd_oti_get(env);
570         struct osd_object   *parent = osd_dt_obj(dt);
571         struct osd_device   *osd = osd_obj2dev(parent);
572         struct dt_insert_rec *rec1 = (struct dt_insert_rec *)rec;
573         const struct lu_fid *fid = rec1->rec_fid;
574         struct osd_thandle  *oh;
575         struct osd_object   *child = NULL;
576         __u32                attr;
577         char                *name = (char *)key;
578         int                  rc;
579         ENTRY;
580
581         LASSERT(parent->oo_db);
582         LASSERT(osd_object_is_zap(parent->oo_db));
583
584         LASSERT(dt_object_exists(dt));
585         LASSERT(osd_invariant(parent));
586
587         LASSERT(th != NULL);
588         oh = container_of0(th, struct osd_thandle, ot_super);
589
590         rc = osd_remote_fid(env, osd, fid);
591         if (rc < 0) {
592                 CERROR("%s: Can not find object "DFID": rc = %d\n",
593                        osd->od_svname, PFID(fid), rc);
594                 RETURN(rc);
595         }
596
597         if (unlikely(rc == 1)) {
598                 /* Insert remote entry */
599                 memset(&oti->oti_zde.lzd_reg, 0, sizeof(oti->oti_zde.lzd_reg));
600                 oti->oti_zde.lzd_reg.zde_type = IFTODT(rec1->rec_type & S_IFMT);
601         } else {
602                 /*
603                  * To simulate old Orion setups with ./..  stored in the
604                  * directories
605                  */
606                 /* Insert local entry */
607                 child = osd_object_find(env, dt, fid);
608                 if (IS_ERR(child))
609                         RETURN(PTR_ERR(child));
610
611                 LASSERT(child->oo_db);
612                 if (name[0] == '.') {
613                         if (name[1] == 0) {
614                                 /* do not store ".", instead generate it
615                                  * during iteration */
616                                 GOTO(out, rc = 0);
617                         } else if (name[1] == '.' && name[2] == 0) {
618                                 /* update parent dnode in the child.
619                                  * later it will be used to generate ".." */
620                                 rc = osd_object_sa_update(parent,
621                                                  SA_ZPL_PARENT(osd),
622                                                  &child->oo_db->db_object,
623                                                  8, oh);
624                                 GOTO(out, rc);
625                         }
626                 }
627                 CLASSERT(sizeof(oti->oti_zde.lzd_reg) == 8);
628                 CLASSERT(sizeof(oti->oti_zde) % 8 == 0);
629                 attr = child->oo_dt.do_lu.lo_header ->loh_attr;
630                 oti->oti_zde.lzd_reg.zde_type = IFTODT(attr & S_IFMT);
631                 oti->oti_zde.lzd_reg.zde_dnode = child->oo_db->db_object;
632         }
633
634         oti->oti_zde.lzd_fid = *fid;
635         /* Insert (key,oid) into ZAP */
636         rc = -zap_add(osd->od_os, parent->oo_db->db_object,
637                       (char *)key, 8, sizeof(oti->oti_zde) / 8,
638                       (void *)&oti->oti_zde, oh->ot_tx);
639
640 out:
641         if (child != NULL)
642                 osd_object_put(env, child);
643
644         RETURN(rc);
645 }
646
647 static int osd_declare_dir_delete(const struct lu_env *env,
648                                   struct dt_object *dt,
649                                   const struct dt_key *key,
650                                   struct thandle *th)
651 {
652         struct osd_object *obj = osd_dt_obj(dt);
653         struct osd_thandle *oh;
654         ENTRY;
655
656         LASSERT(dt_object_exists(dt));
657         LASSERT(osd_invariant(obj));
658
659         LASSERT(th != NULL);
660         oh = container_of0(th, struct osd_thandle, ot_super);
661
662         LASSERT(obj->oo_db);
663         LASSERT(osd_object_is_zap(obj->oo_db));
664
665         dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, (char *)key);
666
667         RETURN(0);
668 }
669
670 static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt,
671                           const struct dt_key *key, struct thandle *th,
672                           struct lustre_capa *capa)
673 {
674         struct osd_object *obj = osd_dt_obj(dt);
675         struct osd_device *osd = osd_obj2dev(obj);
676         struct osd_thandle *oh;
677         dmu_buf_t *zap_db = obj->oo_db;
678         char      *name = (char *)key;
679         int rc;
680         ENTRY;
681
682         LASSERT(obj->oo_db);
683         LASSERT(osd_object_is_zap(obj->oo_db));
684
685         LASSERT(th != NULL);
686         oh = container_of0(th, struct osd_thandle, ot_super);
687
688         /*
689          * In Orion . and .. were stored in the directory (not generated upon
690          * request as now). we preserve them for backward compatibility
691          */
692         if (name[0] == '.') {
693                 if (name[1] == 0) {
694                         RETURN(0);
695                 } else if (name[1] == '.' && name[2] == 0) {
696                         RETURN(0);
697                 }
698         }
699
700         /* Remove key from the ZAP */
701         rc = -zap_remove(osd->od_os, zap_db->db_object,
702                          (char *) key, oh->ot_tx);
703
704         if (unlikely(rc && rc != -ENOENT))
705                 CERROR("%s: zap_remove failed: rc = %d\n", osd->od_svname, rc);
706
707         RETURN(rc);
708 }
709
710 static struct dt_it *osd_dir_it_init(const struct lu_env *env,
711                                      struct dt_object *dt,
712                                      __u32 unused,
713                                      struct lustre_capa *capa)
714 {
715         struct osd_zap_it *it;
716
717         it = (struct osd_zap_it *)osd_index_it_init(env, dt, unused, capa);
718         if (!IS_ERR(it))
719                 it->ozi_pos = 0;
720
721         RETURN((struct dt_it *)it);
722 }
723
724 /**
725  *  Move Iterator to record specified by \a key
726  *
727  *  \param  di      osd iterator
728  *  \param  key     key for index
729  *
730  *  \retval +ve  di points to record with least key not larger than key
731  *  \retval  0   di points to exact matched key
732  *  \retval -ve  failure
733  */
734 static int osd_dir_it_get(const struct lu_env *env,
735                           struct dt_it *di, const struct dt_key *key)
736 {
737         struct osd_zap_it *it = (struct osd_zap_it *)di;
738         struct osd_object *obj = it->ozi_obj;
739         char              *name = (char *)key;
740         int                rc;
741         ENTRY;
742
743         LASSERT(it);
744         LASSERT(it->ozi_zc);
745
746         /* reset the cursor */
747         zap_cursor_fini(it->ozi_zc);
748         osd_obj_cursor_init_serialized(it->ozi_zc, obj, 0);
749
750         /* XXX: implementation of the API is broken at the moment */
751         LASSERT(((const char *)key)[0] == 0);
752
753         if (name[0] == 0) {
754                 it->ozi_pos = 0;
755                 RETURN(1);
756         }
757
758         if (name[0] == '.') {
759                 if (name[1] == 0) {
760                         it->ozi_pos = 1;
761                         GOTO(out, rc = 1);
762                 } else if (name[1] == '.' && name[2] == 0) {
763                         it->ozi_pos = 2;
764                         GOTO(out, rc = 1);
765                 }
766         }
767
768         /* neither . nor .. - some real record */
769         it->ozi_pos = 3;
770         rc = +1;
771
772 out:
773         RETURN(rc);
774 }
775
776 static void osd_dir_it_put(const struct lu_env *env, struct dt_it *di)
777 {
778         /* PBS: do nothing : ref are incremented at retrive and decreamented
779          *      next/finish. */
780 }
781
782 /*
783  * in Orion . and .. were stored in the directory, while ZPL
784  * and current osd-zfs generate them up on request. so, we
785  * need to ignore previously stored . and ..
786  */
787 static int osd_index_retrieve_skip_dots(struct osd_zap_it *it,
788                                         zap_attribute_t *za)
789 {
790         int rc, isdot;
791
792         do {
793                 rc = -zap_cursor_retrieve(it->ozi_zc, za);
794
795                 isdot = 0;
796                 if (unlikely(rc == 0 && za->za_name[0] == '.')) {
797                         if (za->za_name[1] == 0) {
798                                 isdot = 1;
799                         } else if (za->za_name[1] == '.' &&
800                                    za->za_name[2] == 0) {
801                                 isdot = 1;
802                         }
803                         if (unlikely(isdot))
804                                 zap_cursor_advance(it->ozi_zc);
805                 }
806         } while (unlikely(rc == 0 && isdot));
807
808         return rc;
809 }
810
811 /**
812  * to load a directory entry at a time and stored it in
813  * iterator's in-memory data structure.
814  *
815  * \param di, struct osd_it_ea, iterator's in memory structure
816  *
817  * \retval +ve, iterator reached to end
818  * \retval   0, iterator not reached to end
819  * \retval -ve, on error
820  */
821 static int osd_dir_it_next(const struct lu_env *env, struct dt_it *di)
822 {
823         struct osd_zap_it *it = (struct osd_zap_it *)di;
824         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
825         int                rc;
826
827         /* temp. storage should be enough for any key supported by ZFS */
828         CLASSERT(sizeof(za->za_name) <= sizeof(it->ozi_name));
829
830         /*
831          * the first ->next() moves the cursor to .
832          * the second ->next() moves the cursor to ..
833          * then we get to the real records and have to verify any exist
834          */
835         if (it->ozi_pos <= 2) {
836                 it->ozi_pos++;
837                 if (it->ozi_pos <=2)
838                         RETURN(0);
839         }
840
841         zap_cursor_advance(it->ozi_zc);
842
843         /*
844          * According to current API we need to return error if its last entry.
845          * zap_cursor_advance() does not return any value. So we need to call
846          * retrieve to check if there is any record.  We should make
847          * changes to Iterator API to not return status for this API
848          */
849         rc = osd_index_retrieve_skip_dots(it, za);
850
851         if (rc == -ENOENT) /* end of dir */
852                 RETURN(+1);
853
854         RETURN(rc);
855 }
856
857 static struct dt_key *osd_dir_it_key(const struct lu_env *env,
858                                      const struct dt_it *di)
859 {
860         struct osd_zap_it *it = (struct osd_zap_it *)di;
861         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
862         int                rc = 0;
863         ENTRY;
864
865         if (it->ozi_pos <= 1) {
866                 it->ozi_pos = 1;
867                 RETURN((struct dt_key *)".");
868         } else if (it->ozi_pos == 2) {
869                 RETURN((struct dt_key *)"..");
870         }
871
872         if ((rc = -zap_cursor_retrieve(it->ozi_zc, za)))
873                 RETURN(ERR_PTR(rc));
874
875         strcpy(it->ozi_name, za->za_name);
876
877         RETURN((struct dt_key *)it->ozi_name);
878 }
879
880 static int osd_dir_it_key_size(const struct lu_env *env, const struct dt_it *di)
881 {
882         struct osd_zap_it *it = (struct osd_zap_it *)di;
883         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
884         int                rc;
885         ENTRY;
886
887         if (it->ozi_pos <= 1) {
888                 it->ozi_pos = 1;
889                 RETURN(2);
890         } else if (it->ozi_pos == 2) {
891                 RETURN(3);
892         }
893
894         if ((rc = -zap_cursor_retrieve(it->ozi_zc, za)) == 0)
895                 rc = strlen(za->za_name);
896
897         RETURN(rc);
898 }
899
900 static int osd_dir_it_rec(const struct lu_env *env, const struct dt_it *di,
901                           struct dt_rec *dtrec, __u32 attr)
902 {
903         struct osd_zap_it   *it = (struct osd_zap_it *)di;
904         struct lu_dirent    *lde = (struct lu_dirent *)dtrec;
905         struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
906         zap_attribute_t     *za = &osd_oti_get(env)->oti_za;
907         int                  rc, namelen;
908         ENTRY;
909
910         if (it->ozi_pos <= 1) {
911                 lde->lde_hash = cpu_to_le64(1);
912                 strcpy(lde->lde_name, ".");
913                 lde->lde_namelen = cpu_to_le16(1);
914                 lde->lde_fid = *lu_object_fid(&it->ozi_obj->oo_dt.do_lu);
915                 lde->lde_attrs = LUDA_FID;
916                 /* append lustre attributes */
917                 osd_it_append_attrs(lde, attr, 1, IFTODT(S_IFDIR));
918                 lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(1, attr));
919                 it->ozi_pos = 1;
920                 GOTO(out, rc = 0);
921
922         } else if (it->ozi_pos == 2) {
923                 lde->lde_hash = cpu_to_le64(2);
924                 strcpy(lde->lde_name, "..");
925                 lde->lde_namelen = cpu_to_le16(2);
926                 lde->lde_attrs = LUDA_FID;
927                 /* append lustre attributes */
928                 osd_it_append_attrs(lde, attr, 2, IFTODT(S_IFDIR));
929                 lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(2, attr));
930                 rc = osd_find_parent_fid(env, &it->ozi_obj->oo_dt, &lde->lde_fid);
931                 /*
932                  * early Orion code was not setting LinkEA, so it's possible
933                  * some setups still have objects with no LinkEA set.
934                  * but at that time .. was a real record in the directory
935                  * so we should try to lookup .. in ZAP
936                  */
937                 if (rc != -ENOENT)
938                         GOTO(out, rc);
939         }
940
941         LASSERT(lde);
942
943         rc = -zap_cursor_retrieve(it->ozi_zc, za);
944         if (unlikely(rc != 0))
945                 GOTO(out, rc);
946
947         lde->lde_hash = cpu_to_le64(osd_zap_cursor_serialize(it->ozi_zc));
948         namelen = strlen(za->za_name);
949         if (namelen > NAME_MAX)
950                 GOTO(out, rc = -EOVERFLOW);
951         strcpy(lde->lde_name, za->za_name);
952         lde->lde_namelen = cpu_to_le16(namelen);
953
954         if (za->za_integer_length != 8 || za->za_num_integers < 3) {
955                 CERROR("%s: unsupported direntry format: %d %d\n",
956                        osd_obj2dev(it->ozi_obj)->od_svname,
957                        za->za_integer_length, (int)za->za_num_integers);
958
959                 GOTO(out, rc = -EIO);
960         }
961
962         rc = -zap_lookup(it->ozi_zc->zc_objset, it->ozi_zc->zc_zapobj,
963                          za->za_name, za->za_integer_length, 3, zde);
964         if (rc)
965                 GOTO(out, rc);
966
967         lde->lde_fid = zde->lzd_fid;
968         lde->lde_attrs = LUDA_FID;
969
970         /* append lustre attributes */
971         osd_it_append_attrs(lde, attr, namelen, zde->lzd_reg.zde_type);
972
973         lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(namelen, attr));
974
975 out:
976         RETURN(rc);
977 }
978
979 static int osd_dir_it_rec_size(const struct lu_env *env, const struct dt_it *di,
980                                __u32 attr)
981 {
982         struct osd_zap_it   *it = (struct osd_zap_it *)di;
983         zap_attribute_t     *za = &osd_oti_get(env)->oti_za;
984         int                  rc, namelen = 0;
985         ENTRY;
986
987         if (it->ozi_pos <= 1)
988                 namelen = 1;
989         else if (it->ozi_pos == 2)
990                 namelen = 2;
991
992         if (namelen > 0) {
993                 rc = lu_dirent_calc_size(namelen, attr);
994                 RETURN(rc);
995         }
996
997         rc = -zap_cursor_retrieve(it->ozi_zc, za);
998         if (unlikely(rc != 0))
999                 RETURN(rc);
1000
1001         if (za->za_integer_length != 8 || za->za_num_integers < 3) {
1002                 CERROR("%s: unsupported direntry format: %d %d\n",
1003                        osd_obj2dev(it->ozi_obj)->od_svname,
1004                        za->za_integer_length, (int)za->za_num_integers);
1005                 RETURN(-EIO);
1006         }
1007
1008         namelen = strlen(za->za_name);
1009         if (namelen > NAME_MAX)
1010                 RETURN(-EOVERFLOW);
1011
1012         rc = lu_dirent_calc_size(namelen, attr);
1013
1014         RETURN(rc);
1015 }
1016
1017 static __u64 osd_dir_it_store(const struct lu_env *env, const struct dt_it *di)
1018 {
1019         struct osd_zap_it *it = (struct osd_zap_it *)di;
1020         __u64              pos;
1021         ENTRY;
1022
1023         if (it->ozi_pos <= 2)
1024                 pos = it->ozi_pos;
1025         else
1026                 pos = osd_zap_cursor_serialize(it->ozi_zc);
1027
1028         RETURN(pos);
1029 }
1030
1031 /*
1032  * return status :
1033  *  rc == 0 -> end of directory.
1034  *  rc >  0 -> ok, proceed.
1035  *  rc <  0 -> error.  ( EOVERFLOW  can be masked.)
1036  */
1037 static int osd_dir_it_load(const struct lu_env *env,
1038                         const struct dt_it *di, __u64 hash)
1039 {
1040         struct osd_zap_it *it = (struct osd_zap_it *)di;
1041         struct osd_object *obj = it->ozi_obj;
1042         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1043         int                rc;
1044         ENTRY;
1045
1046         /* reset the cursor */
1047         zap_cursor_fini(it->ozi_zc);
1048         osd_obj_cursor_init_serialized(it->ozi_zc, obj, hash);
1049
1050         if (hash <= 2) {
1051                 it->ozi_pos = hash;
1052                 rc = +1;
1053         } else {
1054                 it->ozi_pos = 3;
1055                 /* to return whether the end has been reached */
1056                 rc = osd_index_retrieve_skip_dots(it, za);
1057                 if (rc == 0)
1058                         rc = +1;
1059                 else if (rc == -ENOENT)
1060                         rc = 0;
1061         }
1062
1063         RETURN(rc);
1064 }
1065
1066 static struct dt_index_operations osd_dir_ops = {
1067         .dio_lookup         = osd_dir_lookup,
1068         .dio_declare_insert = osd_declare_dir_insert,
1069         .dio_insert         = osd_dir_insert,
1070         .dio_declare_delete = osd_declare_dir_delete,
1071         .dio_delete         = osd_dir_delete,
1072         .dio_it     = {
1073                 .init     = osd_dir_it_init,
1074                 .fini     = osd_index_it_fini,
1075                 .get      = osd_dir_it_get,
1076                 .put      = osd_dir_it_put,
1077                 .next     = osd_dir_it_next,
1078                 .key      = osd_dir_it_key,
1079                 .key_size = osd_dir_it_key_size,
1080                 .rec      = osd_dir_it_rec,
1081                 .rec_size = osd_dir_it_rec_size,
1082                 .store    = osd_dir_it_store,
1083                 .load     = osd_dir_it_load
1084         }
1085 };
1086
1087 /*
1088  * Primitives for index files using binary keys.
1089  */
1090
1091 /* key integer_size is 8 */
1092 static int osd_prepare_key_uint64(struct osd_object *o, __u64 *dst,
1093                                   const struct dt_key *src)
1094 {
1095         int size;
1096
1097         LASSERT(dst);
1098         LASSERT(src);
1099
1100         /* align keysize to 64bit */
1101         size = (o->oo_keysize + sizeof(__u64) - 1) / sizeof(__u64);
1102         size *= sizeof(__u64);
1103
1104         LASSERT(size <= MAXNAMELEN);
1105
1106         if (unlikely(size > o->oo_keysize))
1107                 memset(dst + o->oo_keysize, 0, size - o->oo_keysize);
1108         memcpy(dst, (const char *)src, o->oo_keysize);
1109
1110         return (size/sizeof(__u64));
1111 }
1112
1113 static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
1114                         struct dt_rec *rec, const struct dt_key *key,
1115                         struct lustre_capa *capa)
1116 {
1117         struct osd_object *obj = osd_dt_obj(dt);
1118         struct osd_device *osd = osd_obj2dev(obj);
1119         __u64             *k = osd_oti_get(env)->oti_key64;
1120         int                rc;
1121         ENTRY;
1122
1123         rc = osd_prepare_key_uint64(obj, k, key);
1124
1125         rc = -zap_lookup_uint64(osd->od_os, obj->oo_db->db_object,
1126                                 k, rc, obj->oo_recusize, obj->oo_recsize,
1127                                 (void *)rec);
1128         RETURN(rc == 0 ? 1 : rc);
1129 }
1130
1131 static int osd_declare_index_insert(const struct lu_env *env,
1132                                     struct dt_object *dt,
1133                                     const struct dt_rec *rec,
1134                                     const struct dt_key *key,
1135                                     struct thandle *th)
1136 {
1137         struct osd_object  *obj = osd_dt_obj(dt);
1138         struct osd_thandle *oh;
1139         ENTRY;
1140
1141         LASSERT(th != NULL);
1142         oh = container_of0(th, struct osd_thandle, ot_super);
1143
1144         LASSERT(obj->oo_db);
1145
1146         dmu_tx_hold_bonus(oh->ot_tx, obj->oo_db->db_object);
1147
1148         /* It is not clear what API should be used for binary keys, so we pass
1149          * a null name which has the side effect of over-reserving space,
1150          * accounting for the worst case. See zap_count_write() */
1151         dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, NULL);
1152
1153         RETURN(0);
1154 }
1155
1156 static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
1157                             const struct dt_rec *rec, const struct dt_key *key,
1158                             struct thandle *th, struct lustre_capa *capa,
1159                             int ignore_quota)
1160 {
1161         struct osd_object  *obj = osd_dt_obj(dt);
1162         struct osd_device  *osd = osd_obj2dev(obj);
1163         struct osd_thandle *oh;
1164         __u64              *k = osd_oti_get(env)->oti_key64;
1165         int                 rc;
1166         ENTRY;
1167
1168         LASSERT(obj->oo_db);
1169         LASSERT(dt_object_exists(dt));
1170         LASSERT(osd_invariant(obj));
1171         LASSERT(th != NULL);
1172
1173         oh = container_of0(th, struct osd_thandle, ot_super);
1174
1175         rc = osd_prepare_key_uint64(obj, k, key);
1176
1177         /* Insert (key,oid) into ZAP */
1178         rc = -zap_add_uint64(osd->od_os, obj->oo_db->db_object,
1179                              k, rc, obj->oo_recusize, obj->oo_recsize,
1180                              (void *)rec, oh->ot_tx);
1181         RETURN(rc);
1182 }
1183
1184 static int osd_declare_index_delete(const struct lu_env *env,
1185                                     struct dt_object *dt,
1186                                     const struct dt_key *key,
1187                                     struct thandle *th)
1188 {
1189         struct osd_object  *obj = osd_dt_obj(dt);
1190         struct osd_thandle *oh;
1191         ENTRY;
1192
1193         LASSERT(dt_object_exists(dt));
1194         LASSERT(osd_invariant(obj));
1195         LASSERT(th != NULL);
1196         LASSERT(obj->oo_db);
1197
1198         oh = container_of0(th, struct osd_thandle, ot_super);
1199         dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, NULL);
1200
1201         RETURN(0);
1202 }
1203
1204 static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
1205                             const struct dt_key *key, struct thandle *th,
1206                             struct lustre_capa *capa)
1207 {
1208         struct osd_object  *obj = osd_dt_obj(dt);
1209         struct osd_device  *osd = osd_obj2dev(obj);
1210         struct osd_thandle *oh;
1211         __u64              *k = osd_oti_get(env)->oti_key64;
1212         int                 rc;
1213         ENTRY;
1214
1215         LASSERT(obj->oo_db);
1216         LASSERT(th != NULL);
1217         oh = container_of0(th, struct osd_thandle, ot_super);
1218
1219         rc = osd_prepare_key_uint64(obj, k, key);
1220
1221         /* Remove binary key from the ZAP */
1222         rc = -zap_remove_uint64(osd->od_os, obj->oo_db->db_object,
1223                                 k, rc, oh->ot_tx);
1224         RETURN(rc);
1225 }
1226
1227 static int osd_index_it_get(const struct lu_env *env, struct dt_it *di,
1228                             const struct dt_key *key)
1229 {
1230         struct osd_zap_it *it = (struct osd_zap_it *)di;
1231         struct osd_object *obj = it->ozi_obj;
1232         struct osd_device *osd = osd_obj2dev(obj);
1233         ENTRY;
1234
1235         LASSERT(it);
1236         LASSERT(it->ozi_zc);
1237
1238         /*
1239          * XXX: we need a binary version of zap_cursor_move_to_key()
1240          *      to implement this API */
1241         if (*((const __u64 *)key) != 0)
1242                 CERROR("NOT IMPLEMETED YET (move to "LPX64")\n",
1243                        *((__u64 *)key));
1244
1245         zap_cursor_fini(it->ozi_zc);
1246         zap_cursor_init(it->ozi_zc, osd->od_os, obj->oo_db->db_object);
1247         it->ozi_reset = 1;
1248
1249         RETURN(+1);
1250 }
1251
1252 static int osd_index_it_next(const struct lu_env *env, struct dt_it *di)
1253 {
1254         struct osd_zap_it *it = (struct osd_zap_it *)di;
1255         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1256         int                rc;
1257         ENTRY;
1258
1259         if (it->ozi_reset == 0)
1260                 zap_cursor_advance(it->ozi_zc);
1261         it->ozi_reset = 0;
1262
1263         /*
1264          * According to current API we need to return error if it's last entry.
1265          * zap_cursor_advance() does not return any value. So we need to call
1266          * retrieve to check if there is any record.  We should make
1267          * changes to Iterator API to not return status for this API
1268          */
1269         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1270         if (rc == -ENOENT)
1271                 RETURN(+1);
1272
1273         RETURN((rc));
1274 }
1275
1276 static struct dt_key *osd_index_it_key(const struct lu_env *env,
1277                                        const struct dt_it *di)
1278 {
1279         struct osd_zap_it *it = (struct osd_zap_it *)di;
1280         struct osd_object *obj = it->ozi_obj;
1281         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1282         int                rc = 0;
1283         ENTRY;
1284
1285         it->ozi_reset = 0;
1286         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1287         if (rc)
1288                 RETURN(ERR_PTR(rc));
1289
1290         /* the binary key is stored in the name */
1291         memcpy(&it->ozi_key, za->za_name, obj->oo_keysize);
1292
1293         RETURN((struct dt_key *)&it->ozi_key);
1294 }
1295
1296 static int osd_index_it_key_size(const struct lu_env *env,
1297                                 const struct dt_it *di)
1298 {
1299         struct osd_zap_it *it = (struct osd_zap_it *)di;
1300         struct osd_object *obj = it->ozi_obj;
1301         RETURN(obj->oo_keysize);
1302 }
1303
1304 static int osd_index_it_rec(const struct lu_env *env, const struct dt_it *di,
1305                             struct dt_rec *rec, __u32 attr)
1306 {
1307         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1308         struct osd_zap_it *it = (struct osd_zap_it *)di;
1309         struct osd_object *obj = it->ozi_obj;
1310         struct osd_device *osd = osd_obj2dev(obj);
1311         __u64             *k = osd_oti_get(env)->oti_key64;
1312         int                rc;
1313         ENTRY;
1314
1315         it->ozi_reset = 0;
1316         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1317         if (rc)
1318                 RETURN(rc);
1319
1320         rc = osd_prepare_key_uint64(obj, k, (const struct dt_key *)za->za_name);
1321
1322         rc = -zap_lookup_uint64(osd->od_os, obj->oo_db->db_object,
1323                                 k, rc, obj->oo_recusize, obj->oo_recsize,
1324                                 (void *)rec);
1325         RETURN(rc);
1326 }
1327
1328 static __u64 osd_index_it_store(const struct lu_env *env,
1329                                 const struct dt_it *di)
1330 {
1331         struct osd_zap_it *it = (struct osd_zap_it *)di;
1332
1333         it->ozi_reset = 0;
1334         RETURN((__u64)zap_cursor_serialize(it->ozi_zc));
1335 }
1336
1337 static int osd_index_it_load(const struct lu_env *env, const struct dt_it *di,
1338                              __u64 hash)
1339 {
1340         struct osd_zap_it *it = (struct osd_zap_it *)di;
1341         struct osd_object *obj = it->ozi_obj;
1342         struct osd_device *osd = osd_obj2dev(obj);
1343         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1344         int                rc;
1345         ENTRY;
1346
1347         /* reset the cursor */
1348         zap_cursor_fini(it->ozi_zc);
1349         zap_cursor_init_serialized(it->ozi_zc, osd->od_os,
1350                                    obj->oo_db->db_object, hash);
1351         it->ozi_reset = 0;
1352
1353         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1354         if (rc == 0)
1355                 RETURN(+1);
1356         else if (rc == -ENOENT)
1357                 RETURN(0);
1358
1359         RETURN(rc);
1360 }
1361
1362 static struct dt_index_operations osd_index_ops = {
1363         .dio_lookup             = osd_index_lookup,
1364         .dio_declare_insert     = osd_declare_index_insert,
1365         .dio_insert             = osd_index_insert,
1366         .dio_declare_delete     = osd_declare_index_delete,
1367         .dio_delete             = osd_index_delete,
1368         .dio_it = {
1369                 .init           = osd_index_it_init,
1370                 .fini           = osd_index_it_fini,
1371                 .get            = osd_index_it_get,
1372                 .put            = osd_index_it_put,
1373                 .next           = osd_index_it_next,
1374                 .key            = osd_index_it_key,
1375                 .key_size       = osd_index_it_key_size,
1376                 .rec            = osd_index_it_rec,
1377                 .store          = osd_index_it_store,
1378                 .load           = osd_index_it_load
1379         }
1380 };
1381
1382 struct osd_metadnode_it {
1383         struct osd_device       *mit_dev;
1384         __u64                    mit_pos;
1385         struct lu_fid            mit_fid;
1386         int                      mit_prefetched;
1387         __u64                    mit_prefetched_dnode;
1388 };
1389
1390 static struct dt_it *osd_zfs_otable_it_init(const struct lu_env *env,
1391                                             struct dt_object *dt, __u32 attr,
1392                                             struct lustre_capa *capa)
1393 {
1394         struct osd_device       *dev   = osd_dev(dt->do_lu.lo_dev);
1395         struct osd_metadnode_it *it;
1396         ENTRY;
1397
1398         OBD_ALLOC_PTR(it);
1399         if (unlikely(it == NULL))
1400                 RETURN(ERR_PTR(-ENOMEM));
1401
1402         it->mit_dev = dev;
1403
1404         /* XXX: dmu_object_next() does NOT find dnodes allocated
1405          *      in the current non-committed txg, so we force txg
1406          *      commit to find all existing dnodes ... */
1407         txg_wait_synced(dmu_objset_pool(dev->od_os), 0ULL);
1408
1409         RETURN((struct dt_it *)it);
1410 }
1411
1412 static void osd_zfs_otable_it_fini(const struct lu_env *env, struct dt_it *di)
1413 {
1414         struct osd_metadnode_it *it  = (struct osd_metadnode_it *)di;
1415
1416         OBD_FREE_PTR(it);
1417 }
1418
1419 static int osd_zfs_otable_it_get(const struct lu_env *env,
1420                                  struct dt_it *di, const struct dt_key *key)
1421 {
1422         return 0;
1423 }
1424
1425 static void osd_zfs_otable_it_put(const struct lu_env *env, struct dt_it *di)
1426 {
1427 }
1428
1429 #define OTABLE_PREFETCH         256
1430
1431 static void osd_zfs_otable_prefetch(const struct lu_env *env,
1432                                     struct osd_metadnode_it *it)
1433 {
1434         struct osd_device       *dev = it->mit_dev;
1435         int                      rc;
1436
1437         /* can go negative on the very first access to the iterator
1438          * or if some non-Lustre objects were found */
1439         if (unlikely(it->mit_prefetched < 0))
1440                 it->mit_prefetched = 0;
1441
1442         if (it->mit_prefetched >= (OTABLE_PREFETCH >> 1))
1443                 return;
1444
1445         if (it->mit_prefetched_dnode == 0)
1446                 it->mit_prefetched_dnode = it->mit_pos;
1447
1448         while (it->mit_prefetched < OTABLE_PREFETCH) {
1449                 rc = -dmu_object_next(dev->od_os, &it->mit_prefetched_dnode,
1450                                       B_FALSE, 0);
1451                 if (unlikely(rc != 0))
1452                         break;
1453
1454                 /* dmu_prefetch() was exported in 0.6.2, if you use with
1455                  * an older release, just comment it out - this is an
1456                  * optimization */
1457                 dmu_prefetch(dev->od_os, it->mit_prefetched_dnode, 0, 0);
1458
1459                 it->mit_prefetched++;
1460         }
1461 }
1462
1463 static int osd_zfs_otable_it_next(const struct lu_env *env, struct dt_it *di)
1464 {
1465         struct osd_metadnode_it *it  = (struct osd_metadnode_it *)di;
1466         struct lustre_mdt_attrs *lma;
1467         struct osd_device       *dev = it->mit_dev;
1468         nvlist_t                *nvbuf = NULL;
1469         uchar_t                 *v;
1470         __u64                    dnode;
1471         int                      rc, s;
1472
1473         memset(&it->mit_fid, 0, sizeof(it->mit_fid));
1474
1475         dnode = it->mit_pos;
1476         do {
1477                 rc = -dmu_object_next(dev->od_os, &it->mit_pos, B_FALSE, 0);
1478                 if (unlikely(rc != 0))
1479                         GOTO(out, rc = 1);
1480                 it->mit_prefetched--;
1481
1482                 /* LMA is required for this to be a Lustre object.
1483                  * If there is no xattr skip it. */
1484                 rc = __osd_xattr_load(dev, it->mit_pos, &nvbuf);
1485                 if (unlikely(rc != 0))
1486                         continue;
1487
1488                 LASSERT(nvbuf != NULL);
1489                 rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA, &v, &s);
1490                 if (likely(rc == 0)) {
1491                         /* Lustre object */
1492                         lma = (struct lustre_mdt_attrs *)v;
1493                         lustre_lma_swab(lma);
1494                         it->mit_fid = lma->lma_self_fid;
1495                         nvlist_free(nvbuf);
1496                         break;
1497                 } else {
1498                         /* not a Lustre object, try next one */
1499                         nvlist_free(nvbuf);
1500                 }
1501
1502         } while (1);
1503
1504
1505         /* we aren't prefetching in the above loop because the number of
1506          * non-Lustre objects is very small and we will be repeating very
1507          * rare. in case we want to use this to iterate over non-Lustre
1508          * objects (i.e. when we convert regular ZFS in Lustre) it makes
1509          * sense to initiate prefetching in the loop */
1510
1511         /* 0 - there are more items, +1 - the end */
1512         if (likely(rc == 0))
1513                 osd_zfs_otable_prefetch(env, it);
1514
1515         CDEBUG(D_OTHER, "advance: %llu -> %llu "DFID": %d\n", dnode,
1516                it->mit_pos, PFID(&it->mit_fid), rc);
1517
1518 out:
1519         return rc;
1520 }
1521
1522 static struct dt_key *osd_zfs_otable_it_key(const struct lu_env *env,
1523                                             const struct dt_it *di)
1524 {
1525         return NULL;
1526 }
1527
1528 static int osd_zfs_otable_it_key_size(const struct lu_env *env,
1529                                       const struct dt_it *di)
1530 {
1531         return sizeof(__u64);
1532 }
1533
1534 static int osd_zfs_otable_it_rec(const struct lu_env *env,
1535                                  const struct dt_it *di,
1536                                  struct dt_rec *rec, __u32 attr)
1537 {
1538         struct osd_metadnode_it *it  = (struct osd_metadnode_it *)di;
1539         struct lu_fid *fid = (struct lu_fid *)rec;
1540         ENTRY;
1541
1542         *fid = it->mit_fid;
1543
1544         RETURN(0);
1545 }
1546
1547
1548 static __u64 osd_zfs_otable_it_store(const struct lu_env *env,
1549                                      const struct dt_it *di)
1550 {
1551         struct osd_metadnode_it *it  = (struct osd_metadnode_it *)di;
1552
1553         return it->mit_pos;
1554 }
1555
1556 static int osd_zfs_otable_it_load(const struct lu_env *env,
1557                                   const struct dt_it *di, __u64 hash)
1558 {
1559         struct osd_metadnode_it *it  = (struct osd_metadnode_it *)di;
1560
1561         it->mit_pos = hash;
1562         it->mit_prefetched = 0;
1563         it->mit_prefetched_dnode = 0;
1564
1565         return osd_zfs_otable_it_next(env, (struct dt_it *)di);
1566 }
1567
1568 static int osd_zfs_otable_it_key_rec(const struct lu_env *env,
1569                                      const struct dt_it *di, void *key_rec)
1570 {
1571         return 0;
1572 }
1573
1574 const struct dt_index_operations osd_zfs_otable_ops = {
1575         .dio_it = {
1576                 .init     = osd_zfs_otable_it_init,
1577                 .fini     = osd_zfs_otable_it_fini,
1578                 .get      = osd_zfs_otable_it_get,
1579                 .put      = osd_zfs_otable_it_put,
1580                 .next     = osd_zfs_otable_it_next,
1581                 .key      = osd_zfs_otable_it_key,
1582                 .key_size = osd_zfs_otable_it_key_size,
1583                 .rec      = osd_zfs_otable_it_rec,
1584                 .store    = osd_zfs_otable_it_store,
1585                 .load     = osd_zfs_otable_it_load,
1586                 .key_rec  = osd_zfs_otable_it_key_rec,
1587         }
1588 };
1589
1590 int osd_index_try(const struct lu_env *env, struct dt_object *dt,
1591                 const struct dt_index_features *feat)
1592 {
1593         struct osd_object *obj = osd_dt_obj(dt);
1594         ENTRY;
1595
1596         LASSERT(dt_object_exists(dt));
1597
1598         /*
1599          * XXX: implement support for fixed-size keys sorted with natural
1600          *      numerical way (not using internal hash value)
1601          */
1602         if (feat->dif_flags & DT_IND_RANGE)
1603                 RETURN(-ERANGE);
1604
1605         if (unlikely(feat == &dt_otable_features)) {
1606                 dt->do_index_ops = &osd_zfs_otable_ops;
1607                 RETURN(0);
1608         }
1609
1610         LASSERT(obj->oo_db != NULL);
1611         if (likely(feat == &dt_directory_features)) {
1612                 if (osd_object_is_zap(obj->oo_db))
1613                         dt->do_index_ops = &osd_dir_ops;
1614                 else
1615                         RETURN(-ENOTDIR);
1616         } else if (unlikely(feat == &dt_acct_features)) {
1617                 LASSERT(fid_is_acct(lu_object_fid(&dt->do_lu)));
1618                 dt->do_index_ops = &osd_acct_index_ops;
1619         } else if (osd_object_is_zap(obj->oo_db) &&
1620                    dt->do_index_ops == NULL) {
1621                 /* For index file, we don't support variable key & record sizes
1622                  * and the key has to be unique */
1623                 if ((feat->dif_flags & ~DT_IND_UPDATE) != 0)
1624                         RETURN(-EINVAL);
1625
1626                 if (feat->dif_keysize_max > ZAP_MAXNAMELEN)
1627                         RETURN(-E2BIG);
1628                 if (feat->dif_keysize_max != feat->dif_keysize_min)
1629                         RETURN(-EINVAL);
1630
1631                 /* As for the record size, it should be a multiple of 8 bytes
1632                  * and smaller than the maximum value length supported by ZAP.
1633                  */
1634                 if (feat->dif_recsize_max > ZAP_MAXVALUELEN)
1635                         RETURN(-E2BIG);
1636                 if (feat->dif_recsize_max != feat->dif_recsize_min)
1637                         RETURN(-EINVAL);
1638
1639                 obj->oo_keysize = feat->dif_keysize_max;
1640                 obj->oo_recsize = feat->dif_recsize_max;
1641                 obj->oo_recusize = 1;
1642
1643                 /* ZFS prefers to work with array of 64bits */
1644                 if ((obj->oo_recsize & 7) == 0) {
1645                         obj->oo_recsize >>= 3;
1646                         obj->oo_recusize = 8;
1647                 }
1648                 dt->do_index_ops = &osd_index_ops;
1649         }
1650
1651         RETURN(0);
1652 }