Whamcloud - gitweb
LU-1842 protocol: add support for OBD_IDX_READ
authorJohann Lombardi <johann@whamcloud.com>
Tue, 11 Sep 2012 22:04:00 +0000 (00:04 +0200)
committerOleg Drokin <green@whamcloud.com>
Thu, 20 Sep 2012 04:00:34 +0000 (00:00 -0400)
This patch defines a new RPC format (namely OBD_IDX_READ) which
allows to read the content of an index file via a bulk transfer.
It is simlilar to MDS_READPAGE except that it is not tied to a
specific key/record format (readdir relies on lu_dirent/lu_dirpage).
Like readdir, key/record pairs are stored in a container of a
fixed size (i.e. 4KB) regardless of the client & server page size.

Signed-off-by: Johann Lombardi <johann@whamcloud.com>
Change-Id: I34071ca05a3bd4e6c01bfe4fc533ab79bdd98b3c
Reviewed-on: http://review.whamcloud.com/3942
Tested-by: Hudson
Reviewed-by: Fan Yong <yong.fan@whamcloud.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Niu Yawei <niu@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
15 files changed:
lustre/include/dt_object.h
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_req_layout.h
lustre/include/obd_support.h
lustre/mdd/mdd_object.c
lustre/mdt/mdt_handler.c
lustre/obdclass/dt_object.c
lustre/ptlrpc/layout.c
lustre/ptlrpc/lproc_ptlrpc.c
lustre/ptlrpc/pack_generic.c
lustre/ptlrpc/sec.c
lustre/ptlrpc/wiretest.c
lustre/utils/req-layout.c
lustre/utils/wirecheck.c
lustre/utils/wiretest.c

index a16c981..a199087 100644 (file)
@@ -869,6 +869,15 @@ int dt_record_read(const struct lu_env *env, struct dt_object *dt,
                    struct lu_buf *buf, loff_t *pos);
 int dt_record_write(const struct lu_env *env, struct dt_object *dt,
                     const struct lu_buf *buf, loff_t *pos, struct thandle *th);
+typedef int (*dt_index_page_build_t)(const struct lu_env *env,
+                                    union lu_page *lp, int nob,
+                                    const struct dt_it_ops *iops,
+                                    struct dt_it *it, __u32 attr, void *arg);
+int dt_index_walk(const struct lu_env *env, struct dt_object *obj,
+                 const struct lu_rdpg *rdpg, dt_index_page_build_t filler,
+                 void *arg);
+int dt_index_read(const struct lu_env *env, struct dt_device *dev,
+                 struct idx_info *ii, const struct lu_rdpg *rdpg);
 
 static inline struct thandle *dt_trans_create(const struct lu_env *env,
                                               struct dt_device *d)
index 93c4040..6416808 100644 (file)
@@ -952,7 +952,7 @@ static inline int lu_dirent_size(struct lu_dirent *ent)
 #define LU_PAGE_SIZE  (1UL << LU_PAGE_SHIFT)
 #define LU_PAGE_MASK  (~(LU_PAGE_SIZE - 1))
 
-#define LU_PAGE_COUNT 1 << (CFS_PAGE_SHIFT - LU_PAGE_SHIFT)
+#define LU_PAGE_COUNT (1 << (CFS_PAGE_SHIFT - LU_PAGE_SHIFT))
 
 /** @} lu_dir */
 
@@ -2622,6 +2622,7 @@ typedef enum {
         OBD_PING = 400,
         OBD_LOG_CANCEL,
         OBD_QC_CALLBACK,
+       OBD_IDX_READ,
         OBD_LAST_OPC
 } obd_cmd_t;
 #define OBD_FIRST_OPC OBD_PING
@@ -2977,6 +2978,91 @@ void dump_obdo(struct obdo *oa);
 void dump_ost_body(struct ost_body *ob);
 void dump_rcs(__u32 *rc);
 
+#define IDX_INFO_MAGIC 0x3D37CC37
+
+/* Index file transfer through the network. The server serializes the index into
+ * a byte stream which is sent to the client via a bulk transfer */
+struct idx_info {
+       __u32           ii_magic;
+
+       /* reply: see idx_info_flags below */
+       __u32           ii_flags;
+
+       /* request & reply: number of lu_idxpage (to be) transferred */
+       __u16           ii_count;
+       __u16           ii_pad0;
+
+       /* request: requested attributes passed down to the iterator API */
+       __u32           ii_attrs;
+
+       /* request & reply: index file identifier (FID) */
+       struct lu_fid   ii_fid;
+
+       /* reply: version of the index file before starting to walk the index.
+        * Please note that the version can be modified at any time during the
+        * transfer */
+       __u64           ii_version;
+
+       /* request: hash to start with:
+        * reply: hash of the first entry of the first lu_idxpage and hash
+        *        of the entry to read next if any */
+       __u64           ii_hash_start;
+       __u64           ii_hash_end;
+
+       /* reply: size of keys in lu_idxpages, minimal one if II_FL_VARKEY is
+        * set */
+       __u16           ii_keysize;
+
+       /* reply: size of records in lu_idxpages, minimal one if II_FL_VARREC
+        * is set */
+       __u16           ii_recsize;
+
+       __u32           ii_pad1;
+       __u64           ii_pad2;
+       __u64           ii_pad3;
+};
+extern void lustre_swab_idx_info(struct idx_info *ii);
+
+#define II_END_OFF     MDS_DIR_END_OFF /* all entries have been read */
+
+/* List of flags used in idx_info::ii_flags */
+enum idx_info_flags {
+       II_FL_NOHASH    = 1 << 0, /* client doesn't care about hash value */
+       II_FL_VARKEY    = 1 << 1, /* keys can be of variable size */
+       II_FL_VARREC    = 1 << 2, /* records can be of variable size */
+       II_FL_NONUNQ    = 1 << 3, /* index supports non-unique keys */
+};
+
+#define LIP_MAGIC 0x8A6D6B6C
+
+/* 4KB (= LU_PAGE_SIZE) container gathering key/record pairs */
+struct lu_idxpage {
+       /* 16-byte header */
+       __u32   lip_magic;
+       __u16   lip_flags;
+       __u16   lip_nr;   /* number of entries in the container */
+       __u64   lip_pad0; /* additional padding for future use */
+
+       /* key/record pairs are stored in the remaining 4080 bytes.
+        * depending upon the flags in idx_info::ii_flags, each key/record
+        * pair might be preceded by:
+        * - a hash value
+        * - the key size (II_FL_VARKEY is set)
+        * - the record size (II_FL_VARREC is set)
+        *
+        * For the time being, we only support fixed-size key & record. */
+       char    lip_entries[0];
+};
+
+#define LIP_HDR_SIZE (offsetof(struct lu_idxpage, lip_entries))
+
+/* Gather all possible type associated with a 4KB container */
+union lu_page {
+       struct lu_dirpage       lp_dir; /* for MDS_READPAGE */
+       struct lu_idxpage       lp_idx; /* for OBD_IDX_READ */
+       char                    lp_array[LU_PAGE_SIZE];
+};
+
 /* this will be used when OBD_CONNECT_CHANGE_QS is set */
 struct qunit_data {
         /**
index f2ec5dd..6a7e265 100644 (file)
@@ -138,6 +138,7 @@ void req_layout_fini(void);
 extern struct req_format RQF_OBD_PING;
 extern struct req_format RQF_OBD_SET_INFO;
 extern struct req_format RQF_SEC_CTX;
+extern struct req_format RQF_OBD_IDX_READ;
 /* MGS req_format */
 extern struct req_format RQF_MGS_TARGET_REG;
 extern struct req_format RQF_MGS_SET_INFO;
@@ -241,6 +242,7 @@ extern struct req_msg_field RMF_SETINFO_KEY;
 extern struct req_msg_field RMF_GETINFO_VAL;
 extern struct req_msg_field RMF_GETINFO_VALLEN;
 extern struct req_msg_field RMF_GETINFO_KEY;
+extern struct req_msg_field RMF_IDX_INFO;
 
 /*
  * connection handle received in MDS_CONNECT request.
index f87ecab..13ca2c0 100644 (file)
@@ -374,6 +374,7 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_OBD_DQACQ               0x604
 #define OBD_FAIL_OBD_LLOG_SETUP          0x605
 #define OBD_FAIL_OBD_LOG_CANCEL_REP      0x606
+#define OBD_FAIL_OBD_IDX_READ_NET        0x607
 
 #define OBD_FAIL_TGT_REPLY_NET           0x700
 #define OBD_FAIL_TGT_CONN_RACE           0x701
index 05d7b24..1b9efeb 100644 (file)
@@ -2599,17 +2599,17 @@ static int mdd_readpage_sanity_check(const struct lu_env *env,
         RETURN(rc);
 }
 
-static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
-                              struct lu_dirpage *dp, int nob,
-                              const struct dt_it_ops *iops, struct dt_it *it,
-                              __u32 attr)
-{
-        void                   *area = dp;
-        int                     result;
-        __u64                   hash = 0;
-        struct lu_dirent       *ent;
-        struct lu_dirent       *last = NULL;
-        int                     first = 1;
+static int mdd_dir_page_build(const struct lu_env *env, union lu_page *lp,
+                             int nob, const struct dt_it_ops *iops,
+                             struct dt_it *it, __u32 attr, void *arg)
+{
+       struct lu_dirpage       *dp = &lp->lp_dir;
+       void                    *area = dp;
+       int                      result;
+       __u64                    hash = 0;
+       struct lu_dirent        *ent;
+       struct lu_dirent        *last = NULL;
+       int                      first = 1;
 
         memset(area, 0, sizeof (*dp));
         area += sizeof (*dp);
@@ -2666,115 +2666,14 @@ out:
                         dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
                 last->lde_reclen = 0; /* end mark */
         }
