Whamcloud - gitweb
LU-6040 lnet: remove messages from lazy portal on NI shutdown
[fs/lustre-release.git] / lustre / osd-zfs / osd_index.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/osd-zfs/osd_index.c
37  *
38  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
39  * Author: Mike Pershin <tappro@whamcloud.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_OSD
43
44 #include <lustre_ver.h>
45 #include <libcfs/libcfs.h>
46 #include <obd_support.h>
47 #include <lustre_net.h>
48 #include <obd.h>
49 #include <obd_class.h>
50 #include <lustre_disk.h>
51 #include <lustre_fid.h>
52
53 #include "osd_internal.h"
54
55 #include <sys/dnode.h>
56 #include <sys/dbuf.h>
57 #include <sys/spa.h>
58 #include <sys/stat.h>
59 #include <sys/zap.h>
60 #include <sys/spa_impl.h>
61 #include <sys/zfs_znode.h>
62 #include <sys/dmu_tx.h>
63 #include <sys/dmu_objset.h>
64 #include <sys/dsl_prop.h>
65 #include <sys/sa_impl.h>
66 #include <sys/txg.h>
67
68 static inline int osd_object_is_zap(dmu_buf_t *db)
69 {
70         dmu_buf_impl_t *dbi = (dmu_buf_impl_t *) db;
71         dnode_t *dn;
72         int rc;
73
74         DB_DNODE_ENTER(dbi);
75         dn = DB_DNODE(dbi);
76         rc = (dn->dn_type == DMU_OT_DIRECTORY_CONTENTS ||
77                         dn->dn_type == DMU_OT_USERGROUP_USED);
78         DB_DNODE_EXIT(dbi);
79
80         return rc;
81 }
82
83 /* We don't actually have direct access to the zap_hashbits() function
84  * so just pretend like we do for now.  If this ever breaks we can look at
85  * it at that time. */
86 #define zap_hashbits(zc) 48
87 /*
88  * ZFS hash format:
89  * | cd (16 bits) | hash (48 bits) |
90  * we need it in other form:
91  * |0| hash (48 bit) | cd (15 bit) |
92  * to be a full 64-bit ordered hash so that Lustre readdir can use it to merge
93  * the readdir hashes from multiple directory stripes uniformly on the client.
94  * Another point is sign bit, the hash range should be in [0, 2^63-1] because
95  * loff_t (for llseek) needs to be a positive value.  This means the "cd" field
96  * should only be the low 15 bits.
97  */
98 uint64_t osd_zap_cursor_serialize(zap_cursor_t *zc)
99 {
100         uint64_t zfs_hash = zap_cursor_serialize(zc) & (~0ULL >> 1);
101
102         return (zfs_hash >> zap_hashbits(zc)) |
103                 (zfs_hash << (63 - zap_hashbits(zc)));
104 }
105
106 void osd_zap_cursor_init_serialized(zap_cursor_t *zc, struct objset *os,
107                                     uint64_t id, uint64_t dirhash)
108 {
109         uint64_t zfs_hash = ((dirhash << zap_hashbits(zc)) & (~0ULL >> 1)) |
110                 (dirhash >> (63 - zap_hashbits(zc)));
111
112         zap_cursor_init_serialized(zc, os, id, zfs_hash);
113 }
114
115 int osd_zap_cursor_init(zap_cursor_t **zc, struct objset *os,
116                         uint64_t id, uint64_t dirhash)
117 {
118         zap_cursor_t *t;
119
120         OBD_ALLOC_PTR(t);
121         if (unlikely(t == NULL))
122                 return -ENOMEM;
123
124         osd_zap_cursor_init_serialized(t, os, id, dirhash);
125         *zc = t;
126
127         return 0;
128 }
129
130 void osd_zap_cursor_fini(zap_cursor_t *zc)
131 {
132         zap_cursor_fini(zc);
133         OBD_FREE_PTR(zc);
134 }
135
136 static inline void osd_obj_cursor_init_serialized(zap_cursor_t *zc,
137                                                  struct osd_object *o,
138                                                  uint64_t dirhash)
139 {
140         struct osd_device *d = osd_obj2dev(o);
141         osd_zap_cursor_init_serialized(zc, d->od_os,
142                                        o->oo_db->db_object, dirhash);
143 }
144
145 static inline int osd_obj_cursor_init(zap_cursor_t **zc, struct osd_object *o,
146                         uint64_t dirhash)
147 {
148         struct osd_device *d = osd_obj2dev(o);
149         return osd_zap_cursor_init(zc, d->od_os, o->oo_db->db_object, dirhash);
150 }
151
152 static struct dt_it *osd_index_it_init(const struct lu_env *env,
153                                        struct dt_object *dt,
154                                        __u32 unused,
155                                        struct lustre_capa *capa)
156 {
157         struct osd_thread_info  *info = osd_oti_get(env);
158         struct osd_zap_it       *it;
159         struct osd_object       *obj = osd_dt_obj(dt);
160         struct lu_object        *lo  = &dt->do_lu;
161         int                      rc;
162         ENTRY;
163
164         /* XXX: check capa ? */
165
166         LASSERT(lu_object_exists(lo));
167         LASSERT(obj->oo_db);
168         LASSERT(osd_object_is_zap(obj->oo_db));
169         LASSERT(info);
170
171         OBD_SLAB_ALLOC_PTR_GFP(it, osd_zapit_cachep, GFP_NOFS);
172         if (it == NULL)
173                 RETURN(ERR_PTR(-ENOMEM));
174
175         rc = osd_obj_cursor_init(&it->ozi_zc, obj, 0);
176         if (rc != 0) {
177                 OBD_SLAB_FREE_PTR(it, osd_zapit_cachep);
178                 RETURN(ERR_PTR(rc));
179         }
180
181         it->ozi_obj   = obj;
182         it->ozi_capa  = capa;
183         it->ozi_reset = 1;
184         lu_object_get(lo);
185
186         RETURN((struct dt_it *)it);
187 }
188
189 static void osd_index_it_fini(const struct lu_env *env, struct dt_it *di)
190 {
191         struct osd_zap_it       *it     = (struct osd_zap_it *)di;
192         struct osd_object       *obj;
193         ENTRY;
194
195         LASSERT(it);
196         LASSERT(it->ozi_obj);
197
198         obj = it->ozi_obj;
199
200         osd_zap_cursor_fini(it->ozi_zc);
201         lu_object_put(env, &obj->oo_dt.do_lu);
202         OBD_SLAB_FREE_PTR(it, osd_zapit_cachep);
203
204         EXIT;
205 }
206
207
208 static void osd_index_it_put(const struct lu_env *env, struct dt_it *di)
209 {
210         /* PBS: do nothing : ref are incremented at retrive and decreamented
211          *      next/finish. */
212 }
213
214 static inline void osd_it_append_attrs(struct lu_dirent *ent, __u32 attr,
215                                        int len, __u16 type)
216 {
217         const unsigned    align = sizeof(struct luda_type) - 1;
218         struct luda_type *lt;
219
220         /* check if file type is required */
221         if (attr & LUDA_TYPE) {
222                 len = (len + align) & ~align;
223
224                 lt = (void *)ent->lde_name + len;
225                 lt->lt_type = cpu_to_le16(DTTOIF(type));
226                 ent->lde_attrs |= LUDA_TYPE;
227         }
228
229         ent->lde_attrs = cpu_to_le32(ent->lde_attrs);
230 }
231
232 /*
233  * as we don't know FID, we can't use LU object, so this function
234  * partially duplicate __osd_xattr_get() which is built around
235  * LU-object and uses it to cache data like regular EA dnode, etc
236  */
237 static int osd_find_parent_by_dnode(const struct lu_env *env,
238                                     struct dt_object *o,
239                                     struct lu_fid *fid)
240 {
241         struct osd_device       *osd = osd_obj2dev(osd_dt_obj(o));
242         struct lustre_mdt_attrs *lma;
243         struct lu_buf            buf;
244         sa_handle_t             *sa_hdl;
245         nvlist_t                *nvbuf = NULL;
246         uchar_t                 *value;
247         uint64_t                 dnode;
248         int                      rc, size;
249         ENTRY;
250
251         /* first of all, get parent dnode from own attributes */
252         LASSERT(osd_dt_obj(o)->oo_db);
253         rc = -sa_handle_get(osd->od_os, osd_dt_obj(o)->oo_db->db_object,
254                             NULL, SA_HDL_PRIVATE, &sa_hdl);
255         if (rc)
256                 RETURN(rc);
257
258         dnode = ZFS_NO_OBJECT;
259         rc = -sa_lookup(sa_hdl, SA_ZPL_PARENT(osd), &dnode, 8);
260         sa_handle_destroy(sa_hdl);
261         if (rc)
262                 RETURN(rc);
263
264         /* now get EA buffer */
265         rc = __osd_xattr_load(osd, dnode, &nvbuf);
266         if (rc)
267                 GOTO(regular, rc);
268
269         /* XXX: if we get that far.. should we cache the result? */
270
271         /* try to find LMA attribute */
272         LASSERT(nvbuf != NULL);
273         rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA, &value, &size);
274         if (rc == 0 && size >= sizeof(*lma)) {
275                 lma = (struct lustre_mdt_attrs *)value;
276                 lustre_lma_swab(lma);
277                 *fid = lma->lma_self_fid;
278                 GOTO(out, rc = 0);
279         }
280
281 regular:
282         /* no LMA attribute in SA, let's try regular EA */
283
284         /* first of all, get parent dnode storing regular EA */
285         rc = -sa_handle_get(osd->od_os, dnode, NULL, SA_HDL_PRIVATE, &sa_hdl);
286         if (rc)
287                 GOTO(out, rc);
288
289         dnode = ZFS_NO_OBJECT;
290         rc = -sa_lookup(sa_hdl, SA_ZPL_XATTR(osd), &dnode, 8);
291         sa_handle_destroy(sa_hdl);
292         if (rc)
293                 GOTO(out, rc);
294
295         CLASSERT(sizeof(*lma) <= sizeof(osd_oti_get(env)->oti_buf));
296         buf.lb_buf = osd_oti_get(env)->oti_buf;
297         buf.lb_len = sizeof(osd_oti_get(env)->oti_buf);
298
299         /* now try to find LMA */
300         rc = __osd_xattr_get_large(env, osd, dnode, &buf,
301                                    XATTR_NAME_LMA, &size);
302         if (rc == 0 && size >= sizeof(*lma)) {
303                 lma = buf.lb_buf;
304                 lustre_lma_swab(lma);
305                 *fid = lma->lma_self_fid;
306                 GOTO(out, rc = 0);
307         } else if (rc < 0) {
308                 GOTO(out, rc);
309         } else {
310                 GOTO(out, rc = -EIO);
311         }
312
313 out:
314         if (nvbuf != NULL)
315                 nvlist_free(nvbuf);
316         RETURN(rc);
317 }
318
319 static int osd_find_parent_fid(const struct lu_env *env, struct dt_object *o,
320                                struct lu_fid *fid)
321 {
322         struct link_ea_header  *leh;
323         struct link_ea_entry   *lee;
324         struct lu_buf           buf;
325         int                     rc;
326         ENTRY;
327
328         buf.lb_buf = osd_oti_get(env)->oti_buf;
329         buf.lb_len = sizeof(osd_oti_get(env)->oti_buf);
330
331         rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK, BYPASS_CAPA);
332         if (rc == -ERANGE) {
333                 rc = osd_xattr_get(env, o, &LU_BUF_NULL,
334                                    XATTR_NAME_LINK, BYPASS_CAPA);
335                 if (rc < 0)
336                         RETURN(rc);
337                 LASSERT(rc > 0);
338                 OBD_ALLOC(buf.lb_buf, rc);
339                 if (buf.lb_buf == NULL)
340                         RETURN(-ENOMEM);
341                 buf.lb_len = rc;
342                 rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK, BYPASS_CAPA);
343         }
344         if (rc < 0)
345                 GOTO(out, rc);
346         if (rc < sizeof(*leh) + sizeof(*lee))
347                 GOTO(out, rc = -EINVAL);
348
349         leh = buf.lb_buf;
350         if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
351                 leh->leh_magic = LINK_EA_MAGIC;
352                 leh->leh_reccount = __swab32(leh->leh_reccount);
353                 leh->leh_len = __swab64(leh->leh_len);
354         }
355         if (leh->leh_magic != LINK_EA_MAGIC)
356                 GOTO(out, rc = -EINVAL);
357         if (leh->leh_reccount == 0)
358                 GOTO(out, rc = -ENODATA);
359
360         lee = (struct link_ea_entry *)(leh + 1);
361         fid_be_to_cpu(fid, (const struct lu_fid *)&lee->lee_parent_fid);
362         rc = 0;
363
364 out:
365         if (buf.lb_buf != osd_oti_get(env)->oti_buf)
366                 OBD_FREE(buf.lb_buf, buf.lb_len);
367
368 #if 0
369         /* this block can be enabled for additional verification
370          * it's trying to match FID from LinkEA vs. FID from LMA */
371         if (rc == 0) {
372                 struct lu_fid fid2;
373                 int rc2;
374                 rc2 = osd_find_parent_by_dnode(env, o, &fid2);
375                 if (rc2 == 0)
376                         if (lu_fid_eq(fid, &fid2) == 0)
377                                 CERROR("wrong parent: "DFID" != "DFID"\n",
378                                        PFID(fid), PFID(&fid2));
379         }
380 #endif
381
382         /* no LinkEA is found, let's try to find the fid in parent's LMA */
383         if (unlikely(rc != 0))
384                 rc = osd_find_parent_by_dnode(env, o, fid);
385
386         RETURN(rc);
387 }
388
389 static int osd_dir_lookup(const struct lu_env *env, struct dt_object *dt,
390                           struct dt_rec *rec, const struct dt_key *key,
391                           struct lustre_capa *capa)
392 {
393         struct osd_thread_info *oti = osd_oti_get(env);
394         struct osd_object  *obj = osd_dt_obj(dt);
395         struct osd_device  *osd = osd_obj2dev(obj);
396         char               *name = (char *)key;
397         int                 rc;
398         ENTRY;
399
400         LASSERT(osd_object_is_zap(obj->oo_db));
401
402         if (name[0] == '.') {
403                 if (name[1] == 0) {
404                         const struct lu_fid *f = lu_object_fid(&dt->do_lu);
405                         memcpy(rec, f, sizeof(*f));
406                         RETURN(1);
407                 } else if (name[1] == '.' && name[2] == 0) {
408                         rc = osd_find_parent_fid(env, dt, (struct lu_fid *)rec);
409                         RETURN(rc == 0 ? 1 : rc);
410                 }
411         }
412
413         rc = -zap_lookup(osd->od_os, obj->oo_db->db_object,
414                          (char *)key, 8, sizeof(oti->oti_zde) / 8,
415                          (void *)&oti->oti_zde);
416         memcpy(rec, &oti->oti_zde.lzd_fid, sizeof(struct lu_fid));
417
418         RETURN(rc == 0 ? 1 : rc);
419 }
420
421 static int osd_declare_dir_insert(const struct lu_env *env,
422                                   struct dt_object *dt,
423                                   const struct dt_rec *rec,
424                                   const struct dt_key *key,
425                                   struct thandle *th)
426 {
427         struct osd_object  *obj = osd_dt_obj(dt);
428         struct osd_thandle *oh;
429         uint64_t object;
430         ENTRY;
431
432         LASSERT(th != NULL);
433         oh = container_of0(th, struct osd_thandle, ot_super);
434
435         /* This is for inserting dot/dotdot for new created dir. */
436         if (obj->oo_db == NULL)
437                 object = DMU_NEW_OBJECT;
438         else
439                 object = obj->oo_db->db_object;
440
441         dmu_tx_hold_bonus(oh->ot_tx, object);
442         dmu_tx_hold_zap(oh->ot_tx, object, TRUE, (char *)key);
443
444         RETURN(0);
445 }
446
447 /**
448  * Find the osd object for given fid.
449  *
450  * \param fid need to find the osd object having this fid
451  *
452  * \retval osd_object on success
453  * \retval        -ve on error
454  */
455 struct osd_object *osd_object_find(const struct lu_env *env,
456                                    struct dt_object *dt,
457                                    const struct lu_fid *fid)
458 {
459         struct lu_device         *ludev = dt->do_lu.lo_dev;
460         struct osd_object        *child = NULL;
461         struct lu_object         *luch;
462         struct lu_object         *lo;
463
464         /*
465          * at this point topdev might not exist yet
466          * (i.e. MGS is preparing profiles). so we can
467          * not rely on topdev and instead lookup with
468          * our device passed as topdev. this can't work
469          * if the object isn't cached yet (as osd doesn't
470          * allocate lu_header). IOW, the object must be
471          * in the cache, otherwise lu_object_alloc() crashes
472          * -bzzz
473          */
474         luch = lu_object_find_at(env, ludev, fid, NULL);
475         if (IS_ERR(luch))
476                 return (void *)luch;
477
478         if (lu_object_exists(luch)) {
479                 lo = lu_object_locate(luch->lo_header, ludev->ld_type);
480                 if (lo != NULL)
481                         child = osd_obj(lo);
482                 else
483                         LU_OBJECT_DEBUG(D_ERROR, env, luch,
484                                         "%s: object can't be located "DFID"\n",
485                                         osd_dev(ludev)->od_svname, PFID(fid));
486
487                 if (child == NULL) {
488                         lu_object_put(env, luch);
489                         CERROR("%s: Unable to get osd_object "DFID"\n",
490                                osd_dev(ludev)->od_svname, PFID(fid));
491                         child = ERR_PTR(-ENOENT);
492                 }
493         } else {
494                 LU_OBJECT_DEBUG(D_ERROR, env, luch,
495                                 "%s: lu_object does not exists "DFID"\n",
496                                 osd_dev(ludev)->od_svname, PFID(fid));
497                 lu_object_put(env, luch);
498                 child = ERR_PTR(-ENOENT);
499         }
500
501         return child;
502 }
503
504 /**
505  * Put the osd object once done with it.
506  *
507  * \param obj osd object that needs to be put
508  */
509 static inline void osd_object_put(const struct lu_env *env,
510                                   struct osd_object *obj)
511 {
512         lu_object_put(env, &obj->oo_dt.do_lu);
513 }
514
515 static int osd_seq_exists(const struct lu_env *env, struct osd_device *osd,
516                           u64 seq)
517 {
518         struct lu_seq_range     *range = &osd_oti_get(env)->oti_seq_range;
519         struct seq_server_site  *ss = osd_seq_site(osd);
520         int                     rc;
521         ENTRY;
522
523         LASSERT(ss != NULL);
524         LASSERT(ss->ss_server_fld != NULL);
525
526         rc = osd_fld_lookup(env, osd, seq, range);
527         if (rc != 0) {
528                 if (rc != -ENOENT)
529                         CERROR("%s: Can not lookup fld for "LPX64"\n",
530                                osd_name(osd), seq);
531                 RETURN(0);
532         }
533
534         RETURN(ss->ss_node_id == range->lsr_index);
535 }
536
537 static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
538                           const struct lu_fid *fid)
539 {
540         struct seq_server_site  *ss = osd_seq_site(osd);
541         ENTRY;
542
543         /* FID seqs not in FLDB, must be local seq */
544         if (unlikely(!fid_seq_in_fldb(fid_seq(fid))))
545                 RETURN(0);
546
547         /* If FLD is not being initialized yet, it only happens during the
548          * initialization, likely during mgs initialization, and we assume
549          * this is local FID. */
550         if (ss == NULL || ss->ss_server_fld == NULL)
551                 RETURN(0);
552
553         /* Only check the local FLDB here */
554         if (osd_seq_exists(env, osd, fid_seq(fid)))
555                 RETURN(0);
556
557         RETURN(1);
558 }
559
560 /**
561  *      Inserts (key, value) pair in \a directory object.
562  *
563  *      \param  dt      osd index object
564  *      \param  key     key for index
565  *      \param  rec     record reference
566  *      \param  th      transaction handler
567  *      \param  capa    capability descriptor
568  *      \param  ignore_quota update should not affect quota
569  *
570  *      \retval  0  success
571  *      \retval -ve failure
572  */
573 static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt,
574                           const struct dt_rec *rec, const struct dt_key *key,
575                           struct thandle *th, struct lustre_capa *capa,
576                           int ignore_quota)
577 {
578         struct osd_thread_info *oti = osd_oti_get(env);
579         struct osd_object   *parent = osd_dt_obj(dt);
580         struct osd_device   *osd = osd_obj2dev(parent);
581         struct dt_insert_rec *rec1 = (struct dt_insert_rec *)rec;
582         const struct lu_fid *fid = rec1->rec_fid;
583         struct osd_thandle  *oh;
584         struct osd_object   *child = NULL;
585         __u32                attr;
586         char                *name = (char *)key;
587         int                  rc;
588         ENTRY;
589
590         LASSERT(parent->oo_db);
591         LASSERT(osd_object_is_zap(parent->oo_db));
592
593         LASSERT(dt_object_exists(dt));
594         LASSERT(osd_invariant(parent));
595
596         LASSERT(th != NULL);
597         oh = container_of0(th, struct osd_thandle, ot_super);
598
599         rc = osd_remote_fid(env, osd, fid);
600         if (rc < 0) {
601                 CERROR("%s: Can not find object "DFID": rc = %d\n",
602                        osd->od_svname, PFID(fid), rc);
603                 RETURN(rc);
604         }
605
606         if (unlikely(rc == 1)) {
607                 /* Insert remote entry */
608                 memset(&oti->oti_zde.lzd_reg, 0, sizeof(oti->oti_zde.lzd_reg));
609                 oti->oti_zde.lzd_reg.zde_type = IFTODT(rec1->rec_type & S_IFMT);
610         } else {
611                 /*
612                  * To simulate old Orion setups with ./..  stored in the
613                  * directories
614                  */
615                 /* Insert local entry */
616                 child = osd_object_find(env, dt, fid);
617                 if (IS_ERR(child))
618                         RETURN(PTR_ERR(child));
619
620                 LASSERT(child->oo_db);
621                 if (name[0] == '.') {
622                         if (name[1] == 0) {
623                                 /* do not store ".", instead generate it
624                                  * during iteration */
625                                 GOTO(out, rc = 0);
626                         } else if (name[1] == '.' && name[2] == 0) {
627                                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PARENT)) {
628                                         struct lu_fid tfid = *fid;
629
630                                         osd_object_put(env, child);
631                                         tfid.f_oid--;
632                                         child = osd_object_find(env, dt, &tfid);
633                                         if (IS_ERR(child))
634                                                 RETURN(PTR_ERR(child));
635
636                                         LASSERT(child->oo_db);
637                                 }
638
639                                 /* update parent dnode in the child.
640                                  * later it will be used to generate ".." */
641                                 rc = osd_object_sa_update(parent,
642                                                  SA_ZPL_PARENT(osd),
643                                                  &child->oo_db->db_object,
644                                                  8, oh);
645
646                                 GOTO(out, rc);
647                         }
648                 }
649                 CLASSERT(sizeof(oti->oti_zde.lzd_reg) == 8);
650                 CLASSERT(sizeof(oti->oti_zde) % 8 == 0);
651                 attr = child->oo_dt.do_lu.lo_header ->loh_attr;
652                 oti->oti_zde.lzd_reg.zde_type = IFTODT(attr & S_IFMT);
653                 oti->oti_zde.lzd_reg.zde_dnode = child->oo_db->db_object;
654         }
655
656         oti->oti_zde.lzd_fid = *fid;
657         /* Insert (key,oid) into ZAP */
658         rc = -zap_add(osd->od_os, parent->oo_db->db_object,
659                       (char *)key, 8, sizeof(oti->oti_zde) / 8,
660                       (void *)&oti->oti_zde, oh->ot_tx);
661         if (unlikely(rc == -EEXIST &&
662                      name[0] == '.' && name[1] == '.' && name[2] == 0))
663                 /* Update (key,oid) in ZAP */
664                 rc = -zap_update(osd->od_os, parent->oo_db->db_object,
665                                 (char *)key, 8, sizeof(oti->oti_zde) / 8,
666                                 (void *)&oti->oti_zde, oh->ot_tx);
667
668 out:
669         if (child != NULL)
670                 osd_object_put(env, child);
671
672         RETURN(rc);
673 }
674
675 static int osd_declare_dir_delete(const struct lu_env *env,
676                                   struct dt_object *dt,
677                                   const struct dt_key *key,
678                                   struct thandle *th)
679 {
680         struct osd_object *obj = osd_dt_obj(dt);
681         struct osd_thandle *oh;
682         ENTRY;
683
684         LASSERT(dt_object_exists(dt));
685         LASSERT(osd_invariant(obj));
686
687         LASSERT(th != NULL);
688         oh = container_of0(th, struct osd_thandle, ot_super);
689
690         LASSERT(obj->oo_db);
691         LASSERT(osd_object_is_zap(obj->oo_db));
692
693         dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, (char *)key);
694
695         RETURN(0);
696 }
697
698 static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt,
699                           const struct dt_key *key, struct thandle *th,
700                           struct lustre_capa *capa)
701 {
702         struct osd_object *obj = osd_dt_obj(dt);
703         struct osd_device *osd = osd_obj2dev(obj);
704         struct osd_thandle *oh;
705         dmu_buf_t *zap_db = obj->oo_db;
706         char      *name = (char *)key;
707         int rc;
708         ENTRY;
709
710         LASSERT(obj->oo_db);
711         LASSERT(osd_object_is_zap(obj->oo_db));
712
713         LASSERT(th != NULL);
714         oh = container_of0(th, struct osd_thandle, ot_super);
715
716         /*
717          * In Orion . and .. were stored in the directory (not generated upon
718          * request as now). we preserve them for backward compatibility
719          */
720         if (name[0] == '.') {
721                 if (name[1] == 0) {
722                         RETURN(0);
723                 } else if (name[1] == '.' && name[2] == 0) {
724                         RETURN(0);
725                 }
726         }
727
728         /* Remove key from the ZAP */
729         rc = -zap_remove(osd->od_os, zap_db->db_object,
730                          (char *) key, oh->ot_tx);
731
732         if (unlikely(rc && rc != -ENOENT))
733                 CERROR("%s: zap_remove failed: rc = %d\n", osd->od_svname, rc);
734
735         RETURN(rc);
736 }
737
738 static struct dt_it *osd_dir_it_init(const struct lu_env *env,
739                                      struct dt_object *dt,
740                                      __u32 unused,
741                                      struct lustre_capa *capa)
742 {
743         struct osd_zap_it *it;
744
745         it = (struct osd_zap_it *)osd_index_it_init(env, dt, unused, capa);
746         if (!IS_ERR(it))
747                 it->ozi_pos = 0;
748
749         RETURN((struct dt_it *)it);
750 }
751
752 /**
753  *  Move Iterator to record specified by \a key
754  *
755  *  \param  di      osd iterator
756  *  \param  key     key for index
757  *
758  *  \retval +ve  di points to record with least key not larger than key
759  *  \retval  0   di points to exact matched key
760  *  \retval -ve  failure
761  */
762 static int osd_dir_it_get(const struct lu_env *env,
763                           struct dt_it *di, const struct dt_key *key)
764 {
765         struct osd_zap_it *it = (struct osd_zap_it *)di;
766         struct osd_object *obj = it->ozi_obj;
767         char              *name = (char *)key;
768         int                rc;
769         ENTRY;
770
771         LASSERT(it);
772         LASSERT(it->ozi_zc);
773
774         /* reset the cursor */
775         zap_cursor_fini(it->ozi_zc);
776         osd_obj_cursor_init_serialized(it->ozi_zc, obj, 0);
777
778         /* XXX: implementation of the API is broken at the moment */
779         LASSERT(((const char *)key)[0] == 0);
780
781         if (name[0] == 0) {
782                 it->ozi_pos = 0;
783                 RETURN(1);
784         }
785
786         if (name[0] == '.') {
787                 if (name[1] == 0) {
788                         it->ozi_pos = 1;
789                         GOTO(out, rc = 1);
790                 } else if (name[1] == '.' && name[2] == 0) {
791                         it->ozi_pos = 2;
792                         GOTO(out, rc = 1);
793                 }
794         }
795
796         /* neither . nor .. - some real record */
797         it->ozi_pos = 3;
798         rc = +1;
799
800 out:
801         RETURN(rc);
802 }
803
804 static void osd_dir_it_put(const struct lu_env *env, struct dt_it *di)
805 {
806         /* PBS: do nothing : ref are incremented at retrive and decreamented
807          *      next/finish. */
808 }
809
810 /*
811  * in Orion . and .. were stored in the directory, while ZPL
812  * and current osd-zfs generate them up on request. so, we
813  * need to ignore previously stored . and ..
814  */
815 static int osd_index_retrieve_skip_dots(struct osd_zap_it *it,
816                                         zap_attribute_t *za)
817 {
818         int rc, isdot;
819
820         do {
821                 rc = -zap_cursor_retrieve(it->ozi_zc, za);
822
823                 isdot = 0;
824                 if (unlikely(rc == 0 && za->za_name[0] == '.')) {
825                         if (za->za_name[1] == 0) {
826                                 isdot = 1;
827                         } else if (za->za_name[1] == '.' &&
828                                    za->za_name[2] == 0) {
829                                 isdot = 1;
830                         }
831                         if (unlikely(isdot))
832                                 zap_cursor_advance(it->ozi_zc);
833                 }
834         } while (unlikely(rc == 0 && isdot));
835
836         return rc;
837 }
838
839 /**
840  * to load a directory entry at a time and stored it in
841  * iterator's in-memory data structure.
842  *
843  * \param di, struct osd_it_ea, iterator's in memory structure
844  *
845  * \retval +ve, iterator reached to end
846  * \retval   0, iterator not reached to end
847  * \retval -ve, on error
848  */
849 static int osd_dir_it_next(const struct lu_env *env, struct dt_it *di)
850 {
851         struct osd_zap_it *it = (struct osd_zap_it *)di;
852         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
853         int                rc;
854
855         ENTRY;
856
857         /* temp. storage should be enough for any key supported by ZFS */
858         CLASSERT(sizeof(za->za_name) <= sizeof(it->ozi_name));
859
860         /*
861          * the first ->next() moves the cursor to .
862          * the second ->next() moves the cursor to ..
863          * then we get to the real records and have to verify any exist
864          */
865         if (it->ozi_pos <= 2) {
866                 it->ozi_pos++;
867                 if (it->ozi_pos <=2)
868                         RETURN(0);
869
870         } else {
871                 zap_cursor_advance(it->ozi_zc);
872         }
873
874         /*
875          * According to current API we need to return error if its last entry.
876          * zap_cursor_advance() does not return any value. So we need to call
877          * retrieve to check if there is any record.  We should make
878          * changes to Iterator API to not return status for this API
879          */
880         rc = osd_index_retrieve_skip_dots(it, za);
881
882         if (rc == -ENOENT) /* end of dir */
883                 RETURN(+1);
884
885         RETURN(rc);
886 }
887
888 static struct dt_key *osd_dir_it_key(const struct lu_env *env,
889                                      const struct dt_it *di)
890 {
891         struct osd_zap_it *it = (struct osd_zap_it *)di;
892         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
893         int                rc = 0;
894         ENTRY;
895
896         if (it->ozi_pos <= 1) {
897                 it->ozi_pos = 1;
898                 RETURN((struct dt_key *)".");
899         } else if (it->ozi_pos == 2) {
900                 RETURN((struct dt_key *)"..");
901         }
902
903         if ((rc = -zap_cursor_retrieve(it->ozi_zc, za)))
904                 RETURN(ERR_PTR(rc));
905
906         strcpy(it->ozi_name, za->za_name);
907
908         RETURN((struct dt_key *)it->ozi_name);
909 }
910
911 static int osd_dir_it_key_size(const struct lu_env *env, const struct dt_it *di)
912 {
913         struct osd_zap_it *it = (struct osd_zap_it *)di;
914         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
915         int                rc;
916         ENTRY;
917
918         if (it->ozi_pos <= 1) {
919                 it->ozi_pos = 1;
920                 RETURN(2);
921         } else if (it->ozi_pos == 2) {
922                 RETURN(3);
923         }
924
925         if ((rc = -zap_cursor_retrieve(it->ozi_zc, za)) == 0)
926                 rc = strlen(za->za_name);
927
928         RETURN(rc);
929 }
930
931 static int osd_dir_it_rec(const struct lu_env *env, const struct dt_it *di,
932                           struct dt_rec *dtrec, __u32 attr)
933 {
934         struct osd_zap_it   *it = (struct osd_zap_it *)di;
935         struct lu_dirent    *lde = (struct lu_dirent *)dtrec;
936         struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
937         zap_attribute_t     *za = &osd_oti_get(env)->oti_za;
938         int                  rc, namelen;
939         ENTRY;
940
941         if (it->ozi_pos <= 1) {
942                 lde->lde_hash = cpu_to_le64(1);
943                 strcpy(lde->lde_name, ".");
944                 lde->lde_namelen = cpu_to_le16(1);
945                 lde->lde_fid = *lu_object_fid(&it->ozi_obj->oo_dt.do_lu);
946                 lde->lde_attrs = LUDA_FID;
947                 /* append lustre attributes */
948                 osd_it_append_attrs(lde, attr, 1, IFTODT(S_IFDIR));
949                 lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(1, attr));
950                 it->ozi_pos = 1;
951                 GOTO(out, rc = 0);
952
953         } else if (it->ozi_pos == 2) {
954                 lde->lde_hash = cpu_to_le64(2);
955                 strcpy(lde->lde_name, "..");
956                 lde->lde_namelen = cpu_to_le16(2);
957                 lde->lde_attrs = LUDA_FID;
958                 /* append lustre attributes */
959                 osd_it_append_attrs(lde, attr, 2, IFTODT(S_IFDIR));
960                 lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(2, attr));
961                 rc = osd_find_parent_fid(env, &it->ozi_obj->oo_dt, &lde->lde_fid);
962
963                 /* ENOENT happens at the root of filesystem so ignore it */
964                 if (rc == -ENOENT)
965                         rc = 0;
966                 GOTO(out, rc);
967         }
968
969         LASSERT(lde);
970
971         rc = -zap_cursor_retrieve(it->ozi_zc, za);
972         if (unlikely(rc != 0))
973                 GOTO(out, rc);
974
975         lde->lde_hash = cpu_to_le64(osd_zap_cursor_serialize(it->ozi_zc));
976         namelen = strlen(za->za_name);
977         if (namelen > NAME_MAX)
978                 GOTO(out, rc = -EOVERFLOW);
979         strcpy(lde->lde_name, za->za_name);
980         lde->lde_namelen = cpu_to_le16(namelen);
981
982         if (za->za_integer_length != 8 || za->za_num_integers < 3) {
983                 CERROR("%s: unsupported direntry format: %d %d\n",
984                        osd_obj2dev(it->ozi_obj)->od_svname,
985                        za->za_integer_length, (int)za->za_num_integers);
986
987                 GOTO(out, rc = -EIO);
988         }
989
990         rc = -zap_lookup(it->ozi_zc->zc_objset, it->ozi_zc->zc_zapobj,
991                          za->za_name, za->za_integer_length, 3, zde);
992         if (rc)
993                 GOTO(out, rc);
994
995         lde->lde_fid = zde->lzd_fid;
996         lde->lde_attrs = LUDA_FID;
997
998         /* append lustre attributes */
999         osd_it_append_attrs(lde, attr, namelen, zde->lzd_reg.zde_type);
1000
1001         lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(namelen, attr));
1002
1003 out:
1004         RETURN(rc);
1005 }
1006
1007 static int osd_dir_it_rec_size(const struct lu_env *env, const struct dt_it *di,
1008                                __u32 attr)
1009 {
1010         struct osd_zap_it   *it = (struct osd_zap_it *)di;
1011         zap_attribute_t     *za = &osd_oti_get(env)->oti_za;
1012         size_t               namelen = 0;
1013         int                  rc;
1014         ENTRY;
1015
1016         if (it->ozi_pos <= 1)
1017                 namelen = 1;
1018         else if (it->ozi_pos == 2)
1019                 namelen = 2;
1020
1021         if (namelen > 0) {
1022                 rc = lu_dirent_calc_size(namelen, attr);
1023                 RETURN(rc);
1024         }
1025
1026         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1027         if (unlikely(rc != 0))
1028                 RETURN(rc);
1029
1030         if (za->za_integer_length != 8 || za->za_num_integers < 3) {
1031                 CERROR("%s: unsupported direntry format: %d %d\n",
1032                        osd_obj2dev(it->ozi_obj)->od_svname,
1033                        za->za_integer_length, (int)za->za_num_integers);
1034                 RETURN(-EIO);
1035         }
1036
1037         namelen = strlen(za->za_name);
1038         if (namelen > NAME_MAX)
1039                 RETURN(-EOVERFLOW);
1040
1041         rc = lu_dirent_calc_size(namelen, attr);
1042
1043         RETURN(rc);
1044 }
1045
1046 static __u64 osd_dir_it_store(const struct lu_env *env, const struct dt_it *di)
1047 {
1048         struct osd_zap_it *it = (struct osd_zap_it *)di;
1049         __u64              pos;
1050         ENTRY;
1051
1052         if (it->ozi_pos <= 2)
1053                 pos = it->ozi_pos;
1054         else
1055                 pos = osd_zap_cursor_serialize(it->ozi_zc);
1056
1057         RETURN(pos);
1058 }
1059
1060 /*
1061  * return status :
1062  *  rc == 0 -> end of directory.
1063  *  rc >  0 -> ok, proceed.
1064  *  rc <  0 -> error.  ( EOVERFLOW  can be masked.)
1065  */
1066 static int osd_dir_it_load(const struct lu_env *env,
1067                         const struct dt_it *di, __u64 hash)
1068 {
1069         struct osd_zap_it *it = (struct osd_zap_it *)di;
1070         struct osd_object *obj = it->ozi_obj;
1071         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1072         int                rc;
1073         ENTRY;
1074
1075         /* reset the cursor */
1076         zap_cursor_fini(it->ozi_zc);
1077         osd_obj_cursor_init_serialized(it->ozi_zc, obj, hash);
1078
1079         if (hash <= 2) {
1080                 it->ozi_pos = hash;
1081                 rc = +1;
1082         } else {
1083                 it->ozi_pos = 3;
1084                 /* to return whether the end has been reached */
1085                 rc = osd_index_retrieve_skip_dots(it, za);
1086                 if (rc == 0)
1087                         rc = +1;
1088                 else if (rc == -ENOENT)
1089                         rc = 0;
1090         }
1091
1092         RETURN(rc);
1093 }
1094
1095 struct dt_index_operations osd_dir_ops = {
1096         .dio_lookup         = osd_dir_lookup,
1097         .dio_declare_insert = osd_declare_dir_insert,
1098         .dio_insert         = osd_dir_insert,
1099         .dio_declare_delete = osd_declare_dir_delete,
1100         .dio_delete         = osd_dir_delete,
1101         .dio_it     = {
1102                 .init     = osd_dir_it_init,
1103                 .fini     = osd_index_it_fini,
1104                 .get      = osd_dir_it_get,
1105                 .put      = osd_dir_it_put,
1106                 .next     = osd_dir_it_next,
1107                 .key      = osd_dir_it_key,
1108                 .key_size = osd_dir_it_key_size,
1109                 .rec      = osd_dir_it_rec,
1110                 .rec_size = osd_dir_it_rec_size,
1111                 .store    = osd_dir_it_store,
1112                 .load     = osd_dir_it_load
1113         }
1114 };
1115
1116 /*
1117  * Primitives for index files using binary keys.
1118  */
1119
1120 /* key integer_size is 8 */
1121 static int osd_prepare_key_uint64(struct osd_object *o, __u64 *dst,
1122                                   const struct dt_key *src)
1123 {
1124         int size;
1125
1126         LASSERT(dst);
1127         LASSERT(src);
1128
1129         /* align keysize to 64bit */
1130         size = (o->oo_keysize + sizeof(__u64) - 1) / sizeof(__u64);
1131         size *= sizeof(__u64);
1132
1133         LASSERT(size <= MAXNAMELEN);
1134
1135         if (unlikely(size > o->oo_keysize))
1136                 memset(dst + o->oo_keysize, 0, size - o->oo_keysize);
1137         memcpy(dst, (const char *)src, o->oo_keysize);
1138
1139         return (size/sizeof(__u64));
1140 }
1141
1142 static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
1143                         struct dt_rec *rec, const struct dt_key *key,
1144                         struct lustre_capa *capa)
1145 {
1146         struct osd_object *obj = osd_dt_obj(dt);
1147         struct osd_device *osd = osd_obj2dev(obj);
1148         __u64             *k = osd_oti_get(env)->oti_key64;
1149         int                rc;
1150         ENTRY;
1151
1152         rc = osd_prepare_key_uint64(obj, k, key);
1153
1154         rc = -zap_lookup_uint64(osd->od_os, obj->oo_db->db_object,
1155                                 k, rc, obj->oo_recusize, obj->oo_recsize,
1156                                 (void *)rec);
1157         RETURN(rc == 0 ? 1 : rc);
1158 }
1159
1160 static int osd_declare_index_insert(const struct lu_env *env,
1161                                     struct dt_object *dt,
1162                                     const struct dt_rec *rec,
1163                                     const struct dt_key *key,
1164                                     struct thandle *th)
1165 {
1166         struct osd_object  *obj = osd_dt_obj(dt);
1167         struct osd_thandle *oh;
1168         ENTRY;
1169
1170         LASSERT(th != NULL);
1171         oh = container_of0(th, struct osd_thandle, ot_super);
1172
1173         LASSERT(obj->oo_db);
1174
1175         dmu_tx_hold_bonus(oh->ot_tx, obj->oo_db->db_object);
1176
1177         /* It is not clear what API should be used for binary keys, so we pass
1178          * a null name which has the side effect of over-reserving space,
1179          * accounting for the worst case. See zap_count_write() */
1180         dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, NULL);
1181
1182         RETURN(0);
1183 }
1184
1185 static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
1186                             const struct dt_rec *rec, const struct dt_key *key,
1187                             struct thandle *th, struct lustre_capa *capa,
1188                             int ignore_quota)
1189 {
1190         struct osd_object  *obj = osd_dt_obj(dt);
1191         struct osd_device  *osd = osd_obj2dev(obj);
1192         struct osd_thandle *oh;
1193         __u64              *k = osd_oti_get(env)->oti_key64;
1194         int                 rc;
1195         ENTRY;
1196
1197         LASSERT(obj->oo_db);
1198         LASSERT(dt_object_exists(dt));
1199         LASSERT(osd_invariant(obj));
1200         LASSERT(th != NULL);
1201
1202         oh = container_of0(th, struct osd_thandle, ot_super);
1203
1204         rc = osd_prepare_key_uint64(obj, k, key);
1205
1206         /* Insert (key,oid) into ZAP */
1207         rc = -zap_add_uint64(osd->od_os, obj->oo_db->db_object,
1208                              k, rc, obj->oo_recusize, obj->oo_recsize,
1209                              (void *)rec, oh->ot_tx);
1210         RETURN(rc);
1211 }
1212
1213 static int osd_declare_index_delete(const struct lu_env *env,
1214                                     struct dt_object *dt,
1215                                     const struct dt_key *key,
1216                                     struct thandle *th)
1217 {
1218         struct osd_object  *obj = osd_dt_obj(dt);
1219         struct osd_thandle *oh;
1220         ENTRY;
1221
1222         LASSERT(dt_object_exists(dt));
1223         LASSERT(osd_invariant(obj));
1224         LASSERT(th != NULL);
1225         LASSERT(obj->oo_db);
1226
1227         oh = container_of0(th, struct osd_thandle, ot_super);
1228         dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, NULL);
1229
1230         RETURN(0);
1231 }
1232
1233 static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
1234                             const struct dt_key *key, struct thandle *th,
1235                             struct lustre_capa *capa)
1236 {
1237         struct osd_object  *obj = osd_dt_obj(dt);
1238         struct osd_device  *osd = osd_obj2dev(obj);
1239         struct osd_thandle *oh;
1240         __u64              *k = osd_oti_get(env)->oti_key64;
1241         int                 rc;
1242         ENTRY;
1243
1244         LASSERT(obj->oo_db);
1245         LASSERT(th != NULL);
1246         oh = container_of0(th, struct osd_thandle, ot_super);
1247
1248         rc = osd_prepare_key_uint64(obj, k, key);
1249
1250         /* Remove binary key from the ZAP */
1251         rc = -zap_remove_uint64(osd->od_os, obj->oo_db->db_object,
1252                                 k, rc, oh->ot_tx);
1253         RETURN(rc);
1254 }
1255
1256 static int osd_index_it_get(const struct lu_env *env, struct dt_it *di,
1257                             const struct dt_key *key)
1258 {
1259         struct osd_zap_it *it = (struct osd_zap_it *)di;
1260         struct osd_object *obj = it->ozi_obj;
1261         struct osd_device *osd = osd_obj2dev(obj);
1262         ENTRY;
1263
1264         LASSERT(it);
1265         LASSERT(it->ozi_zc);
1266
1267         /*
1268          * XXX: we need a binary version of zap_cursor_move_to_key()
1269          *      to implement this API */
1270         if (*((const __u64 *)key) != 0)
1271                 CERROR("NOT IMPLEMETED YET (move to "LPX64")\n",
1272                        *((__u64 *)key));
1273
1274         zap_cursor_fini(it->ozi_zc);
1275         zap_cursor_init(it->ozi_zc, osd->od_os, obj->oo_db->db_object);
1276         it->ozi_reset = 1;
1277
1278         RETURN(+1);
1279 }
1280
1281 static int osd_index_it_next(const struct lu_env *env, struct dt_it *di)
1282 {
1283         struct osd_zap_it *it = (struct osd_zap_it *)di;
1284         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1285         int                rc;
1286         ENTRY;
1287
1288         if (it->ozi_reset == 0)
1289                 zap_cursor_advance(it->ozi_zc);
1290         it->ozi_reset = 0;
1291
1292         /*
1293          * According to current API we need to return error if it's last entry.
1294          * zap_cursor_advance() does not return any value. So we need to call
1295          * retrieve to check if there is any record.  We should make
1296          * changes to Iterator API to not return status for this API
1297          */
1298         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1299         if (rc == -ENOENT)
1300                 RETURN(+1);
1301
1302         RETURN((rc));
1303 }
1304
1305 static struct dt_key *osd_index_it_key(const struct lu_env *env,
1306                                        const struct dt_it *di)
1307 {
1308         struct osd_zap_it *it = (struct osd_zap_it *)di;
1309         struct osd_object *obj = it->ozi_obj;
1310         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1311         int                rc = 0;
1312         ENTRY;
1313
1314         it->ozi_reset = 0;
1315         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1316         if (rc)
1317                 RETURN(ERR_PTR(rc));
1318
1319         /* the binary key is stored in the name */
1320         memcpy(&it->ozi_key, za->za_name, obj->oo_keysize);
1321
1322         RETURN((struct dt_key *)&it->ozi_key);
1323 }
1324
1325 static int osd_index_it_key_size(const struct lu_env *env,
1326                                 const struct dt_it *di)
1327 {
1328         struct osd_zap_it *it = (struct osd_zap_it *)di;
1329         struct osd_object *obj = it->ozi_obj;
1330         RETURN(obj->oo_keysize);
1331 }
1332
1333 static int osd_index_it_rec(const struct lu_env *env, const struct dt_it *di,
1334                             struct dt_rec *rec, __u32 attr)
1335 {
1336         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1337         struct osd_zap_it *it = (struct osd_zap_it *)di;
1338         struct osd_object *obj = it->ozi_obj;
1339         struct osd_device *osd = osd_obj2dev(obj);
1340         __u64             *k = osd_oti_get(env)->oti_key64;
1341         int                rc;
1342         ENTRY;
1343
1344         it->ozi_reset = 0;
1345         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1346         if (rc)
1347                 RETURN(rc);
1348
1349         rc = osd_prepare_key_uint64(obj, k, (const struct dt_key *)za->za_name);
1350
1351         rc = -zap_lookup_uint64(osd->od_os, obj->oo_db->db_object,
1352                                 k, rc, obj->oo_recusize, obj->oo_recsize,
1353                                 (void *)rec);
1354         RETURN(rc);
1355 }
1356
1357 static __u64 osd_index_it_store(const struct lu_env *env,
1358                                 const struct dt_it *di)
1359 {
1360         struct osd_zap_it *it = (struct osd_zap_it *)di;
1361
1362         it->ozi_reset = 0;
1363         RETURN((__u64)zap_cursor_serialize(it->ozi_zc));
1364 }
1365
1366 static int osd_index_it_load(const struct lu_env *env, const struct dt_it *di,
1367                              __u64 hash)
1368 {
1369         struct osd_zap_it *it = (struct osd_zap_it *)di;
1370         struct osd_object *obj = it->ozi_obj;
1371         struct osd_device *osd = osd_obj2dev(obj);
1372         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1373         int                rc;
1374         ENTRY;
1375
1376         /* reset the cursor */
1377         zap_cursor_fini(it->ozi_zc);
1378         zap_cursor_init_serialized(it->ozi_zc, osd->od_os,
1379                                    obj->oo_db->db_object, hash);
1380         it->ozi_reset = 0;
1381
1382         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1383         if (rc == 0)
1384                 RETURN(+1);
1385         else if (rc == -ENOENT)
1386                 RETURN(0);
1387
1388         RETURN(rc);
1389 }
1390
1391 static struct dt_index_operations osd_index_ops = {
1392         .dio_lookup             = osd_index_lookup,
1393         .dio_declare_insert     = osd_declare_index_insert,
1394         .dio_insert             = osd_index_insert,
1395         .dio_declare_delete     = osd_declare_index_delete,
1396         .dio_delete             = osd_index_delete,
1397         .dio_it = {
1398                 .init           = osd_index_it_init,
1399                 .fini           = osd_index_it_fini,
1400                 .get            = osd_index_it_get,
1401                 .put            = osd_index_it_put,
1402                 .next           = osd_index_it_next,
1403                 .key            = osd_index_it_key,
1404                 .key_size       = osd_index_it_key_size,
1405                 .rec            = osd_index_it_rec,
1406                 .store          = osd_index_it_store,
1407                 .load           = osd_index_it_load
1408         }
1409 };
1410
1411 struct osd_metadnode_it {
1412         struct osd_device       *mit_dev;
1413         __u64                    mit_pos;
1414         struct lu_fid            mit_fid;
1415         int                      mit_prefetched;
1416         __u64                    mit_prefetched_dnode;
1417 };
1418
1419 static struct dt_it *osd_zfs_otable_it_init(const struct lu_env *env,
1420                                             struct dt_object *dt, __u32 attr,
1421                                             struct lustre_capa *capa)
1422 {
1423         struct osd_device       *dev   = osd_dev(dt->do_lu.lo_dev);
1424         struct osd_metadnode_it *it;
1425         ENTRY;
1426
1427         OBD_ALLOC_PTR(it);
1428         if (unlikely(it == NULL))
1429                 RETURN(ERR_PTR(-ENOMEM));
1430
1431         it->mit_dev = dev;
1432
1433         /* XXX: dmu_object_next() does NOT find dnodes allocated
1434          *      in the current non-committed txg, so we force txg
1435          *      commit to find all existing dnodes ... */
1436         txg_wait_synced(dmu_objset_pool(dev->od_os), 0ULL);
1437
1438         RETURN((struct dt_it *)it);
1439 }
1440
1441 static void osd_zfs_otable_it_fini(const struct lu_env *env, struct dt_it *di)
1442 {
1443         struct osd_metadnode_it *it  = (struct osd_metadnode_it *)di;
1444
1445         OBD_FREE_PTR(it);
1446 }
1447
1448 static int osd_zfs_otable_it_get(const struct lu_env *env,
1449                                  struct dt_it *di, const struct dt_key *key)
1450 {
1451         return 0;
1452 }
1453
1454 static void osd_zfs_otable_it_put(const struct lu_env *env, struct dt_it *di)
1455 {
1456 }
1457
1458 #define OTABLE_PREFETCH         256
1459
1460 static void osd_zfs_otable_prefetch(const struct lu_env *env,
1461                                     struct osd_metadnode_it *it)
1462 {
1463         struct osd_device       *dev = it->mit_dev;
1464         int                      rc;
1465
1466         /* can go negative on the very first access to the iterator
1467          * or if some non-Lustre objects were found */
1468         if (unlikely(it->mit_prefetched < 0))
1469                 it->mit_prefetched = 0;
1470
1471         if (it->mit_prefetched >= (OTABLE_PREFETCH >> 1))
1472                 return;
1473
1474         if (it->mit_prefetched_dnode == 0)
1475                 it->mit_prefetched_dnode = it->mit_pos;
1476
1477         while (it->mit_prefetched < OTABLE_PREFETCH) {
1478                 rc = -dmu_object_next(dev->od_os, &it->mit_prefetched_dnode,
1479                                       B_FALSE, 0);
1480                 if (unlikely(rc != 0))
1481                         break;
1482
1483                 /* dmu_prefetch() was exported in 0.6.2, if you use with
1484                  * an older release, just comment it out - this is an
1485                  * optimization */
1486                 dmu_prefetch(dev->od_os, it->mit_prefetched_dnode, 0, 0);
1487
1488                 it->mit_prefetched++;
1489         }
1490 }
1491
1492 static int osd_zfs_otable_it_next(const struct lu_env *env, struct dt_it *di)
1493 {
1494         struct osd_metadnode_it *it  = (struct osd_metadnode_it *)di;
1495         struct lustre_mdt_attrs *lma;
1496         struct osd_device       *dev = it->mit_dev;
1497         nvlist_t                *nvbuf = NULL;
1498         uchar_t                 *v;
1499         __u64                    dnode;
1500         int                      rc, s;
1501
1502         memset(&it->mit_fid, 0, sizeof(it->mit_fid));
1503
1504         dnode = it->mit_pos;
1505         do {
1506                 rc = -dmu_object_next(dev->od_os, &it->mit_pos, B_FALSE, 0);
1507                 if (unlikely(rc != 0))
1508                         GOTO(out, rc = 1);
1509                 it->mit_prefetched--;
1510
1511                 /* LMA is required for this to be a Lustre object.
1512                  * If there is no xattr skip it. */
1513                 rc = __osd_xattr_load(dev, it->mit_pos, &nvbuf);
1514                 if (unlikely(rc != 0))
1515                         continue;
1516
1517                 LASSERT(nvbuf != NULL);
1518                 rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA, &v, &s);
1519                 if (likely(rc == 0)) {
1520                         /* Lustre object */
1521                         lma = (struct lustre_mdt_attrs *)v;
1522                         lustre_lma_swab(lma);
1523                         it->mit_fid = lma->lma_self_fid;
1524                         nvlist_free(nvbuf);
1525                         break;
1526                 } else {
1527                         /* not a Lustre object, try next one */
1528                         nvlist_free(nvbuf);
1529                 }
1530
1531         } while (1);
1532
1533
1534         /* we aren't prefetching in the above loop because the number of
1535          * non-Lustre objects is very small and we will be repeating very
1536          * rare. in case we want to use this to iterate over non-Lustre
1537          * objects (i.e. when we convert regular ZFS in Lustre) it makes
1538          * sense to initiate prefetching in the loop */
1539
1540         /* 0 - there are more items, +1 - the end */
1541         if (likely(rc == 0))
1542                 osd_zfs_otable_prefetch(env, it);
1543
1544         CDEBUG(D_OTHER, "advance: %llu -> %llu "DFID": %d\n", dnode,
1545                it->mit_pos, PFID(&it->mit_fid), rc);
1546
1547 out:
1548         return rc;
1549 }
1550
1551 static struct dt_key *osd_zfs_otable_it_key(const struct lu_env *env,
1552                                             const struct dt_it *di)
1553 {
1554         return NULL;
1555 }
1556
1557 static int osd_zfs_otable_it_key_size(const struct lu_env *env,
1558                                       const struct dt_it *di)
1559 {
1560         return sizeof(__u64);
1561 }
1562
1563 static int osd_zfs_otable_it_rec(const struct lu_env *env,
1564                                  const struct dt_it *di,
1565                                  struct dt_rec *rec, __u32 attr)
1566 {
1567         struct osd_metadnode_it *it  = (struct osd_metadnode_it *)di;
1568         struct lu_fid *fid = (struct lu_fid *)rec;
1569         ENTRY;
1570
1571         *fid = it->mit_fid;
1572
1573         RETURN(0);
1574 }
1575
1576
1577 static __u64 osd_zfs_otable_it_store(const struct lu_env *env,
1578                                      const struct dt_it *di)
1579 {
1580         struct osd_metadnode_it *it  = (struct osd_metadnode_it *)di;
1581
1582         return it->mit_pos;
1583 }
1584
1585 static int osd_zfs_otable_it_load(const struct lu_env *env,
1586                                   const struct dt_it *di, __u64 hash)
1587 {
1588         struct osd_metadnode_it *it  = (struct osd_metadnode_it *)di;
1589
1590         it->mit_pos = hash;
1591         it->mit_prefetched = 0;
1592         it->mit_prefetched_dnode = 0;
1593
1594         return osd_zfs_otable_it_next(env, (struct dt_it *)di);
1595 }
1596
1597 static int osd_zfs_otable_it_key_rec(const struct lu_env *env,
1598                                      const struct dt_it *di, void *key_rec)
1599 {
1600         return 0;
1601 }
1602
1603 const struct dt_index_operations osd_zfs_otable_ops = {
1604         .dio_it = {
1605                 .init     = osd_zfs_otable_it_init,
1606                 .fini     = osd_zfs_otable_it_fini,
1607                 .get      = osd_zfs_otable_it_get,
1608                 .put      = osd_zfs_otable_it_put,
1609                 .next     = osd_zfs_otable_it_next,
1610                 .key      = osd_zfs_otable_it_key,
1611                 .key_size = osd_zfs_otable_it_key_size,
1612                 .rec      = osd_zfs_otable_it_rec,
1613                 .store    = osd_zfs_otable_it_store,
1614                 .load     = osd_zfs_otable_it_load,
1615                 .key_rec  = osd_zfs_otable_it_key_rec,
1616         }
1617 };
1618
1619 int osd_index_try(const struct lu_env *env, struct dt_object *dt,
1620                 const struct dt_index_features *feat)
1621 {
1622         struct osd_object *obj = osd_dt_obj(dt);
1623         ENTRY;
1624
1625         LASSERT(dt_object_exists(dt));
1626
1627         /*
1628          * XXX: implement support for fixed-size keys sorted with natural
1629          *      numerical way (not using internal hash value)
1630          */
1631         if (feat->dif_flags & DT_IND_RANGE)
1632                 RETURN(-ERANGE);
1633
1634         if (unlikely(feat == &dt_otable_features)) {
1635                 dt->do_index_ops = &osd_zfs_otable_ops;
1636                 RETURN(0);
1637         }
1638
1639         LASSERT(obj->oo_db != NULL);
1640         if (likely(feat == &dt_directory_features)) {
1641                 if (osd_object_is_zap(obj->oo_db))
1642                         dt->do_index_ops = &osd_dir_ops;
1643                 else
1644                         RETURN(-ENOTDIR);
1645         } else if (unlikely(feat == &dt_acct_features)) {
1646                 LASSERT(fid_is_acct(lu_object_fid(&dt->do_lu)));
1647                 dt->do_index_ops = &osd_acct_index_ops;
1648         } else if (osd_object_is_zap(obj->oo_db) &&
1649                    dt->do_index_ops == NULL) {
1650                 /* For index file, we don't support variable key & record sizes
1651                  * and the key has to be unique */
1652                 if ((feat->dif_flags & ~DT_IND_UPDATE) != 0)
1653                         RETURN(-EINVAL);
1654
1655                 if (feat->dif_keysize_max > ZAP_MAXNAMELEN)
1656                         RETURN(-E2BIG);
1657                 if (feat->dif_keysize_max != feat->dif_keysize_min)
1658                         RETURN(-EINVAL);
1659
1660                 /* As for the record size, it should be a multiple of 8 bytes
1661                  * and smaller than the maximum value length supported by ZAP.
1662                  */
1663                 if (feat->dif_recsize_max > ZAP_MAXVALUELEN)
1664                         RETURN(-E2BIG);
1665                 if (feat->dif_recsize_max != feat->dif_recsize_min)
1666                         RETURN(-EINVAL);
1667
1668                 obj->oo_keysize = feat->dif_keysize_max;
1669                 obj->oo_recsize = feat->dif_recsize_max;
1670                 obj->oo_recusize = 1;
1671
1672                 /* ZFS prefers to work with array of 64bits */
1673                 if ((obj->oo_recsize & 7) == 0) {
1674                         obj->oo_recsize >>= 3;
1675                         obj->oo_recusize = 8;
1676                 }
1677                 dt->do_index_ops = &osd_index_ops;
1678         }
1679
1680         RETURN(0);
1681 }