static int mdt_handle(struct ptlrpc_request *req);
static struct ptlrpc_thread_key mdt_thread_key;
+static int mdt_mkdir(struct mdt_thread_info *info, struct mdt_device *d,
+ struct lu_fid *pfid, const char *name, struct lu_fid *cfid)
+{
+ struct mdt_object *o;
+ struct mdt_object *child;
+ struct mdt_lock_handle *lh;
+
+ int result;
+
+ lh = &info->mti_lh[MDT_LH_PARENT];
+ lh->mlh_mode = LCK_PW;
+
+ o = mdt_object_find_lock(d, pfid, lh, MDS_INODELOCK_UPDATE);
+ if (IS_ERR(o))
+ return PTR_ERR(o);
+
+ child = mdt_object_find(d, cfid);
+ if (!IS_ERR(child)) {
+ result = mdt_child_ops(d)->mdo_mkdir(mdt_object_child(o), name,
+ mdt_object_child(child));
+ mdt_object_put(child);
+ } else
+ result = PTR_ERR(child);
+ mdt_object_unlock(d->mdt_namespace, o, lh);
+ mdt_object_put(o);
+ return result;
+}
+#if 0
+static int mdt_md_getattr(struct mdt_thread_info *info, struct lu_fid *fid,
+ struct md_object_attr *attr)
+{
+ struct mdt_device *d = info->mti_mdt;
+ struct mdt_object *o;
+ struct iattr
+ int result;
+
+ o = mdt_object_find(d, fid);
+ if (IS_ERR(o))
+ return PTR_ERR(o);
+
+ result = mdt_child_ops(d)->mdo_attr_get(mdt_object_child(o), name,
+ mdt_object_child(child));
+ mdt_object_put(child);
+ } else
+ result = PTR_ERR(child);
+ mdt_object_unlock(d->mdt_namespace, o, lh);
+ mdt_object_put(o);
+ return result;
+}
+#endif
static int mdt_getstatus(struct mdt_thread_info *info,
struct ptlrpc_request *req, int offset)
{
RETURN(result);
}
+static int mdt_statfs(struct mdt_thread_info *info,
+ struct ptlrpc_request *req, int offset)
+{
+ struct md_device *child = info->mti_mdt->mdt_child;
+ struct obd_statfs *osfs;
+ struct kstatfs sfs;
+ int result;
+ int size = sizeof(struct obd_statfs);
+
+ ENTRY;
+
+ result = lustre_pack_reply(req, 1, &size, NULL);
+ if (result)
+ CERROR(LUSTRE_MDT0_NAME" out of memory for statfs: size=%d\n",
+ size);
+ else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
+ CERROR(LUSTRE_MDT0_NAME": statfs lustre_pack_reply failed\n");
+ result = -ENOMEM;
+ } else {
+ osfs = lustre_msg_buf(req->rq_repmsg, 0, size);
+ /* XXX max_age optimisation is needed here. See mds_statfs */
+ result = child->md_ops->mdo_statfs(child, &sfs);
+ statfs_pack(osfs, &sfs);
+ }
+out:
+ RETURN(result);
+}
+#if 0
+static int mdt_getattr(struct mdt_thread_info *info,
+ struct ptlrpc_request *req, int offset)
+{
+ struct mdt_body *body;
+ int size = sizeof (*body);
+ struct md_obj_attr attr;
+ int result;
+
+ ENTRY;
+
+ result = lustre_pack_reply(req, 1, &size, NULL);
+ if (result)
+ CERROR(LUSTRE_MDT0_NAME" out of memory for statfs: size=%d\n",
+ size);
+ else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
+ CERROR(LUSTRE_MDT0_NAME": statfs lustre_pack_reply failed\n");
+ result = -ENOMEM;
+ } else {
+ body = lustre_swab_reqbuf(req, offset, size,
+ lustre_swab_mdt_body);
+ result = mdt_md_getattr(info, body->fid1);
+ }
+out:
+ RETURN(result);
+}
+#else
+static int mdt_getattr(struct mdt_thread_info *info,
+ struct ptlrpc_request *req, int offset)
+{
+ return -EOPNOTSUPP;
+}
+#endif
static int mdt_connect(struct mdt_thread_info *info,
struct ptlrpc_request *req, int offset)
{
return -EOPNOTSUPP;
}
-static int mdt_getattr(struct mdt_thread_info *info,
- struct ptlrpc_request *req, int offset)
-{
- return -EOPNOTSUPP;
-}
-
static int mdt_getattr_name(struct mdt_thread_info *info,
struct ptlrpc_request *req, int offset)
{
return -EOPNOTSUPP;
}
-static int mdt_statfs(struct mdt_thread_info *info,
- struct ptlrpc_request *req, int offset)
-{
- return -EOPNOTSUPP;
-}
-
static int mdt_readpage(struct mdt_thread_info *info,
struct ptlrpc_request *req, int offset)
{
struct mdt_thread_info *info = ptlrpc_thread_key_get(req->rq_svc_thread,
&mdt_thread_key);
+ ENTRY;
+
mdt_thread_info_init(info);
- info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
+ /* it can be NULL while CONNECT */
+ if (req->rq_export)
+ info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
result = mdt_handle0(req, info);
-
mdt_thread_info_fini(info);
return result;
}
ldlm_namespace_free(m->mdt_namespace, 0);
m->mdt_namespace = NULL;
}
+ /* finish the stack */
+ if (m->mdt_child) {
+ struct lu_device *child = md2lu_dev(m->mdt_child);
+
+ child->ld_ops->ldo_device_fini(child);
+ }
LASSERT(atomic_read(&d->ld_ref) == 0);
md_device_fini(&m->mdt_md_dev);
struct lu_site *s;
char ns_name[48];
struct obd_device * obd = NULL;
+ char *top = lustre_cfg_string(cfg, 0);
char *child = lustre_cfg_string(cfg, 1);
ENTRY;
m->mdt_child = lu2md_dev(obd->obd_lu_dev);
} else {
CDEBUG(D_INFO, "Child device %s is not found\n", child);
+ return -EINVAL;
}
m->mdt_service_conf.psc_nbufs = MDS_NBUFS;
NULL);
if (m->mdt_service == NULL)
return -ENOMEM;
-
+
+ /* init the stack */
+ if (m->mdt_child) {
+ struct lu_device *child = md2lu_dev(m->mdt_child);
+ int err;
+
+ if (child->ld_ops->ldo_device_init) {
+ err = child->ld_ops->ldo_device_init(child, top);
+ if (err)
+ return err;
+ }
+ }
return ptlrpc_start_threads(NULL, m->mdt_service, LUSTRE_MDT0_NAME);
}
.ldo_object_print = mdt_object_print
};
-static struct md_object *mdt_object_child(struct mdt_object *o)
+/* mds_connect copy */
+static int mdt_obd_connect(struct lustre_handle *conn, struct obd_device *obd,
+ struct obd_uuid *cluuid,
+ struct obd_connect_data *data)
{
- return lu2md(lu_object_next(&o->mot_obj.mo_lu));
-}
-
-static inline struct md_device_operations *mdt_child_ops(struct mdt_device *d)
-{
- return d->mdt_child->md_ops;
-}
-
-static int mdt_mkdir(struct mdt_thread_info *info, struct mdt_device *d,
- struct lu_fid *pfid, const char *name, struct lu_fid *cfid)
-{
- struct mdt_object *o;
- struct mdt_object *child;
- struct mdt_lock_handle *lh;
+ struct obd_export *exp;
+ int rc, abort_recovery;
+ struct mds_export_data *med;
+ struct mds_client_data *mcd = NULL;
+
+ ENTRY;
- int result;
+ if (!conn || !obd || !cluuid)
+ RETURN(-EINVAL);
- lh = &info->mti_lh[MDT_LH_PARENT];
- lh->mlh_mode = LCK_PW;
+ /* Check for aborted recovery. */
+ spin_lock_bh(&obd->obd_processing_task_lock);
+ abort_recovery = obd->obd_abort_recovery;
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+ if (abort_recovery)
+ target_abort_recovery(obd);
- o = mdt_object_find_lock(d, pfid, lh, MDS_INODELOCK_UPDATE);
- if (IS_ERR(o))
- return PTR_ERR(o);
+ /* XXX There is a small race between checking the list and adding a
+ * new connection for the same UUID, but the real threat (list
+ * corruption when multiple different clients connect) is solved.
+ *
+ * There is a second race between adding the export to the list,
+ * and filling in the client data below. Hence skipping the case
+ * of NULL mcd above. We should already be controlling multiple
+ * connects at the client, and we can't hold the spinlock over
+ * memory allocations without risk of deadlocking.
+ */
+ rc = class_connect(conn, obd, cluuid);
+ if (rc)
+ RETURN(rc);
+ exp = class_conn2export(conn);
+ LASSERT(exp);
+ med = &exp->exp_mds_data;
+
+ OBD_ALLOC(mcd, sizeof(*mcd));
+ if (!mcd)
+ GOTO(out, rc = -ENOMEM);
+
+ memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
+ med->med_mcd = mcd;
+
+out:
+ if (rc) {
+ if (mcd) {
+ OBD_FREE(mcd, sizeof(*mcd));
+ med->med_mcd = NULL;
+ }
+ class_disconnect(exp);
+ } else {
+ class_export_put(exp);
+ }
- child = mdt_object_find(d, cfid);
- if (!IS_ERR(child)) {
- result = mdt_child_ops(d)->mdo_mkdir(mdt_object_child(o), name,
- mdt_object_child(child));
- mdt_object_put(child);
- } else
- result = PTR_ERR(child);
- mdt_object_unlock(d->mdt_namespace, o, lh);
- mdt_object_put(o);
- return result;
+ RETURN(rc);
}
static struct obd_ops mdt_obd_device_ops = {
- .o_owner = THIS_MODULE
+ .o_owner = THIS_MODULE,
+ .o_connect = mdt_obd_connect
};
static struct lu_device *mdt_device_alloc(struct lu_device_type *t,