Whamcloud - gitweb
LU-6068 misc: update Intel copyright messages 2014
[fs/lustre-release.git] / lustre / osd-zfs / osd_index.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/osd-zfs/osd_index.c
37  *
38  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
39  * Author: Mike Pershin <tappro@whamcloud.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_OSD
43
44 #include <lustre_ver.h>
45 #include <libcfs/libcfs.h>
46 #include <obd_support.h>
47 #include <lustre_net.h>
48 #include <obd.h>
49 #include <obd_class.h>
50 #include <lustre_disk.h>
51 #include <lustre_fid.h>
52
53 #include "osd_internal.h"
54
55 #include <sys/dnode.h>
56 #include <sys/dbuf.h>
57 #include <sys/spa.h>
58 #include <sys/stat.h>
59 #include <sys/zap.h>
60 #include <sys/spa_impl.h>
61 #include <sys/zfs_znode.h>
62 #include <sys/dmu_tx.h>
63 #include <sys/dmu_objset.h>
64 #include <sys/dsl_prop.h>
65 #include <sys/sa_impl.h>
66 #include <sys/txg.h>
67
68 static inline int osd_object_is_zap(dmu_buf_t *db)
69 {
70         dmu_buf_impl_t *dbi = (dmu_buf_impl_t *) db;
71         dnode_t *dn;
72         int rc;
73
74         DB_DNODE_ENTER(dbi);
75         dn = DB_DNODE(dbi);
76         rc = (dn->dn_type == DMU_OT_DIRECTORY_CONTENTS ||
77                         dn->dn_type == DMU_OT_USERGROUP_USED);
78         DB_DNODE_EXIT(dbi);
79
80         return rc;
81 }
82
83 /* We don't actually have direct access to the zap_hashbits() function
84  * so just pretend like we do for now.  If this ever breaks we can look at
85  * it at that time. */
86 #define zap_hashbits(zc) 48
87 /*
88  * ZFS hash format:
89  * | cd (16 bits) | hash (48 bits) |
90  * we need it in other form:
91  * |0| hash (48 bit) | cd (15 bit) |
92  * to be a full 64-bit ordered hash so that Lustre readdir can use it to merge
93  * the readdir hashes from multiple directory stripes uniformly on the client.
94  * Another point is sign bit, the hash range should be in [0, 2^63-1] because
95  * loff_t (for llseek) needs to be a positive value.  This means the "cd" field
96  * should only be the low 15 bits.
97  */
98 uint64_t osd_zap_cursor_serialize(zap_cursor_t *zc)
99 {
100         uint64_t zfs_hash = zap_cursor_serialize(zc) & (~0ULL >> 1);
101
102         return (zfs_hash >> zap_hashbits(zc)) |
103                 (zfs_hash << (63 - zap_hashbits(zc)));
104 }
105
106 void osd_zap_cursor_init_serialized(zap_cursor_t *zc, struct objset *os,
107                                     uint64_t id, uint64_t dirhash)
108 {
109         uint64_t zfs_hash = ((dirhash << zap_hashbits(zc)) & (~0ULL >> 1)) |
110                 (dirhash >> (63 - zap_hashbits(zc)));
111
112         zap_cursor_init_serialized(zc, os, id, zfs_hash);
113 }
114
115 int osd_zap_cursor_init(zap_cursor_t **zc, struct objset *os,
116                         uint64_t id, uint64_t dirhash)
117 {
118         zap_cursor_t *t;
119
120         OBD_ALLOC_PTR(t);
121         if (unlikely(t == NULL))
122                 return -ENOMEM;
123
124         osd_zap_cursor_init_serialized(t, os, id, dirhash);
125         *zc = t;
126
127         return 0;
128 }
129
130 void osd_zap_cursor_fini(zap_cursor_t *zc)
131 {
132         zap_cursor_fini(zc);
133         OBD_FREE_PTR(zc);
134 }
135
136 static inline void osd_obj_cursor_init_serialized(zap_cursor_t *zc,
137                                                  struct osd_object *o,
138                                                  uint64_t dirhash)
139 {
140         struct osd_device *d = osd_obj2dev(o);
141         zap_cursor_init_serialized(zc, d->od_os, o->oo_db->db_object, dirhash);
142 }
143
144 static inline int osd_obj_cursor_init(zap_cursor_t **zc, struct osd_object *o,
145                         uint64_t dirhash)
146 {
147         struct osd_device *d = osd_obj2dev(o);
148         return osd_zap_cursor_init(zc, d->od_os, o->oo_db->db_object, dirhash);
149 }
150
151 static struct dt_it *osd_index_it_init(const struct lu_env *env,
152                                        struct dt_object *dt,
153                                        __u32 unused,
154                                        struct lustre_capa *capa)
155 {
156         struct osd_thread_info  *info = osd_oti_get(env);
157         struct osd_zap_it       *it;
158         struct osd_object       *obj = osd_dt_obj(dt);
159         struct lu_object        *lo  = &dt->do_lu;
160         int                      rc;
161         ENTRY;
162
163         /* XXX: check capa ? */
164
165         LASSERT(lu_object_exists(lo));
166         LASSERT(obj->oo_db);
167         LASSERT(osd_object_is_zap(obj->oo_db));
168         LASSERT(info);
169
170         if (info->oti_it_inline) {
171                 OBD_ALLOC_PTR(it);
172                 if (it == NULL)
173                         RETURN(ERR_PTR(-ENOMEM));
174         } else {
175                 it = &info->oti_it_zap;
176                 info->oti_it_inline = 1;
177         }
178
179         rc = osd_obj_cursor_init(&it->ozi_zc, obj, 0);
180         if (rc != 0) {
181                 if (it != &info->oti_it_zap)
182                         OBD_FREE_PTR(it);
183                 else
184                         info->oti_it_inline = 0;
185
186                 RETURN(ERR_PTR(rc));
187         }
188
189         it->ozi_obj   = obj;
190         it->ozi_capa  = capa;
191         it->ozi_reset = 1;
192         lu_object_get(lo);
193
194         RETURN((struct dt_it *)it);
195 }
196
197 static void osd_index_it_fini(const struct lu_env *env, struct dt_it *di)
198 {
199         struct osd_thread_info  *info   = osd_oti_get(env);
200         struct osd_zap_it       *it     = (struct osd_zap_it *)di;
201         struct osd_object       *obj;
202         ENTRY;
203
204         LASSERT(it);
205         LASSERT(it->ozi_obj);
206
207         obj = it->ozi_obj;
208
209         osd_zap_cursor_fini(it->ozi_zc);
210         lu_object_put(env, &obj->oo_dt.do_lu);
211         if (it != &info->oti_it_zap)
212                 OBD_FREE_PTR(it);
213         else
214                 info->oti_it_inline = 0;
215
216         EXIT;
217 }
218
219
220 static void osd_index_it_put(const struct lu_env *env, struct dt_it *di)
221 {
222         /* PBS: do nothing : ref are incremented at retrive and decreamented
223          *      next/finish. */
224 }
225
226 static inline void osd_it_append_attrs(struct lu_dirent *ent, __u32 attr,
227                                        int len, __u16 type)
228 {
229         const unsigned    align = sizeof(struct luda_type) - 1;
230         struct luda_type *lt;
231
232         /* check if file type is required */
233         if (attr & LUDA_TYPE) {
234                 len = (len + align) & ~align;
235
236                 lt = (void *)ent->lde_name + len;
237                 lt->lt_type = cpu_to_le16(DTTOIF(type));
238                 ent->lde_attrs |= LUDA_TYPE;
239         }
240
241         ent->lde_attrs = cpu_to_le32(ent->lde_attrs);
242 }
243
244 /*
245  * as we don't know FID, we can't use LU object, so this function
246  * partially duplicate __osd_xattr_get() which is built around
247  * LU-object and uses it to cache data like regular EA dnode, etc
248  */
249 static int osd_find_parent_by_dnode(const struct lu_env *env,
250                                     struct dt_object *o,
251                                     struct lu_fid *fid)
252 {
253         struct osd_device       *osd = osd_obj2dev(osd_dt_obj(o));
254         struct lustre_mdt_attrs *lma;
255         struct lu_buf            buf;
256         sa_handle_t             *sa_hdl;
257         nvlist_t                *nvbuf = NULL;
258         uchar_t                 *value;
259         uint64_t                 dnode;
260         int                      rc, size;
261         ENTRY;
262
263         /* first of all, get parent dnode from own attributes */
264         LASSERT(osd_dt_obj(o)->oo_db);
265         rc = -sa_handle_get(osd->od_os, osd_dt_obj(o)->oo_db->db_object,
266                             NULL, SA_HDL_PRIVATE, &sa_hdl);
267         if (rc)
268                 RETURN(rc);
269
270         dnode = ZFS_NO_OBJECT;
271         rc = -sa_lookup(sa_hdl, SA_ZPL_PARENT(osd), &dnode, 8);
272         sa_handle_destroy(sa_hdl);
273         if (rc)
274                 RETURN(rc);
275
276         /* now get EA buffer */
277         rc = __osd_xattr_load(osd, dnode, &nvbuf);
278         if (rc)
279                 GOTO(regular, rc);
280
281         /* XXX: if we get that far.. should we cache the result? */
282
283         /* try to find LMA attribute */
284         LASSERT(nvbuf != NULL);
285         rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA, &value, &size);
286         if (rc == 0 && size >= sizeof(*lma)) {
287                 lma = (struct lustre_mdt_attrs *)value;
288                 lustre_lma_swab(lma);
289                 *fid = lma->lma_self_fid;
290                 GOTO(out, rc = 0);
291         }
292
293 regular:
294         /* no LMA attribute in SA, let's try regular EA */
295
296         /* first of all, get parent dnode storing regular EA */
297         rc = -sa_handle_get(osd->od_os, dnode, NULL, SA_HDL_PRIVATE, &sa_hdl);
298         if (rc)
299                 GOTO(out, rc);
300
301         dnode = ZFS_NO_OBJECT;
302         rc = -sa_lookup(sa_hdl, SA_ZPL_XATTR(osd), &dnode, 8);
303         sa_handle_destroy(sa_hdl);
304         if (rc)
305                 GOTO(out, rc);
306
307         CLASSERT(sizeof(*lma) <= sizeof(osd_oti_get(env)->oti_buf));
308         buf.lb_buf = osd_oti_get(env)->oti_buf;
309         buf.lb_len = sizeof(osd_oti_get(env)->oti_buf);
310
311         /* now try to find LMA */
312         rc = __osd_xattr_get_large(env, osd, dnode, &buf,
313                                    XATTR_NAME_LMA, &size);
314         if (rc == 0 && size >= sizeof(*lma)) {
315                 lma = buf.lb_buf;
316                 lustre_lma_swab(lma);
317                 *fid = lma->lma_self_fid;
318                 GOTO(out, rc = 0);
319         } else if (rc < 0) {
320                 GOTO(out, rc);
321         } else {
322                 GOTO(out, rc = -EIO);
323         }
324
325 out:
326         if (nvbuf != NULL)
327                 nvlist_free(nvbuf);
328         RETURN(rc);
329 }
330
331 static int osd_find_parent_fid(const struct lu_env *env, struct dt_object *o,
332                                struct lu_fid *fid)
333 {
334         struct link_ea_header  *leh;
335         struct link_ea_entry   *lee;
336         struct lu_buf           buf;
337         int                     rc;
338         ENTRY;
339
340         buf.lb_buf = osd_oti_get(env)->oti_buf;
341         buf.lb_len = sizeof(osd_oti_get(env)->oti_buf);
342
343         rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK, BYPASS_CAPA);
344         if (rc == -ERANGE) {
345                 rc = osd_xattr_get(env, o, &LU_BUF_NULL,
346                                    XATTR_NAME_LINK, BYPASS_CAPA);
347                 if (rc < 0)
348                         RETURN(rc);
349                 LASSERT(rc > 0);
350                 OBD_ALLOC(buf.lb_buf, rc);
351                 if (buf.lb_buf == NULL)
352                         RETURN(-ENOMEM);
353                 buf.lb_len = rc;
354                 rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK, BYPASS_CAPA);
355         }
356         if (rc < 0)
357                 GOTO(out, rc);
358         if (rc < sizeof(*leh) + sizeof(*lee))
359                 GOTO(out, rc = -EINVAL);
360
361         leh = buf.lb_buf;
362         if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
363                 leh->leh_magic = LINK_EA_MAGIC;
364                 leh->leh_reccount = __swab32(leh->leh_reccount);
365                 leh->leh_len = __swab64(leh->leh_len);
366         }
367         if (leh->leh_magic != LINK_EA_MAGIC)
368                 GOTO(out, rc = -EINVAL);
369         if (leh->leh_reccount == 0)
370                 GOTO(out, rc = -ENODATA);
371
372         lee = (struct link_ea_entry *)(leh + 1);
373         fid_be_to_cpu(fid, (const struct lu_fid *)&lee->lee_parent_fid);
374         rc = 0;
375
376 out:
377         if (buf.lb_buf != osd_oti_get(env)->oti_buf)
378                 OBD_FREE(buf.lb_buf, buf.lb_len);
379
380 #if 0
381         /* this block can be enabled for additional verification
382          * it's trying to match FID from LinkEA vs. FID from LMA */
383         if (rc == 0) {
384                 struct lu_fid fid2;
385                 int rc2;
386                 rc2 = osd_find_parent_by_dnode(env, o, &fid2);
387                 if (rc2 == 0)
388                         if (lu_fid_eq(fid, &fid2) == 0)
389                                 CERROR("wrong parent: "DFID" != "DFID"\n",
390                                        PFID(fid), PFID(&fid2));
391         }
392 #endif
393
394         /* no LinkEA is found, let's try to find the fid in parent's LMA */
395         if (unlikely(rc != 0))
396                 rc = osd_find_parent_by_dnode(env, o, fid);
397
398         RETURN(rc);
399 }
400
401 static int osd_dir_lookup(const struct lu_env *env, struct dt_object *dt,
402                           struct dt_rec *rec, const struct dt_key *key,
403                           struct lustre_capa *capa)
404 {
405         struct osd_thread_info *oti = osd_oti_get(env);
406         struct osd_object  *obj = osd_dt_obj(dt);
407         struct osd_device  *osd = osd_obj2dev(obj);
408         char               *name = (char *)key;
409         int                 rc;
410         ENTRY;
411
412         LASSERT(osd_object_is_zap(obj->oo_db));
413
414         if (name[0] == '.') {
415                 if (name[1] == 0) {
416                         const struct lu_fid *f = lu_object_fid(&dt->do_lu);
417                         memcpy(rec, f, sizeof(*f));
418                         RETURN(1);
419                 } else if (name[1] == '.' && name[2] == 0) {
420                         rc = osd_find_parent_fid(env, dt, (struct lu_fid *)rec);
421                         RETURN(rc == 0 ? 1 : rc);
422                 }
423         }
424
425         rc = -zap_lookup(osd->od_os, obj->oo_db->db_object,
426                          (char *)key, 8, sizeof(oti->oti_zde) / 8,
427                          (void *)&oti->oti_zde);
428         memcpy(rec, &oti->oti_zde.lzd_fid, sizeof(struct lu_fid));
429
430         RETURN(rc == 0 ? 1 : rc);
431 }
432
433 static int osd_declare_dir_insert(const struct lu_env *env,
434                                   struct dt_object *dt,
435                                   const struct dt_rec *rec,
436                                   const struct dt_key *key,
437                                   struct thandle *th)
438 {
439         struct osd_object  *obj = osd_dt_obj(dt);
440         struct osd_thandle *oh;
441         uint64_t object;
442         ENTRY;
443
444         LASSERT(th != NULL);
445         oh = container_of0(th, struct osd_thandle, ot_super);
446
447         /* This is for inserting dot/dotdot for new created dir. */
448         if (obj->oo_db == NULL)
449                 object = DMU_NEW_OBJECT;
450         else
451                 object = obj->oo_db->db_object;
452
453         dmu_tx_hold_bonus(oh->ot_tx, object);
454         dmu_tx_hold_zap(oh->ot_tx, object, TRUE, (char *)key);
455
456         RETURN(0);
457 }
458
459 /**
460  * Find the osd object for given fid.
461  *
462  * \param fid need to find the osd object having this fid
463  *
464  * \retval osd_object on success
465  * \retval        -ve on error
466  */
467 struct osd_object *osd_object_find(const struct lu_env *env,
468                                    struct dt_object *dt,
469                                    const struct lu_fid *fid)
470 {
471         struct lu_device         *ludev = dt->do_lu.lo_dev;
472         struct osd_object        *child = NULL;
473         struct lu_object         *luch;
474         struct lu_object         *lo;
475
476         /*
477          * at this point topdev might not exist yet
478          * (i.e. MGS is preparing profiles). so we can
479          * not rely on topdev and instead lookup with
480          * our device passed as topdev. this can't work
481          * if the object isn't cached yet (as osd doesn't
482          * allocate lu_header). IOW, the object must be
483          * in the cache, otherwise lu_object_alloc() crashes
484          * -bzzz
485          */
486         luch = lu_object_find_at(env, ludev, fid, NULL);
487         if (IS_ERR(luch))
488                 return (void *)luch;
489
490         if (lu_object_exists(luch)) {
491                 lo = lu_object_locate(luch->lo_header, ludev->ld_type);
492                 if (lo != NULL)
493                         child = osd_obj(lo);
494                 else
495                         LU_OBJECT_DEBUG(D_ERROR, env, luch,
496                                         "%s: object can't be located "DFID"\n",
497                                         osd_dev(ludev)->od_svname, PFID(fid));
498
499                 if (child == NULL) {
500                         lu_object_put(env, luch);
501                         CERROR("%s: Unable to get osd_object "DFID"\n",
502                                osd_dev(ludev)->od_svname, PFID(fid));
503                         child = ERR_PTR(-ENOENT);
504                 }
505         } else {
506                 LU_OBJECT_DEBUG(D_ERROR, env, luch,
507                                 "%s: lu_object does not exists "DFID"\n",
508                                 osd_dev(ludev)->od_svname, PFID(fid));
509                 lu_object_put(env, luch);
510                 child = ERR_PTR(-ENOENT);
511         }
512
513         return child;
514 }
515
516 /**
517  * Put the osd object once done with it.
518  *
519  * \param obj osd object that needs to be put
520  */
521 static inline void osd_object_put(const struct lu_env *env,
522                                   struct osd_object *obj)
523 {
524         lu_object_put(env, &obj->oo_dt.do_lu);
525 }
526
527 static int osd_seq_exists(const struct lu_env *env, struct osd_device *osd,
528                           u64 seq)
529 {
530         struct lu_seq_range     *range = &osd_oti_get(env)->oti_seq_range;
531         struct seq_server_site  *ss = osd_seq_site(osd);
532         int                     rc;
533         ENTRY;
534
535         LASSERT(ss != NULL);
536         LASSERT(ss->ss_server_fld != NULL);
537
538         rc = osd_fld_lookup(env, osd, seq, range);
539         if (rc != 0) {
540                 CERROR("%s: Can not lookup fld for "LPX64"\n",
541                        osd_name(osd), seq);
542                 RETURN(0);
543         }
544
545         RETURN(ss->ss_node_id == range->lsr_index);
546 }
547
548 static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
549                           const struct lu_fid *fid)
550 {
551         struct seq_server_site  *ss = osd_seq_site(osd);
552         ENTRY;
553
554         /* FID seqs not in FLDB, must be local seq */
555         if (unlikely(!fid_seq_in_fldb(fid_seq(fid))))
556                 RETURN(0);
557
558         /* If FLD is not being initialized yet, it only happens during the
559          * initialization, likely during mgs initialization, and we assume
560          * this is local FID. */
561         if (ss == NULL || ss->ss_server_fld == NULL)
562                 RETURN(0);
563
564         /* Only check the local FLDB here */
565         if (osd_seq_exists(env, osd, fid_seq(fid)))
566                 RETURN(0);
567
568         RETURN(1);
569 }
570
571 /**
572  *      Inserts (key, value) pair in \a directory object.
573  *
574  *      \param  dt      osd index object
575  *      \param  key     key for index
576  *      \param  rec     record reference
577  *      \param  th      transaction handler
578  *      \param  capa    capability descriptor
579  *      \param  ignore_quota update should not affect quota
580  *
581  *      \retval  0  success
582  *      \retval -ve failure
583  */
584 static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt,
585                           const struct dt_rec *rec, const struct dt_key *key,
586                           struct thandle *th, struct lustre_capa *capa,
587                           int ignore_quota)
588 {
589         struct osd_thread_info *oti = osd_oti_get(env);
590         struct osd_object   *parent = osd_dt_obj(dt);
591         struct osd_device   *osd = osd_obj2dev(parent);
592         struct dt_insert_rec *rec1 = (struct dt_insert_rec *)rec;
593         const struct lu_fid *fid = rec1->rec_fid;
594         struct osd_thandle  *oh;
595         struct osd_object   *child = NULL;
596         __u32                attr;
597         char                *name = (char *)key;
598         int                  rc;
599         ENTRY;
600
601         LASSERT(parent->oo_db);
602         LASSERT(osd_object_is_zap(parent->oo_db));
603
604         LASSERT(dt_object_exists(dt));
605         LASSERT(osd_invariant(parent));
606
607         LASSERT(th != NULL);
608         oh = container_of0(th, struct osd_thandle, ot_super);
609
610         rc = osd_remote_fid(env, osd, fid);
611         if (rc < 0) {
612                 CERROR("%s: Can not find object "DFID": rc = %d\n",
613                        osd->od_svname, PFID(fid), rc);
614                 RETURN(rc);
615         }
616
617         if (unlikely(rc == 1)) {
618                 /* Insert remote entry */
619                 memset(&oti->oti_zde.lzd_reg, 0, sizeof(oti->oti_zde.lzd_reg));
620                 oti->oti_zde.lzd_reg.zde_type = IFTODT(rec1->rec_type & S_IFMT);
621         } else {
622                 /*
623                  * To simulate old Orion setups with ./..  stored in the
624                  * directories
625                  */
626                 /* Insert local entry */
627                 child = osd_object_find(env, dt, fid);
628                 if (IS_ERR(child))
629                         RETURN(PTR_ERR(child));
630
631                 LASSERT(child->oo_db);
632                 if (name[0] == '.') {
633                         if (name[1] == 0) {
634                                 /* do not store ".", instead generate it
635                                  * during iteration */
636                                 GOTO(out, rc = 0);
637                         } else if (name[1] == '.' && name[2] == 0) {
638                                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PARENT)) {
639                                         struct lu_fid tfid = *fid;
640
641                                         osd_object_put(env, child);
642                                         tfid.f_oid--;
643                                         child = osd_object_find(env, dt, &tfid);
644                                         if (IS_ERR(child))
645                                                 RETURN(PTR_ERR(child));
646
647                                         LASSERT(child->oo_db);
648                                 }
649
650                                 /* update parent dnode in the child.
651                                  * later it will be used to generate ".." */
652                                 rc = osd_object_sa_update(parent,
653                                                  SA_ZPL_PARENT(osd),
654                                                  &child->oo_db->db_object,
655                                                  8, oh);
656
657                                 GOTO(out, rc);
658                         }
659                 }
660                 CLASSERT(sizeof(oti->oti_zde.lzd_reg) == 8);
661                 CLASSERT(sizeof(oti->oti_zde) % 8 == 0);
662                 attr = child->oo_dt.do_lu.lo_header ->loh_attr;
663                 oti->oti_zde.lzd_reg.zde_type = IFTODT(attr & S_IFMT);
664                 oti->oti_zde.lzd_reg.zde_dnode = child->oo_db->db_object;
665         }
666
667         oti->oti_zde.lzd_fid = *fid;
668         /* Insert (key,oid) into ZAP */
669         rc = -zap_add(osd->od_os, parent->oo_db->db_object,
670                       (char *)key, 8, sizeof(oti->oti_zde) / 8,
671                       (void *)&oti->oti_zde, oh->ot_tx);
672         if (unlikely(rc == -EEXIST &&
673                      name[0] == '.' && name[1] == '.' && name[2] == 0))
674                 /* Update (key,oid) in ZAP */
675                 rc = -zap_update(osd->od_os, parent->oo_db->db_object,
676                                 (char *)key, 8, sizeof(oti->oti_zde) / 8,
677                                 (void *)&oti->oti_zde, oh->ot_tx);
678
679 out:
680         if (child != NULL)
681                 osd_object_put(env, child);
682
683         RETURN(rc);
684 }
685
686 static int osd_declare_dir_delete(const struct lu_env *env,
687                                   struct dt_object *dt,
688                                   const struct dt_key *key,
689                                   struct thandle *th)
690 {
691         struct osd_object *obj = osd_dt_obj(dt);
692         struct osd_thandle *oh;
693         ENTRY;
694
695         LASSERT(dt_object_exists(dt));
696         LASSERT(osd_invariant(obj));
697
698         LASSERT(th != NULL);
699         oh = container_of0(th, struct osd_thandle, ot_super);
700
701         LASSERT(obj->oo_db);
702         LASSERT(osd_object_is_zap(obj->oo_db));
703
704         dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, (char *)key);
705
706         RETURN(0);
707 }
708
709 static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt,
710                           const struct dt_key *key, struct thandle *th,
711                           struct lustre_capa *capa)
712 {
713         struct osd_object *obj = osd_dt_obj(dt);
714         struct osd_device *osd = osd_obj2dev(obj);
715         struct osd_thandle *oh;
716         dmu_buf_t *zap_db = obj->oo_db;
717         char      *name = (char *)key;
718         int rc;
719         ENTRY;
720
721         LASSERT(obj->oo_db);
722         LASSERT(osd_object_is_zap(obj->oo_db));
723
724         LASSERT(th != NULL);
725         oh = container_of0(th, struct osd_thandle, ot_super);
726
727         /*
728          * In Orion . and .. were stored in the directory (not generated upon
729          * request as now). we preserve them for backward compatibility
730          */
731         if (name[0] == '.') {
732                 if (name[1] == 0) {
733                         RETURN(0);
734                 } else if (name[1] == '.' && name[2] == 0) {
735                         RETURN(0);
736                 }
737         }
738
739         /* Remove key from the ZAP */
740         rc = -zap_remove(osd->od_os, zap_db->db_object,
741                          (char *) key, oh->ot_tx);
742
743         if (unlikely(rc && rc != -ENOENT))
744                 CERROR("%s: zap_remove failed: rc = %d\n", osd->od_svname, rc);
745
746         RETURN(rc);
747 }
748
749 static struct dt_it *osd_dir_it_init(const struct lu_env *env,
750                                      struct dt_object *dt,
751                                      __u32 unused,
752                                      struct lustre_capa *capa)
753 {
754         struct osd_zap_it *it;
755
756         it = (struct osd_zap_it *)osd_index_it_init(env, dt, unused, capa);
757         if (!IS_ERR(it))
758                 it->ozi_pos = 0;
759
760         RETURN((struct dt_it *)it);
761 }
762
763 /**
764  *  Move Iterator to record specified by \a key
765  *
766  *  \param  di      osd iterator
767  *  \param  key     key for index
768  *
769  *  \retval +ve  di points to record with least key not larger than key
770  *  \retval  0   di points to exact matched key
771  *  \retval -ve  failure
772  */
773 static int osd_dir_it_get(const struct lu_env *env,
774                           struct dt_it *di, const struct dt_key *key)
775 {
776         struct osd_zap_it *it = (struct osd_zap_it *)di;
777         struct osd_object *obj = it->ozi_obj;
778         char              *name = (char *)key;
779         int                rc;
780         ENTRY;
781
782         LASSERT(it);
783         LASSERT(it->ozi_zc);
784
785         /* reset the cursor */
786         zap_cursor_fini(it->ozi_zc);
787         osd_obj_cursor_init_serialized(it->ozi_zc, obj, 0);
788
789         /* XXX: implementation of the API is broken at the moment */
790         LASSERT(((const char *)key)[0] == 0);
791
792         if (name[0] == 0) {
793                 it->ozi_pos = 0;
794                 RETURN(1);
795         }
796
797         if (name[0] == '.') {
798                 if (name[1] == 0) {
799                         it->ozi_pos = 1;
800                         GOTO(out, rc = 1);
801                 } else if (name[1] == '.' && name[2] == 0) {
802                         it->ozi_pos = 2;
803                         GOTO(out, rc = 1);
804                 }
805         }
806
807         /* neither . nor .. - some real record */
808         it->ozi_pos = 3;
809         rc = +1;
810
811 out:
812         RETURN(rc);
813 }
814
815 static void osd_dir_it_put(const struct lu_env *env, struct dt_it *di)
816 {
817         /* PBS: do nothing : ref are incremented at retrive and decreamented
818          *      next/finish. */
819 }
820
821 /*
822  * in Orion . and .. were stored in the directory, while ZPL
823  * and current osd-zfs generate them up on request. so, we
824  * need to ignore previously stored . and ..
825  */
826 static int osd_index_retrieve_skip_dots(struct osd_zap_it *it,
827                                         zap_attribute_t *za)
828 {
829         int rc, isdot;
830
831         do {
832                 rc = -zap_cursor_retrieve(it->ozi_zc, za);
833
834                 isdot = 0;
835                 if (unlikely(rc == 0 && za->za_name[0] == '.')) {
836                         if (za->za_name[1] == 0) {
837                                 isdot = 1;
838                         } else if (za->za_name[1] == '.' &&
839                                    za->za_name[2] == 0) {
840                                 isdot = 1;
841                         }
842                         if (unlikely(isdot))
843                                 zap_cursor_advance(it->ozi_zc);
844                 }
845         } while (unlikely(rc == 0 && isdot));
846
847         return rc;
848 }
849
850 /**
851  * to load a directory entry at a time and stored it in
852  * iterator's in-memory data structure.
853  *
854  * \param di, struct osd_it_ea, iterator's in memory structure
855  *
856  * \retval +ve, iterator reached to end
857  * \retval   0, iterator not reached to end
858  * \retval -ve, on error
859  */
860 static int osd_dir_it_next(const struct lu_env *env, struct dt_it *di)
861 {
862         struct osd_zap_it *it = (struct osd_zap_it *)di;
863         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
864         int                rc;
865
866         ENTRY;
867
868         /* temp. storage should be enough for any key supported by ZFS */
869         CLASSERT(sizeof(za->za_name) <= sizeof(it->ozi_name));
870
871         /*
872          * the first ->next() moves the cursor to .
873          * the second ->next() moves the cursor to ..
874          * then we get to the real records and have to verify any exist
875          */
876         if (it->ozi_pos <= 2) {
877                 it->ozi_pos++;
878                 if (it->ozi_pos <=2)
879                         RETURN(0);
880
881         } else {
882                 zap_cursor_advance(it->ozi_zc);
883         }
884
885         /*
886          * According to current API we need to return error if its last entry.
887          * zap_cursor_advance() does not return any value. So we need to call
888          * retrieve to check if there is any record.  We should make
889          * changes to Iterator API to not return status for this API
890          */
891         rc = osd_index_retrieve_skip_dots(it, za);
892
893         if (rc == -ENOENT) /* end of dir */
894                 RETURN(+1);
895
896         RETURN(rc);
897 }
898
899 static struct dt_key *osd_dir_it_key(const struct lu_env *env,
900                                      const struct dt_it *di)
901 {
902         struct osd_zap_it *it = (struct osd_zap_it *)di;
903         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
904         int                rc = 0;
905         ENTRY;
906
907         if (it->ozi_pos <= 1) {
908                 it->ozi_pos = 1;
909                 RETURN((struct dt_key *)".");
910         } else if (it->ozi_pos == 2) {
911                 RETURN((struct dt_key *)"..");
912         }
913
914         if ((rc = -zap_cursor_retrieve(it->ozi_zc, za)))
915                 RETURN(ERR_PTR(rc));
916
917         strcpy(it->ozi_name, za->za_name);
918
919         RETURN((struct dt_key *)it->ozi_name);
920 }
921
922 static int osd_dir_it_key_size(const struct lu_env *env, const struct dt_it *di)
923 {
924         struct osd_zap_it *it = (struct osd_zap_it *)di;
925         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
926         int                rc;
927         ENTRY;
928
929         if (it->ozi_pos <= 1) {
930                 it->ozi_pos = 1;
931                 RETURN(2);
932         } else if (it->ozi_pos == 2) {
933                 RETURN(3);
934         }
935
936         if ((rc = -zap_cursor_retrieve(it->ozi_zc, za)) == 0)
937                 rc = strlen(za->za_name);
938
939         RETURN(rc);
940 }
941
942 static int osd_dir_it_rec(const struct lu_env *env, const struct dt_it *di,
943                           struct dt_rec *dtrec, __u32 attr)
944 {
945         struct osd_zap_it   *it = (struct osd_zap_it *)di;
946         struct lu_dirent    *lde = (struct lu_dirent *)dtrec;
947         struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
948         zap_attribute_t     *za = &osd_oti_get(env)->oti_za;
949         int                  rc, namelen;
950         ENTRY;
951
952         if (it->ozi_pos <= 1) {
953                 lde->lde_hash = cpu_to_le64(1);
954                 strcpy(lde->lde_name, ".");
955                 lde->lde_namelen = cpu_to_le16(1);
956                 lde->lde_fid = *lu_object_fid(&it->ozi_obj->oo_dt.do_lu);
957                 lde->lde_attrs = LUDA_FID;
958                 /* append lustre attributes */
959                 osd_it_append_attrs(lde, attr, 1, IFTODT(S_IFDIR));
960                 lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(1, attr));
961                 it->ozi_pos = 1;
962                 GOTO(out, rc = 0);
963
964         } else if (it->ozi_pos == 2) {
965                 lde->lde_hash = cpu_to_le64(2);
966                 strcpy(lde->lde_name, "..");
967                 lde->lde_namelen = cpu_to_le16(2);
968                 lde->lde_attrs = LUDA_FID;
969                 /* append lustre attributes */
970                 osd_it_append_attrs(lde, attr, 2, IFTODT(S_IFDIR));
971                 lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(2, attr));
972                 rc = osd_find_parent_fid(env, &it->ozi_obj->oo_dt, &lde->lde_fid);
973
974                 /* ENOENT happens at the root of filesystem so ignore it */
975                 if (rc == -ENOENT)
976                         rc = 0;
977                 GOTO(out, rc);
978         }
979
980         LASSERT(lde);
981
982         rc = -zap_cursor_retrieve(it->ozi_zc, za);
983         if (unlikely(rc != 0))
984                 GOTO(out, rc);
985
986         lde->lde_hash = cpu_to_le64(osd_zap_cursor_serialize(it->ozi_zc));
987         namelen = strlen(za->za_name);
988         if (namelen > NAME_MAX)
989                 GOTO(out, rc = -EOVERFLOW);
990         strcpy(lde->lde_name, za->za_name);
991         lde->lde_namelen = cpu_to_le16(namelen);
992
993         if (za->za_integer_length != 8 || za->za_num_integers < 3) {
994                 CERROR("%s: unsupported direntry format: %d %d\n",
995                        osd_obj2dev(it->ozi_obj)->od_svname,
996                        za->za_integer_length, (int)za->za_num_integers);
997
998                 GOTO(out, rc = -EIO);
999         }
1000
1001         rc = -zap_lookup(it->ozi_zc->zc_objset, it->ozi_zc->zc_zapobj,
1002                          za->za_name, za->za_integer_length, 3, zde);
1003         if (rc)
1004                 GOTO(out, rc);
1005
1006         lde->lde_fid = zde->lzd_fid;
1007         lde->lde_attrs = LUDA_FID;
1008
1009         /* append lustre attributes */
1010         osd_it_append_attrs(lde, attr, namelen, zde->lzd_reg.zde_type);
1011
1012         lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(namelen, attr));
1013
1014 out:
1015         RETURN(rc);
1016 }
1017
1018 static int osd_dir_it_rec_size(const struct lu_env *env, const struct dt_it *di,
1019                                __u32 attr)
1020 {
1021         struct osd_zap_it   *it = (struct osd_zap_it *)di;
1022         zap_attribute_t     *za = &osd_oti_get(env)->oti_za;
1023         size_t               namelen = 0;
1024         int                  rc;
1025         ENTRY;
1026
1027         if (it->ozi_pos <= 1)
1028                 namelen = 1;
1029         else if (it->ozi_pos == 2)
1030                 namelen = 2;
1031
1032         if (namelen > 0) {
1033                 rc = lu_dirent_calc_size(namelen, attr);
1034                 RETURN(rc);
1035         }
1036
1037         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1038         if (unlikely(rc != 0))
1039                 RETURN(rc);
1040
1041         if (za->za_integer_length != 8 || za->za_num_integers < 3) {
1042                 CERROR("%s: unsupported direntry format: %d %d\n",
1043                        osd_obj2dev(it->ozi_obj)->od_svname,
1044                        za->za_integer_length, (int)za->za_num_integers);
1045                 RETURN(-EIO);
1046         }
1047
1048         namelen = strlen(za->za_name);
1049         if (namelen > NAME_MAX)
1050                 RETURN(-EOVERFLOW);
1051
1052         rc = lu_dirent_calc_size(namelen, attr);
1053
1054         RETURN(rc);
1055 }
1056
1057 static __u64 osd_dir_it_store(const struct lu_env *env, const struct dt_it *di)
1058 {
1059         struct osd_zap_it *it = (struct osd_zap_it *)di;
1060         __u64              pos;
1061         ENTRY;
1062
1063         if (it->ozi_pos <= 2)
1064                 pos = it->ozi_pos;
1065         else
1066                 pos = osd_zap_cursor_serialize(it->ozi_zc);
1067
1068         RETURN(pos);
1069 }
1070
1071 /*
1072  * return status :
1073  *  rc == 0 -> end of directory.
1074  *  rc >  0 -> ok, proceed.
1075  *  rc <  0 -> error.  ( EOVERFLOW  can be masked.)
1076  */
1077 static int osd_dir_it_load(const struct lu_env *env,
1078                         const struct dt_it *di, __u64 hash)
1079 {
1080         struct osd_zap_it *it = (struct osd_zap_it *)di;
1081         struct osd_object *obj = it->ozi_obj;
1082         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1083         int                rc;
1084         ENTRY;
1085
1086         /* reset the cursor */
1087         zap_cursor_fini(it->ozi_zc);
1088         osd_obj_cursor_init_serialized(it->ozi_zc, obj, hash);
1089
1090         if (hash <= 2) {
1091                 it->ozi_pos = hash;
1092                 rc = +1;
1093         } else {
1094                 it->ozi_pos = 3;
1095                 /* to return whether the end has been reached */
1096                 rc = osd_index_retrieve_skip_dots(it, za);
1097                 if (rc == 0)
1098                         rc = +1;
1099                 else if (rc == -ENOENT)
1100                         rc = 0;
1101         }
1102
1103         RETURN(rc);
1104 }
1105
1106 struct dt_index_operations osd_dir_ops = {
1107         .dio_lookup         = osd_dir_lookup,
1108         .dio_declare_insert = osd_declare_dir_insert,
1109         .dio_insert         = osd_dir_insert,
1110         .dio_declare_delete = osd_declare_dir_delete,
1111         .dio_delete         = osd_dir_delete,
1112         .dio_it     = {
1113                 .init     = osd_dir_it_init,
1114                 .fini     = osd_index_it_fini,
1115                 .get      = osd_dir_it_get,
1116                 .put      = osd_dir_it_put,
1117                 .next     = osd_dir_it_next,
1118                 .key      = osd_dir_it_key,
1119                 .key_size = osd_dir_it_key_size,
1120                 .rec      = osd_dir_it_rec,
1121                 .rec_size = osd_dir_it_rec_size,
1122                 .store    = osd_dir_it_store,
1123                 .load     = osd_dir_it_load
1124         }
1125 };
1126
1127 /*
1128  * Primitives for index files using binary keys.
1129  */
1130
1131 /* key integer_size is 8 */
1132 static int osd_prepare_key_uint64(struct osd_object *o, __u64 *dst,
1133                                   const struct dt_key *src)
1134 {
1135         int size;
1136
1137         LASSERT(dst);
1138         LASSERT(src);
1139
1140         /* align keysize to 64bit */
1141         size = (o->oo_keysize + sizeof(__u64) - 1) / sizeof(__u64);
1142         size *= sizeof(__u64);
1143
1144         LASSERT(size <= MAXNAMELEN);
1145
1146         if (unlikely(size > o->oo_keysize))
1147                 memset(dst + o->oo_keysize, 0, size - o->oo_keysize);
1148         memcpy(dst, (const char *)src, o->oo_keysize);
1149
1150         return (size/sizeof(__u64));
1151 }
1152
1153 static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
1154                         struct dt_rec *rec, const struct dt_key *key,
1155                         struct lustre_capa *capa)
1156 {
1157         struct osd_object *obj = osd_dt_obj(dt);
1158         struct osd_device *osd = osd_obj2dev(obj);
1159         __u64             *k = osd_oti_get(env)->oti_key64;
1160         int                rc;
1161         ENTRY;
1162
1163         rc = osd_prepare_key_uint64(obj, k, key);
1164
1165         rc = -zap_lookup_uint64(osd->od_os, obj->oo_db->db_object,
1166                                 k, rc, obj->oo_recusize, obj->oo_recsize,
1167                                 (void *)rec);
1168         RETURN(rc == 0 ? 1 : rc);
1169 }
1170
1171 static int osd_declare_index_insert(const struct lu_env *env,
1172                                     struct dt_object *dt,
1173                                     const struct dt_rec *rec,
1174                                     const struct dt_key *key,
1175                                     struct thandle *th)
1176 {
1177         struct osd_object  *obj = osd_dt_obj(dt);
1178         struct osd_thandle *oh;
1179         ENTRY;
1180
1181         LASSERT(th != NULL);
1182         oh = container_of0(th, struct osd_thandle, ot_super);
1183
1184         LASSERT(obj->oo_db);
1185
1186         dmu_tx_hold_bonus(oh->ot_tx, obj->oo_db->db_object);
1187
1188         /* It is not clear what API should be used for binary keys, so we pass
1189          * a null name which has the side effect of over-reserving space,
1190          * accounting for the worst case. See zap_count_write() */
1191         dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, NULL);
1192
1193         RETURN(0);
1194 }
1195
1196 static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
1197                             const struct dt_rec *rec, const struct dt_key *key,
1198                             struct thandle *th, struct lustre_capa *capa,
1199                             int ignore_quota)
1200 {
1201         struct osd_object  *obj = osd_dt_obj(dt);
1202         struct osd_device  *osd = osd_obj2dev(obj);
1203         struct osd_thandle *oh;
1204         __u64              *k = osd_oti_get(env)->oti_key64;
1205         int                 rc;
1206         ENTRY;
1207
1208         LASSERT(obj->oo_db);
1209         LASSERT(dt_object_exists(dt));
1210         LASSERT(osd_invariant(obj));
1211         LASSERT(th != NULL);
1212
1213         oh = container_of0(th, struct osd_thandle, ot_super);
1214
1215         rc = osd_prepare_key_uint64(obj, k, key);
1216
1217         /* Insert (key,oid) into ZAP */
1218         rc = -zap_add_uint64(osd->od_os, obj->oo_db->db_object,
1219                              k, rc, obj->oo_recusize, obj->oo_recsize,
1220                              (void *)rec, oh->ot_tx);
1221         RETURN(rc);
1222 }
1223
1224 static int osd_declare_index_delete(const struct lu_env *env,
1225                                     struct dt_object *dt,
1226                                     const struct dt_key *key,
1227                                     struct thandle *th)
1228 {
1229         struct osd_object  *obj = osd_dt_obj(dt);
1230         struct osd_thandle *oh;
1231         ENTRY;
1232
1233         LASSERT(dt_object_exists(dt));
1234         LASSERT(osd_invariant(obj));
1235         LASSERT(th != NULL);
1236         LASSERT(obj->oo_db);
1237
1238         oh = container_of0(th, struct osd_thandle, ot_super);
1239         dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, NULL);
1240
1241         RETURN(0);
1242 }
1243
1244 static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
1245                             const struct dt_key *key, struct thandle *th,
1246                             struct lustre_capa *capa)
1247 {
1248         struct osd_object  *obj = osd_dt_obj(dt);
1249         struct osd_device  *osd = osd_obj2dev(obj);
1250         struct osd_thandle *oh;
1251         __u64              *k = osd_oti_get(env)->oti_key64;
1252         int                 rc;
1253         ENTRY;
1254
1255         LASSERT(obj->oo_db);
1256         LASSERT(th != NULL);
1257         oh = container_of0(th, struct osd_thandle, ot_super);
1258
1259         rc = osd_prepare_key_uint64(obj, k, key);
1260
1261         /* Remove binary key from the ZAP */
1262         rc = -zap_remove_uint64(osd->od_os, obj->oo_db->db_object,
1263                                 k, rc, oh->ot_tx);
1264         RETURN(rc);
1265 }
1266
1267 static int osd_index_it_get(const struct lu_env *env, struct dt_it *di,
1268                             const struct dt_key *key)
1269 {
1270         struct osd_zap_it *it = (struct osd_zap_it *)di;
1271         struct osd_object *obj = it->ozi_obj;
1272         struct osd_device *osd = osd_obj2dev(obj);
1273         ENTRY;
1274
1275         LASSERT(it);
1276         LASSERT(it->ozi_zc);
1277
1278         /*
1279          * XXX: we need a binary version of zap_cursor_move_to_key()
1280          *      to implement this API */
1281         if (*((const __u64 *)key) != 0)
1282                 CERROR("NOT IMPLEMETED YET (move to "LPX64")\n",
1283                        *((__u64 *)key));
1284
1285         zap_cursor_fini(it->ozi_zc);
1286         zap_cursor_init(it->ozi_zc, osd->od_os, obj->oo_db->db_object);
1287         it->ozi_reset = 1;
1288
1289         RETURN(+1);
1290 }
1291
1292 static int osd_index_it_next(const struct lu_env *env, struct dt_it *di)
1293 {
1294         struct osd_zap_it *it = (struct osd_zap_it *)di;
1295         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1296         int                rc;
1297         ENTRY;
1298
1299         if (it->ozi_reset == 0)
1300                 zap_cursor_advance(it->ozi_zc);
1301         it->ozi_reset = 0;
1302
1303         /*
1304          * According to current API we need to return error if it's last entry.
1305          * zap_cursor_advance() does not return any value. So we need to call
1306          * retrieve to check if there is any record.  We should make
1307          * changes to Iterator API to not return status for this API
1308          */
1309         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1310         if (rc == -ENOENT)
1311                 RETURN(+1);
1312
1313         RETURN((rc));
1314 }
1315
1316 static struct dt_key *osd_index_it_key(const struct lu_env *env,
1317                                        const struct dt_it *di)
1318 {
1319         struct osd_zap_it *it = (struct osd_zap_it *)di;
1320         struct osd_object *obj = it->ozi_obj;
1321         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1322         int                rc = 0;
1323         ENTRY;
1324
1325         it->ozi_reset = 0;
1326         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1327         if (rc)
1328                 RETURN(ERR_PTR(rc));
1329
1330         /* the binary key is stored in the name */
1331         memcpy(&it->ozi_key, za->za_name, obj->oo_keysize);
1332
1333         RETURN((struct dt_key *)&it->ozi_key);
1334 }
1335
1336 static int osd_index_it_key_size(const struct lu_env *env,
1337                                 const struct dt_it *di)
1338 {
1339         struct osd_zap_it *it = (struct osd_zap_it *)di;
1340         struct osd_object *obj = it->ozi_obj;
1341         RETURN(obj->oo_keysize);
1342 }
1343
1344 static int osd_index_it_rec(const struct lu_env *env, const struct dt_it *di,
1345                             struct dt_rec *rec, __u32 attr)
1346 {
1347         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1348         struct osd_zap_it *it = (struct osd_zap_it *)di;
1349         struct osd_object *obj = it->ozi_obj;
1350         struct osd_device *osd = osd_obj2dev(obj);
1351         __u64             *k = osd_oti_get(env)->oti_key64;
1352         int                rc;
1353         ENTRY;
1354
1355         it->ozi_reset = 0;
1356         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1357         if (rc)
1358                 RETURN(rc);
1359
1360         rc = osd_prepare_key_uint64(obj, k, (const struct dt_key *)za->za_name);
1361
1362         rc = -zap_lookup_uint64(osd->od_os, obj->oo_db->db_object,
1363                                 k, rc, obj->oo_recusize, obj->oo_recsize,
1364                                 (void *)rec);
1365         RETURN(rc);
1366 }
1367
1368 static __u64 osd_index_it_store(const struct lu_env *env,
1369                                 const struct dt_it *di)
1370 {
1371         struct osd_zap_it *it = (struct osd_zap_it *)di;
1372
1373         it->ozi_reset = 0;
1374         RETURN((__u64)zap_cursor_serialize(it->ozi_zc));
1375 }
1376
1377 static int osd_index_it_load(const struct lu_env *env, const struct dt_it *di,
1378                              __u64 hash)
1379 {
1380         struct osd_zap_it *it = (struct osd_zap_it *)di;
1381         struct osd_object *obj = it->ozi_obj;
1382         struct osd_device *osd = osd_obj2dev(obj);
1383         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1384         int                rc;
1385         ENTRY;
1386
1387         /* reset the cursor */
1388         zap_cursor_fini(it->ozi_zc);
1389         zap_cursor_init_serialized(it->ozi_zc, osd->od_os,
1390                                    obj->oo_db->db_object, hash);
1391         it->ozi_reset = 0;
1392
1393         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1394         if (rc == 0)
1395                 RETURN(+1);
1396         else if (rc == -ENOENT)
1397                 RETURN(0);
1398
1399         RETURN(rc);
1400 }
1401
1402 static struct dt_index_operations osd_index_ops = {
1403         .dio_lookup             = osd_index_lookup,
1404         .dio_declare_insert     = osd_declare_index_insert,
1405         .dio_insert             = osd_index_insert,
1406         .dio_declare_delete     = osd_declare_index_delete,
1407         .dio_delete             = osd_index_delete,
1408         .dio_it = {
1409                 .init           = osd_index_it_init,
1410                 .fini           = osd_index_it_fini,
1411                 .get            = osd_index_it_get,
1412                 .put            = osd_index_it_put,
1413                 .next           = osd_index_it_next,
1414                 .key            = osd_index_it_key,
1415                 .key_size       = osd_index_it_key_size,
1416                 .rec            = osd_index_it_rec,
1417                 .store          = osd_index_it_store,
1418                 .load           = osd_index_it_load
1419         }
1420 };
1421
1422 struct osd_metadnode_it {
1423         struct osd_device       *mit_dev;
1424         __u64                    mit_pos;
1425         struct lu_fid            mit_fid;
1426         int                      mit_prefetched;
1427         __u64                    mit_prefetched_dnode;
1428 };
1429
1430 static struct dt_it *osd_zfs_otable_it_init(const struct lu_env *env,
1431                                             struct dt_object *dt, __u32 attr,
1432                                             struct lustre_capa *capa)
1433 {
1434         struct osd_device       *dev   = osd_dev(dt->do_lu.lo_dev);
1435         struct osd_metadnode_it *it;
1436         ENTRY;
1437
1438         OBD_ALLOC_PTR(it);
1439         if (unlikely(it == NULL))
1440                 RETURN(ERR_PTR(-ENOMEM));
1441
1442         it->mit_dev = dev;
1443
1444         /* XXX: dmu_object_next() does NOT find dnodes allocated
1445          *      in the current non-committed txg, so we force txg
1446          *      commit to find all existing dnodes ... */
1447         txg_wait_synced(dmu_objset_pool(dev->od_os), 0ULL);
1448
1449         RETURN((struct dt_it *)it);
1450 }
1451
1452 static void osd_zfs_otable_it_fini(const struct lu_env *env, struct dt_it *di)
1453 {
1454         struct osd_metadnode_it *it  = (struct osd_metadnode_it *)di;
1455
1456         OBD_FREE_PTR(it);
1457 }
1458
1459 static int osd_zfs_otable_it_get(const struct lu_env *env,
1460                                  struct dt_it *di, const struct dt_key *key)
1461 {
1462         return 0;
1463 }
1464
1465 static void osd_zfs_otable_it_put(const struct lu_env *env, struct dt_it *di)
1466 {
1467 }
1468
1469 #define OTABLE_PREFETCH         256
1470
1471 static void osd_zfs_otable_prefetch(const struct lu_env *env,
1472                                     struct osd_metadnode_it *it)
1473 {
1474         struct osd_device       *dev = it->mit_dev;
1475         int                      rc;
1476
1477         /* can go negative on the very first access to the iterator
1478          * or if some non-Lustre objects were found */
1479         if (unlikely(it->mit_prefetched < 0))
1480                 it->mit_prefetched = 0;
1481
1482         if (it->mit_prefetched >= (OTABLE_PREFETCH >> 1))
1483                 return;
1484
1485         if (it->mit_prefetched_dnode == 0)
1486                 it->mit_prefetched_dnode = it->mit_pos;
1487
1488         while (it->mit_prefetched < OTABLE_PREFETCH) {
1489                 rc = -dmu_object_next(dev->od_os, &it->mit_prefetched_dnode,
1490                                       B_FALSE, 0);
1491                 if (unlikely(rc != 0))
1492                         break;
1493
1494                 /* dmu_prefetch() was exported in 0.6.2, if you use with
1495                  * an older release, just comment it out - this is an
1496                  * optimization */
1497                 dmu_prefetch(dev->od_os, it->mit_prefetched_dnode, 0, 0);
1498
1499                 it->mit_prefetched++;
1500         }
1501 }
1502
1503 static int osd_zfs_otable_it_next(const struct lu_env *env, struct dt_it *di)
1504 {
1505         struct osd_metadnode_it *it  = (struct osd_metadnode_it *)di;
1506         struct lustre_mdt_attrs *lma;
1507         struct osd_device       *dev = it->mit_dev;
1508         nvlist_t                *nvbuf = NULL;
1509         uchar_t                 *v;
1510         __u64                    dnode;
1511         int                      rc, s;
1512
1513         memset(&it->mit_fid, 0, sizeof(it->mit_fid));
1514
1515         dnode = it->mit_pos;
1516         do {
1517                 rc = -dmu_object_next(dev->od_os, &it->mit_pos, B_FALSE, 0);
1518                 if (unlikely(rc != 0))
1519                         GOTO(out, rc = 1);
1520                 it->mit_prefetched--;
1521
1522                 /* LMA is required for this to be a Lustre object.
1523                  * If there is no xattr skip it. */
1524                 rc = __osd_xattr_load(dev, it->mit_pos, &nvbuf);
1525                 if (unlikely(rc != 0))
1526                         continue;
1527
1528                 LASSERT(nvbuf != NULL);
1529                 rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA, &v, &s);
1530                 if (likely(rc == 0)) {
1531                         /* Lustre object */
1532                         lma = (struct lustre_mdt_attrs *)v;
1533                         lustre_lma_swab(lma);
1534                         it->mit_fid = lma->lma_self_fid;
1535                         nvlist_free(nvbuf);
1536                         break;
1537                 } else {
1538                         /* not a Lustre object, try next one */
1539                         nvlist_free(nvbuf);
1540                 }
1541
1542         } while (1);
1543
1544
1545         /* we aren't prefetching in the above loop because the number of
1546          * non-Lustre objects is very small and we will be repeating very
1547          * rare. in case we want to use this to iterate over non-Lustre
1548          * objects (i.e. when we convert regular ZFS in Lustre) it makes
1549          * sense to initiate prefetching in the loop */
1550
1551         /* 0 - there are more items, +1 - the end */
1552         if (likely(rc == 0))
1553                 osd_zfs_otable_prefetch(env, it);
1554
1555         CDEBUG(D_OTHER, "advance: %llu -> %llu "DFID": %d\n", dnode,
1556                it->mit_pos, PFID(&it->mit_fid), rc);
1557
1558 out:
1559         return rc;
1560 }
1561
1562 static struct dt_key *osd_zfs_otable_it_key(const struct lu_env *env,
1563                                             const struct dt_it *di)
1564 {
1565         return NULL;
1566 }
1567
1568 static int osd_zfs_otable_it_key_size(const struct lu_env *env,
1569                                       const struct dt_it *di)
1570 {
1571         return sizeof(__u64);
1572 }
1573
1574 static int osd_zfs_otable_it_rec(const struct lu_env *env,
1575                                  const struct dt_it *di,
1576                                  struct dt_rec *rec, __u32 attr)
1577 {
1578         struct osd_metadnode_it *it  = (struct osd_metadnode_it *)di;
1579         struct lu_fid *fid = (struct lu_fid *)rec;
1580         ENTRY;
1581
1582         *fid = it->mit_fid;
1583
1584         RETURN(0);
1585 }
1586
1587
1588 static __u64 osd_zfs_otable_it_store(const struct lu_env *env,
1589                                      const struct dt_it *di)
1590 {
1591         struct osd_metadnode_it *it  = (struct osd_metadnode_it *)di;
1592
1593         return it->mit_pos;
1594 }
1595
1596 static int osd_zfs_otable_it_load(const struct lu_env *env,
1597                                   const struct dt_it *di, __u64 hash)
1598 {
1599         struct osd_metadnode_it *it  = (struct osd_metadnode_it *)di;
1600
1601         it->mit_pos = hash;
1602         it->mit_prefetched = 0;
1603         it->mit_prefetched_dnode = 0;
1604
1605         return osd_zfs_otable_it_next(env, (struct dt_it *)di);
1606 }
1607
1608 static int osd_zfs_otable_it_key_rec(const struct lu_env *env,
1609                                      const struct dt_it *di, void *key_rec)
1610 {
1611         return 0;
1612 }
1613
1614 const struct dt_index_operations osd_zfs_otable_ops = {
1615         .dio_it = {
1616                 .init     = osd_zfs_otable_it_init,
1617                 .fini     = osd_zfs_otable_it_fini,
1618                 .get      = osd_zfs_otable_it_get,
1619                 .put      = osd_zfs_otable_it_put,
1620                 .next     = osd_zfs_otable_it_next,
1621                 .key      = osd_zfs_otable_it_key,
1622                 .key_size = osd_zfs_otable_it_key_size,
1623                 .rec      = osd_zfs_otable_it_rec,
1624                 .store    = osd_zfs_otable_it_store,
1625                 .load     = osd_zfs_otable_it_load,
1626                 .key_rec  = osd_zfs_otable_it_key_rec,
1627         }
1628 };
1629
1630 int osd_index_try(const struct lu_env *env, struct dt_object *dt,
1631                 const struct dt_index_features *feat)
1632 {
1633         struct osd_object *obj = osd_dt_obj(dt);
1634         ENTRY;
1635
1636         LASSERT(dt_object_exists(dt));
1637
1638         /*
1639          * XXX: implement support for fixed-size keys sorted with natural
1640          *      numerical way (not using internal hash value)
1641          */
1642         if (feat->dif_flags & DT_IND_RANGE)
1643                 RETURN(-ERANGE);
1644
1645         if (unlikely(feat == &dt_otable_features)) {
1646                 dt->do_index_ops = &osd_zfs_otable_ops;
1647                 RETURN(0);
1648         }
1649
1650         LASSERT(obj->oo_db != NULL);
1651         if (likely(feat == &dt_directory_features)) {
1652                 if (osd_object_is_zap(obj->oo_db))
1653                         dt->do_index_ops = &osd_dir_ops;
1654                 else
1655                         RETURN(-ENOTDIR);
1656         } else if (unlikely(feat == &dt_acct_features)) {
1657                 LASSERT(fid_is_acct(lu_object_fid(&dt->do_lu)));
1658                 dt->do_index_ops = &osd_acct_index_ops;
1659         } else if (osd_object_is_zap(obj->oo_db) &&
1660                    dt->do_index_ops == NULL) {
1661                 /* For index file, we don't support variable key & record sizes
1662                  * and the key has to be unique */
1663                 if ((feat->dif_flags & ~DT_IND_UPDATE) != 0)
1664                         RETURN(-EINVAL);
1665
1666                 if (feat->dif_keysize_max > ZAP_MAXNAMELEN)
1667                         RETURN(-E2BIG);
1668                 if (feat->dif_keysize_max != feat->dif_keysize_min)
1669                         RETURN(-EINVAL);
1670
1671                 /* As for the record size, it should be a multiple of 8 bytes
1672                  * and smaller than the maximum value length supported by ZAP.
1673                  */
1674                 if (feat->dif_recsize_max > ZAP_MAXVALUELEN)
1675                         RETURN(-E2BIG);
1676                 if (feat->dif_recsize_max != feat->dif_recsize_min)
1677                         RETURN(-EINVAL);
1678
1679                 obj->oo_keysize = feat->dif_keysize_max;
1680                 obj->oo_recsize = feat->dif_recsize_max;
1681                 obj->oo_recusize = 1;
1682
1683                 /* ZFS prefers to work with array of 64bits */
1684                 if ((obj->oo_recsize & 7) == 0) {
1685                         obj->oo_recsize >>= 3;
1686                         obj->oo_recusize = 8;
1687                 }
1688                 dt->do_index_ops = &osd_index_ops;
1689         }
1690
1691         RETURN(0);
1692 }