+       if (result > 0)
+               /* end of directory */
+               dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
+       if (result < 0)
+               CWARN("build page failed: %d!\n", result);
         return result;
 }
 
-static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
-                          const struct lu_rdpg *rdpg)
-{
-        struct dt_it      *it;
-        struct dt_object  *next = mdd_object_child(obj);
-        const struct dt_it_ops  *iops;
-        struct page       *pg;
-        struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
-        int i;
-        int nlupgs = 0;
-        int rc;
-        int nob;
-
-        LASSERT(rdpg->rp_pages != NULL);
-        LASSERT(next->do_index_ops != NULL);
-
-        if (rdpg->rp_count <= 0)
-                return -EFAULT;
-
-        /*
-         * iterate through directory and fill pages from @rdpg
-         */
-        iops = &next->do_index_ops->dio_it;
-        it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
-        if (IS_ERR(it))
-                return PTR_ERR(it);
-
-        rc = iops->load(env, it, rdpg->rp_hash);
-
-        if (rc == 0) {
-                /*
-                 * Iterator didn't find record with exactly the key requested.
-                 *
-                 * It is currently either
-                 *
-                 *     - positioned above record with key less than
-                 *     requested---skip it.
-                 *
-                 *     - or not positioned at all (is in IAM_IT_SKEWED
-                 *     state)---position it on the next item.
-                 */
-                rc = iops->next(env, it);
-        } else if (rc > 0)
-                rc = 0;
-
-        /*
-         * At this point and across for-loop:
-         *
-         *  rc == 0 -> ok, proceed.
-         *  rc >  0 -> end of directory.
-         *  rc <  0 -> error.
-         */
-        for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
-             i++, nob -= CFS_PAGE_SIZE) {
-                struct lu_dirpage *dp;
-
-                LASSERT(i < rdpg->rp_npages);
-                pg = rdpg->rp_pages[i];
-                dp = cfs_kmap(pg);
-#if CFS_PAGE_SIZE > LU_PAGE_SIZE
-repeat:
-#endif
-                rc = mdd_dir_page_build(env, mdd, dp,
-                                        min_t(int, nob, LU_PAGE_SIZE),
-                                        iops, it, rdpg->rp_attrs);
-                if (rc > 0) {
-                        /*
-                         * end of directory.
-                         */
-                        dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
-                        nlupgs++;
-                } else if (rc < 0) {
-                        CWARN("build page failed: %d!\n", rc);
-                } else {
-                        nlupgs++;
-#if CFS_PAGE_SIZE > LU_PAGE_SIZE
-                        dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
-                        if ((unsigned long)dp & ~CFS_PAGE_MASK)
-                                goto repeat;
-#endif
-                }
-                cfs_kunmap(pg);
-        }
-        if (rc >= 0) {
-                struct lu_dirpage *dp;
-
-                dp = cfs_kmap(rdpg->rp_pages[0]);
-                dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
-                if (nlupgs == 0) {
-                        /*
-                         * No pages were processed, mark this for first page
-                         * and send back.
-                         */
-                        dp->ldp_flags  = cpu_to_le32(LDF_EMPTY);
-                        nlupgs = 1;
-                }
-                cfs_kunmap(rdpg->rp_pages[0]);
-
-                rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
-        }
-        iops->put(env, it);
-        iops->fini(env, it);
-
-        return rc;
-}
-
 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
                  const struct lu_rdpg *rdpg)
 {
@@ -2818,9 +2717,25 @@ int mdd_readpage(const struct lu_env *env, struct md_object *obj,
                 GOTO(out_unlock, rc = LU_PAGE_SIZE);
         }
 
-        rc = __mdd_readpage(env, mdd_obj, rdpg);
+       rc = dt_index_walk(env, mdd_object_child(mdd_obj), rdpg,
+                          mdd_dir_page_build, NULL);
+       if (rc >= 0) {
+               struct lu_dirpage       *dp;
+
+               dp = cfs_kmap(rdpg->rp_pages[0]);
+               dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
+               if (rc == 0) {
+                       /*
+                        * No pages were processed, mark this for first page
+                        * and send back.
+                        */
+                       dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
+                       rc = min_t(unsigned int, LU_PAGE_SIZE, rdpg->rp_count);
+               }
+               cfs_kunmap(rdpg->rp_pages[0]);
+       }
 
-        EXIT;
+       GOTO(out_unlock, rc);
 out_unlock:
         mdd_read_unlock(env, mdd_obj);
         return rc;
index c43b0c8..65c6c10 100644 (file)
@@ -1959,6 +1959,89 @@ static int mdt_obd_ping(struct mdt_thread_info *info)
         RETURN(rc);
 }
 
