Whamcloud - gitweb
LU-5092 nodemap: transfer nodemaps between MGS, MDTs, and OSTs
[fs/lustre-release.git] / lustre / ptlrpc / nodemap_storage.c
index cd396a9..b61e97d 100644 (file)
@@ -504,6 +504,9 @@ static int nodemap_process_keyrec(struct nodemap_config *config,
        type = nm_idx_get_type(nodemap_id);
        nodemap_id = nm_idx_set_type(nodemap_id, 0);
 
+       CDEBUG(D_INFO, "found config entry, nm_id %d type %d\n",
+              nodemap_id, type);
+
        /* find the correct nodemap in the load list */
        if (type == NODEMAP_RANGE_IDX || type == NODEMAP_UIDMAP_IDX ||
            type == NODEMAP_GIDMAP_IDX) {
@@ -706,7 +709,8 @@ out:
                        rc = PTR_ERR(new_config->nmc_default_nodemap);
                } else {
                        rc = nodemap_idx_nodemap_add_update(
-                                       new_config->nmc_default_nodemap, 0);
+                                       new_config->nmc_default_nodemap,
+                                       NM_ADD);
                        nodemap_putref(new_config->nmc_default_nodemap);
                }
        }
@@ -793,3 +797,156 @@ void nm_config_file_deregister(const struct lu_env *env,
        EXIT;
 }
 EXPORT_SYMBOL(nm_config_file_deregister);
