+static int nodemap_page_build(const struct lu_env *env, union lu_page *lp,
+ size_t nob, const struct dt_it_ops *iops,
+ struct dt_it *it, __u32 attr, void *arg)
+{
+ struct idx_info *ii = (struct idx_info *)arg;
+ struct lu_idxpage *lip = &lp->lp_idx;
+ char *entry;
+ size_t size = ii->ii_keysize + ii->ii_recsize;
+ int rc;
+ ENTRY;
+
+ if (nob < LIP_HDR_SIZE)
+ return -EINVAL;
+
+ /* initialize the header of the new container */
+ memset(lip, 0, LIP_HDR_SIZE);
+ lip->lip_magic = LIP_MAGIC;
+ nob -= LIP_HDR_SIZE;
+
+ entry = lip->lip_entries;
+ do {
+ char *tmp_entry = entry;
+ struct dt_key *key;
+ __u64 hash;
+ enum nodemap_idx_type key_type;
+
+ /* fetch 64-bit hash value */
+ hash = iops->store(env, it);
+ ii->ii_hash_end = hash;
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_OBD_IDX_READ_BREAK)) {
+ if (lip->lip_nr != 0)
+ GOTO(out, rc = 0);
+ }
+
+ if (nob < size) {
+ if (lip->lip_nr == 0)
+ GOTO(out, rc = -EINVAL);
+ GOTO(out, rc = 0);
+ }
+
+ key = iops->key(env, it);
+ key_type = nodemap_get_key_type((struct nodemap_key *)key);
+
+ /* on the first pass, get only the cluster types. On second
+ * pass, get all the rest */
+ if ((ii->ii_attrs == NM_READ_CLUSTERS &&
+ key_type == NODEMAP_CLUSTER_IDX) ||
+ (ii->ii_attrs == NM_READ_ATTRIBUTES &&
+ key_type != NODEMAP_CLUSTER_IDX &&
+ key_type != NODEMAP_EMPTY_IDX)) {
+ memcpy(tmp_entry, key, ii->ii_keysize);
+ tmp_entry += ii->ii_keysize;
+
+ /* and finally the record */
+ rc = iops->rec(env, it, (struct dt_rec *)tmp_entry,
+ attr);
+ if (rc != -ESTALE) {
+ if (rc != 0)
+ GOTO(out, rc);
+
+ /* hash/key/record successfully copied! */
+ lip->lip_nr++;
+ if (unlikely(lip->lip_nr == 1 &&
+ ii->ii_count == 0))
+ ii->ii_hash_start = hash;
+
+ entry = tmp_entry + ii->ii_recsize;
+ nob -= size;
+ }
+ }
+
+ /* move on to the next record */
+ do {
+ rc = iops->next(env, it);
+ } while (rc == -ESTALE);
+
+ /* move to second pass */
+ if (rc > 0 && ii->ii_attrs == NM_READ_CLUSTERS) {
+ ii->ii_attrs = NM_READ_ATTRIBUTES;
+ rc = iops->load(env, it, 0);
+ if (rc == 0)
+ rc = iops->next(env, it);
+ else if (rc > 0)
+ rc = 0;
+ else
+ GOTO(out, rc);
+ }
+
+ } while (rc == 0);
+
+ GOTO(out, rc);
+out:
+ if (rc >= 0 && lip->lip_nr > 0)
+ /* one more container */
+ ii->ii_count++;
+ if (rc > 0)
+ /* no more entries */
+ ii->ii_hash_end = II_END_OFF;
+ return rc;
+}
+
+