4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
31 * Copyright (c) 2012, Intel Corporation.
32 * Use is subject to license terms.
35 * This file is part of Lustre, http://www.lustre.org/
36 * Lustre is a trademark of Sun Microsystems, Inc.
38 * lustre/osd-zfs/osd_index.c
40 * Author: Alex Zhuravlev <bzzz@whamcloud.com>
41 * Author: Mike Pershin <tappro@whamcloud.com>
45 # define EXPORT_SYMTAB
47 #define DEBUG_SUBSYSTEM S_OSD
49 #include <lustre_ver.h>
50 #include <libcfs/libcfs.h>
51 #include <lustre_fsfilt.h>
52 #include <obd_support.h>
53 #include <lustre_net.h>
55 #include <obd_class.h>
56 #include <lustre_disk.h>
57 #include <lustre_fid.h>
59 #include "osd_internal.h"
61 #include <sys/dnode.h>
66 #include <sys/spa_impl.h>
67 #include <sys/zfs_znode.h>
68 #include <sys/dmu_tx.h>
69 #include <sys/dmu_objset.h>
70 #include <sys/dsl_prop.h>
71 #include <sys/sa_impl.h>
74 static struct dt_it *osd_index_it_init(const struct lu_env *env,
77 struct lustre_capa *capa)
79 struct osd_thread_info *info = osd_oti_get(env);
80 struct osd_zap_it *it;
81 struct osd_object *obj = osd_dt_obj(dt);
82 struct osd_device *osd = osd_obj2dev(obj);
83 struct lu_object *lo = &dt->do_lu;
86 /* XXX: check capa ? */
88 LASSERT(lu_object_exists(lo));
90 LASSERT(udmu_object_is_zap(obj->oo_db));
93 it = &info->oti_it_zap;
95 if (udmu_zap_cursor_init(&it->ozi_zc, &osd->od_objset,
96 obj->oo_db->db_object, 0))
97 RETURN(ERR_PTR(-ENOMEM));
104 RETURN((struct dt_it *)it);
107 static void osd_index_it_fini(const struct lu_env *env, struct dt_it *di)
109 struct osd_zap_it *it = (struct osd_zap_it *)di;
110 struct osd_object *obj;
114 LASSERT(it->ozi_obj);
118 udmu_zap_cursor_fini(it->ozi_zc);
119 lu_object_put(env, &obj->oo_dt.do_lu);
125 static void osd_index_it_put(const struct lu_env *env, struct dt_it *di)
127 /* PBS: do nothing : ref are incremented at retrive and decreamented
131 int udmu_zap_cursor_retrieve_key(const struct lu_env *env,
132 zap_cursor_t *zc, char *key, int max)
134 zap_attribute_t *za = &osd_oti_get(env)->oti_za;
137 if ((err = zap_cursor_retrieve(zc, za)))
141 strcpy(key, za->za_name);
147 * zap_cursor_retrieve read from current record.
148 * to read bytes we need to call zap_lookup explicitly.
150 int udmu_zap_cursor_retrieve_value(const struct lu_env *env,
151 zap_cursor_t *zc, char *buf,
152 int buf_size, int *bytes_read)
154 zap_attribute_t *za = &osd_oti_get(env)->oti_za;
155 int err, actual_size;
157 if ((err = zap_cursor_retrieve(zc, za)))
160 if (za->za_integer_length <= 0)
163 actual_size = za->za_integer_length * za->za_num_integers;
165 if (actual_size > buf_size) {
166 actual_size = buf_size;
167 buf_size = actual_size / za->za_integer_length;
169 buf_size = za->za_num_integers;
172 err = -zap_lookup(zc->zc_objset, zc->zc_zapobj,
173 za->za_name, za->za_integer_length,
177 *bytes_read = actual_size;
182 static inline void osd_it_append_attrs(struct lu_dirent *ent, __u32 attr,
185 const unsigned align = sizeof(struct luda_type) - 1;
186 struct luda_type *lt;
188 /* check if file type is required */
189 if (attr & LUDA_TYPE) {
190 len = (len + align) & ~align;
192 lt = (void *)ent->lde_name + len;
193 lt->lt_type = cpu_to_le16(CFS_DTTOIF(type));
194 ent->lde_attrs |= LUDA_TYPE;
197 ent->lde_attrs = cpu_to_le32(ent->lde_attrs);
200 static int osd_find_parent_fid(const struct lu_env *env, struct dt_object *o,
203 struct link_ea_header *leh;
204 struct link_ea_entry *lee;
209 buf.lb_buf = osd_oti_get(env)->oti_buf;
210 buf.lb_len = sizeof(osd_oti_get(env)->oti_buf);
212 rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK, BYPASS_CAPA);
214 rc = osd_xattr_get(env, o, &LU_BUF_NULL,
215 XATTR_NAME_LINK, BYPASS_CAPA);
219 OBD_ALLOC(buf.lb_buf, rc);
220 if (buf.lb_buf == NULL)
223 rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK, BYPASS_CAPA);
227 if (rc < sizeof(*leh) + sizeof(*lee))
228 GOTO(out, rc = -EINVAL);
231 if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
232 leh->leh_magic = LINK_EA_MAGIC;
233 leh->leh_reccount = __swab32(leh->leh_reccount);
234 leh->leh_len = __swab64(leh->leh_len);
236 if (leh->leh_magic != LINK_EA_MAGIC)
237 GOTO(out, rc = -EINVAL);
238 if (leh->leh_reccount == 0)
239 GOTO(out, rc = -ENODATA);
241 lee = (struct link_ea_entry *)(leh + 1);
242 fid_be_to_cpu(fid, (const struct lu_fid *)&lee->lee_parent_fid);
246 if (buf.lb_buf != osd_oti_get(env)->oti_buf)
247 OBD_FREE(buf.lb_buf, buf.lb_len);
251 static int osd_dir_lookup(const struct lu_env *env, struct dt_object *dt,
252 struct dt_rec *rec, const struct dt_key *key,
253 struct lustre_capa *capa)
255 struct osd_thread_info *oti = osd_oti_get(env);
256 struct osd_object *obj = osd_dt_obj(dt);
257 struct osd_device *osd = osd_obj2dev(obj);
258 char *name = (char *)key;
262 LASSERT(udmu_object_is_zap(obj->oo_db));
264 if (name[0] == '.') {
266 const struct lu_fid *f = lu_object_fid(&dt->do_lu);
267 memcpy(rec, f, sizeof(*f));
269 } else if (name[1] == '.' && name[2] == 0) {
270 rc = osd_find_parent_fid(env, dt, (struct lu_fid *)rec);
271 RETURN(rc == 0 ? 1 : rc);
275 rc = -zap_lookup(osd->od_objset.os, obj->oo_db->db_object,
276 (char *)key, 8, sizeof(oti->oti_zde) / 8,
277 (void *)&oti->oti_zde);
278 memcpy(rec, &oti->oti_zde.lzd_fid, sizeof(struct lu_fid));
280 RETURN(rc == 0 ? 1 : rc);
283 static int osd_declare_dir_insert(const struct lu_env *env,
284 struct dt_object *dt,
285 const struct dt_rec *rec,
286 const struct dt_key *key,
289 struct osd_object *obj = osd_dt_obj(dt);
290 struct osd_thandle *oh;
294 oh = container_of0(th, struct osd_thandle, ot_super);
297 LASSERT(udmu_object_is_zap(obj->oo_db));
299 dmu_tx_hold_bonus(oh->ot_tx, obj->oo_db->db_object);
300 dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, (char *)key);
306 * Find the osd object for given fid.
308 * \param fid need to find the osd object having this fid
310 * \retval osd_object on success
311 * \retval -ve on error
313 struct osd_object *osd_object_find(const struct lu_env *env,
314 struct dt_object *dt,
315 const struct lu_fid *fid)
317 struct lu_device *ludev = dt->do_lu.lo_dev;
318 struct osd_object *child = NULL;
319 struct lu_object *luch;
320 struct lu_object *lo;
323 * at this point topdev might not exist yet
324 * (i.e. MGS is preparing profiles). so we can
325 * not rely on topdev and instead lookup with
326 * our device passed as topdev. this can't work
327 * if the object isn't cached yet (as osd doesn't
328 * allocate lu_header). IOW, the object must be
329 * in the cache, otherwise lu_object_alloc() crashes
332 luch = lu_object_find_at(env, ludev, fid, NULL);
336 if (lu_object_exists(luch)) {
337 lo = lu_object_locate(luch->lo_header, ludev->ld_type);
341 LU_OBJECT_DEBUG(D_ERROR, env, luch,
342 "%s: object can't be located "DFID"\n",
343 osd_dev(ludev)->od_svname, PFID(fid));
346 lu_object_put(env, luch);
347 CERROR("%s: Unable to get osd_object "DFID"\n",
348 osd_dev(ludev)->od_svname, PFID(fid));
349 child = ERR_PTR(-ENOENT);
352 LU_OBJECT_DEBUG(D_ERROR, env, luch,
353 "%s: lu_object does not exists "DFID"\n",
354 osd_dev(ludev)->od_svname, PFID(fid));
355 lu_object_put(env, luch);
356 child = ERR_PTR(-ENOENT);
363 * Put the osd object once done with it.
365 * \param obj osd object that needs to be put
367 static inline void osd_object_put(const struct lu_env *env,
368 struct osd_object *obj)
370 lu_object_put(env, &obj->oo_dt.do_lu);
374 * Inserts (key, value) pair in \a directory object.
376 * \param dt osd index object
377 * \param key key for index
378 * \param rec record reference
379 * \param th transaction handler
380 * \param capa capability descriptor
381 * \param ignore_quota update should not affect quota
384 * \retval -ve failure
386 static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt,
387 const struct dt_rec *rec, const struct dt_key *key,
388 struct thandle *th, struct lustre_capa *capa,
391 struct osd_thread_info *oti = osd_oti_get(env);
392 struct osd_object *parent = osd_dt_obj(dt);
393 struct osd_device *osd = osd_obj2dev(parent);
394 struct lu_fid *fid = (struct lu_fid *)rec;
395 struct osd_thandle *oh;
396 struct osd_object *child;
398 char *name = (char *)key;
402 LASSERT(parent->oo_db);
403 LASSERT(udmu_object_is_zap(parent->oo_db));
405 LASSERT(dt_object_exists(dt));
406 LASSERT(osd_invariant(parent));
409 oh = container_of0(th, struct osd_thandle, ot_super);
411 child = osd_object_find(env, dt, fid);
413 RETURN(PTR_ERR(child));
416 * to simulate old Orion setups with ./.. stored in the directories
418 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 3, 91, 0)
419 #define OSD_ZFS_INSERT_DOTS_FOR_TESTING__
422 LASSERT(child->oo_db);
423 if (name[0] == '.') {
425 /* do not store ".", instead generate it
426 * during iteration */
427 #ifndef OSD_ZFS_INSERT_DOTS_FOR_TESTING
430 } else if (name[1] == '.' && name[2] == 0) {
431 /* update parent dnode in the child.
432 * later it will be used to generate ".." */
433 udmu_objset_t *uos = &osd->od_objset;
434 rc = osd_object_sa_update(child,
436 &parent->oo_db->db_object,
439 #ifndef OSD_ZFS_INSERT_DOTS_FOR_TESTING
445 CLASSERT(sizeof(oti->oti_zde.lzd_reg) == 8);
446 CLASSERT(sizeof(oti->oti_zde) % 8 == 0);
447 attr = child->oo_dt.do_lu.lo_header ->loh_attr;
448 oti->oti_zde.lzd_reg.zde_type = IFTODT(attr & S_IFMT);
449 oti->oti_zde.lzd_reg.zde_dnode = child->oo_db->db_object;
450 oti->oti_zde.lzd_fid = *fid;
452 /* Insert (key,oid) into ZAP */
453 rc = -zap_add(osd->od_objset.os, parent->oo_db->db_object,
454 (char *)key, 8, sizeof(oti->oti_zde) / 8,
455 (void *)&oti->oti_zde, oh->ot_tx);
457 #ifndef OSD_ZFS_INSERT_DOTS_FOR_TESTING
460 osd_object_put(env, child);
465 static int osd_declare_dir_delete(const struct lu_env *env,
466 struct dt_object *dt,
467 const struct dt_key *key,
470 struct osd_object *obj = osd_dt_obj(dt);
471 struct osd_thandle *oh;
474 LASSERT(dt_object_exists(dt));
475 LASSERT(osd_invariant(obj));
478 oh = container_of0(th, struct osd_thandle, ot_super);
481 LASSERT(udmu_object_is_zap(obj->oo_db));
483 dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, (char *)key);
488 static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt,
489 const struct dt_key *key, struct thandle *th,
490 struct lustre_capa *capa)
492 struct osd_object *obj = osd_dt_obj(dt);
493 struct osd_device *osd = osd_obj2dev(obj);
494 struct osd_thandle *oh;
495 dmu_buf_t *zap_db = obj->oo_db;
496 char *name = (char *)key;
501 LASSERT(udmu_object_is_zap(obj->oo_db));
504 oh = container_of0(th, struct osd_thandle, ot_super);
506 #ifndef OSD_ZFS_INSERT_DOTS_FOR_TESTING
508 * in Orion . and .. were stored in the directory (not generated up on
509 * request as now. we preserve them for backward compatibility
511 if (name[0] == '.') {
514 } else if (name[1] == '.' && name[2] == 0) {
520 /* Remove key from the ZAP */
521 rc = -zap_remove(osd->od_objset.os, zap_db->db_object,
522 (char *) key, oh->ot_tx);
524 #if LUSTRE_VERSION_CODE <= OBD_OCD_VERSION(2, 4, 53, 0)
525 if (unlikely(rc == -ENOENT && name[0] == '.' &&
526 (name[1] == 0 || (name[1] == '.' && name[2] == 0))))
529 if (unlikely(rc && rc != -ENOENT))
530 CERROR("%s: zap_remove failed: rc = %d\n", osd->od_svname, rc);
535 static struct dt_it *osd_dir_it_init(const struct lu_env *env,
536 struct dt_object *dt,
538 struct lustre_capa *capa)
540 struct osd_zap_it *it;
542 it = (struct osd_zap_it *)osd_index_it_init(env, dt, unused, capa);
546 RETURN((struct dt_it *)it);
550 * Move Iterator to record specified by \a key
552 * \param di osd iterator
553 * \param key key for index
555 * \retval +ve di points to record with least key not larger than key
556 * \retval 0 di points to exact matched key
557 * \retval -ve failure
559 static int osd_dir_it_get(const struct lu_env *env,
560 struct dt_it *di, const struct dt_key *key)
562 struct osd_zap_it *it = (struct osd_zap_it *)di;
563 struct osd_object *obj = it->ozi_obj;
564 struct osd_device *osd = osd_obj2dev(obj);
565 char *name = (char *)key;
572 udmu_zap_cursor_fini(it->ozi_zc);
574 if (udmu_zap_cursor_init(&it->ozi_zc, &osd->od_objset,
575 obj->oo_db->db_object, 0))
578 /* XXX: implementation of the API is broken at the moment */
579 LASSERT(((const char *)key)[0] == 0);
586 if (name[0] == '.') {
590 } else if (name[1] == '.' && name[2] == 0) {
596 /* neither . nor .. - some real record */
604 static void osd_dir_it_put(const struct lu_env *env, struct dt_it *di)
606 /* PBS: do nothing : ref are incremented at retrive and decreamented
611 * in Orion . and .. were stored in the directory, while ZPL
612 * and current osd-zfs generate them up on request. so, we
613 * need to ignore previously stored . and ..
615 static int osd_index_retrieve_skip_dots(struct osd_zap_it *it,
621 rc = -zap_cursor_retrieve(it->ozi_zc, za);
624 if (unlikely(rc == 0 && za->za_name[0] == '.')) {
625 if (za->za_name[1] == 0) {
627 } else if (za->za_name[1] == '.' &&
628 za->za_name[2] == 0) {
632 zap_cursor_advance(it->ozi_zc);
634 } while (unlikely(rc == 0 && isdot));
640 * to load a directory entry at a time and stored it in
641 * iterator's in-memory data structure.
643 * \param di, struct osd_it_ea, iterator's in memory structure
645 * \retval +ve, iterator reached to end
646 * \retval 0, iterator not reached to end
647 * \retval -ve, on error
649 static int osd_dir_it_next(const struct lu_env *env, struct dt_it *di)
651 struct osd_zap_it *it = (struct osd_zap_it *)di;
652 zap_attribute_t *za = &osd_oti_get(env)->oti_za;
655 /* temp. storage should be enough for any key supported by ZFS */
656 CLASSERT(sizeof(za->za_name) <= sizeof(it->ozi_name));
659 * the first ->next() moves the cursor to .
660 * the second ->next() moves the cursor to ..
661 * then we get to the real records and have to verify any exist
663 if (it->ozi_pos <= 2) {
669 zap_cursor_advance(it->ozi_zc);
672 * According to current API we need to return error if its last entry.
673 * zap_cursor_advance() does not return any value. So we need to call
674 * retrieve to check if there is any record. We should make
675 * changes to Iterator API to not return status for this API
677 rc = osd_index_retrieve_skip_dots(it, za);
679 if (rc == -ENOENT) /* end of dir */
685 static struct dt_key *osd_dir_it_key(const struct lu_env *env,
686 const struct dt_it *di)
688 struct osd_zap_it *it = (struct osd_zap_it *)di;
689 zap_attribute_t *za = &osd_oti_get(env)->oti_za;
693 if (it->ozi_pos <= 1) {
695 RETURN((struct dt_key *)".");
696 } else if (it->ozi_pos == 2) {
697 RETURN((struct dt_key *)"..");
700 if ((rc = -zap_cursor_retrieve(it->ozi_zc, za)))
703 strcpy(it->ozi_name, za->za_name);
705 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 3, 91, 0)
706 if (za->za_name[0] == '.') {
707 if (za->za_name[1] == 0 || (za->za_name[1] == '.' &&
708 za->za_name[2] == 0)) {
709 /* we should not get onto . and ..
710 * stored in the directory. ->next() and
711 * other methods should prevent this
718 RETURN((struct dt_key *)it->ozi_name);
721 static int osd_dir_it_key_size(const struct lu_env *env, const struct dt_it *di)
723 struct osd_zap_it *it = (struct osd_zap_it *)di;
724 zap_attribute_t *za = &osd_oti_get(env)->oti_za;
728 if (it->ozi_pos <= 1) {
731 } else if (it->ozi_pos == 2) {
735 if ((rc = -zap_cursor_retrieve(it->ozi_zc, za)) == 0)
736 rc = strlen(za->za_name);
738 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 3, 99, 0)
739 if (rc == 0 && za->za_name[0] == '.') {
740 if (za->za_name[1] == 0 || (za->za_name[1] == '.' &&
741 za->za_name[2] == 0)) {
742 /* we should not get onto . and ..
743 * stored in the directory. ->next() and
744 * other methods should prevent this
753 static int osd_dir_it_rec(const struct lu_env *env, const struct dt_it *di,
754 struct dt_rec *dtrec, __u32 attr)
756 struct osd_zap_it *it = (struct osd_zap_it *)di;
757 struct lu_dirent *lde = (struct lu_dirent *)dtrec;
758 struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
759 zap_attribute_t *za = &osd_oti_get(env)->oti_za;
763 if (it->ozi_pos <= 1) {
764 lde->lde_hash = cpu_to_le64(1);
765 strcpy(lde->lde_name, ".");
766 lde->lde_namelen = cpu_to_le16(1);
767 lde->lde_fid = *lu_object_fid(&it->ozi_obj->oo_dt.do_lu);
768 lde->lde_attrs = LUDA_FID;
769 /* append lustre attributes */
770 osd_it_append_attrs(lde, attr, 1, IFTODT(S_IFDIR));
771 lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(1, attr));
775 } else if (it->ozi_pos == 2) {
776 lde->lde_hash = cpu_to_le64(2);
777 strcpy(lde->lde_name, "..");
778 lde->lde_namelen = cpu_to_le16(2);
779 lde->lde_attrs = LUDA_FID;
780 /* append lustre attributes */
781 osd_it_append_attrs(lde, attr, 2, IFTODT(S_IFDIR));
782 lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(2, attr));
783 rc = osd_find_parent_fid(env, &it->ozi_obj->oo_dt, &lde->lde_fid);
785 * early Orion code was not setting LinkEA, so it's possible
786 * some setups still have objects with no LinkEA set.
787 * but at that time .. was a real record in the directory
788 * so we should try to lookup .. in ZAP
796 lde->lde_hash = cpu_to_le64(udmu_zap_cursor_serialize(it->ozi_zc));
798 if ((rc = -zap_cursor_retrieve(it->ozi_zc, za)))
801 namelen = strlen(za->za_name);
802 if (namelen > NAME_MAX)
803 GOTO(out, rc = -EOVERFLOW);
804 strcpy(lde->lde_name, za->za_name);
805 lde->lde_namelen = cpu_to_le16(namelen);
807 if (za->za_integer_length != 8 || za->za_num_integers < 3) {
808 CERROR("%s: unsupported direntry format: %d %d\n",
809 osd_obj2dev(it->ozi_obj)->od_svname,
810 za->za_integer_length, (int)za->za_num_integers);
812 GOTO(out, rc = -EIO);
815 rc = -zap_lookup(it->ozi_zc->zc_objset, it->ozi_zc->zc_zapobj,
816 za->za_name, za->za_integer_length, 3, zde);
820 lde->lde_fid = zde->lzd_fid;
821 lde->lde_attrs = LUDA_FID;
823 /* append lustre attributes */
824 osd_it_append_attrs(lde, attr, namelen, zde->lzd_reg.zde_type);
826 lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(namelen, attr));
832 static __u64 osd_dir_it_store(const struct lu_env *env, const struct dt_it *di)
834 struct osd_zap_it *it = (struct osd_zap_it *)di;
838 if (it->ozi_pos <= 2)
841 pos = udmu_zap_cursor_serialize(it->ozi_zc);
848 * rc == 0 -> end of directory.
849 * rc > 0 -> ok, proceed.
850 * rc < 0 -> error. ( EOVERFLOW can be masked.)
852 static int osd_dir_it_load(const struct lu_env *env,
853 const struct dt_it *di, __u64 hash)
855 struct osd_zap_it *it = (struct osd_zap_it *)di;
856 struct osd_object *obj = it->ozi_obj;
857 struct osd_device *osd = osd_obj2dev(obj);
858 zap_attribute_t *za = &osd_oti_get(env)->oti_za;
862 if (it->ozi_pos != 0) {
863 /* the cursor wasn't at the beginning
864 * so we should reset ZAP cursor as well */
865 udmu_zap_cursor_fini(it->ozi_zc);
866 if (udmu_zap_cursor_init(&it->ozi_zc, &osd->od_objset,
867 obj->oo_db->db_object, hash))
876 /* to return whether the end has been reached */
877 rc = osd_index_retrieve_skip_dots(it, za);
880 else if (rc == -ENOENT)
887 static struct dt_index_operations osd_dir_ops = {
888 .dio_lookup = osd_dir_lookup,
889 .dio_declare_insert = osd_declare_dir_insert,
890 .dio_insert = osd_dir_insert,
891 .dio_declare_delete = osd_declare_dir_delete,
892 .dio_delete = osd_dir_delete,
894 .init = osd_dir_it_init,
895 .fini = osd_index_it_fini,
896 .get = osd_dir_it_get,
897 .put = osd_dir_it_put,
898 .next = osd_dir_it_next,
899 .key = osd_dir_it_key,
900 .key_size = osd_dir_it_key_size,
901 .rec = osd_dir_it_rec,
902 .store = osd_dir_it_store,
903 .load = osd_dir_it_load
908 * Primitives for index files using binary keys.
909 * XXX: only 64-bit keys are supported for now.
912 static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
913 struct dt_rec *rec, const struct dt_key *key,
914 struct lustre_capa *capa)
916 struct osd_object *obj = osd_dt_obj(dt);
917 struct osd_device *osd = osd_obj2dev(obj);
921 rc = -zap_lookup_uint64(osd->od_objset.os, obj->oo_db->db_object,
922 (const __u64 *)key, 1, 8, obj->oo_recsize,
924 RETURN(rc == 0 ? 1 : rc);
927 static int osd_declare_index_insert(const struct lu_env *env,
928 struct dt_object *dt,
929 const struct dt_rec *rec,
930 const struct dt_key *key,
933 struct osd_object *obj = osd_dt_obj(dt);
934 struct osd_thandle *oh;
938 oh = container_of0(th, struct osd_thandle, ot_super);
942 dmu_tx_hold_bonus(oh->ot_tx, obj->oo_db->db_object);
944 /* It is not clear what API should be used for binary keys, so we pass
945 * a null name which has the side effect of over-reserving space,
946 * accounting for the worst case. See zap_count_write() */
947 dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, NULL);
952 static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
953 const struct dt_rec *rec, const struct dt_key *key,
954 struct thandle *th, struct lustre_capa *capa,
957 struct osd_object *obj = osd_dt_obj(dt);
958 struct osd_device *osd = osd_obj2dev(obj);
959 struct osd_thandle *oh;
964 LASSERT(dt_object_exists(dt));
965 LASSERT(osd_invariant(obj));
968 oh = container_of0(th, struct osd_thandle, ot_super);
970 /* Insert (key,oid) into ZAP */
971 rc = -zap_add_uint64(osd->od_objset.os, obj->oo_db->db_object,
972 (const __u64 *)key, 1, 8, obj->oo_recsize,
973 (void *)rec, oh->ot_tx);
977 static int osd_declare_index_delete(const struct lu_env *env,
978 struct dt_object *dt,
979 const struct dt_key *key,
982 struct osd_object *obj = osd_dt_obj(dt);
983 struct osd_thandle *oh;
986 LASSERT(dt_object_exists(dt));
987 LASSERT(osd_invariant(obj));
991 oh = container_of0(th, struct osd_thandle, ot_super);
992 dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, NULL);
997 static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
998 const struct dt_key *key, struct thandle *th,
999 struct lustre_capa *capa)
1001 struct osd_object *obj = osd_dt_obj(dt);
1002 struct osd_device *osd = osd_obj2dev(obj);
1003 struct osd_thandle *oh;
1007 LASSERT(obj->oo_db);
1008 LASSERT(th != NULL);
1009 oh = container_of0(th, struct osd_thandle, ot_super);
1011 /* Remove binary key from the ZAP */
1012 rc = -zap_remove_uint64(osd->od_objset.os, obj->oo_db->db_object,
1013 (const __u64 *)key, 1, oh->ot_tx);
1017 static int osd_index_it_get(const struct lu_env *env, struct dt_it *di,
1018 const struct dt_key *key)
1020 struct osd_zap_it *it = (struct osd_zap_it *)di;
1021 struct osd_object *obj = it->ozi_obj;
1022 struct osd_device *osd = osd_obj2dev(obj);
1026 LASSERT(it->ozi_zc);
1028 /* XXX: API is broken at the moment */
1029 LASSERT(*((const __u64 *)key) == 0);
1031 zap_cursor_fini(it->ozi_zc);
1032 memset(it->ozi_zc, 0, sizeof(*it->ozi_zc));
1033 zap_cursor_init(it->ozi_zc, osd->od_objset.os, obj->oo_db->db_object);
1039 static int osd_index_it_next(const struct lu_env *env, struct dt_it *di)
1041 struct osd_zap_it *it = (struct osd_zap_it *)di;
1042 zap_attribute_t *za = &osd_oti_get(env)->oti_za;
1046 if (it->ozi_reset == 0)
1047 zap_cursor_advance(it->ozi_zc);
1051 * According to current API we need to return error if it's last entry.
1052 * zap_cursor_advance() does not return any value. So we need to call
1053 * retrieve to check if there is any record. We should make
1054 * changes to Iterator API to not return status for this API
1056 rc = -zap_cursor_retrieve(it->ozi_zc, za);
1063 static struct dt_key *osd_index_it_key(const struct lu_env *env,
1064 const struct dt_it *di)
1066 struct osd_zap_it *it = (struct osd_zap_it *)di;
1067 zap_attribute_t *za = &osd_oti_get(env)->oti_za;
1072 rc = -zap_cursor_retrieve(it->ozi_zc, za);
1074 RETURN(ERR_PTR(rc));
1076 /* the binary key is stored in the name */
1077 it->ozi_key = *((__u64 *)za->za_name);
1079 RETURN((struct dt_key *)&it->ozi_key);
1082 static int osd_index_it_key_size(const struct lu_env *env,
1083 const struct dt_it *di)
1085 /* we only support 64-bit binary keys for the time being */
1086 RETURN(sizeof(__u64));
1089 static int osd_index_it_rec(const struct lu_env *env, const struct dt_it *di,
1090 struct dt_rec *rec, __u32 attr)
1092 zap_attribute_t *za = &osd_oti_get(env)->oti_za;
1093 struct osd_zap_it *it = (struct osd_zap_it *)di;
1094 struct osd_object *obj = it->ozi_obj;
1095 struct osd_device *osd = osd_obj2dev(obj);
1100 rc = -zap_cursor_retrieve(it->ozi_zc, za);
1104 rc = -zap_lookup_uint64(osd->od_objset.os, obj->oo_db->db_object,
1105 (const __u64 *)za->za_name, 1, 8,
1106 obj->oo_recsize, (void *)rec);
1110 static __u64 osd_index_it_store(const struct lu_env *env,
1111 const struct dt_it *di)
1113 struct osd_zap_it *it = (struct osd_zap_it *)di;
1116 RETURN((__u64)zap_cursor_serialize(it->ozi_zc));
1119 static int osd_index_it_load(const struct lu_env *env, const struct dt_it *di,
1122 struct osd_zap_it *it = (struct osd_zap_it *)di;
1123 struct osd_object *obj = it->ozi_obj;
1124 struct osd_device *osd = osd_obj2dev(obj);
1125 zap_attribute_t *za = &osd_oti_get(env)->oti_za;
1129 /* close the current cursor */
1130 zap_cursor_fini(it->ozi_zc);
1132 /* create a new one starting at hash */
1133 memset(it->ozi_zc, 0, sizeof(*it->ozi_zc));
1134 zap_cursor_init_serialized(it->ozi_zc, osd->od_objset.os,
1135 obj->oo_db->db_object, hash);
1138 rc = -zap_cursor_retrieve(it->ozi_zc, za);
1141 else if (rc == -ENOENT)
1147 static struct dt_index_operations osd_index_ops = {
1148 .dio_lookup = osd_index_lookup,
1149 .dio_declare_insert = osd_declare_index_insert,
1150 .dio_insert = osd_index_insert,
1151 .dio_declare_delete = osd_declare_index_delete,
1152 .dio_delete = osd_index_delete,
1154 .init = osd_index_it_init,
1155 .fini = osd_index_it_fini,
1156 .get = osd_index_it_get,
1157 .put = osd_index_it_put,
1158 .next = osd_index_it_next,
1159 .key = osd_index_it_key,
1160 .key_size = osd_index_it_key_size,
1161 .rec = osd_index_it_rec,
1162 .store = osd_index_it_store,
1163 .load = osd_index_it_load
1167 int osd_index_try(const struct lu_env *env, struct dt_object *dt,
1168 const struct dt_index_features *feat)
1170 struct osd_object *obj = osd_dt_obj(dt);
1173 LASSERT(dt_object_exists(dt));
1176 * XXX: implement support for fixed-size keys sorted with natural
1177 * numerical way (not using internal hash value)
1179 if (feat->dif_flags & DT_IND_RANGE)
1182 if (unlikely(feat == &dt_otable_features))
1183 /* do not support oi scrub yet. */
1186 LASSERT(obj->oo_db != NULL);
1187 if (likely(feat == &dt_directory_features)) {
1188 if (udmu_object_is_zap(obj->oo_db))
1189 dt->do_index_ops = &osd_dir_ops;
1192 } else if (unlikely(feat == &dt_acct_features)) {
1193 LASSERT(fid_is_acct(lu_object_fid(&dt->do_lu)));
1194 dt->do_index_ops = &osd_acct_index_ops;
1195 } else if (udmu_object_is_zap(obj->oo_db) &&
1196 dt->do_index_ops == NULL) {
1197 /* For index file, we don't support variable key & record sizes
1198 * and the key has to be unique */
1199 if ((feat->dif_flags & ~DT_IND_UPDATE) != 0)
1202 /* Although the zap_*_uint64() primitives support large keys, we
1203 * limit ourselves to 64-bit keys for now */
1204 if (feat->dif_keysize_max != sizeof(__u64) ||
1205 feat->dif_keysize_min != sizeof(__u64))
1208 /* As for the record size, it should be a multiple of 8 bytes
1209 * and smaller than the maximum value length supported by ZAP.
1211 if (feat->dif_recsize_max > ZAP_MAXVALUELEN)
1213 if (feat->dif_recsize_max != feat->dif_recsize_min ||
1214 (feat->dif_recsize_max & (sizeof(__u64) - 1)))
1217 obj->oo_recsize = feat->dif_recsize_max / sizeof(__u64);
1218 dt->do_index_ops = &osd_index_ops;