+static int lmv_choose_mds(struct lmv_obd *lmv, struct md_op_data *op_data,
+ placement_policy_t placement)
+{
+ switch (placement) {
+ case PLACEMENT_CHAR_POLICY:
+ return lmv_all_chars_policy(lmv->desc.ld_tgt_count,
+ op_data->op_name,
+ op_data->op_namelen);
+ case PLACEMENT_NID_POLICY:
+ return lmv_nid_policy(lmv);
+
+ default:
+ break;
+ }
+
+ CERROR("Unsupported placement policy %x\n", placement);
+ return -EINVAL;
+}
+
+/**
+ * This is _inode_ placement policy function (not name).
+ */
+static int lmv_placement_policy(struct obd_device *obd,
+ struct md_op_data *op_data,
+ mdsno_t *mds)
+{
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_object *obj;
+ int rc;
+ ENTRY;
+
+ LASSERT(mds != NULL);
+
+ if (lmv->desc.ld_tgt_count == 1) {
+ *mds = 0;
+ RETURN(0);
+ }
+
+ /*
+ * Allocate new fid on target according to operation type and parent
+ * home mds.
+ */
+ obj = lmv_object_find(obd, &op_data->op_fid1);
+ if (obj != NULL || op_data->op_name == NULL ||
+ op_data->op_opc != LUSTRE_OPC_MKDIR) {
+ /*
+ * Allocate fid for non-dir or for null name or for case parent
+ * dir is split.
+ */
+ if (obj) {
+ lmv_object_put(obj);
+
+ /*
+ * If we have this flag turned on, and we see that
+ * parent dir is split, this means, that caller did not
+ * notice split yet. This is race and we would like to
+ * let caller know that.
+ */
+ if (op_data->op_bias & MDS_CHECK_SPLIT)
+ RETURN(-ERESTART);
+ }
+
+ /*
+ * Allocate new fid on same mds where parent fid is located and
+ * where operation will be sent. In case of split dir, ->op_fid1
+ * and ->op_mds here will contain fid and mds of slave directory
+ * object (assigned by caller).
+ */
+ *mds = op_data->op_mds;
+ rc = 0;
+ } else {
+ /*
+ * Parent directory is not split and we want to create a
+ * directory in it. Let's calculate where to place it according
+ * to operation data @op_data.
+ */
+ *mds = lmv_choose_mds(lmv, op_data, lmv->lmv_placement);
+ rc = 0;
+ }
+
+ if (rc) {
+ CERROR("Can't choose MDS, err = %d\n", rc);
+ } else {
+ LASSERT(*mds < lmv->desc.ld_tgt_count);
+ }
+
+ RETURN(rc);
+}
+
+int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid,
+ mdsno_t mds)
+{
+ struct lmv_tgt_desc *tgt;
+ int rc;
+ ENTRY;
+
+ tgt = lmv_get_target(lmv, mds);
+
+ /*
+ * New seq alloc and FLD setup should be atomic. Otherwise we may find
+ * on server that seq in new allocated fid is not yet known.
+ */
+ cfs_mutex_lock(&tgt->ltd_fid_mutex);
+
+ if (!tgt->ltd_active)
+ GOTO(out, rc = -ENODEV);
+
+ /*
+ * Asking underlaying tgt layer to allocate new fid.
+ */
+ rc = obd_fid_alloc(tgt->ltd_exp, fid, NULL);
+ if (rc > 0) {
+ LASSERT(fid_is_sane(fid));
+ rc = 0;
+ }
+
+ EXIT;
+out:
+ cfs_mutex_unlock(&tgt->ltd_fid_mutex);
+ return rc;
+}
+
+int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
+ struct md_op_data *op_data)
+{
+ struct obd_device *obd = class_exp2obd(exp);
+ struct lmv_obd *lmv = &obd->u.lmv;
+ mdsno_t mds = 0;
+ int rc;
+ ENTRY;
+
+ LASSERT(op_data != NULL);
+ LASSERT(fid != NULL);
+
+ rc = lmv_placement_policy(obd, op_data, &mds);
+ if (rc) {
+ CERROR("Can't get target for allocating fid, "
+ "rc %d\n", rc);
+ RETURN(rc);
+ }
+
+ rc = __lmv_fid_alloc(lmv, fid, mds);
+ if (rc) {
+ CERROR("Can't alloc new fid, rc %d\n", rc);
+ RETURN(rc);
+ }
+
+ RETURN(rc);
+}
+
+static int lmv_fid_delete(struct obd_export *exp, const struct lu_fid *fid)
+{
+ ENTRY;
+ LASSERT(exp != NULL && fid != NULL);
+ if (lmv_object_delete(exp, fid)) {
+ CDEBUG(D_INODE, "Object "DFID" is destroyed.\n",
+ PFID(fid));
+ }
+ RETURN(0);
+}
+
+static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
+{
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lprocfs_static_vars lvars;
+ struct lmv_desc *desc;
+ int rc;
+ int i = 0;
+ ENTRY;
+
+ if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
+ CERROR("LMV setup requires a descriptor\n");
+ RETURN(-EINVAL);
+ }
+
+ desc = (struct lmv_desc *)lustre_cfg_buf(lcfg, 1);
+ if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
+ CERROR("Lmv descriptor size wrong: %d > %d\n",
+ (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
+ RETURN(-EINVAL);
+ }
+
+ lmv->tgts_size = LMV_MAX_TGT_COUNT * sizeof(struct lmv_tgt_desc);
+
+ OBD_ALLOC(lmv->tgts, lmv->tgts_size);
+ if (lmv->tgts == NULL)
+ RETURN(-ENOMEM);
+
+ for (i = 0; i < LMV_MAX_TGT_COUNT; i++) {
+ cfs_mutex_init(&lmv->tgts[i].ltd_fid_mutex);
+ lmv->tgts[i].ltd_idx = i;
+ }
+
+ lmv->datas_size = LMV_MAX_TGT_COUNT * sizeof(struct obd_connect_data);
+
+ OBD_ALLOC(lmv->datas, lmv->datas_size);
+ if (lmv->datas == NULL)
+ GOTO(out_free_tgts, rc = -ENOMEM);
+
+ obd_str2uuid(&lmv->desc.ld_uuid, desc->ld_uuid.uuid);
+ lmv->desc.ld_tgt_count = 0;
+ lmv->desc.ld_active_tgt_count = 0;
+ lmv->max_cookiesize = 0;
+ lmv->max_def_easize = 0;
+ lmv->max_easize = 0;
+ lmv->lmv_placement = PLACEMENT_CHAR_POLICY;
+
+ cfs_spin_lock_init(&lmv->lmv_lock);
+ cfs_mutex_init(&lmv->init_mutex);
+
+ rc = lmv_object_setup(obd);
+ if (rc) {
+ CERROR("Can't setup LMV object manager, error %d.\n", rc);
+ GOTO(out_free_datas, rc);
+ }
+
+ lprocfs_lmv_init_vars(&lvars);
+ lprocfs_obd_setup(obd, lvars.obd_vars);
+#ifdef LPROCFS
+ {
+ rc = lprocfs_seq_create(obd->obd_proc_entry, "target_obd",
+ 0444, &lmv_proc_target_fops, obd);
+ if (rc)
+ CWARN("%s: error adding LMV target_obd file: rc = %d\n",
+ obd->obd_name, rc);
+ }
+#endif
+ rc = fld_client_init(&lmv->lmv_fld, obd->obd_name,
+ LUSTRE_CLI_FLD_HASH_DHT);
+ if (rc) {
+ CERROR("Can't init FLD, err %d\n", rc);
+ GOTO(out_free_datas, rc);
+ }
+
+ RETURN(0);
+
+out_free_datas:
+ OBD_FREE(lmv->datas, lmv->datas_size);
+ lmv->datas = NULL;
+out_free_tgts:
+ OBD_FREE(lmv->tgts, lmv->tgts_size);
+ lmv->tgts = NULL;
+ return rc;
+}
+
+static int lmv_cleanup(struct obd_device *obd)
+{
+ struct lmv_obd *lmv = &obd->u.lmv;
+ ENTRY;
+
+ fld_client_fini(&lmv->lmv_fld);
+ lmv_object_cleanup(obd);
+ OBD_FREE(lmv->datas, lmv->datas_size);
+ OBD_FREE(lmv->tgts, lmv->tgts_size);
+
+ RETURN(0);
+}
+
+static int lmv_process_config(struct obd_device *obd, obd_count len, void *buf)
+{
+ struct lustre_cfg *lcfg = buf;
+ struct obd_uuid tgt_uuid;
+ int rc;
+ ENTRY;
+
+ switch(lcfg->lcfg_command) {
+ case LCFG_ADD_MDC:
+ if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(tgt_uuid.uuid))
+ GOTO(out, rc = -EINVAL);
+
+ obd_str2uuid(&tgt_uuid, lustre_cfg_string(lcfg, 1));
+ rc = lmv_add_target(obd, &tgt_uuid);
+ GOTO(out, rc);