From d00aa67e61944a59accfb43d277ad4dd8d85f16d Mon Sep 17 00:00:00 2001 From: Johann Lombardi Date: Wed, 12 Sep 2012 00:04:00 +0200 Subject: [PATCH] LU-1842 protocol: add support for OBD_IDX_READ This patch defines a new RPC format (namely OBD_IDX_READ) which allows to read the content of an index file via a bulk transfer. It is simlilar to MDS_READPAGE except that it is not tied to a specific key/record format (readdir relies on lu_dirent/lu_dirpage). Like readdir, key/record pairs are stored in a container of a fixed size (i.e. 4KB) regardless of the client & server page size. Signed-off-by: Johann Lombardi Change-Id: I34071ca05a3bd4e6c01bfe4fc533ab79bdd98b3c Reviewed-on: http://review.whamcloud.com/3942 Tested-by: Hudson Reviewed-by: Fan Yong Tested-by: Maloo Reviewed-by: Niu Yawei Reviewed-by: Oleg Drokin --- lustre/include/dt_object.h | 9 ++ lustre/include/lustre/lustre_idl.h | 88 +++++++++- lustre/include/lustre_req_layout.h | 2 + lustre/include/obd_support.h | 1 + lustre/mdd/mdd_object.c | 153 ++++-------------- lustre/mdt/mdt_handler.c | 92 ++++++++++- lustre/obdclass/dt_object.c | 320 +++++++++++++++++++++++++++++++++++++ lustre/ptlrpc/layout.c | 22 +++ lustre/ptlrpc/lproc_ptlrpc.c | 1 + lustre/ptlrpc/pack_generic.c | 14 ++ lustre/ptlrpc/sec.c | 1 + lustre/ptlrpc/wiretest.c | 98 +++++++++++- lustre/utils/req-layout.c | 1 + lustre/utils/wirecheck.c | 40 +++++ lustre/utils/wiretest.c | 98 +++++++++++- 15 files changed, 817 insertions(+), 123 deletions(-) diff --git a/lustre/include/dt_object.h b/lustre/include/dt_object.h index a16c981..a199087 100644 --- a/lustre/include/dt_object.h +++ b/lustre/include/dt_object.h @@ -869,6 +869,15 @@ int dt_record_read(const struct lu_env *env, struct dt_object *dt, struct lu_buf *buf, loff_t *pos); int dt_record_write(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, loff_t *pos, struct thandle *th); +typedef int (*dt_index_page_build_t)(const struct lu_env *env, + union lu_page *lp, int nob, + const struct dt_it_ops *iops, + struct dt_it *it, __u32 attr, void *arg); +int dt_index_walk(const struct lu_env *env, struct dt_object *obj, + const struct lu_rdpg *rdpg, dt_index_page_build_t filler, + void *arg); +int dt_index_read(const struct lu_env *env, struct dt_device *dev, + struct idx_info *ii, const struct lu_rdpg *rdpg); static inline struct thandle *dt_trans_create(const struct lu_env *env, struct dt_device *d) diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index 93c4040..6416808 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -952,7 +952,7 @@ static inline int lu_dirent_size(struct lu_dirent *ent) #define LU_PAGE_SIZE (1UL << LU_PAGE_SHIFT) #define LU_PAGE_MASK (~(LU_PAGE_SIZE - 1)) -#define LU_PAGE_COUNT 1 << (CFS_PAGE_SHIFT - LU_PAGE_SHIFT) +#define LU_PAGE_COUNT (1 << (CFS_PAGE_SHIFT - LU_PAGE_SHIFT)) /** @} lu_dir */ @@ -2622,6 +2622,7 @@ typedef enum { OBD_PING = 400, OBD_LOG_CANCEL, OBD_QC_CALLBACK, + OBD_IDX_READ, OBD_LAST_OPC } obd_cmd_t; #define OBD_FIRST_OPC OBD_PING @@ -2977,6 +2978,91 @@ void dump_obdo(struct obdo *oa); void dump_ost_body(struct ost_body *ob); void dump_rcs(__u32 *rc); +#define IDX_INFO_MAGIC 0x3D37CC37 + +/* Index file transfer through the network. The server serializes the index into + * a byte stream which is sent to the client via a bulk transfer */ +struct idx_info { + __u32 ii_magic; + + /* reply: see idx_info_flags below */ + __u32 ii_flags; + + /* request & reply: number of lu_idxpage (to be) transferred */ + __u16 ii_count; + __u16 ii_pad0; + + /* request: requested attributes passed down to the iterator API */ + __u32 ii_attrs; + + /* request & reply: index file identifier (FID) */ + struct lu_fid ii_fid; + + /* reply: version of the index file before starting to walk the index. + * Please note that the version can be modified at any time during the + * transfer */ + __u64 ii_version; + + /* request: hash to start with: + * reply: hash of the first entry of the first lu_idxpage and hash + * of the entry to read next if any */ + __u64 ii_hash_start; + __u64 ii_hash_end; + + /* reply: size of keys in lu_idxpages, minimal one if II_FL_VARKEY is + * set */ + __u16 ii_keysize; + + /* reply: size of records in lu_idxpages, minimal one if II_FL_VARREC + * is set */ + __u16 ii_recsize; + + __u32 ii_pad1; + __u64 ii_pad2; + __u64 ii_pad3; +}; +extern void lustre_swab_idx_info(struct idx_info *ii); + +#define II_END_OFF MDS_DIR_END_OFF /* all entries have been read */ + +/* List of flags used in idx_info::ii_flags */ +enum idx_info_flags { + II_FL_NOHASH = 1 << 0, /* client doesn't care about hash value */ + II_FL_VARKEY = 1 << 1, /* keys can be of variable size */ + II_FL_VARREC = 1 << 2, /* records can be of variable size */ + II_FL_NONUNQ = 1 << 3, /* index supports non-unique keys */ +}; + +#define LIP_MAGIC 0x8A6D6B6C + +/* 4KB (= LU_PAGE_SIZE) container gathering key/record pairs */ +struct lu_idxpage { + /* 16-byte header */ + __u32 lip_magic; + __u16 lip_flags; + __u16 lip_nr; /* number of entries in the container */ + __u64 lip_pad0; /* additional padding for future use */ + + /* key/record pairs are stored in the remaining 4080 bytes. + * depending upon the flags in idx_info::ii_flags, each key/record + * pair might be preceded by: + * - a hash value + * - the key size (II_FL_VARKEY is set) + * - the record size (II_FL_VARREC is set) + * + * For the time being, we only support fixed-size key & record. */ + char lip_entries[0]; +}; + +#define LIP_HDR_SIZE (offsetof(struct lu_idxpage, lip_entries)) + +/* Gather all possible type associated with a 4KB container */ +union lu_page { + struct lu_dirpage lp_dir; /* for MDS_READPAGE */ + struct lu_idxpage lp_idx; /* for OBD_IDX_READ */ + char lp_array[LU_PAGE_SIZE]; +}; + /* this will be used when OBD_CONNECT_CHANGE_QS is set */ struct qunit_data { /** diff --git a/lustre/include/lustre_req_layout.h b/lustre/include/lustre_req_layout.h index f2ec5dd..6a7e265 100644 --- a/lustre/include/lustre_req_layout.h +++ b/lustre/include/lustre_req_layout.h @@ -138,6 +138,7 @@ void req_layout_fini(void); extern struct req_format RQF_OBD_PING; extern struct req_format RQF_OBD_SET_INFO; extern struct req_format RQF_SEC_CTX; +extern struct req_format RQF_OBD_IDX_READ; /* MGS req_format */ extern struct req_format RQF_MGS_TARGET_REG; extern struct req_format RQF_MGS_SET_INFO; @@ -241,6 +242,7 @@ extern struct req_msg_field RMF_SETINFO_KEY; extern struct req_msg_field RMF_GETINFO_VAL; extern struct req_msg_field RMF_GETINFO_VALLEN; extern struct req_msg_field RMF_GETINFO_KEY; +extern struct req_msg_field RMF_IDX_INFO; /* * connection handle received in MDS_CONNECT request. diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h index f87ecab..13ca2c0 100644 --- a/lustre/include/obd_support.h +++ b/lustre/include/obd_support.h @@ -374,6 +374,7 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type, #define OBD_FAIL_OBD_DQACQ 0x604 #define OBD_FAIL_OBD_LLOG_SETUP 0x605 #define OBD_FAIL_OBD_LOG_CANCEL_REP 0x606 +#define OBD_FAIL_OBD_IDX_READ_NET 0x607 #define OBD_FAIL_TGT_REPLY_NET 0x700 #define OBD_FAIL_TGT_CONN_RACE 0x701 diff --git a/lustre/mdd/mdd_object.c b/lustre/mdd/mdd_object.c index 05d7b24..1b9efeb 100644 --- a/lustre/mdd/mdd_object.c +++ b/lustre/mdd/mdd_object.c @@ -2599,17 +2599,17 @@ static int mdd_readpage_sanity_check(const struct lu_env *env, RETURN(rc); } -static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd, - struct lu_dirpage *dp, int nob, - const struct dt_it_ops *iops, struct dt_it *it, - __u32 attr) -{ - void *area = dp; - int result; - __u64 hash = 0; - struct lu_dirent *ent; - struct lu_dirent *last = NULL; - int first = 1; +static int mdd_dir_page_build(const struct lu_env *env, union lu_page *lp, + int nob, const struct dt_it_ops *iops, + struct dt_it *it, __u32 attr, void *arg) +{ + struct lu_dirpage *dp = &lp->lp_dir; + void *area = dp; + int result; + __u64 hash = 0; + struct lu_dirent *ent; + struct lu_dirent *last = NULL; + int first = 1; memset(area, 0, sizeof (*dp)); area += sizeof (*dp); @@ -2666,115 +2666,14 @@ out: dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE); last->lde_reclen = 0; /* end mark */ } + if (result > 0) + /* end of directory */ + dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF); + if (result < 0) + CWARN("build page failed: %d!\n", result); return result; } -static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj, - const struct lu_rdpg *rdpg) -{ - struct dt_it *it; - struct dt_object *next = mdd_object_child(obj); - const struct dt_it_ops *iops; - struct page *pg; - struct mdd_device *mdd = mdo2mdd(&obj->mod_obj); - int i; - int nlupgs = 0; - int rc; - int nob; - - LASSERT(rdpg->rp_pages != NULL); - LASSERT(next->do_index_ops != NULL); - - if (rdpg->rp_count <= 0) - return -EFAULT; - - /* - * iterate through directory and fill pages from @rdpg - */ - iops = &next->do_index_ops->dio_it; - it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj)); - if (IS_ERR(it)) - return PTR_ERR(it); - - rc = iops->load(env, it, rdpg->rp_hash); - - if (rc == 0) { - /* - * Iterator didn't find record with exactly the key requested. - * - * It is currently either - * - * - positioned above record with key less than - * requested---skip it. - * - * - or not positioned at all (is in IAM_IT_SKEWED - * state)---position it on the next item. - */ - rc = iops->next(env, it); - } else if (rc > 0) - rc = 0; - - /* - * At this point and across for-loop: - * - * rc == 0 -> ok, proceed. - * rc > 0 -> end of directory. - * rc < 0 -> error. - */ - for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0; - i++, nob -= CFS_PAGE_SIZE) { - struct lu_dirpage *dp; - - LASSERT(i < rdpg->rp_npages); - pg = rdpg->rp_pages[i]; - dp = cfs_kmap(pg); -#if CFS_PAGE_SIZE > LU_PAGE_SIZE -repeat: -#endif - rc = mdd_dir_page_build(env, mdd, dp, - min_t(int, nob, LU_PAGE_SIZE), - iops, it, rdpg->rp_attrs); - if (rc > 0) { - /* - * end of directory. - */ - dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF); - nlupgs++; - } else if (rc < 0) { - CWARN("build page failed: %d!\n", rc); - } else { - nlupgs++; -#if CFS_PAGE_SIZE > LU_PAGE_SIZE - dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE); - if ((unsigned long)dp & ~CFS_PAGE_MASK) - goto repeat; -#endif - } - cfs_kunmap(pg); - } - if (rc >= 0) { - struct lu_dirpage *dp; - - dp = cfs_kmap(rdpg->rp_pages[0]); - dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash); - if (nlupgs == 0) { - /* - * No pages were processed, mark this for first page - * and send back. - */ - dp->ldp_flags = cpu_to_le32(LDF_EMPTY); - nlupgs = 1; - } - cfs_kunmap(rdpg->rp_pages[0]); - - rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count); - } - iops->put(env, it); - iops->fini(env, it); - - return rc; -} - int mdd_readpage(const struct lu_env *env, struct md_object *obj, const struct lu_rdpg *rdpg) { @@ -2818,9 +2717,25 @@ int mdd_readpage(const struct lu_env *env, struct md_object *obj, GOTO(out_unlock, rc = LU_PAGE_SIZE); } - rc = __mdd_readpage(env, mdd_obj, rdpg); + rc = dt_index_walk(env, mdd_object_child(mdd_obj), rdpg, + mdd_dir_page_build, NULL); + if (rc >= 0) { + struct lu_dirpage *dp; + + dp = cfs_kmap(rdpg->rp_pages[0]); + dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash); + if (rc == 0) { + /* + * No pages were processed, mark this for first page + * and send back. + */ + dp->ldp_flags = cpu_to_le32(LDF_EMPTY); + rc = min_t(unsigned int, LU_PAGE_SIZE, rdpg->rp_count); + } + cfs_kunmap(rdpg->rp_pages[0]); + } - EXIT; + GOTO(out_unlock, rc); out_unlock: mdd_read_unlock(env, mdd_obj); return rc; diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index c43b0c8..65c6c10 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -1959,6 +1959,89 @@ static int mdt_obd_ping(struct mdt_thread_info *info) RETURN(rc); } +/* + * OBD_IDX_READ handler + */ +static int mdt_obd_idx_read(struct mdt_thread_info *info) +{ + struct mdt_device *mdt = info->mti_mdt; + struct lu_rdpg *rdpg = &info->mti_u.rdpg.mti_rdpg; + struct idx_info *req_ii, *rep_ii; + int rc, i; + ENTRY; + + memset(rdpg, 0, sizeof(*rdpg)); + req_capsule_set(info->mti_pill, &RQF_OBD_IDX_READ); + + /* extract idx_info buffer from request & reply */ + req_ii = req_capsule_client_get(info->mti_pill, &RMF_IDX_INFO); + if (req_ii == NULL || req_ii->ii_magic != IDX_INFO_MAGIC) + RETURN(err_serious(-EPROTO)); + + rc = req_capsule_server_pack(info->mti_pill); + if (rc) + RETURN(err_serious(rc)); + + rep_ii = req_capsule_server_get(info->mti_pill, &RMF_IDX_INFO); + if (rep_ii == NULL) + RETURN(err_serious(-EFAULT)); + rep_ii->ii_magic = IDX_INFO_MAGIC; + + /* extract hash to start with */ + rdpg->rp_hash = req_ii->ii_hash_start; + + /* extract requested attributes */ + rdpg->rp_attrs = req_ii->ii_attrs; + + /* check that fid packed in request is valid and supported */ + if (!fid_is_sane(&req_ii->ii_fid)) + RETURN(-EINVAL); + rep_ii->ii_fid = req_ii->ii_fid; + + /* copy flags */ + rep_ii->ii_flags = req_ii->ii_flags; + + /* compute number of pages to allocate, ii_count is the number of 4KB + * containers */ + if (req_ii->ii_count <= 0) + GOTO(out, rc = -EFAULT); + rdpg->rp_count = min_t(unsigned int, req_ii->ii_count << LU_PAGE_SHIFT, + PTLRPC_MAX_BRW_SIZE); + rdpg->rp_npages = (rdpg->rp_count + CFS_PAGE_SIZE -1) >> CFS_PAGE_SHIFT; + + /* allocate pages to store the containers */ + OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof(rdpg->rp_pages[0])); + if (rdpg->rp_pages == NULL) + GOTO(out, rc = -ENOMEM); + for (i = 0; i < rdpg->rp_npages; i++) { + rdpg->rp_pages[i] = cfs_alloc_page(CFS_ALLOC_STD); + if (rdpg->rp_pages[i] == NULL) + GOTO(out, rc = -ENOMEM); + } + + /* populate pages with key/record pairs */ + rc = dt_index_read(info->mti_env, mdt->mdt_bottom, rep_ii, rdpg); + if (rc < 0) + GOTO(out, rc); + + LASSERTF(rc <= rdpg->rp_count, "dt_index_read() returned more than " + "asked %d > %d\n", rc, rdpg->rp_count); + + /* send pages to client */ + rc = mdt_sendpage(info, rdpg, rc); + + GOTO(out, rc); +out: + if (rdpg->rp_pages) { + for (i = 0; i < rdpg->rp_npages; i++) + if (rdpg->rp_pages[i]) + cfs_free_page(rdpg->rp_pages[i]); + OBD_FREE(rdpg->rp_pages, + rdpg->rp_npages * sizeof(rdpg->rp_pages[0])); + } + return rc; +} + static int mdt_obd_log_cancel(struct mdt_thread_info *info) { return err_serious(-EOPNOTSUPP); @@ -2970,6 +3053,7 @@ static int mdt_msg_check_version(struct lustre_msg *msg) case SEC_CTX_INIT: case SEC_CTX_INIT_CONT: case SEC_CTX_FINI: + case OBD_IDX_READ: rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION); if (rc) CERROR("bad opc %u version %08x, expecting %08x\n", @@ -6138,7 +6222,8 @@ DEF_MDT_HNDL_F(0, QUOTACTL, mdt_quotactl_handle) static struct mdt_handler mdt_obd_ops[] = { DEF_OBD_HNDL(0, PING, mdt_obd_ping), DEF_OBD_HNDL(0, LOG_CANCEL, mdt_obd_log_cancel), - DEF_OBD_HNDL(0, QC_CALLBACK, mdt_obd_qc_callback) + DEF_OBD_HNDL(0, QC_CALLBACK, mdt_obd_qc_callback), + DEF_OBD_HNDL(0, IDX_READ, mdt_obd_idx_read) }; #define DEF_DLM_HNDL_0(flags, name, fn) \ @@ -6229,6 +6314,11 @@ static struct mdt_opc_slice mdt_readpage_handlers[] = { .mos_opc_end = MDS_LAST_OPC, .mos_hs = mdt_readpage_ops }, + { + .mos_opc_start = OBD_FIRST_OPC, + .mos_opc_end = OBD_LAST_OPC, + .mos_hs = mdt_obd_ops + }, { .mos_hs = NULL } diff --git a/lustre/obdclass/dt_object.c b/lustre/obdclass/dt_object.c index 11a8984..51305b5 100644 --- a/lustre/obdclass/dt_object.c +++ b/lustre/obdclass/dt_object.c @@ -598,3 +598,323 @@ const struct dt_index_features dt_quota_slv_features = { .dif_ptrsize = 4 }; EXPORT_SYMBOL(dt_quota_slv_features); + +/* helper function returning what dt_index_features structure should be used + * based on the FID sequence. This is used by OBD_IDX_READ RPC */ +static inline const struct dt_index_features *dt_index_feat_select(__u64 seq, + __u32 mode) +{ + if (seq == FID_SEQ_QUOTA_GLB) { + /* global quota index */ + if (!S_ISREG(mode)) + /* global quota index should be a regular file */ + return ERR_PTR(-ENOENT); + return &dt_quota_glb_features; + } else if (seq == FID_SEQ_QUOTA) { + /* quota slave index */ + if (!S_ISREG(mode)) + /* slave index should be a regular file */ + return ERR_PTR(-ENOENT); + return &dt_quota_slv_features; + } else if (seq >= FID_SEQ_NORMAL) { + /* object is part of the namespace, verify that it is a + * directory */ + if (!S_ISDIR(mode)) + /* sorry, we can only deal with directory */ + return ERR_PTR(-ENOTDIR); + return &dt_directory_features; + } + + return ERR_PTR(-EOPNOTSUPP); +} + +/* + * Fill a lu_idxpage with key/record pairs read for transfer via OBD_IDX_READ + * RPC + * + * \param env - is the environment passed by the caller + * \param lp - is a pointer to the lu_page to fill + * \param nob - is the maximum number of bytes that should be copied + * \param iops - is the index operation vector associated with the index object + * \param it - is a pointer to the current iterator + * \param attr - is the index attribute to pass to iops->rec() + * \param arg - is a pointer to the idx_info structure + */ +static int dt_index_page_build(const struct lu_env *env, union lu_page *lp, + int nob, const struct dt_it_ops *iops, + struct dt_it *it, __u32 attr, void *arg) +{ + struct idx_info *ii = (struct idx_info *)arg; + struct lu_idxpage *lip = &lp->lp_idx; + char *entry; + int rc, size; + ENTRY; + + /* no support for variable key & record size for now */ + LASSERT((ii->ii_flags & II_FL_VARKEY) == 0); + LASSERT((ii->ii_flags & II_FL_VARREC) == 0); + + /* initialize the header of the new container */ + memset(lip, 0, LIP_HDR_SIZE); + lip->lip_magic = LIP_MAGIC; + nob -= LIP_HDR_SIZE; + + /* compute size needed to store a key/record pair */ + size = ii->ii_recsize + ii->ii_keysize; + if ((ii->ii_flags & II_FL_NOHASH) == 0) + /* add hash if the client wants it */ + size += sizeof(__u64); + + entry = lip->lip_entries; + do { + char *tmp_entry = entry; + struct dt_key *key; + __u64 hash; + + /* fetch 64-bit hash value */ + hash = iops->store(env, it); + ii->ii_hash_end = hash; + + if (nob < size) { + if (lip->lip_nr == 0) + GOTO(out, rc = -EINVAL); + GOTO(out, rc = 0); + } + + if ((ii->ii_flags & II_FL_NOHASH) == 0) { + /* client wants to the 64-bit hash value associated with + * each record */ + memcpy(tmp_entry, &hash, sizeof(hash)); + tmp_entry += sizeof(hash); + } + + /* then the key value */ + LASSERT(iops->key_size(env, it) == ii->ii_keysize); + key = iops->key(env, it); + memcpy(tmp_entry, key, ii->ii_keysize); + tmp_entry += ii->ii_keysize; + + /* and finally the record */ + rc = iops->rec(env, it, (struct dt_rec *)tmp_entry, attr); + if (rc != -ESTALE) { + if (rc != 0) + GOTO(out, rc); + + /* hash/key/record successfully copied! */ + lip->lip_nr++; + if (unlikely(lip->lip_nr == 1 && ii->ii_count == 0)) + ii->ii_hash_start = hash; + entry = tmp_entry + ii->ii_recsize; + nob -= size; + } + + /* move on to the next record */ + do { + rc = iops->next(env, it); + } while (rc == -ESTALE); + + } while (rc == 0); + + GOTO(out, rc); +out: + if (rc >= 0 && lip->lip_nr > 0) + /* one more container */ + ii->ii_count++; + if (rc > 0) + /* no more entries */ + ii->ii_hash_end = II_END_OFF; + return rc; +} + +/* + * Walk index and fill lu_page containers with key/record pairs + * + * \param env - is the environment passed by the caller + * \param obj - is the index object to parse + * \param rdpg - is the lu_rdpg descriptor associated with the transfer + * \param filler - is the callback function responsible for filling a lu_page + * with key/record pairs in the format wanted by the caller + * \param arg - is an opaq argument passed to the filler function + * + * \retval sum (in bytes) of all filled lu_pages + * \retval -ve errno on failure + */ +int dt_index_walk(const struct lu_env *env, struct dt_object *obj, + const struct lu_rdpg *rdpg, dt_index_page_build_t filler, + void *arg) +{ + struct dt_it *it; + const struct dt_it_ops *iops; + unsigned int pageidx, nob, nlupgs = 0; + int rc; + ENTRY; + + LASSERT(rdpg->rp_pages != NULL); + LASSERT(obj->do_index_ops != NULL); + + nob = rdpg->rp_count; + if (nob <= 0) + RETURN(-EFAULT); + + /* Iterate through index and fill containers from @rdpg */ + iops = &obj->do_index_ops->dio_it; + LASSERT(iops != NULL); + it = iops->init(env, obj, rdpg->rp_attrs, BYPASS_CAPA); + if (IS_ERR(it)) + RETURN(PTR_ERR(it)); + + rc = iops->load(env, it, rdpg->rp_hash); + if (rc == 0) { + /* + * Iterator didn't find record with exactly the key requested. + * + * It is currently either + * + * - positioned above record with key less than + * requested---skip it. + * - or not positioned at all (is in IAM_IT_SKEWED + * state)---position it on the next item. + */ + rc = iops->next(env, it); + } else if (rc > 0) { + rc = 0; + } + + /* Fill containers one after the other. There might be multiple + * containers per physical page. + * + * At this point and across for-loop: + * rc == 0 -> ok, proceed. + * rc > 0 -> end of index. + * rc < 0 -> error. */ + for (pageidx = 0; rc == 0 && nob > 0; pageidx++) { + union lu_page *lp; + int i; + + LASSERT(pageidx < rdpg->rp_npages); + lp = cfs_kmap(rdpg->rp_pages[pageidx]); + + /* fill lu pages */ + for (i = 0; i < LU_PAGE_COUNT; i++, lp++, nob -= LU_PAGE_SIZE) { + rc = filler(env, lp, min_t(int, nob, LU_PAGE_SIZE), + iops, it, rdpg->rp_attrs, arg); + if (rc < 0) + break; + /* one more lu_page */ + nlupgs++; + if (rc > 0) + /* end of index */ + break; + } + cfs_kunmap(rdpg->rp_pages[i]); + } + + iops->put(env, it); + iops->fini(env, it); + + if (rc >= 0) + rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count); + + RETURN(rc); +} +EXPORT_SYMBOL(dt_index_walk); + +/** + * Walk key/record pairs of an index and copy them into 4KB containers to be + * transferred over the network. This is the common handler for OBD_IDX_READ + * RPC processing. + * + * \param env - is the environment passed by the caller + * \param dev - is the dt_device storing the index + * \param ii - is the idx_info structure packed by the client in the + * OBD_IDX_READ request + * \param rdpg - is the lu_rdpg descriptor + * + * \retval on success, return sum (in bytes) of all filled containers + * \retval appropriate error otherwise. + */ +int dt_index_read(const struct lu_env *env, struct dt_device *dev, + struct idx_info *ii, const struct lu_rdpg *rdpg) +{ + const struct dt_index_features *feat; + struct dt_object *obj; + int rc; + ENTRY; + + /* rp_count shouldn't be null and should be a multiple of the container + * size */ + if (rdpg->rp_count <= 0 && (rdpg->rp_count & (LU_PAGE_SIZE - 1)) != 0) + RETURN(-EFAULT); + + if (fid_seq(&ii->ii_fid) < FID_SEQ_SPECIAL) + /* block access to local files */ + RETURN(-EPERM); + + if (fid_seq(&ii->ii_fid) >= FID_SEQ_NORMAL) + /* we don't support directory transfer via OBD_IDX_READ for the + * time being */ + RETURN(-EOPNOTSUPP); + + /* lookup index object subject to the transfer */ + obj = dt_locate(env, dev, &ii->ii_fid); + if (IS_ERR(obj)) + RETURN(PTR_ERR(obj)); + if (dt_object_exists(obj) == 0) + GOTO(out, rc = -ENOENT); + + /* fetch index features associated with index object */ + feat = dt_index_feat_select(fid_seq(&ii->ii_fid), + lu_object_attr(&obj->do_lu)); + if (IS_ERR(feat)) + GOTO(out, rc = PTR_ERR(feat)); + + /* load index feature if not done already */ + if (obj->do_index_ops == NULL) { + rc = obj->do_ops->do_index_try(env, obj, feat); + if (rc) + GOTO(out, rc); + } + + /* fill ii_flags with supported index features */ + ii->ii_flags &= II_FL_NOHASH; + + ii->ii_keysize = feat->dif_keysize_max; + if ((feat->dif_flags & DT_IND_VARKEY) != 0) { + /* key size is variable */ + ii->ii_flags |= II_FL_VARKEY; + /* we don't support variable key size for the time being */ + GOTO(out, rc = -EOPNOTSUPP); + } + + ii->ii_recsize = feat->dif_recsize_max; + if ((feat->dif_flags & DT_IND_VARREC) != 0) { + /* record size is variable */ + ii->ii_flags |= II_FL_VARREC; + /* we don't support variable record size for the time being */ + GOTO(out, rc = -EOPNOTSUPP); + } + + if ((feat->dif_flags & DT_IND_NONUNQ) != 0) + /* key isn't necessarily unique */ + ii->ii_flags |= II_FL_NONUNQ; + + dt_read_lock(env, obj, 0); + /* fetch object version before walking the index */ + ii->ii_version = dt_version_get(env, obj); + + /* walk the index and fill lu_idxpages with key/record pairs */ + rc = dt_index_walk(env, obj, rdpg, dt_index_page_build ,ii); + dt_read_unlock(env, obj); + + if (rc == 0) { + /* index is empty */ + LASSERT(ii->ii_count == 0); + ii->ii_hash_end = II_END_OFF; + } + + GOTO(out, rc); +out: + lu_object_put(env, &obj->do_lu); + return rc; +} +EXPORT_SYMBOL(dt_index_read); diff --git a/lustre/ptlrpc/layout.c b/lustre/ptlrpc/layout.c index 78f8751..0d0e106 100644 --- a/lustre/ptlrpc/layout.c +++ b/lustre/ptlrpc/layout.c @@ -497,6 +497,16 @@ static const struct req_msg_field *llog_origin_handle_next_block_server[] = { &RMF_EADATA }; +static const struct req_msg_field *obd_idx_read_client[] = { + &RMF_PTLRPC_BODY, + &RMF_IDX_INFO +}; + +static const struct req_msg_field *obd_idx_read_server[] = { + &RMF_PTLRPC_BODY, + &RMF_IDX_INFO +}; + static const struct req_msg_field *ost_body_only[] = { &RMF_PTLRPC_BODY, &RMF_OST_BODY @@ -564,6 +574,7 @@ static const struct req_msg_field *ost_get_fiemap_server[] = { static struct req_format *req_formats[] = { &RQF_OBD_PING, &RQF_OBD_SET_INFO, + &RQF_OBD_IDX_READ, &RQF_SEC_CTX, &RQF_MGS_TARGET_REG, &RQF_MGS_SET_INFO, @@ -966,6 +977,11 @@ struct req_msg_field RMF_FIEMAP_VAL = DEFINE_MSGF("fiemap", 0, -1, lustre_swab_fiemap, NULL); EXPORT_SYMBOL(RMF_FIEMAP_VAL); +struct req_msg_field RMF_IDX_INFO = + DEFINE_MSGF("idx_info", 0, sizeof(struct idx_info), + lustre_swab_idx_info, NULL); +EXPORT_SYMBOL(RMF_IDX_INFO); + /* * Request formats. */ @@ -1004,6 +1020,12 @@ struct req_format RQF_OBD_SET_INFO = DEFINE_REQ_FMT0("OBD_SET_INFO", obd_set_info_client, empty); EXPORT_SYMBOL(RQF_OBD_SET_INFO); +/* Read index file through the network */ +struct req_format RQF_OBD_IDX_READ = + DEFINE_REQ_FMT0("OBD_IDX_READ", + obd_idx_read_client, obd_idx_read_server); +EXPORT_SYMBOL(RQF_OBD_IDX_READ); + struct req_format RQF_SEC_CTX = DEFINE_REQ_FMT0("SEC_CTX", empty, empty); EXPORT_SYMBOL(RQF_SEC_CTX); diff --git a/lustre/ptlrpc/lproc_ptlrpc.c b/lustre/ptlrpc/lproc_ptlrpc.c index 50a981f..0b87bc9 100644 --- a/lustre/ptlrpc/lproc_ptlrpc.c +++ b/lustre/ptlrpc/lproc_ptlrpc.c @@ -111,6 +111,7 @@ struct ll_rpc_opcode { { OBD_PING, "obd_ping" }, { OBD_LOG_CANCEL, "llog_origin_handle_cancel" }, { OBD_QC_CALLBACK, "obd_quota_callback" }, + { OBD_IDX_READ, "dt_index_read" }, { LLOG_ORIGIN_HANDLE_CREATE, "llog_origin_handle_create" }, { LLOG_ORIGIN_HANDLE_NEXT_BLOCK, "llog_origin_handle_next_block" }, { LLOG_ORIGIN_HANDLE_READ_HEADER,"llog_origin_handle_read_header" }, diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index 98c53d9..dd46499 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -2023,6 +2023,20 @@ void lustre_swab_fiemap(struct ll_user_fiemap *fiemap) } EXPORT_SYMBOL(lustre_swab_fiemap); +void lustre_swab_idx_info(struct idx_info *ii) +{ + __swab32s(&ii->ii_magic); + __swab32s(&ii->ii_flags); + __swab16s(&ii->ii_count); + __swab32s(&ii->ii_attrs); + lustre_swab_lu_fid(&ii->ii_fid); + __swab64s(&ii->ii_version); + __swab64s(&ii->ii_hash_start); + __swab64s(&ii->ii_hash_end); + __swab16s(&ii->ii_keysize); + __swab16s(&ii->ii_recsize); +} + void lustre_swab_mdt_rec_reint (struct mdt_rec_reint *rr) { __swab32s (&rr->rr_opcode); diff --git a/lustre/ptlrpc/sec.c b/lustre/ptlrpc/sec.c index d876a22..315e99a 100644 --- a/lustre/ptlrpc/sec.c +++ b/lustre/ptlrpc/sec.c @@ -822,6 +822,7 @@ void sptlrpc_req_set_flavor(struct ptlrpc_request *req, int opcode) case OST_READ: case MDS_READPAGE: case MGS_CONFIG_READ: + case OBD_IDX_READ: req->rq_bulk_read = 1; break; case OST_WRITE: diff --git a/lustre/ptlrpc/wiretest.c b/lustre/ptlrpc/wiretest.c index 5f6766f..667b83a 100644 --- a/lustre/ptlrpc/wiretest.c +++ b/lustre/ptlrpc/wiretest.c @@ -346,7 +346,9 @@ void lustre_assert_wire_constants(void) (long long)OBD_LOG_CANCEL); LASSERTF(OBD_QC_CALLBACK == 402, "found %lld\n", (long long)OBD_QC_CALLBACK); - LASSERTF(OBD_LAST_OPC == 403, "found %lld\n", + LASSERTF(OBD_IDX_READ == 403, "found %lld\n", + (long long)OBD_IDX_READ); + LASSERTF(OBD_LAST_OPC == 404, "found %lld\n", (long long)OBD_LAST_OPC); LASSERTF(QUOTA_DQACQ == 601, "found %lld\n", (long long)QUOTA_DQACQ); @@ -575,6 +577,8 @@ void lustre_assert_wire_constants(void) (long long)LDF_COLLIDE); LASSERTF(LU_PAGE_SIZE == 4096, "found %lld\n", (long long)LU_PAGE_SIZE); + LASSERTF((int)sizeof(union lu_page) == 4096, "found %lld\n", + (long long)(int)sizeof(union lu_page)); /* Checks for struct lustre_handle */ LASSERTF((int)sizeof(struct lustre_handle) == 8, "found %lld\n", @@ -1674,6 +1678,98 @@ void lustre_assert_wire_constants(void) LASSERTF((int)sizeof(((struct lquota_slv_rec *)0)->qsr_granted) == 8, "found %lld\n", (long long)(int)sizeof(((struct lquota_slv_rec *)0)->qsr_granted)); + /* Checks for struct idx_info */ + LASSERTF((int)sizeof(struct idx_info) == 80, "found %lld\n", + (long long)(int)sizeof(struct idx_info)); + LASSERTF((int)offsetof(struct idx_info, ii_magic) == 0, "found %lld\n", + (long long)(int)offsetof(struct idx_info, ii_magic)); + LASSERTF((int)sizeof(((struct idx_info *)0)->ii_magic) == 4, "found %lld\n", + (long long)(int)sizeof(((struct idx_info *)0)->ii_magic)); + LASSERTF((int)offsetof(struct idx_info, ii_flags) == 4, "found %lld\n", + (long long)(int)offsetof(struct idx_info, ii_flags)); + LASSERTF((int)sizeof(((struct idx_info *)0)->ii_flags) == 4, "found %lld\n", + (long long)(int)sizeof(((struct idx_info *)0)->ii_flags)); + LASSERTF((int)offsetof(struct idx_info, ii_count) == 8, "found %lld\n", + (long long)(int)offsetof(struct idx_info, ii_count)); + LASSERTF((int)sizeof(((struct idx_info *)0)->ii_count) == 2, "found %lld\n", + (long long)(int)sizeof(((struct idx_info *)0)->ii_count)); + LASSERTF((int)offsetof(struct idx_info, ii_pad0) == 10, "found %lld\n", + (long long)(int)offsetof(struct idx_info, ii_pad0)); + LASSERTF((int)sizeof(((struct idx_info *)0)->ii_pad0) == 2, "found %lld\n", + (long long)(int)sizeof(((struct idx_info *)0)->ii_pad0)); + LASSERTF((int)offsetof(struct idx_info, ii_attrs) == 12, "found %lld\n", + (long long)(int)offsetof(struct idx_info, ii_attrs)); + LASSERTF((int)sizeof(((struct idx_info *)0)->ii_attrs) == 4, "found %lld\n", + (long long)(int)sizeof(((struct idx_info *)0)->ii_attrs)); + LASSERTF((int)offsetof(struct idx_info, ii_fid) == 16, "found %lld\n", + (long long)(int)offsetof(struct idx_info, ii_fid)); + LASSERTF((int)sizeof(((struct idx_info *)0)->ii_fid) == 16, "found %lld\n", + (long long)(int)sizeof(((struct idx_info *)0)->ii_fid)); + LASSERTF((int)offsetof(struct idx_info, ii_version) == 32, "found %lld\n", + (long long)(int)offsetof(struct idx_info, ii_version)); + LASSERTF((int)sizeof(((struct idx_info *)0)->ii_version) == 8, "found %lld\n", + (long long)(int)sizeof(((struct idx_info *)0)->ii_version)); + LASSERTF((int)offsetof(struct idx_info, ii_hash_start) == 40, "found %lld\n", + (long long)(int)offsetof(struct idx_info, ii_hash_start)); + LASSERTF((int)sizeof(((struct idx_info *)0)->ii_hash_start) == 8, "found %lld\n", + (long long)(int)sizeof(((struct idx_info *)0)->ii_hash_start)); + LASSERTF((int)offsetof(struct idx_info, ii_hash_end) == 48, "found %lld\n", + (long long)(int)offsetof(struct idx_info, ii_hash_end)); + LASSERTF((int)sizeof(((struct idx_info *)0)->ii_hash_end) == 8, "found %lld\n", + (long long)(int)sizeof(((struct idx_info *)0)->ii_hash_end)); + LASSERTF((int)offsetof(struct idx_info, ii_keysize) == 56, "found %lld\n", + (long long)(int)offsetof(struct idx_info, ii_keysize)); + LASSERTF((int)sizeof(((struct idx_info *)0)->ii_keysize) == 2, "found %lld\n", + (long long)(int)sizeof(((struct idx_info *)0)->ii_keysize)); + LASSERTF((int)offsetof(struct idx_info, ii_recsize) == 58, "found %lld\n", + (long long)(int)offsetof(struct idx_info, ii_recsize)); + LASSERTF((int)sizeof(((struct idx_info *)0)->ii_recsize) == 2, "found %lld\n", + (long long)(int)sizeof(((struct idx_info *)0)->ii_recsize)); + LASSERTF((int)offsetof(struct idx_info, ii_pad1) == 60, "found %lld\n", + (long long)(int)offsetof(struct idx_info, ii_pad1)); + LASSERTF((int)sizeof(((struct idx_info *)0)->ii_pad1) == 4, "found %lld\n", + (long long)(int)sizeof(((struct idx_info *)0)->ii_pad1)); + LASSERTF((int)offsetof(struct idx_info, ii_pad2) == 64, "found %lld\n", + (long long)(int)offsetof(struct idx_info, ii_pad2)); + LASSERTF((int)sizeof(((struct idx_info *)0)->ii_pad2) == 8, "found %lld\n", + (long long)(int)sizeof(((struct idx_info *)0)->ii_pad2)); + LASSERTF((int)offsetof(struct idx_info, ii_pad3) == 72, "found %lld\n", + (long long)(int)offsetof(struct idx_info, ii_pad3)); + LASSERTF((int)sizeof(((struct idx_info *)0)->ii_pad3) == 8, "found %lld\n", + (long long)(int)sizeof(((struct idx_info *)0)->ii_pad3)); + CLASSERT(IDX_INFO_MAGIC == 0x3D37CC37); + + /* Checks for struct lu_idxpage */ + LASSERTF((int)sizeof(struct lu_idxpage) == 16, "found %lld\n", + (long long)(int)sizeof(struct lu_idxpage)); + LASSERTF((int)offsetof(struct lu_idxpage, lip_magic) == 0, "found %lld\n", + (long long)(int)offsetof(struct lu_idxpage, lip_magic)); + LASSERTF((int)sizeof(((struct lu_idxpage *)0)->lip_magic) == 4, "found %lld\n", + (long long)(int)sizeof(((struct lu_idxpage *)0)->lip_magic)); + LASSERTF((int)offsetof(struct lu_idxpage, lip_flags) == 4, "found %lld\n", + (long long)(int)offsetof(struct lu_idxpage, lip_flags)); + LASSERTF((int)sizeof(((struct lu_idxpage *)0)->lip_flags) == 2, "found %lld\n", + (long long)(int)sizeof(((struct lu_idxpage *)0)->lip_flags)); + LASSERTF((int)offsetof(struct lu_idxpage, lip_nr) == 6, "found %lld\n", + (long long)(int)offsetof(struct lu_idxpage, lip_nr)); + LASSERTF((int)sizeof(((struct lu_idxpage *)0)->lip_nr) == 2, "found %lld\n", + (long long)(int)sizeof(((struct lu_idxpage *)0)->lip_nr)); + LASSERTF((int)offsetof(struct lu_idxpage, lip_pad0) == 8, "found %lld\n", + (long long)(int)offsetof(struct lu_idxpage, lip_pad0)); + LASSERTF((int)sizeof(((struct lu_idxpage *)0)->lip_pad0) == 8, "found %lld\n", + (long long)(int)sizeof(((struct lu_idxpage *)0)->lip_pad0)); + CLASSERT(LIP_MAGIC == 0x8A6D6B6C); + LASSERTF(LIP_HDR_SIZE == 16, "found %lld\n", + (long long)LIP_HDR_SIZE); + LASSERTF(II_FL_NOHASH == 1, "found %lld\n", + (long long)II_FL_NOHASH); + LASSERTF(II_FL_VARKEY == 2, "found %lld\n", + (long long)II_FL_VARKEY); + LASSERTF(II_FL_VARREC == 4, "found %lld\n", + (long long)II_FL_VARREC); + LASSERTF(II_FL_NONUNQ == 8, "found %lld\n", + (long long)II_FL_NONUNQ); + /* Checks for struct niobuf_remote */ LASSERTF((int)sizeof(struct niobuf_remote) == 16, "found %lld\n", (long long)(int)sizeof(struct niobuf_remote)); diff --git a/lustre/utils/req-layout.c b/lustre/utils/req-layout.c index 914253f..2c1264d 100644 --- a/lustre/utils/req-layout.c +++ b/lustre/utils/req-layout.c @@ -74,6 +74,7 @@ #define lustre_swab_ost_body NULL #define lustre_swab_ost_last_id NULL #define lustre_swab_fiemap NULL +#define lustre_swab_idx_info NULL #define lustre_swab_qdata NULL #define lustre_swab_quota_body NULL #define lustre_swab_lvb NULL diff --git a/lustre/utils/wirecheck.c b/lustre/utils/wirecheck.c index b8d2ef3..2910754 100644 --- a/lustre/utils/wirecheck.c +++ b/lustre/utils/wirecheck.c @@ -291,6 +291,7 @@ check_lu_dirpage(void) CHECK_VALUE(LDF_EMPTY); CHECK_VALUE(LDF_COLLIDE); CHECK_VALUE(LU_PAGE_SIZE); + CHECK_UNION(lu_page); } static void @@ -757,6 +758,43 @@ check_obd_quotactl(void) } static void +check_obd_idx_read(void) +{ + BLANK_LINE(); + CHECK_STRUCT(idx_info); + CHECK_MEMBER(idx_info, ii_magic); + CHECK_MEMBER(idx_info, ii_flags); + CHECK_MEMBER(idx_info, ii_count); + CHECK_MEMBER(idx_info, ii_pad0); + CHECK_MEMBER(idx_info, ii_attrs); + CHECK_MEMBER(idx_info, ii_fid); + CHECK_MEMBER(idx_info, ii_version); + CHECK_MEMBER(idx_info, ii_hash_start); + CHECK_MEMBER(idx_info, ii_hash_end); + CHECK_MEMBER(idx_info, ii_keysize); + CHECK_MEMBER(idx_info, ii_recsize); + CHECK_MEMBER(idx_info, ii_pad1); + CHECK_MEMBER(idx_info, ii_pad2); + CHECK_MEMBER(idx_info, ii_pad3); + CHECK_CDEFINE(IDX_INFO_MAGIC); + + BLANK_LINE(); + CHECK_STRUCT(lu_idxpage); + CHECK_MEMBER(lu_idxpage, lip_magic); + CHECK_MEMBER(lu_idxpage, lip_flags); + CHECK_MEMBER(lu_idxpage, lip_nr); + CHECK_MEMBER(lu_idxpage, lip_pad0); + + CHECK_CDEFINE(LIP_MAGIC); + CHECK_VALUE(LIP_HDR_SIZE); + + CHECK_VALUE(II_FL_NOHASH); + CHECK_VALUE(II_FL_VARKEY); + CHECK_VALUE(II_FL_VARREC); + CHECK_VALUE(II_FL_NONUNQ); +} + +static void check_niobuf_remote(void) { BLANK_LINE(); @@ -2042,6 +2080,7 @@ main(int argc, char **argv) CHECK_VALUE(OBD_PING); CHECK_VALUE(OBD_LOG_CANCEL); CHECK_VALUE(OBD_QC_CALLBACK); + CHECK_VALUE(OBD_IDX_READ); CHECK_VALUE(OBD_LAST_OPC); CHECK_VALUE(QUOTA_DQACQ); @@ -2081,6 +2120,7 @@ main(int argc, char **argv) check_obd_statfs(); check_obd_ioobj(); check_obd_quotactl(); + check_obd_idx_read(); check_niobuf_remote(); check_ost_body(); check_ll_fid(); diff --git a/lustre/utils/wiretest.c b/lustre/utils/wiretest.c index 7de9139..f5fd813 100644 --- a/lustre/utils/wiretest.c +++ b/lustre/utils/wiretest.c @@ -354,7 +354,9 @@ void lustre_assert_wire_constants(void) (long long)OBD_LOG_CANCEL); LASSERTF(OBD_QC_CALLBACK == 402, "found %lld\n", (long long)OBD_QC_CALLBACK); - LASSERTF(OBD_LAST_OPC == 403, "found %lld\n", + LASSERTF(OBD_IDX_READ == 403, "found %lld\n", + (long long)OBD_IDX_READ); + LASSERTF(OBD_LAST_OPC == 404, "found %lld\n", (long long)OBD_LAST_OPC); LASSERTF(QUOTA_DQACQ == 601, "found %lld\n", (long long)QUOTA_DQACQ); @@ -583,6 +585,8 @@ void lustre_assert_wire_constants(void) (long long)LDF_COLLIDE); LASSERTF(LU_PAGE_SIZE == 4096, "found %lld\n", (long long)LU_PAGE_SIZE); + LASSERTF((int)sizeof(union lu_page) == 4096, "found %lld\n", + (long long)(int)sizeof(union lu_page)); /* Checks for struct lustre_handle */ LASSERTF((int)sizeof(struct lustre_handle) == 8, "found %lld\n", @@ -1682,6 +1686,98 @@ void lustre_assert_wire_constants(void) LASSERTF((int)sizeof(((struct lquota_slv_rec *)0)->qsr_granted) == 8, "found %lld\n", (long long)(int)sizeof(((struct lquota_slv_rec *)0)->qsr_granted)); + /* Checks for struct idx_info */ + LASSERTF((int)sizeof(struct idx_info) == 80, "found %lld\n", + (long long)(int)sizeof(struct idx_info)); + LASSERTF((int)offsetof(struct idx_info, ii_magic) == 0, "found %lld\n", + (long long)(int)offsetof(struct idx_info, ii_magic)); + LASSERTF((int)sizeof(((struct idx_info *)0)->ii_magic) == 4, "found %lld\n", + (long long)(int)sizeof(((struct idx_info *)0)->ii_magic)); + LASSERTF((int)offsetof(struct idx_info, ii_flags) == 4, "found %lld\n", + (long long)(int)offsetof(struct idx_info, ii_flags)); + LASSERTF((int)sizeof(((struct idx_info *)0)->ii_flags) == 4, "found %lld\n", + (long long)(int)sizeof(((struct idx_info *)0)->ii_flags)); + LASSERTF((int)offsetof(struct idx_info, ii_count) == 8, "found %lld\n", + (long long)(int)offsetof(struct idx_info, ii_count)); + LASSERTF((int)sizeof(((struct idx_info *)0)->ii_count) == 2, "found %lld\n", + (long long)(int)sizeof(((struct idx_info *)0)->ii_count)); + LASSERTF((int)offsetof(struct idx_info, ii_pad0) == 10, "found %lld\n", + (long long)(int)offsetof(struct idx_info, ii_pad0)); + LASSERTF((int)sizeof(((struct idx_info *)0)->ii_pad0) == 2, "found %lld\n", + (long long)(int)sizeof(((struct idx_info *)0)->ii_pad0)); + LASSERTF((int)offsetof(struct idx_info, ii_attrs) == 12, "found %lld\n", + (long long)(int)offsetof(struct idx_info, ii_attrs)); + LASSERTF((int)sizeof(((struct idx_info *)0)->ii_attrs) == 4, "found %lld\n", + (long long)(int)sizeof(((struct idx_info *)0)->ii_attrs)); + LASSERTF((int)offsetof(struct idx_info, ii_fid) == 16, "found %lld\n", + (long long)(int)offsetof(struct idx_info, ii_fid)); + LASSERTF((int)sizeof(((struct idx_info *)0)->ii_fid) == 16, "found %lld\n", + (long long)(int)sizeof(((struct idx_info *)0)->ii_fid)); + LASSERTF((int)offsetof(struct idx_info, ii_version) == 32, "found %lld\n", + (long long)(int)offsetof(struct idx_info, ii_version)); + LASSERTF((int)sizeof(((struct idx_info *)0)->ii_version) == 8, "found %lld\n", + (long long)(int)sizeof(((struct idx_info *)0)->ii_version)); + LASSERTF((int)offsetof(struct idx_info, ii_hash_start) == 40, "found %lld\n", + (long long)(int)offsetof(struct idx_info, ii_hash_start)); + LASSERTF((int)sizeof(((struct idx_info *)0)->ii_hash_start) == 8, "found %lld\n", + (long long)(int)sizeof(((struct idx_info *)0)->ii_hash_start)); + LASSERTF((int)offsetof(struct idx_info, ii_hash_end) == 48, "found %lld\n", + (long long)(int)offsetof(struct idx_info, ii_hash_end)); + LASSERTF((int)sizeof(((struct idx_info *)0)->ii_hash_end) == 8, "found %lld\n", + (long long)(int)sizeof(((struct idx_info *)0)->ii_hash_end)); + LASSERTF((int)offsetof(struct idx_info, ii_keysize) == 56, "found %lld\n", + (long long)(int)offsetof(struct idx_info, ii_keysize)); + LASSERTF((int)sizeof(((struct idx_info *)0)->ii_keysize) == 2, "found %lld\n", + (long long)(int)sizeof(((struct idx_info *)0)->ii_keysize)); + LASSERTF((int)offsetof(struct idx_info, ii_recsize) == 58, "found %lld\n", + (long long)(int)offsetof(struct idx_info, ii_recsize)); + LASSERTF((int)sizeof(((struct idx_info *)0)->ii_recsize) == 2, "found %lld\n", + (long long)(int)sizeof(((struct idx_info *)0)->ii_recsize)); + LASSERTF((int)offsetof(struct idx_info, ii_pad1) == 60, "found %lld\n", + (long long)(int)offsetof(struct idx_info, ii_pad1)); + LASSERTF((int)sizeof(((struct idx_info *)0)->ii_pad1) == 4, "found %lld\n", + (long long)(int)sizeof(((struct idx_info *)0)->ii_pad1)); + LASSERTF((int)offsetof(struct idx_info, ii_pad2) == 64, "found %lld\n", + (long long)(int)offsetof(struct idx_info, ii_pad2)); + LASSERTF((int)sizeof(((struct idx_info *)0)->ii_pad2) == 8, "found %lld\n", + (long long)(int)sizeof(((struct idx_info *)0)->ii_pad2)); + LASSERTF((int)offsetof(struct idx_info, ii_pad3) == 72, "found %lld\n", + (long long)(int)offsetof(struct idx_info, ii_pad3)); + LASSERTF((int)sizeof(((struct idx_info *)0)->ii_pad3) == 8, "found %lld\n", + (long long)(int)sizeof(((struct idx_info *)0)->ii_pad3)); + CLASSERT(IDX_INFO_MAGIC == 0x3D37CC37); + + /* Checks for struct lu_idxpage */ + LASSERTF((int)sizeof(struct lu_idxpage) == 16, "found %lld\n", + (long long)(int)sizeof(struct lu_idxpage)); + LASSERTF((int)offsetof(struct lu_idxpage, lip_magic) == 0, "found %lld\n", + (long long)(int)offsetof(struct lu_idxpage, lip_magic)); + LASSERTF((int)sizeof(((struct lu_idxpage *)0)->lip_magic) == 4, "found %lld\n", + (long long)(int)sizeof(((struct lu_idxpage *)0)->lip_magic)); + LASSERTF((int)offsetof(struct lu_idxpage, lip_flags) == 4, "found %lld\n", + (long long)(int)offsetof(struct lu_idxpage, lip_flags)); + LASSERTF((int)sizeof(((struct lu_idxpage *)0)->lip_flags) == 2, "found %lld\n", + (long long)(int)sizeof(((struct lu_idxpage *)0)->lip_flags)); + LASSERTF((int)offsetof(struct lu_idxpage, lip_nr) == 6, "found %lld\n", + (long long)(int)offsetof(struct lu_idxpage, lip_nr)); + LASSERTF((int)sizeof(((struct lu_idxpage *)0)->lip_nr) == 2, "found %lld\n", + (long long)(int)sizeof(((struct lu_idxpage *)0)->lip_nr)); + LASSERTF((int)offsetof(struct lu_idxpage, lip_pad0) == 8, "found %lld\n", + (long long)(int)offsetof(struct lu_idxpage, lip_pad0)); + LASSERTF((int)sizeof(((struct lu_idxpage *)0)->lip_pad0) == 8, "found %lld\n", + (long long)(int)sizeof(((struct lu_idxpage *)0)->lip_pad0)); + CLASSERT(LIP_MAGIC == 0x8A6D6B6C); + LASSERTF(LIP_HDR_SIZE == 16, "found %lld\n", + (long long)LIP_HDR_SIZE); + LASSERTF(II_FL_NOHASH == 1, "found %lld\n", + (long long)II_FL_NOHASH); + LASSERTF(II_FL_VARKEY == 2, "found %lld\n", + (long long)II_FL_VARKEY); + LASSERTF(II_FL_VARREC == 4, "found %lld\n", + (long long)II_FL_VARREC); + LASSERTF(II_FL_NONUNQ == 8, "found %lld\n", + (long long)II_FL_NONUNQ); + /* Checks for struct niobuf_remote */ LASSERTF((int)sizeof(struct niobuf_remote) == 16, "found %lld\n", (long long)(int)sizeof(struct niobuf_remote)); -- 1.8.3.1