/*
* XXX for now. Tags in lu_device_type->ldt_something are needed.
*/
- return ergo(d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
+ return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
}
static struct mdt_device *mdt_dev(struct lu_device *d)
{
LASSERT(lu_device_is_mdt(d));
- return container_of(d, struct mdt_device, mdt_md_dev.md_lu_dev);
+ return container_of0(d, struct mdt_device, mdt_md_dev.md_lu_dev);
}
static int mdt_connect(struct mdt_thread_info *info,
LASSERT(info->mti_dlm_req != NULL);
info->mti_fail_id = OBD_FAIL_LDLM_REPLY;
- return ldlm_handle_enqueue0(req, info->mti_dlm_req, &cbs);
+ return ldlm_handle_enqueue0(info->mti_mdt->mdt_namespace,
+ req, info->mti_dlm_req, &cbs);
}
static int mdt_convert(struct mdt_thread_info *info,
static struct mdt_object *mdt_obj(struct lu_object *o)
{
LASSERT(lu_device_is_mdt(o->lo_dev));
- return container_of(o, struct mdt_object, mot_obj.mo_lu);
+ return container_of0(o, struct mdt_object, mot_obj.mo_lu);
}
struct mdt_object *mdt_object_find(struct lu_context *ctxt,
RETURN(result);
}
+/*
+ * MDT handler function called by ptlrpc service thread when request comes.
+ *
+ * XXX common "target" functionality should be factored into separate module
+ * shared by mdt, ost and stand-alone services like fld.
+ */
static int mdt_handle(struct ptlrpc_request *req)
{
int result;
result = mdt_handle0(req, info);
mdt_thread_info_fini(info);
- return result;
+ RETURN(result);
}
static int mdt_intent_policy(struct ldlm_namespace *ns,
RETURN(ELDLM_LOCK_ABORTED);
}
-struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c,
- svc_handler_t h, char *name,
- struct proc_dir_entry *proc_entry,
- svcreq_printfn_t prntfn)
-{
- return ptlrpc_init_svc(c->psc_nbufs, c->psc_bufsize,
- c->psc_max_req_size, c->psc_max_reply_size,
- c->psc_req_portal, c->psc_rep_portal,
- c->psc_watchdog_timeout,
- h, name, proc_entry,
- prntfn, c->psc_num_threads);
-}
-
static int mdt_config(struct lu_context *ctx, struct mdt_device *m,
const char *name, void *buf, int size, int mode)
{
.smo_write = mdt_seq_mgr_write
};
-/* device init/fini methods */
-
-static int mdt_fld(struct mdt_thread_info *info,
- struct ptlrpc_request *req, int offset)
-{
- struct lu_site *ls = info->mti_mdt->mdt_md_dev.md_lu_dev.ld_site;
- struct md_fld mf, *p, *reply;
- int size = sizeof(*reply);
- __u32 *opt;
- int rc;
- ENTRY;
-
- rc = lustre_pack_reply(req, 1, &size, NULL);
- if (rc)
- RETURN(rc);
-
- opt = lustre_swab_reqbuf(req, 0, sizeof(*opt), lustre_swab_generic_32s);
- p = lustre_swab_reqbuf(req, 1, sizeof(mf), lustre_swab_md_fld);
- mf = *p;
-
- rc = fld_handle(ls->ls_fld, *opt, &mf);
- if (rc)
- RETURN(rc);
-
- reply = lustre_msg_buf(req->rq_repmsg, 0, size);
- *reply = mf;
- RETURN(rc);
-}
-
-struct dt_device *md2_bottom_dev(struct mdt_device *m)
-{
- /*FIXME: get dt device here*/
- RETURN (NULL);
-}
+/*
+ * FLD wrappers
+ */
static int mdt_fld_init(struct mdt_device *m)
{
int rc;
ENTRY;
- dt = md2_bottom_dev(m);
-
ls = m->mdt_md_dev.md_lu_dev.ld_site;
OBD_ALLOC_PTR(ls->ls_fld);
- if (!ls->ls_fld)
- RETURN(-ENOMEM);
-
- rc = fld_server_init(ls->ls_fld, dt);
+ if (ls->ls_fld != NULL)
+ rc = fld_server_init(ls->ls_fld, m->mdt_bottom);
+ else
+ rc = -ENOMEM;
RETURN(rc);
}
RETURN(rc);
}
+/* device init/fini methods */
+
static void mdt_stop_ptlrpc_service(struct mdt_device *m)
{
if (m->mdt_service != NULL) {
ptlrpc_unregister_service(m->mdt_service);
m->mdt_service = NULL;
}
- if (m->mdt_fld_service != NULL) {
- ptlrpc_unregister_service(m->mdt_fld_service);
- m->mdt_fld_service = NULL;
- }
}
static int mdt_start_ptlrpc_service(struct mdt_device *m)
{
int rc;
+ struct ptlrpc_service_conf conf = {
+ .psc_nbufs = MDS_NBUFS,
+ .psc_bufsize = MDS_BUFSIZE,
+ .psc_max_req_size = MDS_MAXREQSIZE,
+ .psc_max_reply_size = MDS_MAXREPSIZE,
+ .psc_req_portal = MDS_REQUEST_PORTAL,
+ .psc_rep_portal = MDC_REPLY_PORTAL,
+ .psc_watchdog_timeout = MDS_SERVICE_WATCHDOG_TIMEOUT,
+ /*
+ * We'd like to have a mechanism to set this on a per-device
+ * basis, but alas...
+ */
+ .psc_num_threads = min(max(mdt_num_threads, MDT_MIN_THREADS),
+ MDT_MAX_THREADS)
+ };
+
ENTRY;
- m->mdt_service_conf.psc_nbufs = MDS_NBUFS;
- m->mdt_service_conf.psc_bufsize = MDS_BUFSIZE;
- m->mdt_service_conf.psc_max_req_size = MDS_MAXREQSIZE;
- m->mdt_service_conf.psc_max_reply_size = MDS_MAXREPSIZE;
- m->mdt_service_conf.psc_req_portal = MDS_REQUEST_PORTAL;
- m->mdt_service_conf.psc_rep_portal = MDC_REPLY_PORTAL;
- m->mdt_service_conf.psc_watchdog_timeout = MDS_SERVICE_WATCHDOG_TIMEOUT;
- /*
- * We'd like to have a mechanism to set this on a per-device basis,
- * but alas...
- */
- m->mdt_service_conf.psc_num_threads = min(max(mdt_num_threads,
- MDT_MIN_THREADS),
- MDT_MAX_THREADS);
ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
"mdt_ldlm_client", &m->mdt_ldlm_client);
m->mdt_service =
- ptlrpc_init_svc_conf(&m->mdt_service_conf, mdt_handle,
- LUSTRE_MDT0_NAME,
+ ptlrpc_init_svc_conf(&conf, mdt_handle, LUSTRE_MDT0_NAME,
m->mdt_md_dev.md_lu_dev.ld_proc_entry,
NULL);
if (m->mdt_service == NULL)
if (rc)
GOTO(err_mdt_svc, rc);
- /*start mdt fld service */
-
- m->mdt_service_conf.psc_req_portal = MDS_FLD_PORTAL;
-
- m->mdt_fld_service =
- ptlrpc_init_svc_conf(&m->mdt_service_conf, mdt_handle,
- LUSTRE_FLD0_NAME,
- m->mdt_md_dev.md_lu_dev.ld_proc_entry,
- NULL);
- if (m->mdt_fld_service == NULL)
- RETURN(-ENOMEM);
-
- rc = ptlrpc_start_threads(NULL, m->mdt_fld_service, LUSTRE_FLD0_NAME);
- if (rc)
- GOTO(err_fld_svc, rc);
-
RETURN(rc);
-err_fld_svc:
- ptlrpc_unregister_service(m->mdt_fld_service);
- m->mdt_fld_service = NULL;
err_mdt_svc:
ptlrpc_unregister_service(m->mdt_service);
m->mdt_service = NULL;
RETURN(rc);
}
-static void mdt_stack_fini(struct lu_device *d)
+static void mdt_stack_fini(struct mdt_device *m)
{
+ struct lu_device *d = md2lu_dev(m->mdt_child);
+
/* goes through all stack */
while (d != NULL) {
struct lu_device *n;
struct obd_type *type;
struct lu_device_type *ldt = d->ld_type;
-
+
lu_device_put(d);
-
+
/* each fini() returns next device in stack of layers
* * so we can avoid the recursion */
n = ldt->ldt_ops->ldto_device_fini(d);
ldt->ldt_ops->ldto_device_free(d);
-
- type = ldt->obd_type;
+
+ type = ldt->ldt_obd_type;
type->typ_refcnt--;
class_put_type(type);
/* switch to the next device in the layer */
d = n;
}
+ m->mdt_child = NULL;
}
static struct lu_device *mdt_layer_setup(const char *typename,
}
ldt = type->typ_lu;
- ldt->obd_type = type;
+ ldt->ldt_obd_type = type;
if (ldt == NULL) {
CERROR("type: '%s'\n", typename);
GOTO(out_type, rc = -EINVAL);
if (IS_ERR(tmp)) {
RETURN (PTR_ERR(tmp));
}
+ m->mdt_bottom = lu2dt_dev(tmp);
d = tmp;
tmp = mdt_layer_setup(LUSTRE_MDD0_NAME, d, cfg);
if (IS_ERR(tmp)) {
/* process setup config */
tmp = &m->mdt_md_dev.md_lu_dev;
rc = tmp->ld_ops->ldo_process_config(tmp, cfg);
-
+
out:
/* fini from last known good lu_device */
if (rc)
- mdt_stack_fini(d);
-
+ mdt_stack_fini(m);
+
return rc;
}
static void mdt_fini(struct mdt_device *m)
{
struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
-
+
ENTRY;
mdt_stop_ptlrpc_service(m);
/* finish the stack */
- mdt_stack_fini(md2lu_dev(m->mdt_child));
+ mdt_stack_fini(m);
if (d->ld_site != NULL) {
lu_site_fini(d->ld_site);
rc = mdt_start_ptlrpc_service(m);
if (rc)
GOTO(err_free_fld, rc);
+
RETURN(0);
err_free_fld:
OBD_FREE_PTR(s);
RETURN(rc);
}
+
/* used by MGS to process specific configurations */
static int mdt_process_config(struct lu_device *d, struct lustre_cfg *cfg)
{
struct lu_device *next = md2lu_dev(mdt_dev(d)->mdt_child);
int err;
ENTRY;
- switch(cfg->lcfg_command) {
+
+ switch (cfg->lcfg_command) {
/* all MDT specific commands should be here */
default:
/* others are passed further */
err = next->ld_ops->ldo_process_config(next, cfg);
}
-
RETURN(err);
}
.ldo_process_config = mdt_process_config
};
+/* mds_connect_internal */
+static int mdt_connect0(struct mdt_device *mdt,
+ struct obd_export *exp, struct obd_connect_data *data)
+{
+ if (data != NULL) {
+ data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED;
+ data->ocd_ibits_known &= MDS_INODELOCK_FULL;
+
+ /* If no known bits (which should not happen, probably,
+ as everybody should support LOOKUP and UPDATE bits at least)
+ revert to compat mode with plain locks. */
+ if (!data->ocd_ibits_known &&
+ data->ocd_connect_flags & OBD_CONNECT_IBITS)
+ data->ocd_connect_flags &= ~OBD_CONNECT_IBITS;
+
+ if (!mdt->mdt_opts.mo_acl)
+ data->ocd_connect_flags &= ~OBD_CONNECT_ACL;
+
+ if (!mdt->mdt_opts.mo_user_xattr)
+ data->ocd_connect_flags &= ~OBD_CONNECT_XATTR;
+
+ exp->exp_connect_flags = data->ocd_connect_flags;
+ data->ocd_version = LUSTRE_VERSION_CODE;
+ exp->exp_mds_data.med_ibits_known = data->ocd_ibits_known;
+ }
+
+ if (mdt->mdt_opts.mo_acl &&
+ ((exp->exp_connect_flags & OBD_CONNECT_ACL) == 0)) {
+ CWARN("%s: MDS requires ACL support but client does not\n",
+ mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
+ return -EBADE;
+ }
+ return 0;
+}
+
/* mds_connect copy */
static int mdt_obd_connect(struct lustre_handle *conn, struct obd_device *obd,
struct obd_uuid *cluuid,
RETURN(rc);
exp = class_conn2export(conn);
- LASSERT(exp);
+ LASSERT(exp != NULL);
med = &exp->exp_mds_data;
- OBD_ALLOC_PTR(mcd);
- if (!mcd)
- GOTO(out, rc = -ENOMEM);
-
- memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
- med->med_mcd = mcd;
-
-out:
- if (rc) {
+ rc = mdt_connect0(mdt, exp, data);
+ if (rc == 0) {
+ OBD_ALLOC_PTR(mcd);
+ if (mcd != NULL) {
+ memcpy(mcd->mcd_uuid, cluuid, sizeof mcd->mcd_uuid);
+ med->med_mcd = mcd;
+ } else
+ rc = -ENOMEM;
+ }
+ if (rc)
class_disconnect(exp);
- } else {
+ else
class_export_put(exp);
- }
RETURN(rc);
}
mdt_num_threads = MDT_NUM_THREADS;
lprocfs_init_vars(mdt, &lvars);
- return class_register_type(&mdt_obd_device_ops, NULL,
- lvars.module_vars, LUSTRE_MDT0_NAME,
+ return class_register_type(&mdt_obd_device_ops, NULL,
+ lvars.module_vars, LUSTRE_MDT0_NAME,
&mdt_device_type);
}
DEF_MDT_HNDL(HABEO_CORPUS, DONE_WRITING, mdt_done_writing),
DEF_MDT_HNDL(0, PIN, mdt_pin),
DEF_MDT_HNDL(HABEO_CORPUS, SYNC, mdt_sync),
- DEF_MDT_HNDL(0, FLD, mdt_fld),
DEF_MDT_HNDL(0, QUOTACHECK, mdt_handle_quotacheck),
DEF_MDT_HNDL(0, QUOTACTL, mdt_handle_quotactl)
};