+
+int nodemap_process_idx_pages(struct nodemap_config *config, union lu_page *lip,
+                             struct lu_nodemap **recent_nodemap)
+{
+       struct nodemap_key *key;
+       union nodemap_rec *rec;
+       char *entry;
+       int j;
+       int k;
+       int rc = 0;
+       int size = dt_nodemap_features.dif_keysize_max +
+                  dt_nodemap_features.dif_recsize_max;
+
+       for (j = 0; j < LU_PAGE_COUNT; j++) {
+               if (lip->lp_idx.lip_magic != LIP_MAGIC)
+                       return -EINVAL;
+
+               /* get and process keys and records from page */
+               for (k = 0; k < lip->lp_idx.lip_nr; k++) {
+                       entry = lip->lp_idx.lip_entries + k * size;
+                       key = (struct nodemap_key *)entry;
+
+                       entry += dt_nodemap_features.dif_keysize_max;
+                       rec = (union nodemap_rec *)entry;
+
+                       rc = nodemap_process_keyrec(config, key, rec,
+                                                   recent_nodemap);
+                       if (rc < 0)
+                               return rc;
+               }
+               lip++;
+       }
+       return 0;
+}
+EXPORT_SYMBOL(nodemap_process_idx_pages);
+
+int nodemap_index_read(struct lu_env *env,
+                      struct nm_config_file *ncf,
+                      struct idx_info *ii,
+                      const struct lu_rdpg *rdpg)
+{
+       struct dt_object        *nodemap_idx = ncf->ncf_obj;
+       int                      rc = 0;
+
+       ii->ii_keysize = dt_nodemap_features.dif_keysize_max;
+       ii->ii_recsize = dt_nodemap_features.dif_recsize_max;
+
+       dt_read_lock(env, nodemap_idx, 0);
+       rc = dt_index_walk(env, nodemap_idx, rdpg, NULL, ii);
+       CDEBUG(D_INFO, "walked index, hashend %llx\n", ii->ii_hash_end);
+
+       dt_read_unlock(env, nodemap_idx);
+       return rc;
+}
+EXPORT_SYMBOL(nodemap_index_read);
+
+/**
+ * Returns the current nodemap configuration to MGC by walking the nodemap
+ * config index and storing it in the response buffer.
+ *
+ * \param      req             incoming MGS_CONFIG_READ request
+ * \retval     0               success
+ * \retval     -EINVAL         malformed request
+ * \retval     -ENOTCONN       client evicted/reconnected already
+ * \retval     -ETIMEDOUT      client timeout or network error
+ * \retval     -ENOMEM
+ */
+int nodemap_get_config_req(struct obd_device *mgs_obd,
+                          struct ptlrpc_request *req)
+{
+       struct mgs_config_body *body;
+       struct mgs_config_res *res;
+       struct lu_rdpg rdpg;
+       struct idx_info nodemap_ii;
+       struct ptlrpc_bulk_desc *desc;
+       struct l_wait_info lwi;
+       int i;
+       int page_count;
+       int bytes = 0;
+       int rc = 0;
+
+       body = req_capsule_client_get(&req->rq_pill, &RMF_MGS_CONFIG_BODY);
+       if (!body)
+               RETURN(-EINVAL);
+
+       if (body->mcb_type != CONFIG_T_NODEMAP)
+               RETURN(-EINVAL);
+
+       rdpg.rp_count = (body->mcb_units << body->mcb_bits);
+       rdpg.rp_npages = (rdpg.rp_count + PAGE_CACHE_SIZE - 1) >>
+               PAGE_CACHE_SHIFT;
+       if (rdpg.rp_npages > PTLRPC_MAX_BRW_PAGES)
+               RETURN(-EINVAL);
+
+       CDEBUG(D_INFO, "reading nodemap log, name '%s', size = %u\n",
+              body->mcb_name, rdpg.rp_count);
+
+       /* allocate pages to store the containers */
+       OBD_ALLOC(rdpg.rp_pages, sizeof(*rdpg.rp_pages) * rdpg.rp_npages);
+       if (rdpg.rp_pages == NULL)
+               RETURN(-ENOMEM);
+       for (i = 0; i < rdpg.rp_npages; i++) {
+               rdpg.rp_pages[i] = alloc_page(GFP_IOFS);
+               if (rdpg.rp_pages[i] == NULL)
+                       GOTO(out, rc = -ENOMEM);
+       }
+
+       rdpg.rp_hash = body->mcb_offset;
+       nodemap_ii.ii_magic = IDX_INFO_MAGIC;
+       nodemap_ii.ii_flags = II_FL_NOHASH;
+
+       bytes = nodemap_index_read(req->rq_svc_thread->t_env,
+                                  mgs_obd->u.obt.obt_nodemap_config_file,
+                                  &nodemap_ii, &rdpg);
+       if (bytes < 0)
+               GOTO(out, rc = bytes);
+
+       res = req_capsule_server_get(&req->rq_pill, &RMF_MGS_CONFIG_RES);
+       if (res == NULL)
+               GOTO(out, rc = -EINVAL);
+       res->mcr_offset = nodemap_ii.ii_hash_end;
+       res->mcr_size = bytes;
+
+       page_count = (bytes + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
+       LASSERT(page_count <= rdpg.rp_count);
+       desc = ptlrpc_prep_bulk_exp(req, page_count, 1,
+                                   PTLRPC_BULK_PUT_SOURCE |
+                                       PTLRPC_BULK_BUF_KIOV,
+                                   MGS_BULK_PORTAL,
+                                   &ptlrpc_bulk_kiov_pin_ops);
+       if (desc == NULL)
+               GOTO(out, rc = -ENOMEM);
+
+       for (i = 0; i < page_count && bytes > 0; i++) {
+               ptlrpc_prep_bulk_page_pin(desc, rdpg.rp_pages[i], 0,
+                                         min_t(int, bytes, PAGE_CACHE_SIZE));
+               bytes -= PAGE_CACHE_SIZE;
+       }
+
+       rc = target_bulk_io(req->rq_export, desc, &lwi);
+       ptlrpc_free_bulk(desc);
+
+out:
+       if (rdpg.rp_pages != NULL) {
+               for (i = 0; i < rdpg.rp_npages; i++)
+                       if (rdpg.rp_pages[i] != NULL)
+                               __free_page(rdpg.rp_pages[i]);
+               OBD_FREE(rdpg.rp_pages,
+                        rdpg.rp_npages * sizeof(rdpg.rp_pages[0]));
+       }
+       return rc;
+}
+EXPORT_SYMBOL(nodemap_get_config_req);