4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
31 * Copyright (c) 2012, 2013, Intel Corporation.
32 * Use is subject to license terms.
35 * This file is part of Lustre, http://www.lustre.org/
36 * Lustre is a trademark of Sun Microsystems, Inc.
38 * lustre/osd-zfs/osd_index.c
40 * Author: Alex Zhuravlev <bzzz@whamcloud.com>
41 * Author: Mike Pershin <tappro@whamcloud.com>
45 # define EXPORT_SYMTAB
47 #define DEBUG_SUBSYSTEM S_OSD
49 #include <lustre_ver.h>
50 #include <libcfs/libcfs.h>
51 #include <obd_support.h>
52 #include <lustre_net.h>
54 #include <obd_class.h>
55 #include <lustre_disk.h>
56 #include <lustre_fid.h>
58 #include "osd_internal.h"
60 #include <sys/dnode.h>
65 #include <sys/spa_impl.h>
66 #include <sys/zfs_znode.h>
67 #include <sys/dmu_tx.h>
68 #include <sys/dmu_objset.h>
69 #include <sys/dsl_prop.h>
70 #include <sys/sa_impl.h>
73 static struct dt_it *osd_index_it_init(const struct lu_env *env,
76 struct lustre_capa *capa)
78 struct osd_thread_info *info = osd_oti_get(env);
79 struct osd_zap_it *it;
80 struct osd_object *obj = osd_dt_obj(dt);
81 struct osd_device *osd = osd_obj2dev(obj);
82 struct lu_object *lo = &dt->do_lu;
85 /* XXX: check capa ? */
87 LASSERT(lu_object_exists(lo));
89 LASSERT(udmu_object_is_zap(obj->oo_db));
92 it = &info->oti_it_zap;
94 if (udmu_zap_cursor_init(&it->ozi_zc, &osd->od_objset,
95 obj->oo_db->db_object, 0))
96 RETURN(ERR_PTR(-ENOMEM));
103 RETURN((struct dt_it *)it);
106 static void osd_index_it_fini(const struct lu_env *env, struct dt_it *di)
108 struct osd_zap_it *it = (struct osd_zap_it *)di;
109 struct osd_object *obj;
113 LASSERT(it->ozi_obj);
117 udmu_zap_cursor_fini(it->ozi_zc);
118 lu_object_put(env, &obj->oo_dt.do_lu);
124 static void osd_index_it_put(const struct lu_env *env, struct dt_it *di)
126 /* PBS: do nothing : ref are incremented at retrive and decreamented
130 int udmu_zap_cursor_retrieve_key(const struct lu_env *env,
131 zap_cursor_t *zc, char *key, int max)
133 zap_attribute_t *za = &osd_oti_get(env)->oti_za;
136 if ((err = zap_cursor_retrieve(zc, za)))
140 strcpy(key, za->za_name);
146 * zap_cursor_retrieve read from current record.
147 * to read bytes we need to call zap_lookup explicitly.
149 int udmu_zap_cursor_retrieve_value(const struct lu_env *env,
150 zap_cursor_t *zc, char *buf,
151 int buf_size, int *bytes_read)
153 zap_attribute_t *za = &osd_oti_get(env)->oti_za;
154 int err, actual_size;
156 if ((err = zap_cursor_retrieve(zc, za)))
159 if (za->za_integer_length <= 0)
162 actual_size = za->za_integer_length * za->za_num_integers;
164 if (actual_size > buf_size) {
165 actual_size = buf_size;
166 buf_size = actual_size / za->za_integer_length;
168 buf_size = za->za_num_integers;
171 err = -zap_lookup(zc->zc_objset, zc->zc_zapobj,
172 za->za_name, za->za_integer_length,
176 *bytes_read = actual_size;
181 static inline void osd_it_append_attrs(struct lu_dirent *ent, __u32 attr,
184 const unsigned align = sizeof(struct luda_type) - 1;
185 struct luda_type *lt;
187 /* check if file type is required */
188 if (attr & LUDA_TYPE) {
189 len = (len + align) & ~align;
191 lt = (void *)ent->lde_name + len;
192 lt->lt_type = cpu_to_le16(DTTOIF(type));
193 ent->lde_attrs |= LUDA_TYPE;
196 ent->lde_attrs = cpu_to_le32(ent->lde_attrs);
200 * as we don't know FID, we can't use LU object, so this function
201 * partially duplicate __osd_xattr_get() which is built around
202 * LU-object and uses it to cache data like regular EA dnode, etc
204 static int osd_find_parent_by_dnode(const struct lu_env *env,
208 struct lustre_mdt_attrs *lma;
209 udmu_objset_t *uos = &osd_obj2dev(osd_dt_obj(o))->od_objset;
212 nvlist_t *nvbuf = NULL;
218 /* first of all, get parent dnode from own attributes */
219 LASSERT(osd_dt_obj(o)->oo_db);
220 rc = -sa_handle_get(uos->os, osd_dt_obj(o)->oo_db->db_object,
221 NULL, SA_HDL_PRIVATE, &sa_hdl);
225 dnode = ZFS_NO_OBJECT;
226 rc = -sa_lookup(sa_hdl, SA_ZPL_PARENT(uos), &dnode, 8);
227 sa_handle_destroy(sa_hdl);
231 /* now get EA buffer */
232 rc = __osd_xattr_load(uos, dnode, &nvbuf);
236 /* XXX: if we get that far.. should we cache the result? */
238 /* try to find LMA attribute */
239 LASSERT(nvbuf != NULL);
240 rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA, &value, &size);
241 if (rc == 0 && size >= sizeof(*lma)) {
242 lma = (struct lustre_mdt_attrs *)value;
243 lustre_lma_swab(lma);
244 *fid = lma->lma_self_fid;
249 /* no LMA attribute in SA, let's try regular EA */
251 /* first of all, get parent dnode storing regular EA */
252 rc = -sa_handle_get(uos->os, dnode, NULL, SA_HDL_PRIVATE, &sa_hdl);
256 dnode = ZFS_NO_OBJECT;
257 rc = -sa_lookup(sa_hdl, SA_ZPL_XATTR(uos), &dnode, 8);
258 sa_handle_destroy(sa_hdl);
262 CLASSERT(sizeof(*lma) <= sizeof(osd_oti_get(env)->oti_buf));
263 buf.lb_buf = osd_oti_get(env)->oti_buf;
264 buf.lb_len = sizeof(osd_oti_get(env)->oti_buf);
266 /* now try to find LMA */
267 rc = __osd_xattr_get_large(env, uos, dnode, &buf,
268 XATTR_NAME_LMA, &size);
269 if (rc == 0 && size >= sizeof(*lma)) {
271 lustre_lma_swab(lma);
272 *fid = lma->lma_self_fid;
277 GOTO(out, rc = -EIO);
286 static int osd_find_parent_fid(const struct lu_env *env, struct dt_object *o,
289 struct link_ea_header *leh;
290 struct link_ea_entry *lee;
295 buf.lb_buf = osd_oti_get(env)->oti_buf;
296 buf.lb_len = sizeof(osd_oti_get(env)->oti_buf);
298 rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK, BYPASS_CAPA);
300 rc = osd_xattr_get(env, o, &LU_BUF_NULL,
301 XATTR_NAME_LINK, BYPASS_CAPA);
305 OBD_ALLOC(buf.lb_buf, rc);
306 if (buf.lb_buf == NULL)
309 rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK, BYPASS_CAPA);
313 if (rc < sizeof(*leh) + sizeof(*lee))
314 GOTO(out, rc = -EINVAL);
317 if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
318 leh->leh_magic = LINK_EA_MAGIC;
319 leh->leh_reccount = __swab32(leh->leh_reccount);
320 leh->leh_len = __swab64(leh->leh_len);
322 if (leh->leh_magic != LINK_EA_MAGIC)
323 GOTO(out, rc = -EINVAL);
324 if (leh->leh_reccount == 0)
325 GOTO(out, rc = -ENODATA);
327 lee = (struct link_ea_entry *)(leh + 1);
328 fid_be_to_cpu(fid, (const struct lu_fid *)&lee->lee_parent_fid);
332 if (buf.lb_buf != osd_oti_get(env)->oti_buf)
333 OBD_FREE(buf.lb_buf, buf.lb_len);
336 /* this block can be enabled for additional verification
337 * it's trying to match FID from LinkEA vs. FID from LMA */
341 rc2 = osd_find_parent_by_dnode(env, o, &fid2);
343 if (lu_fid_eq(fid, &fid2) == 0)
344 CERROR("wrong parent: "DFID" != "DFID"\n",
345 PFID(fid), PFID(&fid2));
349 /* no LinkEA is found, let's try to find the fid in parent's LMA */
350 if (unlikely(rc != 0))
351 rc = osd_find_parent_by_dnode(env, o, fid);
356 static int osd_dir_lookup(const struct lu_env *env, struct dt_object *dt,
357 struct dt_rec *rec, const struct dt_key *key,
358 struct lustre_capa *capa)
360 struct osd_thread_info *oti = osd_oti_get(env);
361 struct osd_object *obj = osd_dt_obj(dt);
362 struct osd_device *osd = osd_obj2dev(obj);
363 char *name = (char *)key;
367 LASSERT(udmu_object_is_zap(obj->oo_db));
369 if (name[0] == '.') {
371 const struct lu_fid *f = lu_object_fid(&dt->do_lu);
372 memcpy(rec, f, sizeof(*f));
374 } else if (name[1] == '.' && name[2] == 0) {
375 rc = osd_find_parent_fid(env, dt, (struct lu_fid *)rec);
376 RETURN(rc == 0 ? 1 : rc);
380 rc = -zap_lookup(osd->od_objset.os, obj->oo_db->db_object,
381 (char *)key, 8, sizeof(oti->oti_zde) / 8,
382 (void *)&oti->oti_zde);
383 memcpy(rec, &oti->oti_zde.lzd_fid, sizeof(struct lu_fid));
385 RETURN(rc == 0 ? 1 : rc);
388 static int osd_declare_dir_insert(const struct lu_env *env,
389 struct dt_object *dt,
390 const struct dt_rec *rec,
391 const struct dt_key *key,
394 struct osd_object *obj = osd_dt_obj(dt);
395 struct osd_thandle *oh;
399 oh = container_of0(th, struct osd_thandle, ot_super);
402 LASSERT(udmu_object_is_zap(obj->oo_db));
404 dmu_tx_hold_bonus(oh->ot_tx, obj->oo_db->db_object);
405 dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, (char *)key);
411 * Find the osd object for given fid.
413 * \param fid need to find the osd object having this fid
415 * \retval osd_object on success
416 * \retval -ve on error
418 struct osd_object *osd_object_find(const struct lu_env *env,
419 struct dt_object *dt,
420 const struct lu_fid *fid)
422 struct lu_device *ludev = dt->do_lu.lo_dev;
423 struct osd_object *child = NULL;
424 struct lu_object *luch;
425 struct lu_object *lo;
428 * at this point topdev might not exist yet
429 * (i.e. MGS is preparing profiles). so we can
430 * not rely on topdev and instead lookup with
431 * our device passed as topdev. this can't work
432 * if the object isn't cached yet (as osd doesn't
433 * allocate lu_header). IOW, the object must be
434 * in the cache, otherwise lu_object_alloc() crashes
437 luch = lu_object_find_at(env, ludev, fid, NULL);
441 if (lu_object_exists(luch)) {
442 lo = lu_object_locate(luch->lo_header, ludev->ld_type);
446 LU_OBJECT_DEBUG(D_ERROR, env, luch,
447 "%s: object can't be located "DFID"\n",
448 osd_dev(ludev)->od_svname, PFID(fid));
451 lu_object_put(env, luch);
452 CERROR("%s: Unable to get osd_object "DFID"\n",
453 osd_dev(ludev)->od_svname, PFID(fid));
454 child = ERR_PTR(-ENOENT);
457 LU_OBJECT_DEBUG(D_ERROR, env, luch,
458 "%s: lu_object does not exists "DFID"\n",
459 osd_dev(ludev)->od_svname, PFID(fid));
460 lu_object_put(env, luch);
461 child = ERR_PTR(-ENOENT);
468 * Put the osd object once done with it.
470 * \param obj osd object that needs to be put
472 static inline void osd_object_put(const struct lu_env *env,
473 struct osd_object *obj)
475 lu_object_put(env, &obj->oo_dt.do_lu);
478 static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
481 struct lu_seq_range *range = &osd_oti_get(env)->oti_seq_range;
482 struct seq_server_site *ss = osd_seq_site(osd);
486 if (!fid_is_norm(fid) && !fid_is_root(fid))
489 rc = osd_fld_lookup(env, osd, fid, range);
491 CERROR("%s: Can not lookup fld for "DFID"\n",
492 osd_name(osd), PFID(fid));
496 RETURN(ss->ss_node_id != range->lsr_index);
500 * Inserts (key, value) pair in \a directory object.
502 * \param dt osd index object
503 * \param key key for index
504 * \param rec record reference
505 * \param th transaction handler
506 * \param capa capability descriptor
507 * \param ignore_quota update should not affect quota
510 * \retval -ve failure
512 static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt,
513 const struct dt_rec *rec, const struct dt_key *key,
514 struct thandle *th, struct lustre_capa *capa,
517 struct osd_thread_info *oti = osd_oti_get(env);
518 struct osd_object *parent = osd_dt_obj(dt);
519 struct osd_device *osd = osd_obj2dev(parent);
520 struct lu_fid *fid = (struct lu_fid *)rec;
521 struct osd_thandle *oh;
522 struct osd_object *child = NULL;
524 char *name = (char *)key;
528 LASSERT(parent->oo_db);
529 LASSERT(udmu_object_is_zap(parent->oo_db));
531 LASSERT(dt_object_exists(dt));
532 LASSERT(osd_invariant(parent));
535 oh = container_of0(th, struct osd_thandle, ot_super);
537 rc = osd_remote_fid(env, osd, fid);
539 CERROR("%s: Can not find object "DFID": rc = %d\n",
540 osd->od_svname, PFID(fid), rc);
544 if (unlikely(rc == 1)) {
545 /* Insert remote entry */
546 memset(&oti->oti_zde.lzd_reg, 0, sizeof(oti->oti_zde.lzd_reg));
547 oti->oti_zde.lzd_reg.zde_type = IFTODT(S_IFDIR & S_IFMT);
550 * To simulate old Orion setups with ./.. stored in the
553 /* Insert local entry */
554 child = osd_object_find(env, dt, fid);
556 RETURN(PTR_ERR(child));
558 LASSERT(child->oo_db);
559 if (name[0] == '.') {
561 /* do not store ".", instead generate it
562 * during iteration */
564 } else if (name[1] == '.' && name[2] == 0) {
565 /* update parent dnode in the child.
566 * later it will be used to generate ".." */
567 udmu_objset_t *uos = &osd->od_objset;
568 rc = osd_object_sa_update(parent,
570 &child->oo_db->db_object,
575 CLASSERT(sizeof(oti->oti_zde.lzd_reg) == 8);
576 CLASSERT(sizeof(oti->oti_zde) % 8 == 0);
577 attr = child->oo_dt.do_lu.lo_header ->loh_attr;
578 oti->oti_zde.lzd_reg.zde_type = IFTODT(attr & S_IFMT);
579 oti->oti_zde.lzd_reg.zde_dnode = child->oo_db->db_object;
582 oti->oti_zde.lzd_fid = *fid;
583 /* Insert (key,oid) into ZAP */
584 rc = -zap_add(osd->od_objset.os, parent->oo_db->db_object,
585 (char *)key, 8, sizeof(oti->oti_zde) / 8,
586 (void *)&oti->oti_zde, oh->ot_tx);
590 osd_object_put(env, child);
595 static int osd_declare_dir_delete(const struct lu_env *env,
596 struct dt_object *dt,
597 const struct dt_key *key,
600 struct osd_object *obj = osd_dt_obj(dt);
601 struct osd_thandle *oh;
604 LASSERT(dt_object_exists(dt));
605 LASSERT(osd_invariant(obj));
608 oh = container_of0(th, struct osd_thandle, ot_super);
611 LASSERT(udmu_object_is_zap(obj->oo_db));
613 dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, (char *)key);
618 static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt,
619 const struct dt_key *key, struct thandle *th,
620 struct lustre_capa *capa)
622 struct osd_object *obj = osd_dt_obj(dt);
623 struct osd_device *osd = osd_obj2dev(obj);
624 struct osd_thandle *oh;
625 dmu_buf_t *zap_db = obj->oo_db;
626 char *name = (char *)key;
631 LASSERT(udmu_object_is_zap(obj->oo_db));
634 oh = container_of0(th, struct osd_thandle, ot_super);
637 * In Orion . and .. were stored in the directory (not generated upon
638 * request as now). we preserve them for backward compatibility
640 if (name[0] == '.') {
643 } else if (name[1] == '.' && name[2] == 0) {
648 /* Remove key from the ZAP */
649 rc = -zap_remove(osd->od_objset.os, zap_db->db_object,
650 (char *) key, oh->ot_tx);
652 #if LUSTRE_VERSION_CODE <= OBD_OCD_VERSION(2, 4, 53, 0)
653 if (unlikely(rc == -ENOENT && name[0] == '.' &&
654 (name[1] == 0 || (name[1] == '.' && name[2] == 0))))
657 if (unlikely(rc && rc != -ENOENT))
658 CERROR("%s: zap_remove failed: rc = %d\n", osd->od_svname, rc);
663 static struct dt_it *osd_dir_it_init(const struct lu_env *env,
664 struct dt_object *dt,
666 struct lustre_capa *capa)
668 struct osd_zap_it *it;
670 it = (struct osd_zap_it *)osd_index_it_init(env, dt, unused, capa);
674 RETURN((struct dt_it *)it);
678 * Move Iterator to record specified by \a key
680 * \param di osd iterator
681 * \param key key for index
683 * \retval +ve di points to record with least key not larger than key
684 * \retval 0 di points to exact matched key
685 * \retval -ve failure
687 static int osd_dir_it_get(const struct lu_env *env,
688 struct dt_it *di, const struct dt_key *key)
690 struct osd_zap_it *it = (struct osd_zap_it *)di;
691 struct osd_object *obj = it->ozi_obj;
692 struct osd_device *osd = osd_obj2dev(obj);
693 char *name = (char *)key;
700 udmu_zap_cursor_fini(it->ozi_zc);
702 if (udmu_zap_cursor_init(&it->ozi_zc, &osd->od_objset,
703 obj->oo_db->db_object, 0))
706 /* XXX: implementation of the API is broken at the moment */
707 LASSERT(((const char *)key)[0] == 0);
714 if (name[0] == '.') {
718 } else if (name[1] == '.' && name[2] == 0) {
724 /* neither . nor .. - some real record */
732 static void osd_dir_it_put(const struct lu_env *env, struct dt_it *di)
734 /* PBS: do nothing : ref are incremented at retrive and decreamented
739 * in Orion . and .. were stored in the directory, while ZPL
740 * and current osd-zfs generate them up on request. so, we
741 * need to ignore previously stored . and ..
743 static int osd_index_retrieve_skip_dots(struct osd_zap_it *it,
749 rc = -zap_cursor_retrieve(it->ozi_zc, za);
752 if (unlikely(rc == 0 && za->za_name[0] == '.')) {
753 if (za->za_name[1] == 0) {
755 } else if (za->za_name[1] == '.' &&
756 za->za_name[2] == 0) {
760 zap_cursor_advance(it->ozi_zc);
762 } while (unlikely(rc == 0 && isdot));
768 * to load a directory entry at a time and stored it in
769 * iterator's in-memory data structure.
771 * \param di, struct osd_it_ea, iterator's in memory structure
773 * \retval +ve, iterator reached to end
774 * \retval 0, iterator not reached to end
775 * \retval -ve, on error
777 static int osd_dir_it_next(const struct lu_env *env, struct dt_it *di)
779 struct osd_zap_it *it = (struct osd_zap_it *)di;
780 zap_attribute_t *za = &osd_oti_get(env)->oti_za;
783 /* temp. storage should be enough for any key supported by ZFS */
784 CLASSERT(sizeof(za->za_name) <= sizeof(it->ozi_name));
787 * the first ->next() moves the cursor to .
788 * the second ->next() moves the cursor to ..
789 * then we get to the real records and have to verify any exist
791 if (it->ozi_pos <= 2) {
797 zap_cursor_advance(it->ozi_zc);
800 * According to current API we need to return error if its last entry.
801 * zap_cursor_advance() does not return any value. So we need to call
802 * retrieve to check if there is any record. We should make
803 * changes to Iterator API to not return status for this API
805 rc = osd_index_retrieve_skip_dots(it, za);
807 if (rc == -ENOENT) /* end of dir */
813 static struct dt_key *osd_dir_it_key(const struct lu_env *env,
814 const struct dt_it *di)
816 struct osd_zap_it *it = (struct osd_zap_it *)di;
817 zap_attribute_t *za = &osd_oti_get(env)->oti_za;
821 if (it->ozi_pos <= 1) {
823 RETURN((struct dt_key *)".");
824 } else if (it->ozi_pos == 2) {
825 RETURN((struct dt_key *)"..");
828 if ((rc = -zap_cursor_retrieve(it->ozi_zc, za)))
831 strcpy(it->ozi_name, za->za_name);
833 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 3, 90, 0)
834 if (za->za_name[0] == '.') {
835 if (za->za_name[1] == 0 || (za->za_name[1] == '.' &&
836 za->za_name[2] == 0)) {
837 /* we should not get onto . and ..
838 * stored in the directory. ->next() and
839 * other methods should prevent this
846 RETURN((struct dt_key *)it->ozi_name);
849 static int osd_dir_it_key_size(const struct lu_env *env, const struct dt_it *di)
851 struct osd_zap_it *it = (struct osd_zap_it *)di;
852 zap_attribute_t *za = &osd_oti_get(env)->oti_za;
856 if (it->ozi_pos <= 1) {
859 } else if (it->ozi_pos == 2) {
863 if ((rc = -zap_cursor_retrieve(it->ozi_zc, za)) == 0)
864 rc = strlen(za->za_name);
866 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 3, 90, 0)
867 if (rc == 0 && za->za_name[0] == '.') {
868 if (za->za_name[1] == 0 || (za->za_name[1] == '.' &&
869 za->za_name[2] == 0)) {
870 /* we should not get onto . and ..
871 * stored in the directory. ->next() and
872 * other methods should prevent this
881 static int osd_dir_it_rec(const struct lu_env *env, const struct dt_it *di,
882 struct dt_rec *dtrec, __u32 attr)
884 struct osd_zap_it *it = (struct osd_zap_it *)di;
885 struct lu_dirent *lde = (struct lu_dirent *)dtrec;
886 struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
887 zap_attribute_t *za = &osd_oti_get(env)->oti_za;
891 if (it->ozi_pos <= 1) {
892 lde->lde_hash = cpu_to_le64(1);
893 strcpy(lde->lde_name, ".");
894 lde->lde_namelen = cpu_to_le16(1);
895 lde->lde_fid = *lu_object_fid(&it->ozi_obj->oo_dt.do_lu);
896 lde->lde_attrs = LUDA_FID;
897 /* append lustre attributes */
898 osd_it_append_attrs(lde, attr, 1, IFTODT(S_IFDIR));
899 lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(1, attr));
903 } else if (it->ozi_pos == 2) {
904 lde->lde_hash = cpu_to_le64(2);
905 strcpy(lde->lde_name, "..");
906 lde->lde_namelen = cpu_to_le16(2);
907 lde->lde_attrs = LUDA_FID;
908 /* append lustre attributes */
909 osd_it_append_attrs(lde, attr, 2, IFTODT(S_IFDIR));
910 lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(2, attr));
911 rc = osd_find_parent_fid(env, &it->ozi_obj->oo_dt, &lde->lde_fid);
913 * early Orion code was not setting LinkEA, so it's possible
914 * some setups still have objects with no LinkEA set.
915 * but at that time .. was a real record in the directory
916 * so we should try to lookup .. in ZAP
924 lde->lde_hash = cpu_to_le64(udmu_zap_cursor_serialize(it->ozi_zc));
926 if ((rc = -zap_cursor_retrieve(it->ozi_zc, za)))
929 namelen = strlen(za->za_name);
930 if (namelen > NAME_MAX)
931 GOTO(out, rc = -EOVERFLOW);
932 strcpy(lde->lde_name, za->za_name);
933 lde->lde_namelen = cpu_to_le16(namelen);
935 if (za->za_integer_length != 8 || za->za_num_integers < 3) {
936 CERROR("%s: unsupported direntry format: %d %d\n",
937 osd_obj2dev(it->ozi_obj)->od_svname,
938 za->za_integer_length, (int)za->za_num_integers);
940 GOTO(out, rc = -EIO);
943 rc = -zap_lookup(it->ozi_zc->zc_objset, it->ozi_zc->zc_zapobj,
944 za->za_name, za->za_integer_length, 3, zde);
948 lde->lde_fid = zde->lzd_fid;
949 lde->lde_attrs = LUDA_FID;
951 /* append lustre attributes */
952 osd_it_append_attrs(lde, attr, namelen, zde->lzd_reg.zde_type);
954 lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(namelen, attr));
960 static __u64 osd_dir_it_store(const struct lu_env *env, const struct dt_it *di)
962 struct osd_zap_it *it = (struct osd_zap_it *)di;
966 if (it->ozi_pos <= 2)
969 pos = udmu_zap_cursor_serialize(it->ozi_zc);
976 * rc == 0 -> end of directory.
977 * rc > 0 -> ok, proceed.
978 * rc < 0 -> error. ( EOVERFLOW can be masked.)
980 static int osd_dir_it_load(const struct lu_env *env,
981 const struct dt_it *di, __u64 hash)
983 struct osd_zap_it *it = (struct osd_zap_it *)di;
984 struct osd_object *obj = it->ozi_obj;
985 struct osd_device *osd = osd_obj2dev(obj);
986 zap_attribute_t *za = &osd_oti_get(env)->oti_za;
990 udmu_zap_cursor_fini(it->ozi_zc);
991 if (udmu_zap_cursor_init(&it->ozi_zc, &osd->od_objset,
992 obj->oo_db->db_object, hash))
1000 /* to return whether the end has been reached */
1001 rc = osd_index_retrieve_skip_dots(it, za);
1004 else if (rc == -ENOENT)
1011 static struct dt_index_operations osd_dir_ops = {
1012 .dio_lookup = osd_dir_lookup,
1013 .dio_declare_insert = osd_declare_dir_insert,
1014 .dio_insert = osd_dir_insert,
1015 .dio_declare_delete = osd_declare_dir_delete,
1016 .dio_delete = osd_dir_delete,
1018 .init = osd_dir_it_init,
1019 .fini = osd_index_it_fini,
1020 .get = osd_dir_it_get,
1021 .put = osd_dir_it_put,
1022 .next = osd_dir_it_next,
1023 .key = osd_dir_it_key,
1024 .key_size = osd_dir_it_key_size,
1025 .rec = osd_dir_it_rec,
1026 .store = osd_dir_it_store,
1027 .load = osd_dir_it_load
1032 * Primitives for index files using binary keys.
1033 * XXX: only 64-bit keys are supported for now.
1036 static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
1037 struct dt_rec *rec, const struct dt_key *key,
1038 struct lustre_capa *capa)
1040 struct osd_object *obj = osd_dt_obj(dt);
1041 struct osd_device *osd = osd_obj2dev(obj);
1045 rc = -zap_lookup_uint64(osd->od_objset.os, obj->oo_db->db_object,
1046 (const __u64 *)key, 1, 8, obj->oo_recsize,
1048 RETURN(rc == 0 ? 1 : rc);
1051 static int osd_declare_index_insert(const struct lu_env *env,
1052 struct dt_object *dt,
1053 const struct dt_rec *rec,
1054 const struct dt_key *key,
1057 struct osd_object *obj = osd_dt_obj(dt);
1058 struct osd_thandle *oh;
1061 LASSERT(th != NULL);
1062 oh = container_of0(th, struct osd_thandle, ot_super);
1064 LASSERT(obj->oo_db);
1066 dmu_tx_hold_bonus(oh->ot_tx, obj->oo_db->db_object);
1068 /* It is not clear what API should be used for binary keys, so we pass
1069 * a null name which has the side effect of over-reserving space,
1070 * accounting for the worst case. See zap_count_write() */
1071 dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, NULL);
1076 static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
1077 const struct dt_rec *rec, const struct dt_key *key,
1078 struct thandle *th, struct lustre_capa *capa,
1081 struct osd_object *obj = osd_dt_obj(dt);
1082 struct osd_device *osd = osd_obj2dev(obj);
1083 struct osd_thandle *oh;
1087 LASSERT(obj->oo_db);
1088 LASSERT(dt_object_exists(dt));
1089 LASSERT(osd_invariant(obj));
1090 LASSERT(th != NULL);
1092 oh = container_of0(th, struct osd_thandle, ot_super);
1094 /* Insert (key,oid) into ZAP */
1095 rc = -zap_add_uint64(osd->od_objset.os, obj->oo_db->db_object,
1096 (const __u64 *)key, 1, 8, obj->oo_recsize,
1097 (void *)rec, oh->ot_tx);
1101 static int osd_declare_index_delete(const struct lu_env *env,
1102 struct dt_object *dt,
1103 const struct dt_key *key,
1106 struct osd_object *obj = osd_dt_obj(dt);
1107 struct osd_thandle *oh;
1110 LASSERT(dt_object_exists(dt));
1111 LASSERT(osd_invariant(obj));
1112 LASSERT(th != NULL);
1113 LASSERT(obj->oo_db);
1115 oh = container_of0(th, struct osd_thandle, ot_super);
1116 dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, NULL);
1121 static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
1122 const struct dt_key *key, struct thandle *th,
1123 struct lustre_capa *capa)
1125 struct osd_object *obj = osd_dt_obj(dt);
1126 struct osd_device *osd = osd_obj2dev(obj);
1127 struct osd_thandle *oh;
1131 LASSERT(obj->oo_db);
1132 LASSERT(th != NULL);
1133 oh = container_of0(th, struct osd_thandle, ot_super);
1135 /* Remove binary key from the ZAP */
1136 rc = -zap_remove_uint64(osd->od_objset.os, obj->oo_db->db_object,
1137 (const __u64 *)key, 1, oh->ot_tx);
1141 static int osd_index_it_get(const struct lu_env *env, struct dt_it *di,
1142 const struct dt_key *key)
1144 struct osd_zap_it *it = (struct osd_zap_it *)di;
1145 struct osd_object *obj = it->ozi_obj;
1146 struct osd_device *osd = osd_obj2dev(obj);
1150 LASSERT(it->ozi_zc);
1152 /* XXX: API is broken at the moment */
1153 LASSERT(*((const __u64 *)key) == 0);
1155 zap_cursor_fini(it->ozi_zc);
1156 memset(it->ozi_zc, 0, sizeof(*it->ozi_zc));
1157 zap_cursor_init(it->ozi_zc, osd->od_objset.os, obj->oo_db->db_object);
1163 static int osd_index_it_next(const struct lu_env *env, struct dt_it *di)
1165 struct osd_zap_it *it = (struct osd_zap_it *)di;
1166 zap_attribute_t *za = &osd_oti_get(env)->oti_za;
1170 if (it->ozi_reset == 0)
1171 zap_cursor_advance(it->ozi_zc);
1175 * According to current API we need to return error if it's last entry.
1176 * zap_cursor_advance() does not return any value. So we need to call
1177 * retrieve to check if there is any record. We should make
1178 * changes to Iterator API to not return status for this API
1180 rc = -zap_cursor_retrieve(it->ozi_zc, za);
1187 static struct dt_key *osd_index_it_key(const struct lu_env *env,
1188 const struct dt_it *di)
1190 struct osd_zap_it *it = (struct osd_zap_it *)di;
1191 zap_attribute_t *za = &osd_oti_get(env)->oti_za;
1196 rc = -zap_cursor_retrieve(it->ozi_zc, za);
1198 RETURN(ERR_PTR(rc));
1200 /* the binary key is stored in the name */
1201 it->ozi_key = *((__u64 *)za->za_name);
1203 RETURN((struct dt_key *)&it->ozi_key);
1206 static int osd_index_it_key_size(const struct lu_env *env,
1207 const struct dt_it *di)
1209 /* we only support 64-bit binary keys for the time being */
1210 RETURN(sizeof(__u64));
1213 static int osd_index_it_rec(const struct lu_env *env, const struct dt_it *di,
1214 struct dt_rec *rec, __u32 attr)
1216 zap_attribute_t *za = &osd_oti_get(env)->oti_za;
1217 struct osd_zap_it *it = (struct osd_zap_it *)di;
1218 struct osd_object *obj = it->ozi_obj;
1219 struct osd_device *osd = osd_obj2dev(obj);
1224 rc = -zap_cursor_retrieve(it->ozi_zc, za);
1228 rc = -zap_lookup_uint64(osd->od_objset.os, obj->oo_db->db_object,
1229 (const __u64 *)za->za_name, 1, 8,
1230 obj->oo_recsize, (void *)rec);
1234 static __u64 osd_index_it_store(const struct lu_env *env,
1235 const struct dt_it *di)
1237 struct osd_zap_it *it = (struct osd_zap_it *)di;
1240 RETURN((__u64)zap_cursor_serialize(it->ozi_zc));
1243 static int osd_index_it_load(const struct lu_env *env, const struct dt_it *di,
1246 struct osd_zap_it *it = (struct osd_zap_it *)di;
1247 struct osd_object *obj = it->ozi_obj;
1248 struct osd_device *osd = osd_obj2dev(obj);
1249 zap_attribute_t *za = &osd_oti_get(env)->oti_za;
1253 /* close the current cursor */
1254 zap_cursor_fini(it->ozi_zc);
1256 /* create a new one starting at hash */
1257 memset(it->ozi_zc, 0, sizeof(*it->ozi_zc));
1258 zap_cursor_init_serialized(it->ozi_zc, osd->od_objset.os,
1259 obj->oo_db->db_object, hash);
1262 rc = -zap_cursor_retrieve(it->ozi_zc, za);
1265 else if (rc == -ENOENT)
1271 static struct dt_index_operations osd_index_ops = {
1272 .dio_lookup = osd_index_lookup,
1273 .dio_declare_insert = osd_declare_index_insert,
1274 .dio_insert = osd_index_insert,
1275 .dio_declare_delete = osd_declare_index_delete,
1276 .dio_delete = osd_index_delete,
1278 .init = osd_index_it_init,
1279 .fini = osd_index_it_fini,
1280 .get = osd_index_it_get,
1281 .put = osd_index_it_put,
1282 .next = osd_index_it_next,
1283 .key = osd_index_it_key,
1284 .key_size = osd_index_it_key_size,
1285 .rec = osd_index_it_rec,
1286 .store = osd_index_it_store,
1287 .load = osd_index_it_load
1291 int osd_index_try(const struct lu_env *env, struct dt_object *dt,
1292 const struct dt_index_features *feat)
1294 struct osd_object *obj = osd_dt_obj(dt);
1297 LASSERT(dt_object_exists(dt));
1300 * XXX: implement support for fixed-size keys sorted with natural
1301 * numerical way (not using internal hash value)
1303 if (feat->dif_flags & DT_IND_RANGE)
1306 if (unlikely(feat == &dt_otable_features))
1307 /* do not support oi scrub yet. */
1310 LASSERT(obj->oo_db != NULL);
1311 if (likely(feat == &dt_directory_features)) {
1312 if (udmu_object_is_zap(obj->oo_db))
1313 dt->do_index_ops = &osd_dir_ops;
1316 } else if (unlikely(feat == &dt_acct_features)) {
1317 LASSERT(fid_is_acct(lu_object_fid(&dt->do_lu)));
1318 dt->do_index_ops = &osd_acct_index_ops;
1319 } else if (udmu_object_is_zap(obj->oo_db) &&
1320 dt->do_index_ops == NULL) {
1321 /* For index file, we don't support variable key & record sizes
1322 * and the key has to be unique */
1323 if ((feat->dif_flags & ~DT_IND_UPDATE) != 0)
1326 /* Although the zap_*_uint64() primitives support large keys, we
1327 * limit ourselves to 64-bit keys for now */
1328 if (feat->dif_keysize_max != sizeof(__u64) ||
1329 feat->dif_keysize_min != sizeof(__u64))
1332 /* As for the record size, it should be a multiple of 8 bytes
1333 * and smaller than the maximum value length supported by ZAP.
1335 if (feat->dif_recsize_max > ZAP_MAXVALUELEN)
1337 if (feat->dif_recsize_max != feat->dif_recsize_min ||
1338 (feat->dif_recsize_max & (sizeof(__u64) - 1)))
1341 obj->oo_recsize = feat->dif_recsize_max / sizeof(__u64);
1342 dt->do_index_ops = &osd_index_ops;