int rc = 0, i = 0;
LASSERT(count == cmm->cmm_tgt_count);
-
/* FIXME: this spin_lock maybe not proper,
* because fid_alloc may need RPC */
spin_lock(&cmm->cmm_tgt_guard);
list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets,
mc_linkage) {
+ if (cmm->cmm_local_num == mc->mc_num)
+ continue;
+
rc = obd_fid_alloc(mc->mc_desc.cl_exp, &fid[i++], NULL);
- if (rc) {
+ if (rc < 0) {
spin_unlock(&cmm->cmm_tgt_guard);
RETURN(rc);
}
}
spin_unlock(&cmm->cmm_tgt_guard);
+ LASSERT(i + 1 == count);
+ if (rc == 1)
+ rc = 0;
RETURN(rc);
}
RETURN(PTR_ERR(obj));
OBD_ALLOC_PTR(spec);
+ spec->u.sp_pfid = fid;
rc = mo_object_create(ctx, md_object_next(&obj->cmo_obj),
spec, ma);
OBD_FREE_PTR(spec);
struct lu_fid *lf = cmm2_fid(md2cmm_obj(mo));
ENTRY;
- lmv_size = cmm_md_size(cmm->cmm_tgt_count + 1);
+ lmv_size = cmm_md_size(cmm->cmm_tgt_count);
/* This lmv will be free after finish splitting. */
OBD_ALLOC(lmv, lmv_size);
lmv->mea_master = -1;
lmv->mea_magic = MEA_MAGIC_ALL_CHARS;
- lmv->mea_count = cmm->cmm_tgt_count + 1;
+ lmv->mea_count = cmm->cmm_tgt_count;
lmv->mea_ids[0] = *lf;
if (rc)
GOTO(cleanup, rc);
- for (i = 0; i < cmm->cmm_tgt_count; i ++) {
+ for (i = 1; i < cmm->cmm_tgt_count; i ++) {
rc = cmm_creat_remote_obj(ctx, cmm, &lmv->mea_ids[i], ma);
if (rc)
GOTO(cleanup, rc);
hash_segement = MAX_HASH_SIZE / cmm->cmm_tgt_count;
for (i = 1; i < cmm->cmm_tgt_count; i++) {
struct lu_fid *lf = &ma->ma_lmv->mea_ids[i];
-
+
rdpg->rp_hash = i * hash_segement;
rdpg->rp_hash_end = rdpg->rp_hash + hash_segement;
- rc = cmm_split_entries(ctx, mo, rdpg, lf);
+ rc = cmm_remove_entries(ctx, mo, rdpg);
if (rc)
GOTO(cleanup, rc);
- rc = cmm_remove_entries(ctx, mo, rdpg);
+
+ rc = cmm_split_entries(ctx, mo, rdpg, lf);
if (rc)
GOTO(cleanup, rc);
}
#include <obd_support.h>
#include <lustre_lib.h>
#include <obd_class.h>
+#include <lustre_mdc.h>
#include "mdc_internal.h"
static struct md_object_operations mdc_mo_ops;
#ifdef HAVE_SPLIT_SUPPORT
int mdc_send_page(const struct lu_context *ctx, struct md_object *mo,
- const struct page *page)
+ struct page *page)
{
struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
+ struct lu_dirpage *dp;
+ struct lu_dirent *ent;
int rc;
ENTRY;
+ kmap(page);
+ dp = page_address(page);
+ for (ent = lu_dirent_start(dp); ent != NULL;
+ ent = lu_dirent_next(ent)) {
+ /* allocate new fid for each obj */
+ rc = obd_fid_alloc(mc->mc_desc.cl_exp, &ent->lde_fid, NULL);
+ if (rc) {
+ kunmap(page);
+ RETURN(rc);
+ }
+ }
+ kunmap(page);
rc = mdc_sendpage(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu),
page);
RETURN(rc);