ENTRY;
obj = lmv_obj_grab(obd, pid);
- if (!obj)
+ if (!obj) {
+ CERROR("Object "DFID" should be split\n",
+ PFID(pid));
RETURN(0);
+ }
mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
(char *)op->name, op->namelen);
rpid = &obj->lo_inodes[mds].li_fid;
rc = lmv_fld_lookup(lmv, rpid, &mds);
+ lmv_obj_put(obj);
if (rc)
- GOTO(cleanup, rc);
+ RETURN(rc);
- rc = obd_fid_alloc(lmv->tgts[mds].ltd_exp, fid, NULL);
- if (rc > 0) {
- LASSERT(fid_is_sane(fid));
- rc = fld_client_create(&lmv->lmv_fld,
- fid_seq(fid), mds, NULL);
- if (rc) {
- CERROR("Can't create fld entry, rc %d\n", rc);
- GOTO(cleanup, rc);
- }
- }
- if (rc == 0) {
- CDEBUG(D_INFO, "Allocate new fid "DFID" for split "
- "obj\n", PFID(fid));
+ rc = __lmv_fid_alloc(lmv, fid, mds);
+ if (rc) {
+ CERROR("Can't allocate new fid, rc %d\n",
+ rc);
+ RETURN(rc);
}
- EXIT;
-cleanup:
- lmv_obj_put(obj);
- return rc;
+ CDEBUG(D_INFO, "Allocate new fid "DFID" for split "
+ "obj\n", PFID(fid));
+
+ RETURN(rc);
}
/*
mdsno_t *mds)
{
struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_obj *obj;
int rc;
ENTRY;
* balanced, that is all sequences have more or less equal
* number of objects created.
*/
- if (hint->ph_cname && (hint->ph_opc == LUSTRE_OPC_MKDIR)) {
-#if 1
- *mds = lmv_all_chars_policy(lmv->desc.ld_tgt_count,
- hint->ph_cname);
- rc = 0;
-#else
- /* Stress policy for tests - to use non-parent MDS */
- LASSERT(fid_is_sane(hint->ph_pfid));
- rc = lmv_fld_lookup(lmv, hint->ph_pfid, mds);
+ obj = lmv_obj_grab(obd, hint->ph_pfid);
+ if (obj) {
+ /*
+ * If the dir got split, alloc fid according to its
+ * hash. No matter what we create, object create should
+ * go to correct MDS.
+ */
+ struct lu_fid *rpid;
+
+ *mds = raw_name2idx(obj->lo_hashtype,
+ obj->lo_objcount,
+ hint->ph_cname->name,
+ hint->ph_cname->len);
+ rpid = &obj->lo_inodes[*mds].li_fid;
+ rc = lmv_fld_lookup(lmv, rpid, mds);
+ lmv_obj_put(obj);
if (rc)
- RETURN(rc);
- *mds = (int)(*mds + 1) % lmv->desc.ld_tgt_count;
-
-#endif
- } else {
- struct lmv_obj *obj;
- LASSERT(fid_is_sane(hint->ph_pfid));
-
- obj = lmv_obj_grab(obd, hint->ph_pfid);
- if (obj) {
- /*
- * If the dir got split, alloc fid according to
- * its hash
- */
- struct lu_fid *rpid;
-
- *mds = raw_name2idx(obj->lo_hashtype,
- obj->lo_objcount,
- hint->ph_cname->name,
- hint->ph_cname->len);
- rpid = &obj->lo_inodes[*mds].li_fid;
- lmv_obj_put(obj);
+ GOTO(exit, rc);
+ rc = 0;
- rc = lmv_fld_lookup(lmv, rpid, mds);
- if (rc)
- GOTO(exit, rc);
-
- CDEBUG(D_INODE, "The obj "DFID" has been"
- "split, got MDS at "LPU64" by name %s\n",
- PFID(hint->ph_pfid), *mds,
- hint->ph_cname->name);
+ CDEBUG(D_INODE, "The obj "DFID" has been split, got "
+ "MDS at "LPU64" by name %s\n",PFID(hint->ph_pfid),
+ *mds, hint->ph_cname->name);
+ } else {
+ if (hint->ph_cname && (hint->ph_opc == LUSTRE_OPC_MKDIR)) {
+ /* Default policy for directories. */
+ *mds = lmv_all_chars_policy(lmv->desc.ld_tgt_count,
+ hint->ph_cname);
rc = 0;
} else {
- /* Default policy is to use parent MDS */
+ /*
+ * Default policy for others is to use parent
+ * MDS.
+ */
rc = lmv_fld_lookup(lmv, hint->ph_pfid, mds);
}
-
}
} else {
/*
* yet!
*/
*mds = 0;
- rc = -EINVAL;
+ rc = -ENOSYS;
}
exit:
if (rc) {
RETURN(rc);
}
+int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid,
+ mdsno_t mds)
+{
+ struct lmv_tgt_desc *tgt = &lmv->tgts[mds];
+ int rc;
+ ENTRY;
+
+ /* New seq alloc and FLD setup should be atomic. */
+ down(&tgt->fid_sem);
+
+ /* Asking underlaying tgt layer to allocate new fid. */
+ rc = obd_fid_alloc(tgt->ltd_exp, fid, NULL);
+ if (rc > 0) {
+ LASSERT(fid_is_sane(fid));
+
+ /* Client switches to new sequence, setup FLD. */
+ rc = fld_client_create(&lmv->lmv_fld, fid_seq(fid),
+ mds, NULL);
+ if (rc) {
+ CERROR("Can't create fld entry, "
+ "rc %d\n", rc);
+ }
+ }
+
+ up(&tgt->fid_sem);
+ RETURN(rc);
+}
+
static int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
struct lu_placement_hint *hint)
{
int rc;
ENTRY;
- LASSERT(fid != NULL);
LASSERT(hint != NULL);
+ LASSERT(fid != NULL);
rc = lmv_placement_policy(obd, hint, &mds);
if (rc) {
RETURN(rc);
}
- /* Asking underlaying tgt layer to allocate new fid. */
- rc = obd_fid_alloc(lmv->tgts[mds].ltd_exp, fid, hint);
-
- /* Client switches to new sequence, setup fld. */
- if (rc > 0) {
- LASSERT(fid_is_sane(fid));
-
- rc = fld_client_create(&lmv->lmv_fld, fid_seq(fid),
- mds, NULL);
- if (rc) {
- CERROR("Can't create fld entry, rc %d\n", rc);
- RETURN(rc);
- }
+ rc = __lmv_fid_alloc(lmv, fid, mds);
+ if (rc) {
+ CERROR("Can't alloc new fid, rc %d\n",
+ rc);
+ RETURN(rc);
}
RETURN(rc);
if (lmv->tgts == NULL)
RETURN(-ENOMEM);
- for (i = 0; i < LMV_MAX_TGT_COUNT; i++)
+ for (i = 0; i < LMV_MAX_TGT_COUNT; i++) {
+ sema_init(&lmv->tgts[i].fid_sem, 1);
lmv->tgts[i].idx = i;
+ }
lmv->datas_size = LMV_MAX_TGT_COUNT * sizeof(struct obd_connect_data);