Whamcloud - gitweb
0cb3bca228c2b56e08a7c7ff33662d6dde738aa0
[fs/lustre-release.git] / lustre / osd-zfs / osd_index.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * Copyright (c) 2011, 2012 Whamcloud, Inc.
32  * Use is subject to license terms.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  *
38  * lustre/osd-zfs/osd_index.c
39  *
40  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
41  * Author: Mike Pershin <tappro@whamcloud.com>
42  */
43
44 #ifndef EXPORT_SYMTAB
45 # define EXPORT_SYMTAB
46 #endif
47 #define DEBUG_SUBSYSTEM S_OSD
48
49 #include <lustre_ver.h>
50 #include <libcfs/libcfs.h>
51 #include <lustre_fsfilt.h>
52 #include <obd_support.h>
53 #include <lustre_net.h>
54 #include <obd.h>
55 #include <obd_class.h>
56 #include <lustre_disk.h>
57 #include <lustre_fid.h>
58
59 #include "osd_internal.h"
60
61 #include <sys/dnode.h>
62 #include <sys/dbuf.h>
63 #include <sys/spa.h>
64 #include <sys/stat.h>
65 #include <sys/zap.h>
66 #include <sys/spa_impl.h>
67 #include <sys/zfs_znode.h>
68 #include <sys/dmu_tx.h>
69 #include <sys/dmu_objset.h>
70 #include <sys/dsl_prop.h>
71 #include <sys/sa_impl.h>
72 #include <sys/txg.h>
73
74 static struct dt_it *osd_zap_it_init(const struct lu_env *env,
75                                      struct dt_object *dt,
76                                      __u32 unused,
77                                      struct lustre_capa *capa)
78 {
79         struct osd_thread_info  *info = osd_oti_get(env);
80         struct osd_zap_it       *it;
81         struct osd_object       *obj = osd_dt_obj(dt);
82         struct osd_device       *osd = osd_obj2dev(obj);
83         struct lu_object        *lo  = &dt->do_lu;
84         ENTRY;
85
86         /* XXX: check capa ? */
87
88         LASSERT(lu_object_exists(lo));
89         LASSERT(obj->oo_db);
90         LASSERT(udmu_object_is_zap(obj->oo_db));
91         LASSERT(info);
92
93         it = &info->oti_it_zap;
94
95         if (udmu_zap_cursor_init(&it->ozi_zc, &osd->od_objset,
96                                  obj->oo_db->db_object, 0))
97                 RETURN(ERR_PTR(-ENOMEM));
98
99         it->ozi_obj   = obj;
100         it->ozi_capa  = capa;
101         it->ozi_reset = 1;
102         lu_object_get(lo);
103
104         RETURN((struct dt_it *)it);
105 }
106
107 static void osd_zap_it_fini(const struct lu_env *env, struct dt_it *di)
108 {
109         struct osd_zap_it *it = (struct osd_zap_it *)di;
110         struct osd_object *obj;
111         ENTRY;
112
113         LASSERT(it);
114         LASSERT(it->ozi_obj);
115
116         obj = it->ozi_obj;
117
118         udmu_zap_cursor_fini(it->ozi_zc);
119         lu_object_put(env, &obj->oo_dt.do_lu);
120
121         EXIT;
122 }
123
124 /**
125  *  Move Iterator to record specified by \a key
126  *
127  *  \param  di      osd iterator
128  *  \param  key     key for index
129  *
130  *  \retval +ve  di points to record with least key not larger than key
131  *  \retval  0   di points to exact matched key
132  *  \retval -ve  failure
133  */
134
135 static int osd_zap_it_get(const struct lu_env *env,
136                           struct dt_it *di, const struct dt_key *key)
137 {
138         struct osd_zap_it *it = (struct osd_zap_it *)di;
139         struct osd_object *obj = it->ozi_obj;
140         struct osd_device *osd = osd_obj2dev(obj);
141         ENTRY;
142
143         LASSERT(it);
144         LASSERT(it->ozi_zc);
145
146         /* XXX: API is broken at the moment */
147         LASSERT(((const char *)key)[0] == '\0');
148
149         udmu_zap_cursor_fini(it->ozi_zc);
150         if (udmu_zap_cursor_init(&it->ozi_zc, &osd->od_objset,
151                                  obj->oo_db->db_object, 0))
152                 RETURN(-ENOMEM);
153
154         it->ozi_reset = 1;
155
156         RETURN(+1);
157 }
158
159 static void osd_zap_it_put(const struct lu_env *env, struct dt_it *di)
160 {
161         /* PBS: do nothing : ref are incremented at retrive and decreamented
162          *      next/finish. */
163 }
164
165 int udmu_zap_cursor_retrieve_key(const struct lu_env *env,
166                                  zap_cursor_t *zc, char *key, int max)
167 {
168         zap_attribute_t *za = &osd_oti_get(env)->oti_za;
169         int             err;
170
171         if ((err = zap_cursor_retrieve(zc, za)))
172                 return err;
173
174         if (key) {
175                 if (strlen(za->za_name) > max)
176                         return EOVERFLOW;
177                 strcpy(key, za->za_name);
178         }
179
180         return 0;
181 }
182
183 /**
184  * to load a directory entry at a time and stored it in
185  * iterator's in-memory data structure.
186  *
187  * \param di, struct osd_it_ea, iterator's in memory structure
188  *
189  * \retval +ve, iterator reached to end
190  * \retval   0, iterator not reached to end
191  * \retval -ve, on error
192  */
193 static int osd_zap_it_next(const struct lu_env *env, struct dt_it *di)
194 {
195         struct osd_zap_it *it = (struct osd_zap_it *)di;
196         int                rc;
197         ENTRY;
198
199         if (it->ozi_reset == 0)
200                 zap_cursor_advance(it->ozi_zc);
201         it->ozi_reset = 0;
202
203         /*
204          * According to current API we need to return error if its last entry.
205          * zap_cursor_advance() does return any value. So we need to call
206          * retrieve to check if there is any record.  We should make
207          * changes to Iterator API to not return status for this API
208          */
209         rc = -udmu_zap_cursor_retrieve_key(env, it->ozi_zc, NULL, NAME_MAX);
210         if (rc == -ENOENT) /* end of dir*/
211                 RETURN(+1);
212
213         RETURN((rc));
214 }
215
216 static struct dt_key *osd_zap_it_key(const struct lu_env *env,
217                                         const struct dt_it *di)
218 {
219         struct osd_zap_it *it = (struct osd_zap_it *)di;
220         int                rc = 0;
221         ENTRY;
222
223         it->ozi_reset = 0;
224         rc = -udmu_zap_cursor_retrieve_key(env, it->ozi_zc, it->ozi_name,
225                                            NAME_MAX + 1);
226         if (!rc)
227                 RETURN((struct dt_key *)it->ozi_name);
228         else
229                 RETURN(ERR_PTR(rc));
230 }
231
232 static int osd_zap_it_key_size(const struct lu_env *env, const struct dt_it *di)
233 {
234         struct osd_zap_it *it = (struct osd_zap_it *)di;
235         int                rc;
236         ENTRY;
237
238         it->ozi_reset = 0;
239         rc = -udmu_zap_cursor_retrieve_key(env, it->ozi_zc, it->ozi_name,
240                                            NAME_MAX + 1);
241         if (!rc)
242                 RETURN(strlen(it->ozi_name));
243         else
244                 RETURN(rc);
245 }
246
247 /*
248  * zap_cursor_retrieve read from current record.
249  * to read bytes we need to call zap_lookup explicitly.
250  */
251 int udmu_zap_cursor_retrieve_value(const struct lu_env *env,
252                                    zap_cursor_t *zc,  char *buf,
253                                    int buf_size, int *bytes_read)
254 {
255         zap_attribute_t *za = &osd_oti_get(env)->oti_za;
256         int err, actual_size;
257
258
259         if ((err = zap_cursor_retrieve(zc, za)))
260                 return err;
261
262         if (za->za_integer_length <= 0)
263                 return (ERANGE);
264
265         actual_size = za->za_integer_length * za->za_num_integers;
266
267         if (actual_size > buf_size) {
268                 actual_size = buf_size;
269                 buf_size = actual_size / za->za_integer_length;
270         } else {
271                 buf_size = za->za_num_integers;
272         }
273
274         err = -zap_lookup(zc->zc_objset, zc->zc_zapobj,
275                           za->za_name, za->za_integer_length,
276                           buf_size, buf);
277
278         if (!err)
279                 *bytes_read = actual_size;
280
281         return err;
282 }
283
284 static inline void osd_it_append_attrs(struct lu_dirent *ent, __u32 attr,
285                                        int len, __u16 type)
286 {
287         const unsigned    align = sizeof(struct luda_type) - 1;
288         struct luda_type *lt;
289
290         /* check if file type is required */
291         if (attr & LUDA_TYPE) {
292                 len = (len + align) & ~align;
293
294                 lt = (void *)ent->lde_name + len;
295                 lt->lt_type = cpu_to_le16(CFS_DTTOIF(type));
296                 ent->lde_attrs |= LUDA_TYPE;
297         }
298
299         ent->lde_attrs = cpu_to_le32(ent->lde_attrs);
300 }
301
302 static int osd_zap_it_rec(const struct lu_env *env, const struct dt_it *di,
303                           struct dt_rec *dtrec, __u32 attr)
304 {
305         struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
306         zap_attribute_t     *za = &osd_oti_get(env)->oti_za;
307         struct osd_zap_it   *it = (struct osd_zap_it *)di;
308         struct lu_dirent    *lde = (struct lu_dirent *)dtrec;
309         int                  rc, namelen;
310         ENTRY;
311
312         it->ozi_reset = 0;
313         LASSERT(lde);
314
315         lde->lde_hash = cpu_to_le64(udmu_zap_cursor_serialize(it->ozi_zc));
316
317         if ((rc = -zap_cursor_retrieve(it->ozi_zc, za)))
318                 GOTO(out, rc);
319
320         namelen = strlen(za->za_name);
321         if (namelen > NAME_MAX)
322                 GOTO(out, rc = -EOVERFLOW);
323         strcpy(lde->lde_name, za->za_name);
324         lde->lde_namelen = cpu_to_le16(namelen);
325
326         if (za->za_integer_length != 8 || za->za_num_integers < 3) {
327                 CERROR("%s: unsupported direntry format: %d %d\n",
328                        osd_obj2dev(it->ozi_obj)->od_svname,
329                        za->za_integer_length, (int)za->za_num_integers);
330
331                 GOTO(out, rc = -EIO);
332         }
333
334         rc = -zap_lookup(it->ozi_zc->zc_objset, it->ozi_zc->zc_zapobj,
335                          za->za_name, za->za_integer_length, 3, zde);
336         if (rc)
337                 GOTO(out, rc);
338
339         lde->lde_fid = zde->lzd_fid;
340         lde->lde_attrs = LUDA_FID;
341
342         /* append lustre attributes */
343         osd_it_append_attrs(lde, attr, namelen, zde->lzd_reg.zde_type);
344
345         lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(namelen, attr));
346
347 out:
348         RETURN(rc);
349 }
350
351 static __u64 osd_zap_it_store(const struct lu_env *env, const struct dt_it *di)
352 {
353         struct osd_zap_it *it = (struct osd_zap_it *)di;
354
355         it->ozi_reset = 0;
356         RETURN(udmu_zap_cursor_serialize(it->ozi_zc));
357 }
358
359 /*
360  * return status :
361  *  rc == 0 -> ok, proceed.
362  *  rc >  0 -> end of directory.
363  *  rc <  0 -> error.  ( EOVERFLOW  can be masked.)
364  */
365 static int osd_zap_it_load(const struct lu_env *env,
366                         const struct dt_it *di, __u64 hash)
367 {
368         struct osd_zap_it *it = (struct osd_zap_it *)di;
369         struct osd_object *obj = it->ozi_obj;
370         struct osd_device *osd = osd_obj2dev(obj);
371         int                rc;
372         ENTRY;
373
374         udmu_zap_cursor_fini(it->ozi_zc);
375         if (udmu_zap_cursor_init(&it->ozi_zc, &osd->od_objset,
376                                  obj->oo_db->db_object, hash))
377                 RETURN(-ENOMEM);
378         it->ozi_reset = 0;
379
380         /* same as osd_zap_it_next()*/
381         rc = -udmu_zap_cursor_retrieve_key(env, it->ozi_zc, NULL,
382                                            NAME_MAX + 1);
383         if (rc == 0)
384                 RETURN(+1);
385         else if (rc == -ENOENT) /* end of dir*/
386                 RETURN(0);
387
388         RETURN(rc);
389 }
390
391 static int osd_dir_lookup(const struct lu_env *env, struct dt_object *dt,
392                           struct dt_rec *rec, const struct dt_key *key,
393                           struct lustre_capa *capa)
394 {
395         struct osd_thread_info *oti = osd_oti_get(env);
396         struct osd_object  *obj = osd_dt_obj(dt);
397         struct osd_device  *osd = osd_obj2dev(obj);
398         int                 rc;
399         ENTRY;
400
401         LASSERT(udmu_object_is_zap(obj->oo_db));
402
403         rc = -zap_lookup(osd->od_objset.os, obj->oo_db->db_object,
404                          (char *)key, 8, sizeof(oti->oti_zde) / 8,
405                          (void *)&oti->oti_zde);
406         memcpy(rec, &oti->oti_zde.lzd_fid, sizeof(struct lu_fid));
407
408         RETURN(rc == 0 ? 1 : rc);
409 }
410
411 static int osd_declare_dir_insert(const struct lu_env *env,
412                                   struct dt_object *dt,
413                                   const struct dt_rec *rec,
414                                   const struct dt_key *key,
415                                   struct thandle *th)
416 {
417         struct osd_object  *obj = osd_dt_obj(dt);
418         struct osd_thandle *oh;
419         ENTRY;
420
421         LASSERT(th != NULL);
422         oh = container_of0(th, struct osd_thandle, ot_super);
423
424         LASSERT(obj->oo_db);
425         LASSERT(udmu_object_is_zap(obj->oo_db));
426
427         dmu_tx_hold_bonus(oh->ot_tx, obj->oo_db->db_object);
428         dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, (char *)key);
429
430         RETURN(0);
431 }
432
433 /**
434  * Find the osd object for given fid.
435  *
436  * \param fid need to find the osd object having this fid
437  *
438  * \retval osd_object on success
439  * \retval        -ve on error
440  */
441 struct osd_object *osd_object_find(const struct lu_env *env,
442                                    struct dt_object *dt,
443                                    const struct lu_fid *fid)
444 {
445         struct lu_device         *ludev = dt->do_lu.lo_dev;
446         struct osd_object        *child = NULL;
447         struct lu_object         *luch;
448         struct lu_object         *lo;
449
450         /*
451          * at this point topdev might not exist yet
452          * (i.e. MGS is preparing profiles). so we can
453          * not rely on topdev and instead lookup with
454          * our device passed as topdev. this can't work
455          * if the object isn't cached yet (as osd doesn't
456          * allocate lu_header). IOW, the object must be
457          * in the cache, otherwise lu_object_alloc() crashes
458          * -bzzz
459          */
460         luch = lu_object_find_at(env, ludev, fid, NULL);
461         if (IS_ERR(luch))
462                 return (void *)luch;
463
464         if (lu_object_exists(luch)) {
465                 lo = lu_object_locate(luch->lo_header, ludev->ld_type);
466                 if (lo != NULL)
467                         child = osd_obj(lo);
468                 else
469                         LU_OBJECT_DEBUG(D_ERROR, env, luch,
470                                         "%s: object can't be located "DFID"\n",
471                                         ludev->ld_obd->obd_name, PFID(fid));
472
473                 if (child == NULL) {
474                         lu_object_put(env, luch);
475                         CERROR("%s: Unable to get osd_object "DFID"\n",
476                                ludev->ld_obd->obd_name, PFID(fid));
477                         child = ERR_PTR(-ENOENT);
478                 }
479         } else {
480                 LU_OBJECT_DEBUG(D_ERROR, env, luch,
481                                 "%s: lu_object does not exists "DFID"\n",
482                                 ludev->ld_obd->obd_name, PFID(fid));
483                 lu_object_put(env, luch);
484                 child = ERR_PTR(-ENOENT);
485         }
486
487         return child;
488 }
489
490 /**
491  * Put the osd object once done with it.
492  *
493  * \param obj osd object that needs to be put
494  */
495 static inline void osd_object_put(const struct lu_env *env,
496                                   struct osd_object *obj)
497 {
498         lu_object_put(env, &obj->oo_dt.do_lu);
499 }
500
501 /**
502  *      Inserts (key, value) pair in \a directory object.
503  *
504  *      \param  dt      osd index object
505  *      \param  key     key for index
506  *      \param  rec     record reference
507  *      \param  th      transaction handler
508  *      \param  capa    capability descriptor
509  *      \param  ignore_quota update should not affect quota
510  *
511  *      \retval  0  success
512  *      \retval -ve failure
513  */
514 static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt,
515                           const struct dt_rec *rec, const struct dt_key *key,
516                           struct thandle *th, struct lustre_capa *capa,
517                           int ignore_quota)
518 {
519         struct osd_thread_info *oti = osd_oti_get(env);
520         struct osd_object   *parent = osd_dt_obj(dt);
521         struct osd_device   *osd = osd_obj2dev(parent);
522         struct lu_fid       *fid = (struct lu_fid *)rec;
523         struct osd_thandle  *oh;
524         struct osd_object   *child;
525         __u32                attr;
526         int                  rc;
527         ENTRY;
528
529         LASSERT(parent->oo_db);
530         LASSERT(udmu_object_is_zap(parent->oo_db));
531
532         LASSERT(dt_object_exists(dt));
533         LASSERT(osd_invariant(parent));
534
535         /*
536          * zfs_readdir() generates ./.. on fly, but
537          * we want own entries (.. at least) with a fid
538          */
539 #if LUSTRE_VERSION_CODE >= OBD_OCD_VERSION(2, 3, 53, 0)
540 #warning "fix '.' and '..' handling"
541 #endif
542
543         LASSERT(th != NULL);
544         oh = container_of0(th, struct osd_thandle, ot_super);
545
546         child = osd_object_find(env, dt, fid);
547         if (IS_ERR(child))
548                 RETURN(PTR_ERR(child));
549
550         LASSERT(child->oo_db);
551
552         CLASSERT(sizeof(oti->oti_zde.lzd_reg) == 8);
553         CLASSERT(sizeof(oti->oti_zde) % 8 == 0);
554         attr = child->oo_dt.do_lu.lo_header ->loh_attr;
555         oti->oti_zde.lzd_reg.zde_type = IFTODT(attr & S_IFMT);
556         oti->oti_zde.lzd_reg.zde_dnode = child->oo_db->db_object;
557         oti->oti_zde.lzd_fid = *fid;
558
559         /* Insert (key,oid) into ZAP */
560         rc = -zap_add(osd->od_objset.os, parent->oo_db->db_object,
561                       (char *)key, 8, sizeof(oti->oti_zde) / 8,
562                       (void *)&oti->oti_zde, oh->ot_tx);
563
564         osd_object_put(env, child);
565
566         RETURN(rc);
567 }
568
569 static int osd_declare_dir_delete(const struct lu_env *env,
570                                   struct dt_object *dt,
571                                   const struct dt_key *key,
572                                   struct thandle *th)
573 {
574         struct osd_object *obj = osd_dt_obj(dt);
575         struct osd_thandle *oh;
576         ENTRY;
577
578         LASSERT(dt_object_exists(dt));
579         LASSERT(osd_invariant(obj));
580
581         LASSERT(th != NULL);
582         oh = container_of0(th, struct osd_thandle, ot_super);
583
584         LASSERT(obj->oo_db);
585         LASSERT(udmu_object_is_zap(obj->oo_db));
586
587         dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, (char *)key);
588
589         RETURN(0);
590 }
591
592 static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt,
593                           const struct dt_key *key, struct thandle *th,
594                           struct lustre_capa *capa)
595 {
596         struct osd_object *obj = osd_dt_obj(dt);
597         struct osd_device *osd = osd_obj2dev(obj);
598         struct osd_thandle *oh;
599         dmu_buf_t *zap_db = obj->oo_db;
600         int rc;
601         ENTRY;
602
603         LASSERT(obj->oo_db);
604         LASSERT(udmu_object_is_zap(obj->oo_db));
605
606         LASSERT(th != NULL);
607         oh = container_of0(th, struct osd_thandle, ot_super);
608
609         /* Remove key from the ZAP */
610         rc = -zap_remove(osd->od_objset.os, zap_db->db_object,
611                          (char *) key, oh->ot_tx);
612
613         if (rc && rc != -ENOENT)
614                 CERROR("%s: zap_remove failed: rc = %d\n", osd->od_svname, rc);
615
616         RETURN(rc);
617 }
618
619 static struct dt_index_operations osd_dir_ops = {
620         .dio_lookup         = osd_dir_lookup,
621         .dio_declare_insert = osd_declare_dir_insert,
622         .dio_insert         = osd_dir_insert,
623         .dio_declare_delete = osd_declare_dir_delete,
624         .dio_delete         = osd_dir_delete,
625         .dio_it     = {
626                 .init     = osd_zap_it_init,
627                 .fini     = osd_zap_it_fini,
628                 .get      = osd_zap_it_get,
629                 .put      = osd_zap_it_put,
630                 .next     = osd_zap_it_next,
631                 .key      = osd_zap_it_key,
632                 .key_size = osd_zap_it_key_size,
633                 .rec      = osd_zap_it_rec,
634                 .store    = osd_zap_it_store,
635                 .load     = osd_zap_it_load
636         }
637 };
638
639 /*
640  * Primitives for index files using binary keys.
641  * XXX: only 64-bit keys are supported for now.
642  */
643
644 static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
645                         struct dt_rec *rec, const struct dt_key *key,
646                         struct lustre_capa *capa)
647 {
648         struct osd_object *obj = osd_dt_obj(dt);
649         struct osd_device *osd = osd_obj2dev(obj);
650         int                rc;
651         ENTRY;
652
653         rc = -zap_lookup_uint64(osd->od_objset.os, obj->oo_db->db_object,
654                                 (const __u64 *)key, 1, 8, obj->oo_recsize,
655                                 (void *)rec);
656         RETURN(rc == 0 ? 1 : rc);
657 }
658
659 static int osd_declare_index_insert(const struct lu_env *env,
660                                     struct dt_object *dt,
661                                     const struct dt_rec *rec,
662                                     const struct dt_key *key,
663                                     struct thandle *th)
664 {
665         struct osd_object  *obj = osd_dt_obj(dt);
666         struct osd_thandle *oh;
667         ENTRY;
668
669         LASSERT(th != NULL);
670         oh = container_of0(th, struct osd_thandle, ot_super);
671
672         LASSERT(obj->oo_db);
673
674         dmu_tx_hold_bonus(oh->ot_tx, obj->oo_db->db_object);
675
676         /* It is not clear what API should be used for binary keys, so we pass
677          * a null name which has the side effect of over-reserving space,
678          * accounting for the worst case. See zap_count_write() */
679         dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, NULL);
680
681         RETURN(0);
682 }
683
684 static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
685                             const struct dt_rec *rec, const struct dt_key *key,
686                             struct thandle *th, struct lustre_capa *capa,
687                             int ignore_quota)
688 {
689         struct osd_object  *obj = osd_dt_obj(dt);
690         struct osd_device  *osd = osd_obj2dev(obj);
691         struct osd_thandle *oh;
692         int                 rc;
693         ENTRY;
694
695         LASSERT(obj->oo_db);
696         LASSERT(dt_object_exists(dt));
697         LASSERT(osd_invariant(obj));
698         LASSERT(th != NULL);
699
700         oh = container_of0(th, struct osd_thandle, ot_super);
701
702         /* Insert (key,oid) into ZAP */
703         rc = -zap_add_uint64(osd->od_objset.os, obj->oo_db->db_object,
704                              (const __u64 *)key, 1, 8, obj->oo_recsize,
705                              (void *)rec, oh->ot_tx);
706         RETURN(rc);
707 }
708
709 static int osd_declare_index_delete(const struct lu_env *env,
710                                     struct dt_object *dt,
711                                     const struct dt_key *key,
712                                     struct thandle *th)
713 {
714         struct osd_object  *obj = osd_dt_obj(dt);
715         struct osd_thandle *oh;
716         ENTRY;
717
718         LASSERT(dt_object_exists(dt));
719         LASSERT(osd_invariant(obj));
720         LASSERT(th != NULL);
721         LASSERT(obj->oo_db);
722
723         oh = container_of0(th, struct osd_thandle, ot_super);
724         dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, NULL);
725
726         RETURN(0);
727 }
728
729 static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
730                             const struct dt_key *key, struct thandle *th,
731                             struct lustre_capa *capa)
732 {
733         struct osd_object  *obj = osd_dt_obj(dt);
734         struct osd_device  *osd = osd_obj2dev(obj);
735         struct osd_thandle *oh;
736         int                 rc;
737         ENTRY;
738
739         LASSERT(obj->oo_db);
740         LASSERT(th != NULL);
741         oh = container_of0(th, struct osd_thandle, ot_super);
742
743         /* Remove binary key from the ZAP */
744         rc = -zap_remove_uint64(osd->od_objset.os, obj->oo_db->db_object,
745                                 (const __u64 *)key, 1, oh->ot_tx);
746         RETURN(rc);
747 }
748
749 static int osd_index_it_get(const struct lu_env *env, struct dt_it *di,
750                             const struct dt_key *key)
751 {
752         struct osd_zap_it *it = (struct osd_zap_it *)di;
753         struct osd_object *obj = it->ozi_obj;
754         struct osd_device *osd = osd_obj2dev(obj);
755         ENTRY;
756
757         LASSERT(it);
758         LASSERT(it->ozi_zc);
759
760         /* XXX: API is broken at the moment */
761         LASSERT(*((const __u64 *)key) == 0);
762
763         zap_cursor_fini(it->ozi_zc);
764         memset(it->ozi_zc, 0, sizeof(*it->ozi_zc));
765         zap_cursor_init(it->ozi_zc, osd->od_objset.os, obj->oo_db->db_object);
766         it->ozi_reset = 1;
767
768         RETURN(+1);
769 }
770
771 static int osd_index_it_next(const struct lu_env *env, struct dt_it *di)
772 {
773         struct osd_zap_it *it = (struct osd_zap_it *)di;
774         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
775         int                rc;
776         ENTRY;
777
778         if (it->ozi_reset == 0)
779                 zap_cursor_advance(it->ozi_zc);
780         it->ozi_reset = 0;
781
782         /*
783          * According to current API we need to return error if it's last entry.
784          * zap_cursor_advance() does not return any value. So we need to call
785          * retrieve to check if there is any record.  We should make
786          * changes to Iterator API to not return status for this API
787          */
788         rc = -zap_cursor_retrieve(it->ozi_zc, za);
789         if (rc == -ENOENT)
790                 RETURN(+1);
791
792         RETURN((rc));
793 }
794
795 static struct dt_key *osd_index_it_key(const struct lu_env *env,
796                                        const struct dt_it *di)
797 {
798         struct osd_zap_it *it = (struct osd_zap_it *)di;
799         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
800         int                rc = 0;
801         ENTRY;
802
803         it->ozi_reset = 0;
804         rc = -zap_cursor_retrieve(it->ozi_zc, za);
805         if (rc)
806                 RETURN(ERR_PTR(rc));
807
808         /* the binary key is stored in the name */
809         it->ozi_key = *((__u64 *)za->za_name);
810
811         RETURN((struct dt_key *)&it->ozi_key);
812 }
813
814 static int osd_index_it_key_size(const struct lu_env *env,
815                                 const struct dt_it *di)
816 {
817         /* we only support 64-bit binary keys for the time being */
818         RETURN(sizeof(__u64));
819 }
820
821 static int osd_index_it_rec(const struct lu_env *env, const struct dt_it *di,
822                             struct dt_rec *rec, __u32 attr)
823 {
824         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
825         struct osd_zap_it *it = (struct osd_zap_it *)di;
826         struct osd_object *obj = it->ozi_obj;
827         struct osd_device *osd = osd_obj2dev(obj);
828         int                rc;
829         ENTRY;
830
831         it->ozi_reset = 0;
832         rc = -zap_cursor_retrieve(it->ozi_zc, za);
833         if (rc)
834                 RETURN(rc);
835
836         rc = -zap_lookup_uint64(osd->od_objset.os, obj->oo_db->db_object,
837                                 (const __u64 *)za->za_name, 1, 8,
838                                 obj->oo_recsize, (void *)rec);
839         RETURN(rc);
840 }
841
842 static __u64 osd_index_it_store(const struct lu_env *env,
843                                 const struct dt_it *di)
844 {
845         struct osd_zap_it *it = (struct osd_zap_it *)di;
846
847         it->ozi_reset = 0;
848         RETURN((__u64)zap_cursor_serialize(it->ozi_zc));
849 }
850
851 static int osd_index_it_load(const struct lu_env *env, const struct dt_it *di,
852                              __u64 hash)
853 {
854         struct osd_zap_it *it = (struct osd_zap_it *)di;
855         struct osd_object *obj = it->ozi_obj;
856         struct osd_device *osd = osd_obj2dev(obj);
857         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
858         int                rc;
859         ENTRY;
860
861         /* close the current cursor */
862         zap_cursor_fini(it->ozi_zc);
863
864         /* create a new one starting at hash */
865         memset(it->ozi_zc, 0, sizeof(*it->ozi_zc));
866         zap_cursor_init_serialized(it->ozi_zc, osd->od_objset.os,
867                                    obj->oo_db->db_object, hash);
868         it->ozi_reset = 0;
869
870         rc = -zap_cursor_retrieve(it->ozi_zc, za);
871         if (rc == 0)
872                 RETURN(+1);
873         else if (rc == -ENOENT)
874                 RETURN(0);
875
876         RETURN(rc);
877 }
878
879 static struct dt_index_operations osd_index_ops = {
880         .dio_lookup             = osd_index_lookup,
881         .dio_declare_insert     = osd_declare_index_insert,
882         .dio_insert             = osd_index_insert,
883         .dio_declare_delete     = osd_declare_index_delete,
884         .dio_delete             = osd_index_delete,
885         .dio_it = {
886                 .init           = osd_zap_it_init,
887                 .fini           = osd_zap_it_fini,
888                 .get            = osd_index_it_get,
889                 .put            = osd_zap_it_put,
890                 .next           = osd_index_it_next,
891                 .key            = osd_index_it_key,
892                 .key_size       = osd_index_it_key_size,
893                 .rec            = osd_index_it_rec,
894                 .store          = osd_index_it_store,
895                 .load           = osd_index_it_load
896         }
897 };
898
899 int osd_index_try(const struct lu_env *env, struct dt_object *dt,
900                 const struct dt_index_features *feat)
901 {
902         struct osd_object *obj = osd_dt_obj(dt);
903         ENTRY;
904
905         LASSERT(obj->oo_db != NULL);
906
907         /*
908          * XXX: implement support for fixed-size keys sorted with natural
909          *      numerical way (not using internal hash value)
910          */
911         if (feat->dif_flags & DT_IND_RANGE)
912                 RETURN(-ERANGE);
913
914         if (likely(feat == &dt_directory_features)) {
915                 if (udmu_object_is_zap(obj->oo_db))
916                         dt->do_index_ops = &osd_dir_ops;
917                 else
918                         RETURN(-ENOTDIR);
919         } else if (unlikely(feat == &dt_acct_features)) {
920                 LASSERT(fid_is_acct(lu_object_fid(&dt->do_lu)));
921                 dt->do_index_ops = &osd_acct_index_ops;
922         } else if (udmu_object_is_zap(obj->oo_db) &&
923                    dt->do_index_ops == NULL) {
924                 /* For index file, we don't support variable key & record sizes
925                  * and the key has to be unique */
926                 if ((feat->dif_flags & ~DT_IND_UPDATE) != 0)
927                         RETURN(-EINVAL);
928
929                 /* Although the zap_*_uint64() primitives support large keys, we
930                  * limit ourselves to 64-bit keys for now */
931                 if (feat->dif_keysize_max != sizeof(__u64) ||
932                     feat->dif_keysize_min != sizeof(__u64))
933                         RETURN(-EINVAL);
934
935                 /* As for the record size, it should be a multiple of 8 bytes
936                  * and smaller than the maximum value length supported by ZAP.
937                  */
938                 if (feat->dif_recsize_max > ZAP_MAXVALUELEN)
939                         RETURN(-E2BIG);
940                 if (feat->dif_recsize_max != feat->dif_recsize_min ||
941                     (feat->dif_recsize_max & (sizeof(__u64) - 1)))
942                         RETURN(-EINVAL);
943
944                 obj->oo_recsize = feat->dif_recsize_max / sizeof(__u64);
945                 dt->do_index_ops = &osd_index_ops;
946         }
947
948         RETURN(0);
949 }
950