Whamcloud - gitweb
LU-3536 osd: allocate it for each iteration.
[fs/lustre-release.git] / lustre / osd-zfs / osd_index.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/osd-zfs/osd_index.c
37  *
38  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
39  * Author: Mike Pershin <tappro@whamcloud.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_OSD
43
44 #include <lustre_ver.h>
45 #include <libcfs/libcfs.h>
46 #include <obd_support.h>
47 #include <lustre_net.h>
48 #include <obd.h>
49 #include <obd_class.h>
50 #include <lustre_disk.h>
51 #include <lustre_fid.h>
52
53 #include "osd_internal.h"
54
55 #include <sys/dnode.h>
56 #include <sys/dbuf.h>
57 #include <sys/spa.h>
58 #include <sys/stat.h>
59 #include <sys/zap.h>
60 #include <sys/spa_impl.h>
61 #include <sys/zfs_znode.h>
62 #include <sys/dmu_tx.h>
63 #include <sys/dmu_objset.h>
64 #include <sys/dsl_prop.h>
65 #include <sys/sa_impl.h>
66 #include <sys/txg.h>
67
68 static inline int osd_object_is_zap(dmu_buf_t *db)
69 {
70         dmu_buf_impl_t *dbi = (dmu_buf_impl_t *) db;
71         dnode_t *dn;
72         int rc;
73
74         DB_DNODE_ENTER(dbi);
75         dn = DB_DNODE(dbi);
76         rc = (dn->dn_type == DMU_OT_DIRECTORY_CONTENTS ||
77                         dn->dn_type == DMU_OT_USERGROUP_USED);
78         DB_DNODE_EXIT(dbi);
79
80         return rc;
81 }
82
83 /* We don't actually have direct access to the zap_hashbits() function
84  * so just pretend like we do for now.  If this ever breaks we can look at
85  * it at that time. */
86 #define zap_hashbits(zc) 48
87 /*
88  * ZFS hash format:
89  * | cd (16 bits) | hash (48 bits) |
90  * we need it in other form:
91  * |0| hash (48 bit) | cd (15 bit) |
92  * to be a full 64-bit ordered hash so that Lustre readdir can use it to merge
93  * the readdir hashes from multiple directory stripes uniformly on the client.
94  * Another point is sign bit, the hash range should be in [0, 2^63-1] because
95  * loff_t (for llseek) needs to be a positive value.  This means the "cd" field
96  * should only be the low 15 bits.
97  */
98 uint64_t osd_zap_cursor_serialize(zap_cursor_t *zc)
99 {
100         uint64_t zfs_hash = zap_cursor_serialize(zc) & (~0ULL >> 1);
101
102         return (zfs_hash >> zap_hashbits(zc)) |
103                 (zfs_hash << (63 - zap_hashbits(zc)));
104 }
105
106 void osd_zap_cursor_init_serialized(zap_cursor_t *zc, struct objset *os,
107                                     uint64_t id, uint64_t dirhash)
108 {
109         uint64_t zfs_hash = ((dirhash << zap_hashbits(zc)) & (~0ULL >> 1)) |
110                 (dirhash >> (63 - zap_hashbits(zc)));
111
112         zap_cursor_init_serialized(zc, os, id, zfs_hash);
113 }
114
115 int osd_zap_cursor_init(zap_cursor_t **zc, struct objset *os,
116                         uint64_t id, uint64_t dirhash)
117 {
118         zap_cursor_t *t;
119
120         OBD_ALLOC_PTR(t);
121         if (unlikely(t == NULL))
122                 return -ENOMEM;
123
124         osd_zap_cursor_init_serialized(t, os, id, dirhash);
125         *zc = t;
126
127         return 0;
128 }
129
130 void osd_zap_cursor_fini(zap_cursor_t *zc)
131 {
132         zap_cursor_fini(zc);
133         OBD_FREE_PTR(zc);
134 }
135
136 static inline void osd_obj_cursor_init_serialized(zap_cursor_t *zc,
137                                                  struct osd_object *o,
138                                                  uint64_t dirhash)
139 {
140         struct osd_device *d = osd_obj2dev(o);
141         osd_zap_cursor_init_serialized(zc, d->od_os,
142                                        o->oo_db->db_object, dirhash);
143 }
144
145 static inline int osd_obj_cursor_init(zap_cursor_t **zc, struct osd_object *o,
146                         uint64_t dirhash)
147 {
148         struct osd_device *d = osd_obj2dev(o);
149         return osd_zap_cursor_init(zc, d->od_os, o->oo_db->db_object, dirhash);
150 }
151
152 static struct dt_it *osd_index_it_init(const struct lu_env *env,
153                                        struct dt_object *dt,
154                                        __u32 unused,
155                                        struct lustre_capa *capa)
156 {
157         struct osd_thread_info  *info = osd_oti_get(env);
158         struct osd_zap_it       *it;
159         struct osd_object       *obj = osd_dt_obj(dt);
160         struct lu_object        *lo  = &dt->do_lu;
161         int                      rc;
162         ENTRY;
163
164         /* XXX: check capa ? */
165
166         LASSERT(lu_object_exists(lo));
167         LASSERT(obj->oo_db);
168         LASSERT(osd_object_is_zap(obj->oo_db));
169         LASSERT(info);
170
171         OBD_SLAB_ALLOC_PTR_GFP(it, osd_zapit_cachep, GFP_NOFS);
172         if (it == NULL)
173                 RETURN(ERR_PTR(-ENOMEM));
174
175         rc = osd_obj_cursor_init(&it->ozi_zc, obj, 0);
176         if (rc != 0) {
177                 OBD_SLAB_FREE_PTR(it, osd_zapit_cachep);
178                 RETURN(ERR_PTR(rc));
179         }
180
181         it->ozi_obj   = obj;
182         it->ozi_capa  = capa;
183         it->ozi_reset = 1;
184         lu_object_get(lo);
185
186         RETURN((struct dt_it *)it);
187 }
188
189 static void osd_index_it_fini(const struct lu_env *env, struct dt_it *di)
190 {
191         struct osd_zap_it       *it     = (struct osd_zap_it *)di;
192         struct osd_object       *obj;
193         ENTRY;
194
195         LASSERT(it);
196         LASSERT(it->ozi_obj);
197
198         obj = it->ozi_obj;
199
200         osd_zap_cursor_fini(it->ozi_zc);
201         lu_object_put(env, &obj->oo_dt.do_lu);
202         OBD_SLAB_FREE_PTR(it, osd_zapit_cachep);
203
204         EXIT;
205 }
206
207
208 static void osd_index_it_put(const struct lu_env *env, struct dt_it *di)
209 {
210         /* PBS: do nothing : ref are incremented at retrive and decreamented
211          *      next/finish. */
212 }
213
214 static inline void osd_it_append_attrs(struct lu_dirent *ent, __u32 attr,
215                                        int len, __u16 type)
216 {
217         const unsigned    align = sizeof(struct luda_type) - 1;
218         struct luda_type *lt;
219
220         /* check if file type is required */
221         if (attr & LUDA_TYPE) {
222                 len = (len + align) & ~align;
223
224                 lt = (void *)ent->lde_name + len;
225                 lt->lt_type = cpu_to_le16(DTTOIF(type));
226                 ent->lde_attrs |= LUDA_TYPE;
227         }
228
229         ent->lde_attrs = cpu_to_le32(ent->lde_attrs);
230 }
231
232 /*
233  * as we don't know FID, we can't use LU object, so this function
234  * partially duplicate __osd_xattr_get() which is built around
235  * LU-object and uses it to cache data like regular EA dnode, etc
236  */
237 static int osd_find_parent_by_dnode(const struct lu_env *env,
238                                     struct dt_object *o,
239                                     struct lu_fid *fid)
240 {
241         struct osd_device       *osd = osd_obj2dev(osd_dt_obj(o));
242         struct lustre_mdt_attrs *lma;
243         struct lu_buf            buf;
244         sa_handle_t             *sa_hdl;
245         nvlist_t                *nvbuf = NULL;
246         uchar_t                 *value;
247         uint64_t                 dnode;
248         int                      rc, size;
249         ENTRY;
250
251         /* first of all, get parent dnode from own attributes */
252         LASSERT(osd_dt_obj(o)->oo_db);
253         rc = -sa_handle_get(osd->od_os, osd_dt_obj(o)->oo_db->db_object,
254                             NULL, SA_HDL_PRIVATE, &sa_hdl);
255         if (rc)
256                 RETURN(rc);
257
258         dnode = ZFS_NO_OBJECT;
259         rc = -sa_lookup(sa_hdl, SA_ZPL_PARENT(osd), &dnode, 8);
260         sa_handle_destroy(sa_hdl);
261         if (rc)
262                 RETURN(rc);
263
264         /* now get EA buffer */
265         rc = __osd_xattr_load(osd, dnode, &nvbuf);
266         if (rc)
267                 GOTO(regular, rc);
268
269         /* XXX: if we get that far.. should we cache the result? */
270
271         /* try to find LMA attribute */
272         LASSERT(nvbuf != NULL);
273         rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA, &value, &size);
274         if (rc == 0 && size >= sizeof(*lma)) {
275                 lma = (struct lustre_mdt_attrs *)value;
276                 lustre_lma_swab(lma);
277                 *fid = lma->lma_self_fid;
278                 GOTO(out, rc = 0);
279         }
280
281 regular:
282         /* no LMA attribute in SA, let's try regular EA */
283
284         /* first of all, get parent dnode storing regular EA */
285         rc = -sa_handle_get(osd->od_os, dnode, NULL, SA_HDL_PRIVATE, &sa_hdl);
286         if (rc)
287                 GOTO(out, rc);
288
289         dnode = ZFS_NO_OBJECT;
290         rc = -sa_lookup(sa_hdl, SA_ZPL_XATTR(osd), &dnode, 8);
291         sa_handle_destroy(sa_hdl);
292         if (rc)
293                 GOTO(out, rc);
294
295         CLASSERT(sizeof(*lma) <= sizeof(osd_oti_get(env)->oti_buf));
296         buf.lb_buf = osd_oti_get(env)->oti_buf;
297         buf.lb_len = sizeof(osd_oti_get(env)->oti_buf);
298
299         /* now try to find LMA */
300         rc = __osd_xattr_get_large(env, osd, dnode, &buf,
301                                    XATTR_NAME_LMA, &size);
302         if (rc == 0 && size >= sizeof(*lma)) {
303                 lma = buf.lb_buf;
304                 lustre_lma_swab(lma);
305                 *fid = lma->lma_self_fid;
306                 GOTO(out, rc = 0);
307         } else if (rc < 0) {
308                 GOTO(out, rc);
309         } else {
310                 GOTO(out, rc = -EIO);
311         }
312
313 out:
314         if (nvbuf != NULL)
315                 nvlist_free(nvbuf);
316         RETURN(rc);
317 }
318
319 static int osd_find_parent_fid(const struct lu_env *env, struct dt_object *o,
320                                struct lu_fid *fid)
321 {
322         struct link_ea_header  *leh;
323         struct link_ea_entry   *lee;
324         struct lu_buf           buf;
325         int                     rc;
326         ENTRY;
327
328         buf.lb_buf = osd_oti_get(env)->oti_buf;
329         buf.lb_len = sizeof(osd_oti_get(env)->oti_buf);
330
331         rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK, BYPASS_CAPA);
332         if (rc == -ERANGE) {
333                 rc = osd_xattr_get(env, o, &LU_BUF_NULL,
334                                    XATTR_NAME_LINK, BYPASS_CAPA);
335                 if (rc < 0)
336                         RETURN(rc);
337                 LASSERT(rc > 0);
338                 OBD_ALLOC(buf.lb_buf, rc);
339                 if (buf.lb_buf == NULL)
340                         RETURN(-ENOMEM);
341                 buf.lb_len = rc;
342                 rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK, BYPASS_CAPA);
343         }
344         if (rc < 0)
345                 GOTO(out, rc);
346         if (rc < sizeof(*leh) + sizeof(*lee))
347                 GOTO(out, rc = -EINVAL);
348
349         leh = buf.lb_buf;
350         if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
351                 leh->leh_magic = LINK_EA_MAGIC;
352                 leh->leh_reccount = __swab32(leh->leh_reccount);
353                 leh->leh_len = __swab64(leh->leh_len);
354         }
355         if (leh->leh_magic != LINK_EA_MAGIC)
356                 GOTO(out, rc = -EINVAL);
357         if (leh->leh_reccount == 0)
358                 GOTO(out, rc = -ENODATA);
359
360         lee = (struct link_ea_entry *)(leh + 1);
361         fid_be_to_cpu(fid, (const struct lu_fid *)&lee->lee_parent_fid);
362         rc = 0;
363
364 out:
365         if (buf.lb_buf != osd_oti_get(env)->oti_buf)
366                 OBD_FREE(buf.lb_buf, buf.lb_len);
367
368 #if 0
369         /* this block can be enabled for additional verification
370          * it's trying to match FID from LinkEA vs. FID from LMA */
371         if (rc == 0) {
372                 struct lu_fid fid2;
373                 int rc2;
374                 rc2 = osd_find_parent_by_dnode(env, o, &fid2);
375                 if (rc2 == 0)
376                         if (lu_fid_eq(fid, &fid2) == 0)
377                                 CERROR("wrong parent: "DFID" != "DFID"\n",
378                                        PFID(fid), PFID(&fid2));
379         }
380 #endif
381
382         /* no LinkEA is found, let's try to find the fid in parent's LMA */
383         if (unlikely(rc != 0))
384                 rc = osd_find_parent_by_dnode(env, o, fid);
385
386         RETURN(rc);
387 }
388
389 static int osd_dir_lookup(const struct lu_env *env, struct dt_object *dt,
390                           struct dt_rec *rec, const struct dt_key *key,
391                           struct lustre_capa *capa)
392 {
393         struct osd_thread_info *oti = osd_oti_get(env);
394         struct osd_object  *obj = osd_dt_obj(dt);
395         struct osd_device  *osd = osd_obj2dev(obj);
396         char               *name = (char *)key;
397         int                 rc;
398         ENTRY;
399
400         LASSERT(osd_object_is_zap(obj->oo_db));
401
402         if (name[0] == '.') {
403                 if (name[1] == 0) {
404                         const struct lu_fid *f = lu_object_fid(&dt->do_lu);
405                         memcpy(rec, f, sizeof(*f));
406                         RETURN(1);
407                 } else if (name[1] == '.' && name[2] == 0) {
408                         rc = osd_find_parent_fid(env, dt, (struct lu_fid *)rec);
409                         RETURN(rc == 0 ? 1 : rc);
410                 }
411         }
412
413         rc = -zap_lookup(osd->od_os, obj->oo_db->db_object,
414                          (char *)key, 8, sizeof(oti->oti_zde) / 8,
415                          (void *)&oti->oti_zde);
416         memcpy(rec, &oti->oti_zde.lzd_fid, sizeof(struct lu_fid));
417
418         RETURN(rc == 0 ? 1 : rc);
419 }
420
421 static int osd_declare_dir_insert(const struct lu_env *env,
422                                   struct dt_object *dt,
423                                   const struct dt_rec *rec,
424                                   const struct dt_key *key,
425                                   struct thandle *th)
426 {
427         struct osd_object  *obj = osd_dt_obj(dt);
428         struct osd_thandle *oh;
429         uint64_t object;
430         ENTRY;
431
432         LASSERT(th != NULL);
433         oh = container_of0(th, struct osd_thandle, ot_super);
434
435         /* This is for inserting dot/dotdot for new created dir. */
436         if (obj->oo_db == NULL)
437                 object = DMU_NEW_OBJECT;
438         else
439                 object = obj->oo_db->db_object;
440
441         dmu_tx_hold_bonus(oh->ot_tx, object);
442         dmu_tx_hold_zap(oh->ot_tx, object, TRUE, (char *)key);
443
444         RETURN(0);
445 }
446
447 /**
448  * Find the osd object for given fid.
449  *
450  * \param fid need to find the osd object having this fid
451  *
452  * \retval osd_object on success
453  * \retval        -ve on error
454  */
455 struct osd_object *osd_object_find(const struct lu_env *env,
456                                    struct dt_object *dt,
457                                    const struct lu_fid *fid)
458 {
459         struct lu_device         *ludev = dt->do_lu.lo_dev;
460         struct osd_object        *child = NULL;
461         struct lu_object         *luch;
462         struct lu_object         *lo;
463
464         /*
465          * at this point topdev might not exist yet
466          * (i.e. MGS is preparing profiles). so we can
467          * not rely on topdev and instead lookup with
468          * our device passed as topdev. this can't work
469          * if the object isn't cached yet (as osd doesn't
470          * allocate lu_header). IOW, the object must be
471          * in the cache, otherwise lu_object_alloc() crashes
472          * -bzzz
473          */
474         luch = lu_object_find_at(env, ludev, fid, NULL);
475         if (IS_ERR(luch))
476                 return (void *)luch;
477
478         if (lu_object_exists(luch)) {
479                 lo = lu_object_locate(luch->lo_header, ludev->ld_type);
480                 if (lo != NULL)
481                         child = osd_obj(lo);
482                 else
483                         LU_OBJECT_DEBUG(D_ERROR, env, luch,
484                                         "%s: object can't be located "DFID"\n",
485                                         osd_dev(ludev)->od_svname, PFID(fid));
486
487                 if (child == NULL) {
488                         lu_object_put(env, luch);
489                         CERROR("%s: Unable to get osd_object "DFID"\n",
490                                osd_dev(ludev)->od_svname, PFID(fid));
491                         child = ERR_PTR(-ENOENT);
492                 }
493         } else {
494                 LU_OBJECT_DEBUG(D_ERROR, env, luch,
495                                 "%s: lu_object does not exists "DFID"\n",
496                                 osd_dev(ludev)->od_svname, PFID(fid));
497                 lu_object_put(env, luch);
498                 child = ERR_PTR(-ENOENT);
499         }
500
501         return child;
502 }
503
504 /**
505  * Put the osd object once done with it.
506  *
507  * \param obj osd object that needs to be put
508  */
509 static inline void osd_object_put(const struct lu_env *env,
510                                   struct osd_object *obj)
511 {
512         lu_object_put(env, &obj->oo_dt.do_lu);
513 }
514
515 static int osd_seq_exists(const struct lu_env *env, struct osd_device *osd,
516                           u64 seq)
517 {
518         struct lu_seq_range     *range = &osd_oti_get(env)->oti_seq_range;
519         struct seq_server_site  *ss = osd_seq_site(osd);
520         int                     rc;
521         ENTRY;
522
523         LASSERT(ss != NULL);
524         LASSERT(ss->ss_server_fld != NULL);
525
526         rc = osd_fld_lookup(env, osd, seq, range);
527         if (rc != 0) {
528                 CERROR("%s: Can not lookup fld for "LPX64"\n",
529                        osd_name(osd), seq);
530                 RETURN(0);
531         }
532
533         RETURN(ss->ss_node_id == range->lsr_index);
534 }
535
536 static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
537                           const struct lu_fid *fid)
538 {
539         struct seq_server_site  *ss = osd_seq_site(osd);
540         ENTRY;
541
542         /* FID seqs not in FLDB, must be local seq */
543         if (unlikely(!fid_seq_in_fldb(fid_seq(fid))))
544                 RETURN(0);
545
546         /* If FLD is not being initialized yet, it only happens during the
547          * initialization, likely during mgs initialization, and we assume
548          * this is local FID. */
549         if (ss == NULL || ss->ss_server_fld == NULL)
550                 RETURN(0);
551
552         /* Only check the local FLDB here */
553         if (osd_seq_exists(env, osd, fid_seq(fid)))
554                 RETURN(0);
555
556         RETURN(1);
557 }
558
559 /**
560  *      Inserts (key, value) pair in \a directory object.
561  *
562  *      \param  dt      osd index object
563  *      \param  key     key for index
564  *      \param  rec     record reference
565  *      \param  th      transaction handler
566  *      \param  capa    capability descriptor
567  *      \param  ignore_quota update should not affect quota
568  *
569  *      \retval  0  success
570  *      \retval -ve failure
571  */
572 static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt,
573                           const struct dt_rec *rec, const struct dt_key *key,
574                           struct thandle *th, struct lustre_capa *capa,
575                           int ignore_quota)
576 {
577         struct osd_thread_info *oti = osd_oti_get(env);
578         struct osd_object   *parent = osd_dt_obj(dt);
579         struct osd_device   *osd = osd_obj2dev(parent);
580         struct dt_insert_rec *rec1 = (struct dt_insert_rec *)rec;
581         const struct lu_fid *fid = rec1->rec_fid;
582         struct osd_thandle  *oh;
583         struct osd_object   *child = NULL;
584         __u32                attr;
585         char                *name = (char *)key;
586         int                  rc;
587         ENTRY;
588
589         LASSERT(parent->oo_db);
590         LASSERT(osd_object_is_zap(parent->oo_db));
591
592         LASSERT(dt_object_exists(dt));
593         LASSERT(osd_invariant(parent));
594
595         LASSERT(th != NULL);
596         oh = container_of0(th, struct osd_thandle, ot_super);
597
598         rc = osd_remote_fid(env, osd, fid);
599         if (rc < 0) {
600                 CERROR("%s: Can not find object "DFID": rc = %d\n",
601                        osd->od_svname, PFID(fid), rc);
602                 RETURN(rc);
603         }
604
605         if (unlikely(rc == 1)) {
606                 /* Insert remote entry */
607                 memset(&oti->oti_zde.lzd_reg, 0, sizeof(oti->oti_zde.lzd_reg));
608                 oti->oti_zde.lzd_reg.zde_type = IFTODT(rec1->rec_type & S_IFMT);
609         } else {
610                 /*
611                  * To simulate old Orion setups with ./..  stored in the
612                  * directories
613                  */
614                 /* Insert local entry */
615                 child = osd_object_find(env, dt, fid);
616                 if (IS_ERR(child))
617                         RETURN(PTR_ERR(child));
618
619                 LASSERT(child->oo_db);
620                 if (name[0] == '.') {
621                         if (name[1] == 0) {
622                                 /* do not store ".", instead generate it
623                                  * during iteration */
624                                 GOTO(out, rc = 0);
625                         } else if (name[1] == '.' && name[2] == 0) {
626                                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PARENT)) {
627                                         struct lu_fid tfid = *fid;
628
629                                         osd_object_put(env, child);
630                                         tfid.f_oid--;
631                                         child = osd_object_find(env, dt, &tfid);
632                                         if (IS_ERR(child))
633                                                 RETURN(PTR_ERR(child));
634
635                                         LASSERT(child->oo_db);
636                                 }
637
638                                 /* update parent dnode in the child.
639                                  * later it will be used to generate ".." */
640                                 rc = osd_object_sa_update(parent,
641                                                  SA_ZPL_PARENT(osd),
642                                                  &child->oo_db->db_object,
643                                                  8, oh);
644
645                                 GOTO(out, rc);
646                         }
647                 }
648                 CLASSERT(sizeof(oti->oti_zde.lzd_reg) == 8);
649                 CLASSERT(sizeof(oti->oti_zde) % 8 == 0);
650                 attr = child->oo_dt.do_lu.lo_header ->loh_attr;
651                 oti->oti_zde.lzd_reg.zde_type = IFTODT(attr & S_IFMT);
652                 oti->oti_zde.lzd_reg.zde_dnode = child->oo_db->db_object;
653         }
654
655         oti->oti_zde.lzd_fid = *fid;
656         /* Insert (key,oid) into ZAP */
657         rc = -zap_add(osd->od_os, parent->oo_db->db_object,
658                       (char *)key, 8, sizeof(oti->oti_zde) / 8,
659                       (void *)&oti->oti_zde, oh->ot_tx);
660         if (unlikely(rc == -EEXIST &&
661                      name[0] == '.' && name[1] == '.' && name[2] == 0))
662                 /* Update (key,oid) in ZAP */
663                 rc = -zap_update(osd->od_os, parent->oo_db->db_object,
664                                 (char *)key, 8, sizeof(oti->oti_zde) / 8,
665                                 (void *)&oti->oti_zde, oh->ot_tx);
666
667 out:
668         if (child != NULL)
669                 osd_object_put(env, child);
670
671         RETURN(rc);
672 }
673
674 static int osd_declare_dir_delete(const struct lu_env *env,
675                                   struct dt_object *dt,
676                                   const struct dt_key *key,
677                                   struct thandle *th)
678 {
679         struct osd_object *obj = osd_dt_obj(dt);
680         struct osd_thandle *oh;
681         ENTRY;
682
683         LASSERT(dt_object_exists(dt));
684         LASSERT(osd_invariant(obj));
685
686         LASSERT(th != NULL);
687         oh = container_of0(th, struct osd_thandle, ot_super);
688
689         LASSERT(obj->oo_db);
690         LASSERT(osd_object_is_zap(obj->oo_db));
691
692         dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, (char *)key);
693
694         RETURN(0);
695 }
696
697 static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt,
698                           const struct dt_key *key, struct thandle *th,
699                           struct lustre_capa *capa)
700 {
701         struct osd_object *obj = osd_dt_obj(dt);
702         struct osd_device *osd = osd_obj2dev(obj);
703         struct osd_thandle *oh;
704         dmu_buf_t *zap_db = obj->oo_db;
705         char      *name = (char *)key;
706         int rc;
707         ENTRY;
708
709         LASSERT(obj->oo_db);
710         LASSERT(osd_object_is_zap(obj->oo_db));
711
712         LASSERT(th != NULL);
713         oh = container_of0(th, struct osd_thandle, ot_super);
714
715         /*
716          * In Orion . and .. were stored in the directory (not generated upon
717          * request as now). we preserve them for backward compatibility
718          */
719         if (name[0] == '.') {
720                 if (name[1] == 0) {
721                         RETURN(0);
722                 } else if (name[1] == '.' && name[2] == 0) {
723                         RETURN(0);
724                 }
725         }
726
727         /* Remove key from the ZAP */
728         rc = -zap_remove(osd->od_os, zap_db->db_object,
729                          (char *) key, oh->ot_tx);
730
731         if (unlikely(rc && rc != -ENOENT))
732                 CERROR("%s: zap_remove failed: rc = %d\n", osd->od_svname, rc);
733
734         RETURN(rc);
735 }
736
737 static struct dt_it *osd_dir_it_init(const struct lu_env *env,
738                                      struct dt_object *dt,
739                                      __u32 unused,
740                                      struct lustre_capa *capa)
741 {
742         struct osd_zap_it *it;
743
744         it = (struct osd_zap_it *)osd_index_it_init(env, dt, unused, capa);
745         if (!IS_ERR(it))
746                 it->ozi_pos = 0;
747
748         RETURN((struct dt_it *)it);
749 }
750
751 /**
752  *  Move Iterator to record specified by \a key
753  *
754  *  \param  di      osd iterator
755  *  \param  key     key for index
756  *
757  *  \retval +ve  di points to record with least key not larger than key
758  *  \retval  0   di points to exact matched key
759  *  \retval -ve  failure
760  */
761 static int osd_dir_it_get(const struct lu_env *env,
762                           struct dt_it *di, const struct dt_key *key)
763 {
764         struct osd_zap_it *it = (struct osd_zap_it *)di;
765         struct osd_object *obj = it->ozi_obj;
766         char              *name = (char *)key;
767         int                rc;
768         ENTRY;
769
770         LASSERT(it);
771         LASSERT(it->ozi_zc);
772
773         /* reset the cursor */
774         zap_cursor_fini(it->ozi_zc);
775         osd_obj_cursor_init_serialized(it->ozi_zc, obj, 0);
776
777         /* XXX: implementation of the API is broken at the moment */
778         LASSERT(((const char *)key)[0] == 0);
779
780         if (name[0] == 0) {
781                 it->ozi_pos = 0;
782                 RETURN(1);
783         }
784
785         if (name[0] == '.') {
786                 if (name[1] == 0) {
787                         it->ozi_pos = 1;
788                         GOTO(out, rc = 1);
789                 } else if (name[1] == '.' && name[2] == 0) {
790                         it->ozi_pos = 2;
791                         GOTO(out, rc = 1);
792                 }
793         }
794
795         /* neither . nor .. - some real record */
796         it->ozi_pos = 3;
797         rc = +1;
798
799 out:
800         RETURN(rc);
801 }
802
803 static void osd_dir_it_put(const struct lu_env *env, struct dt_it *di)
804 {
805         /* PBS: do nothing : ref are incremented at retrive and decreamented
806          *      next/finish. */
807 }
808
809 /*
810  * in Orion . and .. were stored in the directory, while ZPL
811  * and current osd-zfs generate them up on request. so, we
812  * need to ignore previously stored . and ..
813  */
814 static int osd_index_retrieve_skip_dots(struct osd_zap_it *it,
815                                         zap_attribute_t *za)
816 {
817         int rc, isdot;
818
819         do {
820                 rc = -zap_cursor_retrieve(it->ozi_zc, za);
821
822                 isdot = 0;
823                 if (unlikely(rc == 0 && za->za_name[0] == '.')) {
824                         if (za->za_name[1] == 0) {
825                                 isdot = 1;
826                         } else if (za->za_name[1] == '.' &&
827                                    za->za_name[2] == 0) {
828                                 isdot = 1;
829                         }
830                         if (unlikely(isdot))
831                                 zap_cursor_advance(it->ozi_zc);
832                 }
833         } while (unlikely(rc == 0 && isdot));
834
835         return rc;
836 }
837
838 /**
839  * to load a directory entry at a time and stored it in
840  * iterator's in-memory data structure.
841  *
842  * \param di, struct osd_it_ea, iterator's in memory structure
843  *
844  * \retval +ve, iterator reached to end
845  * \retval   0, iterator not reached to end
846  * \retval -ve, on error
847  */
848 static int osd_dir_it_next(const struct lu_env *env, struct dt_it *di)
849 {
850         struct osd_zap_it *it = (struct osd_zap_it *)di;
851         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
852         int                rc;
853
854         ENTRY;
855
856         /* temp. storage should be enough for any key supported by ZFS */
857         CLASSERT(sizeof(za->za_name) <= sizeof(it->ozi_name));
858
859         /*
860          * the first ->next() moves the cursor to .
861          * the second ->next() moves the cursor to ..
862          * then we get to the real records and have to verify any exist
863          */
864         if (it->ozi_pos <= 2) {
865                 it->ozi_pos++;
866                 if (it->ozi_pos <=2)
867                         RETURN(0);
868
869         } else {
870                 zap_cursor_advance(it->ozi_zc);
871         }
872
873         /*
874          * According to current API we need to return error if its last entry.
875          * zap_cursor_advance() does not return any value. So we need to call
876          * retrieve to check if there is any record.  We should make
877          * changes to Iterator API to not return status for this API
878          */
879         rc = osd_index_retrieve_skip_dots(it, za);
880
881         if (rc == -ENOENT) /* end of dir */
882                 RETURN(+1);
883
884         RETURN(rc);
885 }
886
887 static struct dt_key *osd_dir_it_key(const struct lu_env *env,
888                                      const struct dt_it *di)
889 {
890         struct osd_zap_it *it = (struct osd_zap_it *)di;
891         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
892         int                rc = 0;
893         ENTRY;
894
895         if (it->ozi_pos <= 1) {
896                 it->ozi_pos = 1;
897                 RETURN((struct dt_key *)".");
898         } else if (it->ozi_pos == 2) {
899                 RETURN((struct dt_key *)"..");
900         }
901
902         if ((rc = -zap_cursor_retrieve(it->ozi_zc, za)))
903                 RETURN(ERR_PTR(rc));
904
905         strcpy(it->ozi_name, za->za_name);
906
907         RETURN((struct dt_key *)it->ozi_name);
908 }
909
910 static int osd_dir_it_key_size(const struct lu_env *env, const struct dt_it *di)
911 {
912         struct osd_zap_it *it = (struct osd_zap_it *)di;
913         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
914         int                rc;
915         ENTRY;
916
917         if (it->ozi_pos <= 1) {
918                 it->ozi_pos = 1;
919                 RETURN(2);
920         } else if (it->ozi_pos == 2) {
921                 RETURN(3);
922         }
923
924         if ((rc = -zap_cursor_retrieve(it->ozi_zc, za)) == 0)
925                 rc = strlen(za->za_name);
926
927         RETURN(rc);
928 }
929
930 static int osd_dir_it_rec(const struct lu_env *env, const struct dt_it *di,
931                           struct dt_rec *dtrec, __u32 attr)
932 {
933         struct osd_zap_it   *it = (struct osd_zap_it *)di;
934         struct lu_dirent    *lde = (struct lu_dirent *)dtrec;
935         struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
936         zap_attribute_t     *za = &osd_oti_get(env)->oti_za;
937         int                  rc, namelen;
938         ENTRY;
939
940         if (it->ozi_pos <= 1) {
941                 lde->lde_hash = cpu_to_le64(1);
942                 strcpy(lde->lde_name, ".");
943                 lde->lde_namelen = cpu_to_le16(1);
944                 lde->lde_fid = *lu_object_fid(&it->ozi_obj->oo_dt.do_lu);
945                 lde->lde_attrs = LUDA_FID;
946                 /* append lustre attributes */
947                 osd_it_append_attrs(lde, attr, 1, IFTODT(S_IFDIR));
948                 lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(1, attr));
949                 it->ozi_pos = 1;
950                 GOTO(out, rc = 0);
951
952         } else if (it->ozi_pos == 2) {
953                 lde->lde_hash = cpu_to_le64(2);
954                 strcpy(lde->lde_name, "..");
955                 lde->lde_namelen = cpu_to_le16(2);
956                 lde->lde_attrs = LUDA_FID;
957                 /* append lustre attributes */
958                 osd_it_append_attrs(lde, attr, 2, IFTODT(S_IFDIR));
959                 lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(2, attr));
960                 rc = osd_find_parent_fid(env, &it->ozi_obj->oo_dt, &lde->lde_fid);
961
962                 /* ENOENT happens at the root of filesystem so ignore it */
963                 if (rc == -ENOENT)
964                         rc = 0;
965                 GOTO(out, rc);
966         }
967
968         LASSERT(lde);
969
970         rc = -zap_cursor_retrieve(it->ozi_zc, za);
971         if (unlikely(rc != 0))
972                 GOTO(out, rc);
973
974         lde->lde_hash = cpu_to_le64(osd_zap_cursor_serialize(it->ozi_zc));
975         namelen = strlen(za->za_name);
976         if (namelen > NAME_MAX)
977                 GOTO(out, rc = -EOVERFLOW);
978         strcpy(lde->lde_name, za->za_name);
979         lde->lde_namelen = cpu_to_le16(namelen);
980
981         if (za->za_integer_length != 8 || za->za_num_integers < 3) {
982                 CERROR("%s: unsupported direntry format: %d %d\n",
983                        osd_obj2dev(it->ozi_obj)->od_svname,
984                        za->za_integer_length, (int)za->za_num_integers);
985
986                 GOTO(out, rc = -EIO);
987         }
988
989         rc = -zap_lookup(it->ozi_zc->zc_objset, it->ozi_zc->zc_zapobj,
990                          za->za_name, za->za_integer_length, 3, zde);
991         if (rc)
992                 GOTO(out, rc);
993
994         lde->lde_fid = zde->lzd_fid;
995         lde->lde_attrs = LUDA_FID;
996
997         /* append lustre attributes */
998         osd_it_append_attrs(lde, attr, namelen, zde->lzd_reg.zde_type);
999
1000         lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(namelen, attr));
1001
1002 out:
1003         RETURN(rc);
1004 }
1005
1006 static int osd_dir_it_rec_size(const struct lu_env *env, const struct dt_it *di,
1007                                __u32 attr)
1008 {
1009         struct osd_zap_it   *it = (struct osd_zap_it *)di;
1010         zap_attribute_t     *za = &osd_oti_get(env)->oti_za;
1011         size_t               namelen = 0;
1012         int                  rc;
1013         ENTRY;
1014
1015         if (it->ozi_pos <= 1)
1016                 namelen = 1;
1017         else if (it->ozi_pos == 2)
1018                 namelen = 2;
1019
1020         if (namelen > 0) {
1021                 rc = lu_dirent_calc_size(namelen, attr);
1022                 RETURN(rc);
1023         }
1024
1025         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1026         if (unlikely(rc != 0))
1027                 RETURN(rc);
1028
1029         if (za->za_integer_length != 8 || za->za_num_integers < 3) {
1030                 CERROR("%s: unsupported direntry format: %d %d\n",
1031                        osd_obj2dev(it->ozi_obj)->od_svname,
1032                        za->za_integer_length, (int)za->za_num_integers);
1033                 RETURN(-EIO);
1034         }
1035
1036         namelen = strlen(za->za_name);
1037         if (namelen > NAME_MAX)
1038                 RETURN(-EOVERFLOW);
1039
1040         rc = lu_dirent_calc_size(namelen, attr);
1041
1042         RETURN(rc);
1043 }
1044
1045 static __u64 osd_dir_it_store(const struct lu_env *env, const struct dt_it *di)
1046 {
1047         struct osd_zap_it *it = (struct osd_zap_it *)di;
1048         __u64              pos;
1049         ENTRY;
1050
1051         if (it->ozi_pos <= 2)
1052                 pos = it->ozi_pos;
1053         else
1054                 pos = osd_zap_cursor_serialize(it->ozi_zc);
1055
1056         RETURN(pos);
1057 }
1058
1059 /*
1060  * return status :
1061  *  rc == 0 -> end of directory.
1062  *  rc >  0 -> ok, proceed.
1063  *  rc <  0 -> error.  ( EOVERFLOW  can be masked.)
1064  */
1065 static int osd_dir_it_load(const struct lu_env *env,
1066                         const struct dt_it *di, __u64 hash)
1067 {
1068         struct osd_zap_it *it = (struct osd_zap_it *)di;
1069         struct osd_object *obj = it->ozi_obj;
1070         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1071         int                rc;
1072         ENTRY;
1073
1074         /* reset the cursor */
1075         zap_cursor_fini(it->ozi_zc);
1076         osd_obj_cursor_init_serialized(it->ozi_zc, obj, hash);
1077
1078         if (hash <= 2) {
1079                 it->ozi_pos = hash;
1080                 rc = +1;
1081         } else {
1082                 it->ozi_pos = 3;
1083                 /* to return whether the end has been reached */
1084                 rc = osd_index_retrieve_skip_dots(it, za);
1085                 if (rc == 0)
1086                         rc = +1;
1087                 else if (rc == -ENOENT)
1088                         rc = 0;
1089         }
1090
1091         RETURN(rc);
1092 }
1093
1094 struct dt_index_operations osd_dir_ops = {
1095         .dio_lookup         = osd_dir_lookup,
1096         .dio_declare_insert = osd_declare_dir_insert,
1097         .dio_insert         = osd_dir_insert,
1098         .dio_declare_delete = osd_declare_dir_delete,
1099         .dio_delete         = osd_dir_delete,
1100         .dio_it     = {
1101                 .init     = osd_dir_it_init,
1102                 .fini     = osd_index_it_fini,
1103                 .get      = osd_dir_it_get,
1104                 .put      = osd_dir_it_put,
1105                 .next     = osd_dir_it_next,
1106                 .key      = osd_dir_it_key,
1107                 .key_size = osd_dir_it_key_size,
1108                 .rec      = osd_dir_it_rec,
1109                 .rec_size = osd_dir_it_rec_size,
1110                 .store    = osd_dir_it_store,
1111                 .load     = osd_dir_it_load
1112         }
1113 };
1114
1115 /*
1116  * Primitives for index files using binary keys.
1117  */
1118
1119 /* key integer_size is 8 */
1120 static int osd_prepare_key_uint64(struct osd_object *o, __u64 *dst,
1121                                   const struct dt_key *src)
1122 {
1123         int size;
1124
1125         LASSERT(dst);
1126         LASSERT(src);
1127
1128         /* align keysize to 64bit */
1129         size = (o->oo_keysize + sizeof(__u64) - 1) / sizeof(__u64);
1130         size *= sizeof(__u64);
1131
1132         LASSERT(size <= MAXNAMELEN);
1133
1134         if (unlikely(size > o->oo_keysize))
1135                 memset(dst + o->oo_keysize, 0, size - o->oo_keysize);
1136         memcpy(dst, (const char *)src, o->oo_keysize);
1137
1138         return (size/sizeof(__u64));
1139 }
1140
1141 static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
1142                         struct dt_rec *rec, const struct dt_key *key,
1143                         struct lustre_capa *capa)
1144 {
1145         struct osd_object *obj = osd_dt_obj(dt);
1146         struct osd_device *osd = osd_obj2dev(obj);
1147         __u64             *k = osd_oti_get(env)->oti_key64;
1148         int                rc;
1149         ENTRY;
1150
1151         rc = osd_prepare_key_uint64(obj, k, key);
1152
1153         rc = -zap_lookup_uint64(osd->od_os, obj->oo_db->db_object,
1154                                 k, rc, obj->oo_recusize, obj->oo_recsize,
1155                                 (void *)rec);
1156         RETURN(rc == 0 ? 1 : rc);
1157 }
1158
1159 static int osd_declare_index_insert(const struct lu_env *env,
1160                                     struct dt_object *dt,
1161                                     const struct dt_rec *rec,
1162                                     const struct dt_key *key,
1163                                     struct thandle *th)
1164 {
1165         struct osd_object  *obj = osd_dt_obj(dt);
1166         struct osd_thandle *oh;
1167         ENTRY;
1168
1169         LASSERT(th != NULL);
1170         oh = container_of0(th, struct osd_thandle, ot_super);
1171
1172         LASSERT(obj->oo_db);
1173
1174         dmu_tx_hold_bonus(oh->ot_tx, obj->oo_db->db_object);
1175
1176         /* It is not clear what API should be used for binary keys, so we pass
1177          * a null name which has the side effect of over-reserving space,
1178          * accounting for the worst case. See zap_count_write() */
1179         dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, NULL);
1180
1181         RETURN(0);
1182 }
1183
1184 static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
1185                             const struct dt_rec *rec, const struct dt_key *key,
1186                             struct thandle *th, struct lustre_capa *capa,
1187                             int ignore_quota)
1188 {
1189         struct osd_object  *obj = osd_dt_obj(dt);
1190         struct osd_device  *osd = osd_obj2dev(obj);
1191         struct osd_thandle *oh;
1192         __u64              *k = osd_oti_get(env)->oti_key64;
1193         int                 rc;
1194         ENTRY;
1195
1196         LASSERT(obj->oo_db);
1197         LASSERT(dt_object_exists(dt));
1198         LASSERT(osd_invariant(obj));
1199         LASSERT(th != NULL);
1200
1201         oh = container_of0(th, struct osd_thandle, ot_super);
1202
1203         rc = osd_prepare_key_uint64(obj, k, key);
1204
1205         /* Insert (key,oid) into ZAP */
1206         rc = -zap_add_uint64(osd->od_os, obj->oo_db->db_object,
1207                              k, rc, obj->oo_recusize, obj->oo_recsize,
1208                              (void *)rec, oh->ot_tx);
1209         RETURN(rc);
1210 }
1211
1212 static int osd_declare_index_delete(const struct lu_env *env,
1213                                     struct dt_object *dt,
1214                                     const struct dt_key *key,
1215                                     struct thandle *th)
1216 {
1217         struct osd_object  *obj = osd_dt_obj(dt);
1218         struct osd_thandle *oh;
1219         ENTRY;
1220
1221         LASSERT(dt_object_exists(dt));
1222         LASSERT(osd_invariant(obj));
1223         LASSERT(th != NULL);
1224         LASSERT(obj->oo_db);
1225
1226         oh = container_of0(th, struct osd_thandle, ot_super);
1227         dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, NULL);
1228
1229         RETURN(0);
1230 }
1231
1232 static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
1233                             const struct dt_key *key, struct thandle *th,
1234                             struct lustre_capa *capa)
1235 {
1236         struct osd_object  *obj = osd_dt_obj(dt);
1237         struct osd_device  *osd = osd_obj2dev(obj);
1238         struct osd_thandle *oh;
1239         __u64              *k = osd_oti_get(env)->oti_key64;
1240         int                 rc;
1241         ENTRY;
1242
1243         LASSERT(obj->oo_db);
1244         LASSERT(th != NULL);
1245         oh = container_of0(th, struct osd_thandle, ot_super);
1246
1247         rc = osd_prepare_key_uint64(obj, k, key);
1248
1249         /* Remove binary key from the ZAP */
1250         rc = -zap_remove_uint64(osd->od_os, obj->oo_db->db_object,
1251                                 k, rc, oh->ot_tx);
1252         RETURN(rc);
1253 }
1254
1255 static int osd_index_it_get(const struct lu_env *env, struct dt_it *di,
1256                             const struct dt_key *key)
1257 {
1258         struct osd_zap_it *it = (struct osd_zap_it *)di;
1259         struct osd_object *obj = it->ozi_obj;
1260         struct osd_device *osd = osd_obj2dev(obj);
1261         ENTRY;
1262
1263         LASSERT(it);
1264         LASSERT(it->ozi_zc);
1265
1266         /*
1267          * XXX: we need a binary version of zap_cursor_move_to_key()
1268          *      to implement this API */
1269         if (*((const __u64 *)key) != 0)
1270                 CERROR("NOT IMPLEMETED YET (move to "LPX64")\n",
1271                        *((__u64 *)key));
1272
1273         zap_cursor_fini(it->ozi_zc);
1274         zap_cursor_init(it->ozi_zc, osd->od_os, obj->oo_db->db_object);
1275         it->ozi_reset = 1;
1276
1277         RETURN(+1);
1278 }
1279
1280 static int osd_index_it_next(const struct lu_env *env, struct dt_it *di)
1281 {
1282         struct osd_zap_it *it = (struct osd_zap_it *)di;
1283         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1284         int                rc;
1285         ENTRY;
1286
1287         if (it->ozi_reset == 0)
1288                 zap_cursor_advance(it->ozi_zc);
1289         it->ozi_reset = 0;
1290
1291         /*
1292          * According to current API we need to return error if it's last entry.
1293          * zap_cursor_advance() does not return any value. So we need to call
1294          * retrieve to check if there is any record.  We should make
1295          * changes to Iterator API to not return status for this API
1296          */
1297         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1298         if (rc == -ENOENT)
1299                 RETURN(+1);
1300
1301         RETURN((rc));
1302 }
1303
1304 static struct dt_key *osd_index_it_key(const struct lu_env *env,
1305                                        const struct dt_it *di)
1306 {
1307         struct osd_zap_it *it = (struct osd_zap_it *)di;
1308         struct osd_object *obj = it->ozi_obj;
1309         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1310         int                rc = 0;
1311         ENTRY;
1312
1313         it->ozi_reset = 0;
1314         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1315         if (rc)
1316                 RETURN(ERR_PTR(rc));
1317
1318         /* the binary key is stored in the name */
1319         memcpy(&it->ozi_key, za->za_name, obj->oo_keysize);
1320
1321         RETURN((struct dt_key *)&it->ozi_key);
1322 }
1323
1324 static int osd_index_it_key_size(const struct lu_env *env,
1325                                 const struct dt_it *di)
1326 {
1327         struct osd_zap_it *it = (struct osd_zap_it *)di;
1328         struct osd_object *obj = it->ozi_obj;
1329         RETURN(obj->oo_keysize);
1330 }
1331
1332 static int osd_index_it_rec(const struct lu_env *env, const struct dt_it *di,
1333                             struct dt_rec *rec, __u32 attr)
1334 {
1335         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1336         struct osd_zap_it *it = (struct osd_zap_it *)di;
1337         struct osd_object *obj = it->ozi_obj;
1338         struct osd_device *osd = osd_obj2dev(obj);
1339         __u64             *k = osd_oti_get(env)->oti_key64;
1340         int                rc;
1341         ENTRY;
1342
1343         it->ozi_reset = 0;
1344         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1345         if (rc)
1346                 RETURN(rc);
1347
1348         rc = osd_prepare_key_uint64(obj, k, (const struct dt_key *)za->za_name);
1349
1350         rc = -zap_lookup_uint64(osd->od_os, obj->oo_db->db_object,
1351                                 k, rc, obj->oo_recusize, obj->oo_recsize,
1352                                 (void *)rec);
1353         RETURN(rc);
1354 }
1355
1356 static __u64 osd_index_it_store(const struct lu_env *env,
1357                                 const struct dt_it *di)
1358 {
1359         struct osd_zap_it *it = (struct osd_zap_it *)di;
1360
1361         it->ozi_reset = 0;
1362         RETURN((__u64)zap_cursor_serialize(it->ozi_zc));
1363 }
1364
1365 static int osd_index_it_load(const struct lu_env *env, const struct dt_it *di,
1366                              __u64 hash)
1367 {
1368         struct osd_zap_it *it = (struct osd_zap_it *)di;
1369         struct osd_object *obj = it->ozi_obj;
1370         struct osd_device *osd = osd_obj2dev(obj);
1371         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1372         int                rc;
1373         ENTRY;
1374
1375         /* reset the cursor */
1376         zap_cursor_fini(it->ozi_zc);
1377         zap_cursor_init_serialized(it->ozi_zc, osd->od_os,
1378                                    obj->oo_db->db_object, hash);
1379         it->ozi_reset = 0;
1380
1381         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1382         if (rc == 0)
1383                 RETURN(+1);
1384         else if (rc == -ENOENT)
1385                 RETURN(0);
1386
1387         RETURN(rc);
1388 }
1389
1390 static struct dt_index_operations osd_index_ops = {
1391         .dio_lookup             = osd_index_lookup,
1392         .dio_declare_insert     = osd_declare_index_insert,
1393         .dio_insert             = osd_index_insert,
1394         .dio_declare_delete     = osd_declare_index_delete,
1395         .dio_delete             = osd_index_delete,
1396         .dio_it = {
1397                 .init           = osd_index_it_init,
1398                 .fini           = osd_index_it_fini,
1399                 .get            = osd_index_it_get,
1400                 .put            = osd_index_it_put,
1401                 .next           = osd_index_it_next,
1402                 .key            = osd_index_it_key,
1403                 .key_size       = osd_index_it_key_size,
1404                 .rec            = osd_index_it_rec,
1405                 .store          = osd_index_it_store,
1406                 .load           = osd_index_it_load
1407         }
1408 };
1409
1410 struct osd_metadnode_it {
1411         struct osd_device       *mit_dev;
1412         __u64                    mit_pos;
1413         struct lu_fid            mit_fid;
1414         int                      mit_prefetched;
1415         __u64                    mit_prefetched_dnode;
1416 };
1417
1418 static struct dt_it *osd_zfs_otable_it_init(const struct lu_env *env,
1419                                             struct dt_object *dt, __u32 attr,
1420                                             struct lustre_capa *capa)
1421 {
1422         struct osd_device       *dev   = osd_dev(dt->do_lu.lo_dev);
1423         struct osd_metadnode_it *it;
1424         ENTRY;
1425
1426         OBD_ALLOC_PTR(it);
1427         if (unlikely(it == NULL))
1428                 RETURN(ERR_PTR(-ENOMEM));
1429
1430         it->mit_dev = dev;
1431
1432         /* XXX: dmu_object_next() does NOT find dnodes allocated
1433          *      in the current non-committed txg, so we force txg
1434          *      commit to find all existing dnodes ... */
1435         txg_wait_synced(dmu_objset_pool(dev->od_os), 0ULL);
1436
1437         RETURN((struct dt_it *)it);
1438 }
1439
1440 static void osd_zfs_otable_it_fini(const struct lu_env *env, struct dt_it *di)
1441 {
1442         struct osd_metadnode_it *it  = (struct osd_metadnode_it *)di;
1443
1444         OBD_FREE_PTR(it);
1445 }
1446
1447 static int osd_zfs_otable_it_get(const struct lu_env *env,
1448                                  struct dt_it *di, const struct dt_key *key)
1449 {
1450         return 0;
1451 }
1452
1453 static void osd_zfs_otable_it_put(const struct lu_env *env, struct dt_it *di)
1454 {
1455 }
1456
1457 #define OTABLE_PREFETCH         256
1458
1459 static void osd_zfs_otable_prefetch(const struct lu_env *env,
1460                                     struct osd_metadnode_it *it)
1461 {
1462         struct osd_device       *dev = it->mit_dev;
1463         int                      rc;
1464
1465         /* can go negative on the very first access to the iterator
1466          * or if some non-Lustre objects were found */
1467         if (unlikely(it->mit_prefetched < 0))
1468                 it->mit_prefetched = 0;
1469
1470         if (it->mit_prefetched >= (OTABLE_PREFETCH >> 1))
1471                 return;
1472
1473         if (it->mit_prefetched_dnode == 0)
1474                 it->mit_prefetched_dnode = it->mit_pos;
1475
1476         while (it->mit_prefetched < OTABLE_PREFETCH) {
1477                 rc = -dmu_object_next(dev->od_os, &it->mit_prefetched_dnode,
1478                                       B_FALSE, 0);
1479                 if (unlikely(rc != 0))
1480                         break;
1481
1482                 /* dmu_prefetch() was exported in 0.6.2, if you use with
1483                  * an older release, just comment it out - this is an
1484                  * optimization */
1485                 dmu_prefetch(dev->od_os, it->mit_prefetched_dnode, 0, 0);
1486
1487                 it->mit_prefetched++;
1488         }
1489 }
1490
1491 static int osd_zfs_otable_it_next(const struct lu_env *env, struct dt_it *di)
1492 {
1493         struct osd_metadnode_it *it  = (struct osd_metadnode_it *)di;
1494         struct lustre_mdt_attrs *lma;
1495         struct osd_device       *dev = it->mit_dev;
1496         nvlist_t                *nvbuf = NULL;
1497         uchar_t                 *v;
1498         __u64                    dnode;
1499         int                      rc, s;
1500
1501         memset(&it->mit_fid, 0, sizeof(it->mit_fid));
1502
1503         dnode = it->mit_pos;
1504         do {
1505                 rc = -dmu_object_next(dev->od_os, &it->mit_pos, B_FALSE, 0);
1506                 if (unlikely(rc != 0))
1507                         GOTO(out, rc = 1);
1508                 it->mit_prefetched--;
1509
1510                 /* LMA is required for this to be a Lustre object.
1511                  * If there is no xattr skip it. */
1512                 rc = __osd_xattr_load(dev, it->mit_pos, &nvbuf);
1513                 if (unlikely(rc != 0))
1514                         continue;
1515
1516                 LASSERT(nvbuf != NULL);
1517                 rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA, &v, &s);
1518                 if (likely(rc == 0)) {
1519                         /* Lustre object */
1520                         lma = (struct lustre_mdt_attrs *)v;
1521                         lustre_lma_swab(lma);
1522                         it->mit_fid = lma->lma_self_fid;
1523                         nvlist_free(nvbuf);
1524                         break;
1525                 } else {
1526                         /* not a Lustre object, try next one */
1527                         nvlist_free(nvbuf);
1528                 }
1529
1530         } while (1);
1531
1532
1533         /* we aren't prefetching in the above loop because the number of
1534          * non-Lustre objects is very small and we will be repeating very
1535          * rare. in case we want to use this to iterate over non-Lustre
1536          * objects (i.e. when we convert regular ZFS in Lustre) it makes
1537          * sense to initiate prefetching in the loop */
1538
1539         /* 0 - there are more items, +1 - the end */
1540         if (likely(rc == 0))
1541                 osd_zfs_otable_prefetch(env, it);
1542
1543         CDEBUG(D_OTHER, "advance: %llu -> %llu "DFID": %d\n", dnode,
1544                it->mit_pos, PFID(&it->mit_fid), rc);
1545
1546 out:
1547         return rc;
1548 }
1549
1550 static struct dt_key *osd_zfs_otable_it_key(const struct lu_env *env,
1551                                             const struct dt_it *di)
1552 {
1553         return NULL;
1554 }
1555
1556 static int osd_zfs_otable_it_key_size(const struct lu_env *env,
1557                                       const struct dt_it *di)
1558 {
1559         return sizeof(__u64);
1560 }
1561
1562 static int osd_zfs_otable_it_rec(const struct lu_env *env,
1563                                  const struct dt_it *di,
1564                                  struct dt_rec *rec, __u32 attr)
1565 {
1566         struct osd_metadnode_it *it  = (struct osd_metadnode_it *)di;
1567         struct lu_fid *fid = (struct lu_fid *)rec;
1568         ENTRY;
1569
1570         *fid = it->mit_fid;
1571
1572         RETURN(0);
1573 }
1574
1575
1576 static __u64 osd_zfs_otable_it_store(const struct lu_env *env,
1577                                      const struct dt_it *di)
1578 {
1579         struct osd_metadnode_it *it  = (struct osd_metadnode_it *)di;
1580
1581         return it->mit_pos;
1582 }
1583
1584 static int osd_zfs_otable_it_load(const struct lu_env *env,
1585                                   const struct dt_it *di, __u64 hash)
1586 {
1587         struct osd_metadnode_it *it  = (struct osd_metadnode_it *)di;
1588
1589         it->mit_pos = hash;
1590         it->mit_prefetched = 0;
1591         it->mit_prefetched_dnode = 0;
1592
1593         return osd_zfs_otable_it_next(env, (struct dt_it *)di);
1594 }
1595
1596 static int osd_zfs_otable_it_key_rec(const struct lu_env *env,
1597                                      const struct dt_it *di, void *key_rec)
1598 {
1599         return 0;
1600 }
1601
1602 const struct dt_index_operations osd_zfs_otable_ops = {
1603         .dio_it = {
1604                 .init     = osd_zfs_otable_it_init,
1605                 .fini     = osd_zfs_otable_it_fini,
1606                 .get      = osd_zfs_otable_it_get,
1607                 .put      = osd_zfs_otable_it_put,
1608                 .next     = osd_zfs_otable_it_next,
1609                 .key      = osd_zfs_otable_it_key,
1610                 .key_size = osd_zfs_otable_it_key_size,
1611                 .rec      = osd_zfs_otable_it_rec,
1612                 .store    = osd_zfs_otable_it_store,
1613                 .load     = osd_zfs_otable_it_load,
1614                 .key_rec  = osd_zfs_otable_it_key_rec,
1615         }
1616 };
1617
1618 int osd_index_try(const struct lu_env *env, struct dt_object *dt,
1619                 const struct dt_index_features *feat)
1620 {
1621         struct osd_object *obj = osd_dt_obj(dt);
1622         ENTRY;
1623
1624         LASSERT(dt_object_exists(dt));
1625
1626         /*
1627          * XXX: implement support for fixed-size keys sorted with natural
1628          *      numerical way (not using internal hash value)
1629          */
1630         if (feat->dif_flags & DT_IND_RANGE)
1631                 RETURN(-ERANGE);
1632
1633         if (unlikely(feat == &dt_otable_features)) {
1634                 dt->do_index_ops = &osd_zfs_otable_ops;
1635                 RETURN(0);
1636         }
1637
1638         LASSERT(obj->oo_db != NULL);
1639         if (likely(feat == &dt_directory_features)) {
1640                 if (osd_object_is_zap(obj->oo_db))
1641                         dt->do_index_ops = &osd_dir_ops;
1642                 else
1643                         RETURN(-ENOTDIR);
1644         } else if (unlikely(feat == &dt_acct_features)) {
1645                 LASSERT(fid_is_acct(lu_object_fid(&dt->do_lu)));
1646                 dt->do_index_ops = &osd_acct_index_ops;
1647         } else if (osd_object_is_zap(obj->oo_db) &&
1648                    dt->do_index_ops == NULL) {
1649                 /* For index file, we don't support variable key & record sizes
1650                  * and the key has to be unique */
1651                 if ((feat->dif_flags & ~DT_IND_UPDATE) != 0)
1652                         RETURN(-EINVAL);
1653
1654                 if (feat->dif_keysize_max > ZAP_MAXNAMELEN)
1655                         RETURN(-E2BIG);
1656                 if (feat->dif_keysize_max != feat->dif_keysize_min)
1657                         RETURN(-EINVAL);
1658
1659                 /* As for the record size, it should be a multiple of 8 bytes
1660                  * and smaller than the maximum value length supported by ZAP.
1661                  */
1662                 if (feat->dif_recsize_max > ZAP_MAXVALUELEN)
1663                         RETURN(-E2BIG);
1664                 if (feat->dif_recsize_max != feat->dif_recsize_min)
1665                         RETURN(-EINVAL);
1666
1667                 obj->oo_keysize = feat->dif_keysize_max;
1668                 obj->oo_recsize = feat->dif_recsize_max;
1669                 obj->oo_recusize = 1;
1670
1671                 /* ZFS prefers to work with array of 64bits */
1672                 if ((obj->oo_recsize & 7) == 0) {
1673                         obj->oo_recsize >>= 3;
1674                         obj->oo_recusize = 8;
1675                 }
1676                 dt->do_index_ops = &osd_index_ops;
1677         }
1678
1679         RETURN(0);
1680 }