+/*
+ * OBD_IDX_READ handler
+ */
+static int mdt_obd_idx_read(struct mdt_thread_info *info)
+{
+       struct mdt_device       *mdt = info->mti_mdt;
+       struct lu_rdpg          *rdpg = &info->mti_u.rdpg.mti_rdpg;
+       struct idx_info         *req_ii, *rep_ii;
+       int                      rc, i;
+       ENTRY;
+
+       memset(rdpg, 0, sizeof(*rdpg));
+       req_capsule_set(info->mti_pill, &RQF_OBD_IDX_READ);
+
+       /* extract idx_info buffer from request & reply */
+       req_ii = req_capsule_client_get(info->mti_pill, &RMF_IDX_INFO);
+       if (req_ii == NULL || req_ii->ii_magic != IDX_INFO_MAGIC)
+               RETURN(err_serious(-EPROTO));
+
+       rc = req_capsule_server_pack(info->mti_pill);
+       if (rc)
+               RETURN(err_serious(rc));
+
+       rep_ii = req_capsule_server_get(info->mti_pill, &RMF_IDX_INFO);
+       if (rep_ii == NULL)
+               RETURN(err_serious(-EFAULT));
+       rep_ii->ii_magic = IDX_INFO_MAGIC;
+
+       /* extract hash to start with */
+       rdpg->rp_hash = req_ii->ii_hash_start;
+
+       /* extract requested attributes */
+       rdpg->rp_attrs = req_ii->ii_attrs;
+
+       /* check that fid packed in request is valid and supported */
+       if (!fid_is_sane(&req_ii->ii_fid))
+               RETURN(-EINVAL);
+       rep_ii->ii_fid = req_ii->ii_fid;
+
+       /* copy flags */
+       rep_ii->ii_flags = req_ii->ii_flags;
+
+       /* compute number of pages to allocate, ii_count is the number of 4KB
+        * containers */
+       if (req_ii->ii_count <= 0)
+               GOTO(out, rc = -EFAULT);
+       rdpg->rp_count = min_t(unsigned int, req_ii->ii_count << LU_PAGE_SHIFT,
+                              PTLRPC_MAX_BRW_SIZE);
+       rdpg->rp_npages = (rdpg->rp_count + CFS_PAGE_SIZE -1) >> CFS_PAGE_SHIFT;
+
+       /* allocate pages to store the containers */
+       OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof(rdpg->rp_pages[0]));
+       if (rdpg->rp_pages == NULL)
+               GOTO(out, rc = -ENOMEM);
+       for (i = 0; i < rdpg->rp_npages; i++) {
+               rdpg->rp_pages[i] = cfs_alloc_page(CFS_ALLOC_STD);
+               if (rdpg->rp_pages[i] == NULL)
+                       GOTO(out, rc = -ENOMEM);
+       }
+
+       /* populate pages with key/record pairs */
+       rc = dt_index_read(info->mti_env, mdt->mdt_bottom, rep_ii, rdpg);
+       if (rc < 0)
+               GOTO(out, rc);
+
+       LASSERTF(rc <= rdpg->rp_count, "dt_index_read() returned more than "
+                "asked %d > %d\n", rc, rdpg->rp_count);
+
+       /* send pages to client */
+       rc = mdt_sendpage(info, rdpg, rc);
+
+       GOTO(out, rc);
+out:
+       if (rdpg->rp_pages) {
+               for (i = 0; i < rdpg->rp_npages; i++)
+                       if (rdpg->rp_pages[i])
+                               cfs_free_page(rdpg->rp_pages[i]);
+               OBD_FREE(rdpg->rp_pages,
+                        rdpg->rp_npages * sizeof(rdpg->rp_pages[0]));
+       }
+       return rc;
+}
+
 static int mdt_obd_log_cancel(struct mdt_thread_info *info)
 {
         return err_serious(-EOPNOTSUPP);
@@ -2970,6 +3053,7 @@ static int mdt_msg_check_version(struct lustre_msg *msg)
         case SEC_CTX_INIT:
         case SEC_CTX_INIT_CONT:
         case SEC_CTX_FINI:
+       case OBD_IDX_READ:
                 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
                 if (rc)
                         CERROR("bad opc %u version %08x, expecting %08x\n",
@@ -6138,7 +6222,8 @@ DEF_MDT_HNDL_F(0,                         QUOTACTL,     mdt_quotactl_handle)
 static struct mdt_handler mdt_obd_ops[] = {
         DEF_OBD_HNDL(0, PING,           mdt_obd_ping),
         DEF_OBD_HNDL(0, LOG_CANCEL,     mdt_obd_log_cancel),
-        DEF_OBD_HNDL(0, QC_CALLBACK,    mdt_obd_qc_callback)
+       DEF_OBD_HNDL(0, QC_CALLBACK,    mdt_obd_qc_callback),
+       DEF_OBD_HNDL(0, IDX_READ,       mdt_obd_idx_read)
 };
 
 #define DEF_DLM_HNDL_0(flags, name, fn)                   \
@@ -6229,6 +6314,11 @@ static struct mdt_opc_slice mdt_readpage_handlers[] = {
                 .mos_opc_end   = MDS_LAST_OPC,
                 .mos_hs        = mdt_readpage_ops
         },
+       {
+               .mos_opc_start = OBD_FIRST_OPC,
+               .mos_opc_end   = OBD_LAST_OPC,
+               .mos_hs        = mdt_obd_ops
+       },
         {
                 .mos_hs        = NULL
         }
index 11a8984..51305b5 100644 (file)
@@ -598,3 +598,323 @@ const struct dt_index_features dt_quota_slv_features = {
        .dif_ptrsize            = 4
 };
 EXPORT_SYMBOL(dt_quota_slv_features);
+
+/* helper function returning what dt_index_features structure should be used
+ * based on the FID sequence. This is used by OBD_IDX_READ RPC */
+static inline const struct dt_index_features *dt_index_feat_select(__u64 seq,
+                                                                  __u32 mode)
+{
+       if (seq == FID_SEQ_QUOTA_GLB) {
+               /* global quota index */
+               if (!S_ISREG(mode))
+                       /* global quota index should be a regular file */
+                       return ERR_PTR(-ENOENT);
+               return &dt_quota_glb_features;
+       } else if (seq == FID_SEQ_QUOTA) {
+               /* quota slave index */
+               if (!S_ISREG(mode))
+                       /* slave index should be a regular file */
+                       return ERR_PTR(-ENOENT);
+               return &dt_quota_slv_features;
+       } else if (seq >= FID_SEQ_NORMAL) {
+               /* object is part of the namespace, verify that it is a
+                * directory */
+               if (!S_ISDIR(mode))
+                       /* sorry, we can only deal with directory */
+                       return ERR_PTR(-ENOTDIR);
+               return &dt_directory_features;
+       }
+
+       return ERR_PTR(-EOPNOTSUPP);
+}
+
+/*
+ * Fill a lu_idxpage with key/record pairs read for transfer via OBD_IDX_READ
+ * RPC
+ *
+ * \param env - is the environment passed by the caller
+ * \param lp  - is a pointer to the lu_page to fill
+ * \param nob - is the maximum number of bytes that should be copied
+ * \param iops - is the index operation vector associated with the index object
+ * \param it   - is a pointer to the current iterator
+ * \param attr - is the index attribute to pass to iops->rec()
+ * \param arg  - is a pointer to the idx_info structure
+ */
+static int dt_index_page_build(const struct lu_env *env, union lu_page *lp,
+                              int nob, const struct dt_it_ops *iops,
+                              struct dt_it *it, __u32 attr, void *arg)
+{
+       struct idx_info         *ii = (struct idx_info *)arg;
+       struct lu_idxpage       *lip = &lp->lp_idx;
+       char                    *entry;
+       int                      rc, size;
+       ENTRY;
+
+       /* no support for variable key & record size for now */
+       LASSERT((ii->ii_flags & II_FL_VARKEY) == 0);
+       LASSERT((ii->ii_flags & II_FL_VARREC) == 0);
+
+       /* initialize the header of the new container */
+       memset(lip, 0, LIP_HDR_SIZE);
+       lip->lip_magic = LIP_MAGIC;
+       nob           -= LIP_HDR_SIZE;
+
+       /* compute size needed to store a key/record pair */
+       size = ii->ii_recsize + ii->ii_keysize;
+       if ((ii->ii_flags & II_FL_NOHASH) == 0)
+               /* add hash if the client wants it */
+               size += sizeof(__u64);
+
+       entry = lip->lip_entries;
+       do {
+               char            *tmp_entry = entry;
+               struct dt_key   *key;
+               __u64            hash;
+
+               /* fetch 64-bit hash value */
+               hash = iops->store(env, it);
+               ii->ii_hash_end = hash;
+
+               if (nob < size) {
+                       if (lip->lip_nr == 0)
+                               GOTO(out, rc = -EINVAL);
+                       GOTO(out, rc = 0);
+               }
+
+               if ((ii->ii_flags & II_FL_NOHASH) == 0) {
+                       /* client wants to the 64-bit hash value associated with
+                        * each record */
+                       memcpy(tmp_entry, &hash, sizeof(hash));
+                       tmp_entry += sizeof(hash);
+               }
+
+               /* then the key value */
+               LASSERT(iops->key_size(env, it) == ii->ii_keysize);
+               key = iops->key(env, it);
+               memcpy(tmp_entry, key, ii->ii_keysize);
+               tmp_entry += ii->ii_keysize;
+
+               /* and finally the record */
+               rc = iops->rec(env, it, (struct dt_rec *)tmp_entry, attr);
+               if (rc != -ESTALE) {
+                       if (rc != 0)
+                               GOTO(out, rc);
+
+                       /* hash/key/record successfully copied! */
+                       lip->lip_nr++;
+                       if (unlikely(lip->lip_nr == 1 && ii->ii_count == 0))
+                               ii->ii_hash_start = hash;
+                       entry = tmp_entry + ii->ii_recsize;
+                       nob -= size;
+               }
+
+               /* move on to the next record */
+               do {
+                       rc = iops->next(env, it);
+               } while (rc == -ESTALE);
+
+       } while (rc == 0);
+
+       GOTO(out, rc);
+out:
+       if (rc >= 0 && lip->lip_nr > 0)
+               /* one more container */
+               ii->ii_count++;
+       if (rc > 0)
+               /* no more entries */
+               ii->ii_hash_end = II_END_OFF;
+       return rc;
+}
+
+/*
+ * Walk index and fill lu_page containers with key/record pairs
+ *
+ * \param env - is the environment passed by the caller
+ * \param obj - is the index object to parse
+ * \param rdpg - is the lu_rdpg descriptor associated with the transfer
+ * \param filler - is the callback function responsible for filling a lu_page
+ *                 with key/record pairs in the format wanted by the caller
+ * \param arg    - is an opaq argument passed to the filler function
+ *
+ * \retval sum (in bytes) of all filled lu_pages
+ * \retval -ve errno on failure
+ */
+int dt_index_walk(const struct lu_env *env, struct dt_object *obj,
+                 const struct lu_rdpg *rdpg, dt_index_page_build_t filler,
+                 void *arg)
+{
+       struct dt_it            *it;
+       const struct dt_it_ops  *iops;
+       unsigned int             pageidx, nob, nlupgs = 0;
+       int                      rc;
+       ENTRY;
+
+       LASSERT(rdpg->rp_pages != NULL);
+       LASSERT(obj->do_index_ops != NULL);
+
+       nob = rdpg->rp_count;
+       if (nob <= 0)
+               RETURN(-EFAULT);
+
+       /* Iterate through index and fill containers from @rdpg */
+       iops = &obj->do_index_ops->dio_it;
+       LASSERT(iops != NULL);
+       it = iops->init(env, obj, rdpg->rp_attrs, BYPASS_CAPA);
+       if (IS_ERR(it))
+               RETURN(PTR_ERR(it));
+
+       rc = iops->load(env, it, rdpg->rp_hash);
+       if (rc == 0) {
+               /*
+                * Iterator didn't find record with exactly the key requested.
+                *
+                * It is currently either
+                *
+                *     - positioned above record with key less than
+                *     requested---skip it.
+                *     - or not positioned at all (is in IAM_IT_SKEWED
+                *     state)---position it on the next item.
+                */
+               rc = iops->next(env, it);
+       } else if (rc > 0) {
+               rc = 0;
+       }
+
+       /* Fill containers one after the other. There might be multiple
+        * containers per physical page.
+        *
+        * At this point and across for-loop:
+        *  rc == 0 -> ok, proceed.
+        *  rc >  0 -> end of index.
+        *  rc <  0 -> error. */
+       for (pageidx = 0; rc == 0 && nob > 0; pageidx++) {
+               union lu_page   *lp;
+               int              i;
+
+               LASSERT(pageidx < rdpg->rp_npages);
+               lp = cfs_kmap(rdpg->rp_pages[pageidx]);
+
+               /* fill lu pages */
+               for (i = 0; i < LU_PAGE_COUNT; i++, lp++, nob -= LU_PAGE_SIZE) {
+                       rc = filler(env, lp, min_t(int, nob, LU_PAGE_SIZE),
+                                   iops, it, rdpg->rp_attrs, arg);
+                       if (rc < 0)
+                               break;
+                       /* one more lu_page */
+                       nlupgs++;
+                       if (rc > 0)
+                               /* end of index */
+                               break;
+               }
+               cfs_kunmap(rdpg->rp_pages[i]);
+       }
+
+       iops->put(env, it);
+       iops->fini(env, it);
+
+       if (rc >= 0)
+               rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
+
+       RETURN(rc);
+}
+EXPORT_SYMBOL(dt_index_walk);
+
+/**
+ * Walk key/record pairs of an index and copy them into 4KB containers to be
+ * transferred over the network. This is the common handler for OBD_IDX_READ
+ * RPC processing.
+ *
+ * \param env - is the environment passed by the caller
+ * \param dev - is the dt_device storing the index
+ * \param ii  - is the idx_info structure packed by the client in the
+ *              OBD_IDX_READ request
+ * \param rdpg - is the lu_rdpg descriptor
+ *
+ * \retval on success, return sum (in bytes) of all filled containers
+ * \retval appropriate error otherwise.
+ */
+int dt_index_read(const struct lu_env *env, struct dt_device *dev,
+                  struct idx_info *ii, const struct lu_rdpg *rdpg)
+{
+       const struct dt_index_features  *feat;
+       struct dt_object                *obj;
+       int                              rc;
+       ENTRY;
+
+       /* rp_count shouldn't be null and should be a multiple of the container
+        * size */
+       if (rdpg->rp_count <= 0 && (rdpg->rp_count & (LU_PAGE_SIZE - 1)) != 0)
+               RETURN(-EFAULT);
+
+       if (fid_seq(&ii->ii_fid) < FID_SEQ_SPECIAL)
+               /* block access to local files */
+               RETURN(-EPERM);
+
+       if (fid_seq(&ii->ii_fid) >= FID_SEQ_NORMAL)
+               /* we don't support directory transfer via OBD_IDX_READ for the
+                * time being */
+               RETURN(-EOPNOTSUPP);
+
+       /* lookup index object subject to the transfer */
+       obj = dt_locate(env, dev, &ii->ii_fid);
+       if (IS_ERR(obj))
+               RETURN(PTR_ERR(obj));
+       if (dt_object_exists(obj) == 0)
+               GOTO(out, rc = -ENOENT);
+
+       /* fetch index features associated with index object */
+       feat = dt_index_feat_select(fid_seq(&ii->ii_fid),
+                                   lu_object_attr(&obj->do_lu));
+       if (IS_ERR(feat))
+               GOTO(out, rc = PTR_ERR(feat));
+
+       /* load index feature if not done already */
+       if (obj->do_index_ops == NULL) {
+               rc = obj->do_ops->do_index_try(env, obj, feat);
+               if (rc)
+                       GOTO(out, rc);
+       }
+
+       /* fill ii_flags with supported index features */
+       ii->ii_flags &= II_FL_NOHASH;
+
+       ii->ii_keysize = feat->dif_keysize_max;
+       if ((feat->dif_flags & DT_IND_VARKEY) != 0) {
+               /* key size is variable */
+               ii->ii_flags |= II_FL_VARKEY;
+               /* we don't support variable key size for the time being */
+               GOTO(out, rc = -EOPNOTSUPP);
+       }
+
+       ii->ii_recsize = feat->dif_recsize_max;
+       if ((feat->dif_flags & DT_IND_VARREC) != 0) {
+               /* record size is variable */
+               ii->ii_flags |= II_FL_VARREC;
+               /* we don't support variable record size for the time being */
+               GOTO(out, rc = -EOPNOTSUPP);
+       }
+
+       if ((feat->dif_flags & DT_IND_NONUNQ) != 0)
+               /* key isn't necessarily unique */
+               ii->ii_flags |= II_FL_NONUNQ;
+
+       dt_read_lock(env, obj, 0);
+       /* fetch object version before walking the index */
+       ii->ii_version = dt_version_get(env, obj);
+
+       /* walk the index and fill lu_idxpages with key/record pairs */
+       rc = dt_index_walk(env, obj, rdpg, dt_index_page_build ,ii);
+       dt_read_unlock(env, obj);
+
+       if (rc == 0) {
+               /* index is empty */
+               LASSERT(ii->ii_count == 0);
+               ii->ii_hash_end = II_END_OFF;
+       }
+
+       GOTO(out, rc);
+out:
+       lu_object_put(env, &obj->do_lu);
+       return rc;
+}
+EXPORT_SYMBOL(dt_index_read);
index 78f8751..0d0e106 100644 (file)
@@ -497,6 +497,16 @@ static const struct req_msg_field *llog_origin_handle_next_block_server[] = {
         &RMF_EADATA
 };
 
+static const struct req_msg_field *obd_idx_read_client[] = {
+       &RMF_PTLRPC_BODY,
+       &RMF_IDX_INFO
+};
+
+static const struct req_msg_field *obd_idx_read_server[] = {
+       &RMF_PTLRPC_BODY,
+       &RMF_IDX_INFO
+};
+
 static const struct req_msg_field *ost_body_only[] = {
         &RMF_PTLRPC_BODY,
         &RMF_OST_BODY
@@ -564,6 +574,7 @@ static const struct req_msg_field *ost_get_fiemap_server[] = {
 static struct req_format *req_formats[] = {
         &RQF_OBD_PING,
         &RQF_OBD_SET_INFO,
+       &RQF_OBD_IDX_READ,
         &RQF_SEC_CTX,
         &RQF_MGS_TARGET_REG,
         &RQF_MGS_SET_INFO,
@@ -966,6 +977,11 @@ struct req_msg_field RMF_FIEMAP_VAL =
         DEFINE_MSGF("fiemap", 0, -1, lustre_swab_fiemap, NULL);
 EXPORT_SYMBOL(RMF_FIEMAP_VAL);
 
+struct req_msg_field RMF_IDX_INFO =
+       DEFINE_MSGF("idx_info", 0, sizeof(struct idx_info),
+                   lustre_swab_idx_info, NULL);
+EXPORT_SYMBOL(RMF_IDX_INFO);
+
 /*
  * Request formats.
  */
@@ -1004,6 +1020,12 @@ struct req_format RQF_OBD_SET_INFO =
         DEFINE_REQ_FMT0("OBD_SET_INFO", obd_set_info_client, empty);
 EXPORT_SYMBOL(RQF_OBD_SET_INFO);
 
+/* Read index file through the network */
+struct req_format RQF_OBD_IDX_READ =
+       DEFINE_REQ_FMT0("OBD_IDX_READ",
+                       obd_idx_read_client, obd_idx_read_server);
+EXPORT_SYMBOL(RQF_OBD_IDX_READ);
+
 struct req_format RQF_SEC_CTX =
         DEFINE_REQ_FMT0("SEC_CTX", empty, empty);
 EXPORT_SYMBOL(RQF_SEC_CTX);
index 50a981f..0b87bc9 100644 (file)
@@ -111,6 +111,7 @@ struct ll_rpc_opcode {
         { OBD_PING,         "obd_ping" },
         { OBD_LOG_CANCEL,   "llog_origin_handle_cancel" },
         { OBD_QC_CALLBACK,  "obd_quota_callback" },
+       { OBD_IDX_READ,     "dt_index_read" },
         { LLOG_ORIGIN_HANDLE_CREATE,     "llog_origin_handle_create" },
         { LLOG_ORIGIN_HANDLE_NEXT_BLOCK, "llog_origin_handle_next_block" },
         { LLOG_ORIGIN_HANDLE_READ_HEADER,"llog_origin_handle_read_header" },
index 98c53d9..dd46499 100644 (file)
@@ -2023,6 +2023,20 @@ void lustre_swab_fiemap(struct ll_user_fiemap *fiemap)
 }
 EXPORT_SYMBOL(lustre_swab_fiemap);
 
+void lustre_swab_idx_info(struct idx_info *ii)
+{
+       __swab32s(&ii->ii_magic);
+       __swab32s(&ii->ii_flags);
+       __swab16s(&ii->ii_count);
+       __swab32s(&ii->ii_attrs);
+       lustre_swab_lu_fid(&ii->ii_fid);
+       __swab64s(&ii->ii_version);
+       __swab64s(&ii->ii_hash_start);
+       __swab64s(&ii->ii_hash_end);
+       __swab16s(&ii->ii_keysize);
+       __swab16s(&ii->ii_recsize);
+}
+
 void lustre_swab_mdt_rec_reint (struct mdt_rec_reint *rr)
 {
         __swab32s (&rr->rr_opcode);
index d876a22..315e99a 100644 (file)
@@ -822,6 +822,7 @@ void sptlrpc_req_set_flavor(struct ptlrpc_request *req, int opcode)
         case OST_READ:
         case MDS_READPAGE:
         case MGS_CONFIG_READ:
+       case OBD_IDX_READ:
                 req->rq_bulk_read = 1;
                 break;
         case OST_WRITE:
index 5f6766f..667b83a 100644 (file)
@@ -346,7 +346,9 @@ void lustre_assert_wire_constants(void)
                  (long long)OBD_LOG_CANCEL);
         LASSERTF(OBD_QC_CALLBACK == 402, "found %lld\n",
                  (long long)OBD_QC_CALLBACK);
-        LASSERTF(OBD_LAST_OPC == 403, "found %lld\n",
+        LASSERTF(OBD_IDX_READ == 403, "found %lld\n",
+                 (long long)OBD_IDX_READ);
+        LASSERTF(OBD_LAST_OPC == 404, "found %lld\n",
                  (long long)OBD_LAST_OPC);
         LASSERTF(QUOTA_DQACQ == 601, "found %lld\n",
                  (long long)QUOTA_DQACQ);
@@ -575,6 +577,8 @@ void lustre_assert_wire_constants(void)
                  (long long)LDF_COLLIDE);
         LASSERTF(LU_PAGE_SIZE == 4096, "found %lld\n",
                  (long long)LU_PAGE_SIZE);
+       LASSERTF((int)sizeof(union lu_page) == 4096, "found %lld\n",
+                (long long)(int)sizeof(union lu_page));
 
         /* Checks for struct lustre_handle */
         LASSERTF((int)sizeof(struct lustre_handle) == 8, "found %lld\n",
@@ -1674,6 +1678,98 @@ void lustre_assert_wire_constants(void)
         LASSERTF((int)sizeof(((struct lquota_slv_rec *)0)->qsr_granted) == 8, "found %lld\n",
                  (long long)(int)sizeof(((struct lquota_slv_rec *)0)->qsr_granted));
 
+        /* Checks for struct idx_info */
+        LASSERTF((int)sizeof(struct idx_info) == 80, "found %lld\n",
+                 (long long)(int)sizeof(struct idx_info));
+        LASSERTF((int)offsetof(struct idx_info, ii_magic) == 0, "found %lld\n",
+                 (long long)(int)offsetof(struct idx_info, ii_magic));
+        LASSERTF((int)sizeof(((struct idx_info *)0)->ii_magic) == 4, "found %lld\n",
+                 (long long)(int)sizeof(((struct idx_info *)0)->ii_magic));
+        LASSERTF((int)offsetof(struct idx_info, ii_flags) == 4, "found %lld\n",
+                 (long long)(int)offsetof(struct idx_info, ii_flags));
+        LASSERTF((int)sizeof(((struct idx_info *)0)->ii_flags) == 4, "found %lld\n",
+                 (long long)(int)sizeof(((struct idx_info *)0)->ii_flags));
+        LASSERTF((int)offsetof(struct idx_info, ii_count) == 8, "found %lld\n",
+                 (long long)(int)offsetof(struct idx_info, ii_count));
+        LASSERTF((int)sizeof(((struct idx_info *)0)->ii_count) == 2, "found %lld\n",
+                 (long long)(int)sizeof(((struct idx_info *)0)->ii_count));
+        LASSERTF((int)offsetof(struct idx_info, ii_pad0) == 10, "found %lld\n",
+                 (long long)(int)offsetof(struct idx_info, ii_pad0));
+        LASSERTF((int)sizeof(((struct idx_info *)0)->ii_pad0) == 2, "found %lld\n",
+                 (long long)(int)sizeof(((struct idx_info *)0)->ii_pad0));
+        LASSERTF((int)offsetof(struct idx_info, ii_attrs) == 12, "found %lld\n",
+                 (long long)(int)offsetof(struct idx_info, ii_attrs));
+        LASSERTF((int)sizeof(((struct idx_info *)0)->ii_attrs) == 4, "found %lld\n",
+                 (long long)(int)sizeof(((struct idx_info *)0)->ii_attrs));
+        LASSERTF((int)offsetof(struct idx_info, ii_fid) == 16, "found %lld\n",
+                 (long long)(int)offsetof(struct idx_info, ii_fid));
+        LASSERTF((int)sizeof(((struct idx_info *)0)->ii_fid) == 16, "found %lld\n",
+                 (long long)(int)sizeof(((struct idx_info *)0)->ii_fid));
+        LASSERTF((int)offsetof(struct idx_info, ii_version) == 32, "found %lld\n",
+                 (long long)(int)offsetof(struct idx_info, ii_version));
+        LASSERTF((int)sizeof(((struct idx_info *)0)->ii_version) == 8, "found %lld\n",
+                 (long long)(int)sizeof(((struct idx_info *)0)->ii_version));
+        LASSERTF((int)offsetof(struct idx_info, ii_hash_start) == 40, "found %lld\n",
+                 (long long)(int)offsetof(struct idx_info, ii_hash_start));
+        LASSERTF((int)sizeof(((struct idx_info *)0)->ii_hash_start) == 8, "found %lld\n",
+                 (long long)(int)sizeof(((struct idx_info *)0)->ii_hash_start));
+        LASSERTF((int)offsetof(struct idx_info, ii_hash_end) == 48, "found %lld\n",
+                 (long long)(int)offsetof(struct idx_info, ii_hash_end));
+        LASSERTF((int)sizeof(((struct idx_info *)0)->ii_hash_end) == 8, "found %lld\n",
+                 (long long)(int)sizeof(((struct idx_info *)0)->ii_hash_end));
+        LASSERTF((int)offsetof(struct idx_info, ii_keysize) == 56, "found %lld\n",
+                 (long long)(int)offsetof(struct idx_info, ii_keysize));
+        LASSERTF((int)sizeof(((struct idx_info *)0)->ii_keysize) == 2, "found %lld\n",
+                 (long long)(int)sizeof(((struct idx_info *)0)->ii_keysize));
+        LASSERTF((int)offsetof(struct idx_info, ii_recsize) == 58, "found %lld\n",
+                 (long long)(int)offsetof(struct idx_info, ii_recsize));
+        LASSERTF((int)sizeof(((struct idx_info *)0)->ii_recsize) == 2, "found %lld\n",
+                 (long long)(int)sizeof(((struct idx_info *)0)->ii_recsize));
+        LASSERTF((int)offsetof(struct idx_info, ii_pad1) == 60, "found %lld\n",
+                 (long long)(int)offsetof(struct idx_info, ii_pad1));
+        LASSERTF((int)sizeof(((struct idx_info *)0)->ii_pad1) == 4, "found %lld\n",
+                 (long long)(int)sizeof(((struct idx_info *)0)->ii_pad1));
+        LASSERTF((int)offsetof(struct idx_info, ii_pad2) == 64, "found %lld\n",
+                 (long long)(int)offsetof(struct idx_info, ii_pad2));
+        LASSERTF((int)sizeof(((struct idx_info *)0)->ii_pad2) == 8, "found %lld\n",
+                 (long long)(int)sizeof(((struct idx_info *)0)->ii_pad2));
+        LASSERTF((int)offsetof(struct idx_info, ii_pad3) == 72, "found %lld\n",
+                 (long long)(int)offsetof(struct idx_info, ii_pad3));
+        LASSERTF((int)sizeof(((struct idx_info *)0)->ii_pad3) == 8, "found %lld\n",
+                 (long long)(int)sizeof(((struct idx_info *)0)->ii_pad3));
+        CLASSERT(IDX_INFO_MAGIC == 0x3D37CC37);
+
+        /* Checks for struct lu_idxpage */
+        LASSERTF((int)sizeof(struct lu_idxpage) == 16, "found %lld\n",
+                 (long long)(int)sizeof(struct lu_idxpage));
+        LASSERTF((int)offsetof(struct lu_idxpage, lip_magic) == 0, "found %lld\n",
+                 (long long)(int)offsetof(struct lu_idxpage, lip_magic));
+        LASSERTF((int)sizeof(((struct lu_idxpage *)0)->lip_magic) == 4, "found %lld\n",
+                 (long long)(int)sizeof(((struct lu_idxpage *)0)->lip_magic));
+        LASSERTF((int)offsetof(struct lu_idxpage, lip_flags) == 4, "found %lld\n",
+                 (long long)(int)offsetof(struct lu_idxpage, lip_flags));
+        LASSERTF((int)sizeof(((struct lu_idxpage *)0)->lip_flags) == 2, "found %lld\n",
+                 (long long)(int)sizeof(((struct lu_idxpage *)0)->lip_flags));
+        LASSERTF((int)offsetof(struct lu_idxpage, lip_nr) == 6, "found %lld\n",
+                 (long long)(int)offsetof(struct lu_idxpage, lip_nr));
+        LASSERTF((int)sizeof(((struct lu_idxpage *)0)->lip_nr) == 2, "found %lld\n",
+                 (long long)(int)sizeof(((struct lu_idxpage *)0)->lip_nr));
+        LASSERTF((int)offsetof(struct lu_idxpage, lip_pad0) == 8, "found %lld\n",
+                 (long long)(int)offsetof(struct lu_idxpage, lip_pad0));
+        LASSERTF((int)sizeof(((struct lu_idxpage *)0)->lip_pad0) == 8, "found %lld\n",
+                 (long long)(int)sizeof(((struct lu_idxpage *)0)->lip_pad0));
+        CLASSERT(LIP_MAGIC == 0x8A6D6B6C);
+        LASSERTF(LIP_HDR_SIZE == 16, "found %lld\n",
+                 (long long)LIP_HDR_SIZE);
+        LASSERTF(II_FL_NOHASH == 1, "found %lld\n",
+                 (long long)II_FL_NOHASH);
+        LASSERTF(II_FL_VARKEY == 2, "found %lld\n",
+                 (long long)II_FL_VARKEY);
+        LASSERTF(II_FL_VARREC == 4, "found %lld\n",
+                 (long long)II_FL_VARREC);
+        LASSERTF(II_FL_NONUNQ == 8, "found %lld\n",
+                 (long long)II_FL_NONUNQ);
+
         /* Checks for struct niobuf_remote */
         LASSERTF((int)sizeof(struct niobuf_remote) == 16, "found %lld\n",
                  (long long)(int)sizeof(struct niobuf_remote));
index 914253f..2c1264d 100644 (file)
@@ -74,6 +74,7 @@
 #define lustre_swab_ost_body NULL
 #define lustre_swab_ost_last_id NULL
 #define lustre_swab_fiemap NULL
+#define lustre_swab_idx_info NULL
 #define lustre_swab_qdata NULL
 #define lustre_swab_quota_body NULL
 #define lustre_swab_lvb NULL
index b8d2ef3..2910754 100644 (file)
@@ -291,6 +291,7 @@ check_lu_dirpage(void)
         CHECK_VALUE(LDF_EMPTY);
         CHECK_VALUE(LDF_COLLIDE);
         CHECK_VALUE(LU_PAGE_SIZE);
+       CHECK_UNION(lu_page);
 }
 
 static void
@@ -757,6 +758,43 @@ check_obd_quotactl(void)
 }
 
 static void
+check_obd_idx_read(void)
+{
+       BLANK_LINE();
+       CHECK_STRUCT(idx_info);
+       CHECK_MEMBER(idx_info, ii_magic);
+       CHECK_MEMBER(idx_info, ii_flags);
+       CHECK_MEMBER(idx_info, ii_count);
+       CHECK_MEMBER(idx_info, ii_pad0);
+       CHECK_MEMBER(idx_info, ii_attrs);
+       CHECK_MEMBER(idx_info, ii_fid);
+       CHECK_MEMBER(idx_info, ii_version);
+       CHECK_MEMBER(idx_info, ii_hash_start);
+       CHECK_MEMBER(idx_info, ii_hash_end);
+       CHECK_MEMBER(idx_info, ii_keysize);
+       CHECK_MEMBER(idx_info, ii_recsize);
+       CHECK_MEMBER(idx_info, ii_pad1);
+       CHECK_MEMBER(idx_info, ii_pad2);
+       CHECK_MEMBER(idx_info, ii_pad3);
+       CHECK_CDEFINE(IDX_INFO_MAGIC);
+
+       BLANK_LINE();
+       CHECK_STRUCT(lu_idxpage);
+       CHECK_MEMBER(lu_idxpage, lip_magic);
+       CHECK_MEMBER(lu_idxpage, lip_flags);
+       CHECK_MEMBER(lu_idxpage, lip_nr);
+       CHECK_MEMBER(lu_idxpage, lip_pad0);
+
+       CHECK_CDEFINE(LIP_MAGIC);
+       CHECK_VALUE(LIP_HDR_SIZE);
+
+       CHECK_VALUE(II_FL_NOHASH);
+       CHECK_VALUE(II_FL_VARKEY);
+       CHECK_VALUE(II_FL_VARREC);
+       CHECK_VALUE(II_FL_NONUNQ);
+}
+
+static void
 check_niobuf_remote(void)
 {
         BLANK_LINE();
@@ -2042,6 +2080,7 @@ main(int argc, char **argv)
         CHECK_VALUE(OBD_PING);
         CHECK_VALUE(OBD_LOG_CANCEL);
         CHECK_VALUE(OBD_QC_CALLBACK);
+       CHECK_VALUE(OBD_IDX_READ);
         CHECK_VALUE(OBD_LAST_OPC);
 
         CHECK_VALUE(QUOTA_DQACQ);
@@ -2081,6 +2120,7 @@ main(int argc, char **argv)
         check_obd_statfs();
         check_obd_ioobj();
         check_obd_quotactl();
+       check_obd_idx_read();
         check_niobuf_remote();
         check_ost_body();
         check_ll_fid();
index 7de9139..f5fd813 100644 (file)
@@ -354,7 +354,9 @@ void lustre_assert_wire_constants(void)
                  (long long)OBD_LOG_CANCEL);
         LASSERTF(OBD_QC_CALLBACK == 402, "found %lld\n",
                  (long long)OBD_QC_CALLBACK);
-        LASSERTF(OBD_LAST_OPC == 403, "found %lld\n",
+        LASSERTF(OBD_IDX_READ == 403, "found %lld\n",
+                 (long long)OBD_IDX_READ);
+        LASSERTF(OBD_LAST_OPC == 404, "found %lld\n",
                  (long long)OBD_LAST_OPC);
         LASSERTF(QUOTA_DQACQ == 601, "found %lld\n",
                  (long long)QUOTA_DQACQ);
@@ -583,6 +585,8 @@ void lustre_assert_wire_constants(void)
                  (long long)LDF_COLLIDE);
         LASSERTF(LU_PAGE_SIZE == 4096, "found %lld\n",
                  (long long)LU_PAGE_SIZE);
+       LASSERTF((int)sizeof(union lu_page) == 4096, "found %lld\n",
+                (long long)(int)sizeof(union lu_page));
 
         /* Checks for struct lustre_handle */
         LASSERTF((int)sizeof(struct lustre_handle) == 8, "found %lld\n",
@@ -1682,6 +1686,98 @@ void lustre_assert_wire_constants(void)
         LASSERTF((int)sizeof(((struct lquota_slv_rec *)0)->qsr_granted) == 8, "found %lld\n",
                  (long long)(int)sizeof(((struct lquota_slv_rec *)0)->qsr_granted));
 
+        /* Checks for struct idx_info */
+        LASSERTF((int)sizeof(struct idx_info) == 80, "found %lld\n",
+                 (long long)(int)sizeof(struct idx_info));
+        LASSERTF((int)offsetof(struct idx_info, ii_magic) == 0, "found %lld\n",
+                 (long long)(int)offsetof(struct idx_info, ii_magic));
+        LASSERTF((int)sizeof(((struct idx_info *)0)->ii_magic) == 4, "found %lld\n",
+                 (long long)(int)sizeof(((struct idx_info *)0)->ii_magic));
+        LASSERTF((int)offsetof(struct idx_info, ii_flags) == 4, "found %lld\n",
+                 (long long)(int)offsetof(struct idx_info, ii_flags));
+        LASSERTF((int)sizeof(((struct idx_info *)0)->ii_flags) == 4, "found %lld\n",
+                 (long long)(int)sizeof(((struct idx_info *)0)->ii_flags));
+        LASSERTF((int)offsetof(struct idx_info, ii_count) == 8, "found %lld\n",
+                 (long long)(int)offsetof(struct idx_info, ii_count));
+        LASSERTF((int)sizeof(((struct idx_info *)0)->ii_count) == 2, "found %lld\n",
+                 (long long)(int)sizeof(((struct idx_info *)0)->ii_count));
+        LASSERTF((int)offsetof(struct idx_info, ii_pad0) == 10, "found %lld\n",
+                 (long long)(int)offsetof(struct idx_info, ii_pad0));
+        LASSERTF((int)sizeof(((struct idx_info *)0)->ii_pad0) == 2, "found %lld\n",
+                 (long long)(int)sizeof(((struct idx_info *)0)->ii_pad0));
+        LASSERTF((int)offsetof(struct idx_info, ii_attrs) == 12, "found %lld\n",
+                 (long long)(int)offsetof(struct idx_info, ii_attrs));
+        LASSERTF((int)sizeof(((struct idx_info *)0)->ii_attrs) == 4, "found %lld\n",
+                 (long long)(int)sizeof(((struct idx_info *)0)->ii_attrs));
+        LASSERTF((int)offsetof(struct idx_info, ii_fid) == 16, "found %lld\n",
+                 (long long)(int)offsetof(struct idx_info, ii_fid));
+        LASSERTF((int)sizeof(((struct idx_info *)0)->ii_fid) == 16, "found %lld\n",
+                 (long long)(int)sizeof(((struct idx_info *)0)->ii_fid));
+        LASSERTF((int)offsetof(struct idx_info, ii_version) == 32, "found %lld\n",
+                 (long long)(int)offsetof(struct idx_info, ii_version));
+        LASSERTF((int)sizeof(((struct idx_info *)0)->ii_version) == 8, "found %lld\n",
+                 (long long)(int)sizeof(((struct idx_info *)0)->ii_version));
+        LASSERTF((int)offsetof(struct idx_info, ii_hash_start) == 40, "found %lld\n",
+                 (long long)(int)offsetof(struct idx_info, ii_hash_start));
+        LASSERTF((int)sizeof(((struct idx_info *)0)->ii_hash_start) == 8, "found %lld\n",
+                 (long long)(int)sizeof(((struct idx_info *)0)->ii_hash_start));
+        LASSERTF((int)offsetof(struct idx_info, ii_hash_end) == 48, "found %lld\n",
+                 (long long)(int)offsetof(struct idx_info, ii_hash_end));
+        LASSERTF((int)sizeof(((struct idx_info *)0)->ii_hash_end) == 8, "found %lld\n",
+                 (long long)(int)sizeof(((struct idx_info *)0)->ii_hash_end));
+        LASSERTF((int)offsetof(struct idx_info, ii_keysize) == 56, "found %lld\n",
+                 (long long)(int)offsetof(struct idx_info, ii_keysize));
+        LASSERTF((int)sizeof(((struct idx_info *)0)->ii_keysize) == 2, "found %lld\n",
+                 (long long)(int)sizeof(((struct idx_info *)0)->ii_keysize));
+        LASSERTF((int)offsetof(struct idx_info, ii_recsize) == 58, "found %lld\n",
+                 (long long)(int)offsetof(struct idx_info, ii_recsize));
+        LASSERTF((int)sizeof(((struct idx_info *)0)->ii_recsize) == 2, "found %lld\n",
+                 (long long)(int)sizeof(((struct idx_info *)0)->ii_recsize));
+        LASSERTF((int)offsetof(struct idx_info, ii_pad1) == 60, "found %lld\n",
+                 (long long)(int)offsetof(struct idx_info, ii_pad1));
+        LASSERTF((int)sizeof(((struct idx_info *)0)->ii_pad1) == 4, "found %lld\n",
+                 (long long)(int)sizeof(((struct idx_info *)0)->ii_pad1));
+        LASSERTF((int)offsetof(struct idx_info, ii_pad2) == 64, "found %lld\n",
+                 (long long)(int)offsetof(struct idx_info, ii_pad2));
+        LASSERTF((int)sizeof(((struct idx_info *)0)->ii_pad2) == 8, "found %lld\n",
+                 (long long)(int)sizeof(((struct idx_info *)0)->ii_pad2));
+        LASSERTF((int)offsetof(struct idx_info, ii_pad3) == 72, "found %lld\n",
+                 (long long)(int)offsetof(struct idx_info, ii_pad3));
+        LASSERTF((int)sizeof(((struct idx_info *)0)->ii_pad3) == 8, "found %lld\n",
+                 (long long)(int)sizeof(((struct idx_info *)0)->ii_pad3));
+        CLASSERT(IDX_INFO_MAGIC == 0x3D37CC37);
+
+        /* Checks for struct lu_idxpage */
+        LASSERTF((int)sizeof(struct lu_idxpage) == 16, "found %lld\n",
+                 (long long)(int)sizeof(struct lu_idxpage));
+        LASSERTF((int)offsetof(struct lu_idxpage, lip_magic) == 0, "found %lld\n",
+                 (long long)(int)offsetof(struct lu_idxpage, lip_magic));
+        LASSERTF((int)sizeof(((struct lu_idxpage *)0)->lip_magic) == 4, "found %lld\n",
+                 (long long)(int)sizeof(((struct lu_idxpage *)0)->lip_magic));
+        LASSERTF((int)offsetof(struct lu_idxpage, lip_flags) == 4, "found %lld\n",
+                 (long long)(int)offsetof(struct lu_idxpage, lip_flags));
+        LASSERTF((int)sizeof(((struct lu_idxpage *)0)->lip_flags) == 2, "found %lld\n",
+                 (long long)(int)sizeof(((struct lu_idxpage *)0)->lip_flags));
+        LASSERTF((int)offsetof(struct lu_idxpage, lip_nr) == 6, "found %lld\n",
+                 (long long)(int)offsetof(struct lu_idxpage, lip_nr));
+        LASSERTF((int)sizeof(((struct lu_idxpage *)0)->lip_nr) == 2, "found %lld\n",
+                 (long long)(int)sizeof(((struct lu_idxpage *)0)->lip_nr));
+        LASSERTF((int)offsetof(struct lu_idxpage, lip_pad0) == 8, "found %lld\n",
+                 (long long)(int)offsetof(struct lu_idxpage, lip_pad0));
+        LASSERTF((int)sizeof(((struct lu_idxpage *)0)->lip_pad0) == 8, "found %lld\n",
+                 (long long)(int)sizeof(((struct lu_idxpage *)0)->lip_pad0));
+        CLASSERT(LIP_MAGIC == 0x8A6D6B6C);
+        LASSERTF(LIP_HDR_SIZE == 16, "found %lld\n",
+                 (long long)LIP_HDR_SIZE);
+        LASSERTF(II_FL_NOHASH == 1, "found %lld\n",
+                 (long long)II_FL_NOHASH);
+        LASSERTF(II_FL_VARKEY == 2, "found %lld\n",
+                 (long long)II_FL_VARKEY);
+        LASSERTF(II_FL_VARREC == 4, "found %lld\n",
+                 (long long)II_FL_VARREC);
+        LASSERTF(II_FL_NONUNQ == 8, "found %lld\n",
+                 (long long)II_FL_NONUNQ);
+
         /* Checks for struct niobuf_remote */
         LASSERTF((int)sizeof(struct niobuf_remote) == 16, "found %lld\n",
                  (long long)(int)sizeof(struct niobuf_remote));