+EXPORT_SYMBOL(nm_config_file_deregister_tgt);
+
+int nodemap_process_idx_pages(struct nodemap_config *config, union lu_page *lip,
+ struct lu_nodemap **recent_nodemap)
+{
+ struct nodemap_key *key;
+ union nodemap_rec *rec;
+ char *entry;
+ int j;
+ int k;
+ int rc = 0;
+ int size = dt_nodemap_features.dif_keysize_max +
+ dt_nodemap_features.dif_recsize_max;
+ ENTRY;
+
+ for (j = 0; j < LU_PAGE_COUNT; j++) {
+ if (lip->lp_idx.lip_magic != LIP_MAGIC)
+ return -EINVAL;
+
+ /* get and process keys and records from page */
+ for (k = 0; k < lip->lp_idx.lip_nr; k++) {
+ entry = lip->lp_idx.lip_entries + k * size;
+ key = (struct nodemap_key *)entry;
+
+ entry += dt_nodemap_features.dif_keysize_max;
+ rec = (union nodemap_rec *)entry;
+
+ rc = nodemap_process_keyrec(config, key, rec,
+ recent_nodemap);
+ if (rc < 0)
+ return rc;
+ }
+ lip++;
+ }
+
+ EXIT;
+ return 0;
+}
+EXPORT_SYMBOL(nodemap_process_idx_pages);
+
+static int nodemap_page_build(const struct lu_env *env, union lu_page *lp,
+ size_t nob, const struct dt_it_ops *iops,
+ struct dt_it *it, __u32 attr, void *arg)
+{
+ struct idx_info *ii = (struct idx_info *)arg;
+ struct lu_idxpage *lip = &lp->lp_idx;
+ char *entry;
+ size_t size = ii->ii_keysize + ii->ii_recsize;
+ int rc;
+ ENTRY;
+
+ if (nob < LIP_HDR_SIZE)
+ return -EINVAL;
+
+ /* initialize the header of the new container */
+ memset(lip, 0, LIP_HDR_SIZE);
+ lip->lip_magic = LIP_MAGIC;
+ nob -= LIP_HDR_SIZE;
+
+ entry = lip->lip_entries;
+ do {
+ char *tmp_entry = entry;
+ struct dt_key *key;
+ __u64 hash;
+ enum nodemap_idx_type key_type;
+
+ /* fetch 64-bit hash value */
+ hash = iops->store(env, it);
+ ii->ii_hash_end = hash;
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_OBD_IDX_READ_BREAK)) {
+ if (lip->lip_nr != 0)
+ GOTO(out, rc = 0);
+ }
+
+ if (nob < size) {
+ if (lip->lip_nr == 0)
+ GOTO(out, rc = -EINVAL);
+ GOTO(out, rc = 0);
+ }
+
+ key = iops->key(env, it);
+ key_type = nodemap_get_key_type((struct nodemap_key *)key);
+
+ /* on the first pass, get only the cluster types. On second
+ * pass, get all the rest */
+ if ((ii->ii_attrs == NM_READ_CLUSTERS &&
+ key_type == NODEMAP_CLUSTER_IDX) ||
+ (ii->ii_attrs == NM_READ_ATTRIBUTES &&
+ key_type != NODEMAP_CLUSTER_IDX &&
+ key_type != NODEMAP_EMPTY_IDX)) {
+ memcpy(tmp_entry, key, ii->ii_keysize);
+ tmp_entry += ii->ii_keysize;
+
+ /* and finally the record */
+ rc = iops->rec(env, it, (struct dt_rec *)tmp_entry,
+ attr);
+ if (rc != -ESTALE) {
+ if (rc != 0)
+ GOTO(out, rc);
+
+ /* hash/key/record successfully copied! */
+ lip->lip_nr++;
+ if (unlikely(lip->lip_nr == 1 &&
+ ii->ii_count == 0))
+ ii->ii_hash_start = hash;
+
+ entry = tmp_entry + ii->ii_recsize;
+ nob -= size;
+ }
+ }
+
+ /* move on to the next record */
+ do {
+ rc = iops->next(env, it);
+ } while (rc == -ESTALE);
+
+ /* move to second pass */
+ if (rc > 0 && ii->ii_attrs == NM_READ_CLUSTERS) {
+ ii->ii_attrs = NM_READ_ATTRIBUTES;
+ rc = iops->load(env, it, 0);
+ if (rc == 0)
+ rc = iops->next(env, it);
+ else if (rc > 0)
+ rc = 0;
+ else
+ GOTO(out, rc);
+ }
+
+ } while (rc == 0);
+
+ GOTO(out, rc);
+out:
+ if (rc >= 0 && lip->lip_nr > 0)
+ /* one more container */
+ ii->ii_count++;
+ if (rc > 0)
+ /* no more entries */
+ ii->ii_hash_end = II_END_OFF;
+ return rc;
+}
+
+
+int nodemap_index_read(struct lu_env *env,
+ struct nm_config_file *ncf,
+ struct idx_info *ii,
+ const struct lu_rdpg *rdpg)
+{
+ struct dt_object *nodemap_idx = ncf->ncf_obj;
+ __u64 version;
+ int rc = 0;
+
+ ii->ii_keysize = dt_nodemap_features.dif_keysize_max;
+ ii->ii_recsize = dt_nodemap_features.dif_recsize_max;
+
+ dt_read_lock(env, nodemap_idx, 0);
+ version = dt_version_get(env, nodemap_idx);
+ if (rdpg->rp_hash != 0 && ii->ii_version != version) {
+ CDEBUG(D_INFO, "nodemap config changed inflight, old %llu, new %llu\n",
+ ii->ii_version,
+ version);
+ ii->ii_hash_end = 0;
+ } else {
+ rc = dt_index_walk(env, nodemap_idx, rdpg, nodemap_page_build,
+ ii);
+ CDEBUG(D_INFO, "walked index, hashend %llx\n", ii->ii_hash_end);
+ }
+
+ if (rc >= 0)
+ ii->ii_version = version;
+
+ dt_read_unlock(env, nodemap_idx);
+ return rc;
+}
+EXPORT_SYMBOL(nodemap_index_read);
+
+/**
+ * Returns the current nodemap configuration to MGC by walking the nodemap
+ * config index and storing it in the response buffer.
+ *
+ * \param req incoming MGS_CONFIG_READ request
+ * \retval 0 success
+ * \retval -EINVAL malformed request
+ * \retval -ENOTCONN client evicted/reconnected already
+ * \retval -ETIMEDOUT client timeout or network error
+ * \retval -ENOMEM
+ */
+int nodemap_get_config_req(struct obd_device *mgs_obd,
+ struct ptlrpc_request *req)
+{
+ const struct ptlrpc_bulk_frag_ops *frag_ops = &ptlrpc_bulk_kiov_pin_ops;
+ struct mgs_config_body *body;
+ struct mgs_config_res *res;
+ struct lu_rdpg rdpg;
+ struct idx_info nodemap_ii;
+ struct ptlrpc_bulk_desc *desc;
+ struct tg_export_data *rqexp_ted = &req->rq_export->exp_target_data;
+ int i;
+ int page_count;
+ int bytes = 0;
+ int rc = 0;
+
+ body = req_capsule_client_get(&req->rq_pill, &RMF_MGS_CONFIG_BODY);
+ if (!body)
+ RETURN(-EINVAL);
+
+ if (body->mcb_type != CONFIG_T_NODEMAP)
+ RETURN(-EINVAL);
+
+ rdpg.rp_count = (body->mcb_units << body->mcb_bits);
+ rdpg.rp_npages = (rdpg.rp_count + PAGE_SIZE - 1) >>
+ PAGE_SHIFT;
+ if (rdpg.rp_npages > PTLRPC_MAX_BRW_PAGES)
+ RETURN(-EINVAL);
+
+ CDEBUG(D_INFO, "reading nodemap log, name '%s', size = %u\n",
+ body->mcb_name, rdpg.rp_count);
+
+ /* allocate pages to store the containers */
+ OBD_ALLOC(rdpg.rp_pages, sizeof(*rdpg.rp_pages) * rdpg.rp_npages);
+ if (rdpg.rp_pages == NULL)
+ RETURN(-ENOMEM);
+ for (i = 0; i < rdpg.rp_npages; i++) {
+ rdpg.rp_pages[i] = alloc_page(GFP_NOFS);
+ if (rdpg.rp_pages[i] == NULL)
+ GOTO(out, rc = -ENOMEM);
+ }
+
+ rdpg.rp_hash = body->mcb_offset;
+ nodemap_ii.ii_magic = IDX_INFO_MAGIC;
+ nodemap_ii.ii_flags = II_FL_NOHASH;
+ nodemap_ii.ii_version = rqexp_ted->ted_nodemap_version;
+ nodemap_ii.ii_attrs = body->mcb_nm_cur_pass;
+
+ bytes = nodemap_index_read(req->rq_svc_thread->t_env,
+ mgs_obd->u.obt.obt_nodemap_config_file,
+ &nodemap_ii, &rdpg);
+ if (bytes < 0)
+ GOTO(out, rc = bytes);
+
+ rqexp_ted->ted_nodemap_version = nodemap_ii.ii_version;
+
+ res = req_capsule_server_get(&req->rq_pill, &RMF_MGS_CONFIG_RES);
+ if (res == NULL)
+ GOTO(out, rc = -EINVAL);
+ res->mcr_offset = nodemap_ii.ii_hash_end;
+ res->mcr_nm_cur_pass = nodemap_ii.ii_attrs;
+
+ page_count = (bytes + PAGE_SIZE - 1) >> PAGE_SHIFT;
+ LASSERT(page_count <= rdpg.rp_count);
+ desc = ptlrpc_prep_bulk_exp(req, page_count, 1,
+ PTLRPC_BULK_PUT_SOURCE |
+ PTLRPC_BULK_BUF_KIOV,
+ MGS_BULK_PORTAL, frag_ops);
+ if (desc == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ for (i = 0; i < page_count && bytes > 0; i++) {
+ frag_ops->add_kiov_frag(desc, rdpg.rp_pages[i], 0,
+ min_t(int, bytes, PAGE_SIZE));
+ bytes -= PAGE_SIZE;
+ }
+
+ rc = target_bulk_io(req->rq_export, desc);
+ ptlrpc_free_bulk(desc);
+
+out:
+ if (rdpg.rp_pages != NULL) {
+ for (i = 0; i < rdpg.rp_npages; i++)
+ if (rdpg.rp_pages[i] != NULL)
+ __free_page(rdpg.rp_pages[i]);
+ OBD_FREE(rdpg.rp_pages,
+ rdpg.rp_npages * sizeof(rdpg.rp_pages[0]));
+ }
+ return rc;
+}
+EXPORT_SYMBOL(nodemap_get_config_req);