4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
31 * Copyright (c) 2012, Intel Corporation.
32 * Use is subject to license terms.
35 * This file is part of Lustre, http://www.lustre.org/
36 * Lustre is a trademark of Sun Microsystems, Inc.
38 * lustre/osd-zfs/osd_index.c
40 * Author: Alex Zhuravlev <bzzz@whamcloud.com>
41 * Author: Mike Pershin <tappro@whamcloud.com>
45 # define EXPORT_SYMTAB
47 #define DEBUG_SUBSYSTEM S_OSD
49 #include <lustre_ver.h>
50 #include <libcfs/libcfs.h>
51 #include <lustre_fsfilt.h>
52 #include <obd_support.h>
53 #include <lustre_net.h>
55 #include <obd_class.h>
56 #include <lustre_disk.h>
57 #include <lustre_fid.h>
59 #include "osd_internal.h"
61 #include <sys/dnode.h>
66 #include <sys/spa_impl.h>
67 #include <sys/zfs_znode.h>
68 #include <sys/dmu_tx.h>
69 #include <sys/dmu_objset.h>
70 #include <sys/dsl_prop.h>
71 #include <sys/sa_impl.h>
74 static struct dt_it *osd_zap_it_init(const struct lu_env *env,
77 struct lustre_capa *capa)
79 struct osd_thread_info *info = osd_oti_get(env);
80 struct osd_zap_it *it;
81 struct osd_object *obj = osd_dt_obj(dt);
82 struct osd_device *osd = osd_obj2dev(obj);
83 struct lu_object *lo = &dt->do_lu;
86 /* XXX: check capa ? */
88 LASSERT(lu_object_exists(lo));
90 LASSERT(udmu_object_is_zap(obj->oo_db));
93 it = &info->oti_it_zap;
95 if (udmu_zap_cursor_init(&it->ozi_zc, &osd->od_objset,
96 obj->oo_db->db_object, 0))
97 RETURN(ERR_PTR(-ENOMEM));
104 RETURN((struct dt_it *)it);
107 static void osd_zap_it_fini(const struct lu_env *env, struct dt_it *di)
109 struct osd_zap_it *it = (struct osd_zap_it *)di;
110 struct osd_object *obj;
114 LASSERT(it->ozi_obj);
118 udmu_zap_cursor_fini(it->ozi_zc);
119 lu_object_put(env, &obj->oo_dt.do_lu);
125 * Move Iterator to record specified by \a key
127 * \param di osd iterator
128 * \param key key for index
130 * \retval +ve di points to record with least key not larger than key
131 * \retval 0 di points to exact matched key
132 * \retval -ve failure
135 static int osd_zap_it_get(const struct lu_env *env,
136 struct dt_it *di, const struct dt_key *key)
138 struct osd_zap_it *it = (struct osd_zap_it *)di;
139 struct osd_object *obj = it->ozi_obj;
140 struct osd_device *osd = osd_obj2dev(obj);
146 /* XXX: API is broken at the moment */
147 LASSERT(((const char *)key)[0] == '\0');
149 udmu_zap_cursor_fini(it->ozi_zc);
150 if (udmu_zap_cursor_init(&it->ozi_zc, &osd->od_objset,
151 obj->oo_db->db_object, 0))
159 static void osd_zap_it_put(const struct lu_env *env, struct dt_it *di)
161 /* PBS: do nothing : ref are incremented at retrive and decreamented
165 int udmu_zap_cursor_retrieve_key(const struct lu_env *env,
166 zap_cursor_t *zc, char *key, int max)
168 zap_attribute_t *za = &osd_oti_get(env)->oti_za;
171 if ((err = zap_cursor_retrieve(zc, za)))
175 if (strlen(za->za_name) > max)
177 strcpy(key, za->za_name);
184 * to load a directory entry at a time and stored it in
185 * iterator's in-memory data structure.
187 * \param di, struct osd_it_ea, iterator's in memory structure
189 * \retval +ve, iterator reached to end
190 * \retval 0, iterator not reached to end
191 * \retval -ve, on error
193 static int osd_zap_it_next(const struct lu_env *env, struct dt_it *di)
195 struct osd_zap_it *it = (struct osd_zap_it *)di;
199 if (it->ozi_reset == 0)
200 zap_cursor_advance(it->ozi_zc);
204 * According to current API we need to return error if its last entry.
205 * zap_cursor_advance() does return any value. So we need to call
206 * retrieve to check if there is any record. We should make
207 * changes to Iterator API to not return status for this API
209 rc = -udmu_zap_cursor_retrieve_key(env, it->ozi_zc, NULL, NAME_MAX);
210 if (rc == -ENOENT) /* end of dir*/
216 static struct dt_key *osd_zap_it_key(const struct lu_env *env,
217 const struct dt_it *di)
219 struct osd_zap_it *it = (struct osd_zap_it *)di;
224 rc = -udmu_zap_cursor_retrieve_key(env, it->ozi_zc, it->ozi_name,
227 RETURN((struct dt_key *)it->ozi_name);
232 static int osd_zap_it_key_size(const struct lu_env *env, const struct dt_it *di)
234 struct osd_zap_it *it = (struct osd_zap_it *)di;
239 rc = -udmu_zap_cursor_retrieve_key(env, it->ozi_zc, it->ozi_name,
242 RETURN(strlen(it->ozi_name));
248 * zap_cursor_retrieve read from current record.
249 * to read bytes we need to call zap_lookup explicitly.
251 int udmu_zap_cursor_retrieve_value(const struct lu_env *env,
252 zap_cursor_t *zc, char *buf,
253 int buf_size, int *bytes_read)
255 zap_attribute_t *za = &osd_oti_get(env)->oti_za;
256 int err, actual_size;
259 if ((err = zap_cursor_retrieve(zc, za)))
262 if (za->za_integer_length <= 0)
265 actual_size = za->za_integer_length * za->za_num_integers;
267 if (actual_size > buf_size) {
268 actual_size = buf_size;
269 buf_size = actual_size / za->za_integer_length;
271 buf_size = za->za_num_integers;
274 err = -zap_lookup(zc->zc_objset, zc->zc_zapobj,
275 za->za_name, za->za_integer_length,
279 *bytes_read = actual_size;
284 static inline void osd_it_append_attrs(struct lu_dirent *ent, __u32 attr,
287 const unsigned align = sizeof(struct luda_type) - 1;
288 struct luda_type *lt;
290 /* check if file type is required */
291 if (attr & LUDA_TYPE) {
292 len = (len + align) & ~align;
294 lt = (void *)ent->lde_name + len;
295 lt->lt_type = cpu_to_le16(CFS_DTTOIF(type));
296 ent->lde_attrs |= LUDA_TYPE;
299 ent->lde_attrs = cpu_to_le32(ent->lde_attrs);
302 static int osd_zap_it_rec(const struct lu_env *env, const struct dt_it *di,
303 struct dt_rec *dtrec, __u32 attr)
305 struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
306 zap_attribute_t *za = &osd_oti_get(env)->oti_za;
307 struct osd_zap_it *it = (struct osd_zap_it *)di;
308 struct lu_dirent *lde = (struct lu_dirent *)dtrec;
315 lde->lde_hash = cpu_to_le64(udmu_zap_cursor_serialize(it->ozi_zc));
317 if ((rc = -zap_cursor_retrieve(it->ozi_zc, za)))
320 namelen = strlen(za->za_name);
321 if (namelen > NAME_MAX)
322 GOTO(out, rc = -EOVERFLOW);
323 strcpy(lde->lde_name, za->za_name);
324 lde->lde_namelen = cpu_to_le16(namelen);
326 if (za->za_integer_length != 8 || za->za_num_integers < 3) {
327 CERROR("%s: unsupported direntry format: %d %d\n",
328 osd_obj2dev(it->ozi_obj)->od_svname,
329 za->za_integer_length, (int)za->za_num_integers);
331 GOTO(out, rc = -EIO);
334 rc = -zap_lookup(it->ozi_zc->zc_objset, it->ozi_zc->zc_zapobj,
335 za->za_name, za->za_integer_length, 3, zde);
339 lde->lde_fid = zde->lzd_fid;
340 lde->lde_attrs = LUDA_FID;
342 /* append lustre attributes */
343 osd_it_append_attrs(lde, attr, namelen, zde->lzd_reg.zde_type);
345 lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(namelen, attr));
351 static __u64 osd_zap_it_store(const struct lu_env *env, const struct dt_it *di)
353 struct osd_zap_it *it = (struct osd_zap_it *)di;
356 RETURN(udmu_zap_cursor_serialize(it->ozi_zc));
361 * rc == 0 -> ok, proceed.
362 * rc > 0 -> end of directory.
363 * rc < 0 -> error. ( EOVERFLOW can be masked.)
365 static int osd_zap_it_load(const struct lu_env *env,
366 const struct dt_it *di, __u64 hash)
368 struct osd_zap_it *it = (struct osd_zap_it *)di;
369 struct osd_object *obj = it->ozi_obj;
370 struct osd_device *osd = osd_obj2dev(obj);
374 udmu_zap_cursor_fini(it->ozi_zc);
375 if (udmu_zap_cursor_init(&it->ozi_zc, &osd->od_objset,
376 obj->oo_db->db_object, hash))
380 /* same as osd_zap_it_next()*/
381 rc = -udmu_zap_cursor_retrieve_key(env, it->ozi_zc, NULL,
385 else if (rc == -ENOENT) /* end of dir*/
391 static int osd_dir_lookup(const struct lu_env *env, struct dt_object *dt,
392 struct dt_rec *rec, const struct dt_key *key,
393 struct lustre_capa *capa)
395 struct osd_thread_info *oti = osd_oti_get(env);
396 struct osd_object *obj = osd_dt_obj(dt);
397 struct osd_device *osd = osd_obj2dev(obj);
401 LASSERT(udmu_object_is_zap(obj->oo_db));
403 rc = -zap_lookup(osd->od_objset.os, obj->oo_db->db_object,
404 (char *)key, 8, sizeof(oti->oti_zde) / 8,
405 (void *)&oti->oti_zde);
406 memcpy(rec, &oti->oti_zde.lzd_fid, sizeof(struct lu_fid));
408 RETURN(rc == 0 ? 1 : rc);
411 static int osd_declare_dir_insert(const struct lu_env *env,
412 struct dt_object *dt,
413 const struct dt_rec *rec,
414 const struct dt_key *key,
417 struct osd_object *obj = osd_dt_obj(dt);
418 struct osd_thandle *oh;
422 oh = container_of0(th, struct osd_thandle, ot_super);
425 LASSERT(udmu_object_is_zap(obj->oo_db));
427 dmu_tx_hold_bonus(oh->ot_tx, obj->oo_db->db_object);
428 dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, (char *)key);
434 * Find the osd object for given fid.
436 * \param fid need to find the osd object having this fid
438 * \retval osd_object on success
439 * \retval -ve on error
441 struct osd_object *osd_object_find(const struct lu_env *env,
442 struct dt_object *dt,
443 const struct lu_fid *fid)
445 struct lu_device *ludev = dt->do_lu.lo_dev;
446 struct osd_object *child = NULL;
447 struct lu_object *luch;
448 struct lu_object *lo;
451 * at this point topdev might not exist yet
452 * (i.e. MGS is preparing profiles). so we can
453 * not rely on topdev and instead lookup with
454 * our device passed as topdev. this can't work
455 * if the object isn't cached yet (as osd doesn't
456 * allocate lu_header). IOW, the object must be
457 * in the cache, otherwise lu_object_alloc() crashes
460 luch = lu_object_find_at(env, ludev, fid, NULL);
464 if (lu_object_exists(luch)) {
465 lo = lu_object_locate(luch->lo_header, ludev->ld_type);
469 LU_OBJECT_DEBUG(D_ERROR, env, luch,
470 "%s: object can't be located "DFID"\n",
471 osd_dev(ludev)->od_svname, PFID(fid));
474 lu_object_put(env, luch);
475 CERROR("%s: Unable to get osd_object "DFID"\n",
476 osd_dev(ludev)->od_svname, PFID(fid));
477 child = ERR_PTR(-ENOENT);
480 LU_OBJECT_DEBUG(D_ERROR, env, luch,
481 "%s: lu_object does not exists "DFID"\n",
482 osd_dev(ludev)->od_svname, PFID(fid));
483 lu_object_put(env, luch);
484 child = ERR_PTR(-ENOENT);
491 * Put the osd object once done with it.
493 * \param obj osd object that needs to be put
495 static inline void osd_object_put(const struct lu_env *env,
496 struct osd_object *obj)
498 lu_object_put(env, &obj->oo_dt.do_lu);
502 * Inserts (key, value) pair in \a directory object.
504 * \param dt osd index object
505 * \param key key for index
506 * \param rec record reference
507 * \param th transaction handler
508 * \param capa capability descriptor
509 * \param ignore_quota update should not affect quota
512 * \retval -ve failure
514 static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt,
515 const struct dt_rec *rec, const struct dt_key *key,
516 struct thandle *th, struct lustre_capa *capa,
519 struct osd_thread_info *oti = osd_oti_get(env);
520 struct osd_object *parent = osd_dt_obj(dt);
521 struct osd_device *osd = osd_obj2dev(parent);
522 struct lu_fid *fid = (struct lu_fid *)rec;
523 struct osd_thandle *oh;
524 struct osd_object *child;
529 LASSERT(parent->oo_db);
530 LASSERT(udmu_object_is_zap(parent->oo_db));
532 LASSERT(dt_object_exists(dt));
533 LASSERT(osd_invariant(parent));
536 * zfs_readdir() generates ./.. on fly, but
537 * we want own entries (.. at least) with a fid
539 #if LUSTRE_VERSION_CODE >= OBD_OCD_VERSION(2, 3, 61, 0)
540 #warning "fix '.' and '..' handling"
544 oh = container_of0(th, struct osd_thandle, ot_super);
546 child = osd_object_find(env, dt, fid);
548 RETURN(PTR_ERR(child));
550 LASSERT(child->oo_db);
552 CLASSERT(sizeof(oti->oti_zde.lzd_reg) == 8);
553 CLASSERT(sizeof(oti->oti_zde) % 8 == 0);
554 attr = child->oo_dt.do_lu.lo_header ->loh_attr;
555 oti->oti_zde.lzd_reg.zde_type = IFTODT(attr & S_IFMT);
556 oti->oti_zde.lzd_reg.zde_dnode = child->oo_db->db_object;
557 oti->oti_zde.lzd_fid = *fid;
559 /* Insert (key,oid) into ZAP */
560 rc = -zap_add(osd->od_objset.os, parent->oo_db->db_object,
561 (char *)key, 8, sizeof(oti->oti_zde) / 8,
562 (void *)&oti->oti_zde, oh->ot_tx);
564 osd_object_put(env, child);
569 static int osd_declare_dir_delete(const struct lu_env *env,
570 struct dt_object *dt,
571 const struct dt_key *key,
574 struct osd_object *obj = osd_dt_obj(dt);
575 struct osd_thandle *oh;
578 LASSERT(dt_object_exists(dt));
579 LASSERT(osd_invariant(obj));
582 oh = container_of0(th, struct osd_thandle, ot_super);
585 LASSERT(udmu_object_is_zap(obj->oo_db));
587 dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, (char *)key);
592 static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt,
593 const struct dt_key *key, struct thandle *th,
594 struct lustre_capa *capa)
596 struct osd_object *obj = osd_dt_obj(dt);
597 struct osd_device *osd = osd_obj2dev(obj);
598 struct osd_thandle *oh;
599 dmu_buf_t *zap_db = obj->oo_db;
604 LASSERT(udmu_object_is_zap(obj->oo_db));
607 oh = container_of0(th, struct osd_thandle, ot_super);
609 /* Remove key from the ZAP */
610 rc = -zap_remove(osd->od_objset.os, zap_db->db_object,
611 (char *) key, oh->ot_tx);
613 if (rc && rc != -ENOENT)
614 CERROR("%s: zap_remove failed: rc = %d\n", osd->od_svname, rc);
619 static struct dt_index_operations osd_dir_ops = {
620 .dio_lookup = osd_dir_lookup,
621 .dio_declare_insert = osd_declare_dir_insert,
622 .dio_insert = osd_dir_insert,
623 .dio_declare_delete = osd_declare_dir_delete,
624 .dio_delete = osd_dir_delete,
626 .init = osd_zap_it_init,
627 .fini = osd_zap_it_fini,
628 .get = osd_zap_it_get,
629 .put = osd_zap_it_put,
630 .next = osd_zap_it_next,
631 .key = osd_zap_it_key,
632 .key_size = osd_zap_it_key_size,
633 .rec = osd_zap_it_rec,
634 .store = osd_zap_it_store,
635 .load = osd_zap_it_load
640 * Primitives for index files using binary keys.
641 * XXX: only 64-bit keys are supported for now.
644 static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
645 struct dt_rec *rec, const struct dt_key *key,
646 struct lustre_capa *capa)
648 struct osd_object *obj = osd_dt_obj(dt);
649 struct osd_device *osd = osd_obj2dev(obj);
653 rc = -zap_lookup_uint64(osd->od_objset.os, obj->oo_db->db_object,
654 (const __u64 *)key, 1, 8, obj->oo_recsize,
656 RETURN(rc == 0 ? 1 : rc);
659 static int osd_declare_index_insert(const struct lu_env *env,
660 struct dt_object *dt,
661 const struct dt_rec *rec,
662 const struct dt_key *key,
665 struct osd_object *obj = osd_dt_obj(dt);
666 struct osd_thandle *oh;
670 oh = container_of0(th, struct osd_thandle, ot_super);
674 dmu_tx_hold_bonus(oh->ot_tx, obj->oo_db->db_object);
676 /* It is not clear what API should be used for binary keys, so we pass
677 * a null name which has the side effect of over-reserving space,
678 * accounting for the worst case. See zap_count_write() */
679 dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, NULL);
684 static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
685 const struct dt_rec *rec, const struct dt_key *key,
686 struct thandle *th, struct lustre_capa *capa,
689 struct osd_object *obj = osd_dt_obj(dt);
690 struct osd_device *osd = osd_obj2dev(obj);
691 struct osd_thandle *oh;
696 LASSERT(dt_object_exists(dt));
697 LASSERT(osd_invariant(obj));
700 oh = container_of0(th, struct osd_thandle, ot_super);
702 /* Insert (key,oid) into ZAP */
703 rc = -zap_add_uint64(osd->od_objset.os, obj->oo_db->db_object,
704 (const __u64 *)key, 1, 8, obj->oo_recsize,
705 (void *)rec, oh->ot_tx);
709 static int osd_declare_index_delete(const struct lu_env *env,
710 struct dt_object *dt,
711 const struct dt_key *key,
714 struct osd_object *obj = osd_dt_obj(dt);
715 struct osd_thandle *oh;
718 LASSERT(dt_object_exists(dt));
719 LASSERT(osd_invariant(obj));
723 oh = container_of0(th, struct osd_thandle, ot_super);
724 dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, NULL);
729 static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
730 const struct dt_key *key, struct thandle *th,
731 struct lustre_capa *capa)
733 struct osd_object *obj = osd_dt_obj(dt);
734 struct osd_device *osd = osd_obj2dev(obj);
735 struct osd_thandle *oh;
741 oh = container_of0(th, struct osd_thandle, ot_super);
743 /* Remove binary key from the ZAP */
744 rc = -zap_remove_uint64(osd->od_objset.os, obj->oo_db->db_object,
745 (const __u64 *)key, 1, oh->ot_tx);
749 static int osd_index_it_get(const struct lu_env *env, struct dt_it *di,
750 const struct dt_key *key)
752 struct osd_zap_it *it = (struct osd_zap_it *)di;
753 struct osd_object *obj = it->ozi_obj;
754 struct osd_device *osd = osd_obj2dev(obj);
760 /* XXX: API is broken at the moment */
761 LASSERT(*((const __u64 *)key) == 0);
763 zap_cursor_fini(it->ozi_zc);
764 memset(it->ozi_zc, 0, sizeof(*it->ozi_zc));
765 zap_cursor_init(it->ozi_zc, osd->od_objset.os, obj->oo_db->db_object);
771 static int osd_index_it_next(const struct lu_env *env, struct dt_it *di)
773 struct osd_zap_it *it = (struct osd_zap_it *)di;
774 zap_attribute_t *za = &osd_oti_get(env)->oti_za;
778 if (it->ozi_reset == 0)
779 zap_cursor_advance(it->ozi_zc);
783 * According to current API we need to return error if it's last entry.
784 * zap_cursor_advance() does not return any value. So we need to call
785 * retrieve to check if there is any record. We should make
786 * changes to Iterator API to not return status for this API
788 rc = -zap_cursor_retrieve(it->ozi_zc, za);
795 static struct dt_key *osd_index_it_key(const struct lu_env *env,
796 const struct dt_it *di)
798 struct osd_zap_it *it = (struct osd_zap_it *)di;
799 zap_attribute_t *za = &osd_oti_get(env)->oti_za;
804 rc = -zap_cursor_retrieve(it->ozi_zc, za);
808 /* the binary key is stored in the name */
809 it->ozi_key = *((__u64 *)za->za_name);
811 RETURN((struct dt_key *)&it->ozi_key);
814 static int osd_index_it_key_size(const struct lu_env *env,
815 const struct dt_it *di)
817 /* we only support 64-bit binary keys for the time being */
818 RETURN(sizeof(__u64));
821 static int osd_index_it_rec(const struct lu_env *env, const struct dt_it *di,
822 struct dt_rec *rec, __u32 attr)
824 zap_attribute_t *za = &osd_oti_get(env)->oti_za;
825 struct osd_zap_it *it = (struct osd_zap_it *)di;
826 struct osd_object *obj = it->ozi_obj;
827 struct osd_device *osd = osd_obj2dev(obj);
832 rc = -zap_cursor_retrieve(it->ozi_zc, za);
836 rc = -zap_lookup_uint64(osd->od_objset.os, obj->oo_db->db_object,
837 (const __u64 *)za->za_name, 1, 8,
838 obj->oo_recsize, (void *)rec);
842 static __u64 osd_index_it_store(const struct lu_env *env,
843 const struct dt_it *di)
845 struct osd_zap_it *it = (struct osd_zap_it *)di;
848 RETURN((__u64)zap_cursor_serialize(it->ozi_zc));
851 static int osd_index_it_load(const struct lu_env *env, const struct dt_it *di,
854 struct osd_zap_it *it = (struct osd_zap_it *)di;
855 struct osd_object *obj = it->ozi_obj;
856 struct osd_device *osd = osd_obj2dev(obj);
857 zap_attribute_t *za = &osd_oti_get(env)->oti_za;
861 /* close the current cursor */
862 zap_cursor_fini(it->ozi_zc);
864 /* create a new one starting at hash */
865 memset(it->ozi_zc, 0, sizeof(*it->ozi_zc));
866 zap_cursor_init_serialized(it->ozi_zc, osd->od_objset.os,
867 obj->oo_db->db_object, hash);
870 rc = -zap_cursor_retrieve(it->ozi_zc, za);
873 else if (rc == -ENOENT)
879 static struct dt_index_operations osd_index_ops = {
880 .dio_lookup = osd_index_lookup,
881 .dio_declare_insert = osd_declare_index_insert,
882 .dio_insert = osd_index_insert,
883 .dio_declare_delete = osd_declare_index_delete,
884 .dio_delete = osd_index_delete,
886 .init = osd_zap_it_init,
887 .fini = osd_zap_it_fini,
888 .get = osd_index_it_get,
889 .put = osd_zap_it_put,
890 .next = osd_index_it_next,
891 .key = osd_index_it_key,
892 .key_size = osd_index_it_key_size,
893 .rec = osd_index_it_rec,
894 .store = osd_index_it_store,
895 .load = osd_index_it_load
899 int osd_index_try(const struct lu_env *env, struct dt_object *dt,
900 const struct dt_index_features *feat)
902 struct osd_object *obj = osd_dt_obj(dt);
905 LASSERT(dt_object_exists(dt));
908 * XXX: implement support for fixed-size keys sorted with natural
909 * numerical way (not using internal hash value)
911 if (feat->dif_flags & DT_IND_RANGE)
914 if (unlikely(feat == &dt_otable_features))
915 /* do not support oi scrub yet. */
918 LASSERT(obj->oo_db != NULL);
919 if (likely(feat == &dt_directory_features)) {
920 if (udmu_object_is_zap(obj->oo_db))
921 dt->do_index_ops = &osd_dir_ops;
924 } else if (unlikely(feat == &dt_acct_features)) {
925 LASSERT(fid_is_acct(lu_object_fid(&dt->do_lu)));
926 dt->do_index_ops = &osd_acct_index_ops;
927 } else if (udmu_object_is_zap(obj->oo_db) &&
928 dt->do_index_ops == NULL) {
929 /* For index file, we don't support variable key & record sizes
930 * and the key has to be unique */
931 if ((feat->dif_flags & ~DT_IND_UPDATE) != 0)
934 /* Although the zap_*_uint64() primitives support large keys, we
935 * limit ourselves to 64-bit keys for now */
936 if (feat->dif_keysize_max != sizeof(__u64) ||
937 feat->dif_keysize_min != sizeof(__u64))
940 /* As for the record size, it should be a multiple of 8 bytes
941 * and smaller than the maximum value length supported by ZAP.
943 if (feat->dif_recsize_max > ZAP_MAXVALUELEN)
945 if (feat->dif_recsize_max != feat->dif_recsize_min ||
946 (feat->dif_recsize_max & (sizeof(__u64) - 1)))
949 obj->oo_recsize = feat->dif_recsize_max / sizeof(__u64);
950 dt->do_index_ops = &osd_index_ops;