* fld/fld.c
*
* Copyright (C) 2006 Cluster File Systems, Inc.
+ * Author: WangDi <wangdi@clusterfs.com>
*
* This file is part of the Lustre file system, http://www.lustre.org
* Lustre is a trademark of Cluster File Systems, Inc.
#include <linux/lprocfs_status.h>
#include <linux/md_object.h>
+#include <linux/lustre_mdc.h>
#include "fld_internal.h"
/*XXX maybe these 2 items should go to sbi*/
md_fld.mf_seq = seq;
md_fld.mf_mds = mds_num;
- rc = obd_set_info(fld_exp, strlen("fld_create"), "fld_create",
- sizeof(struct md_fld), &md_fld);
+ rc = mdc_fld(fld_exp, &md_fld, FLD_CREATE);
fld_cache_insert(fld_cache, seq, mds_num);
RETURN(rc);
md_fld.mf_seq = seq;
md_fld.mf_mds = mds_num;
- rc = obd_set_info(fld_exp, strlen("fld_delete"), "fld_delete",
- sizeof(struct md_fld), &md_fld);
+ rc = mdc_fld(fld_exp, &md_fld, FLD_DELETE);
RETURN(rc);
}
vallen = sizeof(struct md_fld);
- rc = obd_get_info(fld_exp, strlen("fld_delete"), "fld_delete",
- &vallen, &md_fld);
-
+ rc = mdc_fld(fld_exp, &md_fld, FLD_GET);
+
*mds_num = md_fld.mf_mds;
RETURN(rc);
#define MGS_REQUEST_PORTAL 26
#define MGS_REPLY_PORTAL 27
#define OST_REQUEST_PORTAL 28
+#define MDS_FLD_PORTAL 29
#define SVC_KILLED 1
#define SVC_EVENT 2
extern void lustre_swab_ost_body (struct ost_body *b);
extern void lustre_swab_ost_last_id(obd_id *id);
+extern void lustre_swab_generic_32s(__u32 *val);
/* lock value block communicated between the filter and llite */
MDS_QUOTACTL = 48,
MDS_GETXATTR = 49,
MDS_SETXATTR = 50,
- MDS_GET_INFO = 51,
+ MDS_FLD = 51,
MDS_LAST_OPC
} mds_cmd_t;
struct ptlrpc_request **request);
int mdc_sync(struct obd_export *exp, struct lu_fid *fid,
struct ptlrpc_request **);
+
+int mdc_fld(struct obd_export *exp, struct md_fld *mf, __u32 fld_op);
+
int mdc_create_client(struct obd_uuid uuid, struct ptlrpc_client *cl);
int mdc_llog_process(struct obd_export *, char *logname, llog_cb_t, void *data);
int (*mdo_statfs)(struct lu_context *ctx,
struct md_device *m, struct kstatfs *sfs);
- int (*mdo_get_info)(struct lu_context *ctx, struct md_device *m,
- __u32 keylen, void *key, __u32 *vallen, void *val);
+ int (*mdo_fld)(struct lu_context *ctx, struct md_device *m,
+ __u32 opt, void *mf);
int (*mdo_set_info)(struct lu_context *ctx, struct md_device *m,
__u32 keylen, void *key, __u32 vallen, void *val);
lu_device_fini(&md->md_lu_dev);
}
+enum fld_op {
+ FLD_CREATE = 0,
+ FLD_DELETE = 1,
+ FLD_GET = 2
+};
#endif /* _LINUX_MD_OBJECT_H */
#define LUSTRE_CMM0_NAME "cmm0"
#define LUSTRE_MDD0_NAME "mdd0"
#define LUSTRE_OSD0_NAME "osd0"
+#define LUSTRE_FLD0_NAME "fld0"
#define LUSTRE_MDC_NAME "mdc"
#define OBD_FAIL_MDS_SETXATTR_WRITE 0x134
#define OBD_FAIL_MDS_SET_INFO_NET 0x135
#define OBD_FAIL_MDS_SET_INFO_PACK 0x136
-#define OBD_FAIL_MDS_GET_INFO_NET 0x137
-#define OBD_FAIL_MDS_GET_INFO_PACK 0x138
+#define OBD_FAIL_MDS_FLD_NET 0x137
+#define OBD_FAIL_MDS_FLD_PACK 0x138
#define OBD_FAIL_OST 0x200
#define OBD_FAIL_OST_CONNECT_NET 0x201
ptlrpc_req_finished(req);
RETURN(rc);
}
- if (KEY_IS("fld_create") || KEY_IS("fld_delete")) {
- struct ptlrpc_request *req;
- int size[2] = {keylen, vallen};
- char *bufs[2] = {key, val};
- ENTRY;
-
- req = ptlrpc_prep_req(imp, LUSTRE_MDS_VERSION,
- MDS_SET_INFO, 2, size, bufs);
- if (req == NULL)
- RETURN(-ENOMEM);
-
- req->rq_replen = lustre_msg_size(0, NULL);
- rc = ptlrpc_queue_wait(req);
- if (rc)
- GOTO(out_req, rc);
-out_req:
- ptlrpc_req_finished(req);
- RETURN(rc);
-
- }
RETURN(rc);
}
-int mdc_get_info(struct obd_export *exp, __u32 keylen, void *key,
- __u32 *vallen, void *val)
+int mdc_fld(struct obd_export *exp, struct md_fld *mf, __u32 fld_op)
{
- int rc = -EINVAL;
-
- if (keylen == strlen("max_easize") &&
- memcmp(key, "max_easize", strlen("max_easize")) == 0) {
- int mdsize, *max_easize;
-
- if (*vallen != sizeof(int))
- RETURN(-EINVAL);
- mdsize = *(int*)val;
- if (mdsize > exp->exp_obd->u.cli.cl_max_mds_easize)
- exp->exp_obd->u.cli.cl_max_mds_easize = mdsize;
- max_easize = val;
- *max_easize = exp->exp_obd->u.cli.cl_max_mds_easize;
- RETURN(0);
- }
- if (KEY_IS("fld_get")) {
- struct ptlrpc_request *req;
- int size[2] = {keylen, *vallen};
- char *bufs[2] = {key, val};
- struct md_fld *reply;
- ENTRY;
-
- req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
- MDS_GET_INFO, 2, size, bufs);
- if (req == NULL)
- RETURN(-ENOMEM);
-
- req->rq_replen = lustre_msg_size(1, (int *)vallen);
- rc = ptlrpc_queue_wait(req);
- if (rc)
- GOTO(out_req, rc);
+ struct ptlrpc_request *req;
+ struct md_fld *pmf;
+ int mf_size = sizeof(*mf);
+ __u32 *op;
+ int size[2] = {sizeof(op), mf_size}, rc;
+ ENTRY;
- reply = lustre_swab_repbuf(req, 0, sizeof(*reply),
- lustre_swab_md_fld);
- if (reply == NULL) {
- CERROR("Can't unpack %s\n", (char *)key);
- GOTO(out_req, rc = -EPROTO);
- }
- *((struct md_fld *)val) = *reply;
+ req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
+ MDS_FLD, 2, size, NULL);
+ if (req == NULL)
+ RETURN(-ENOMEM);
+
+ op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*op));
+ *op = fld_op;
+
+ pmf = lustre_msg_buf(req->rq_reqmsg, 1, sizeof (*pmf));
+ memcpy(pmf, mf, sizeof(*mf));
+
+ req->rq_replen = lustre_msg_size(1, &mf_size);
+ rc = ptlrpc_queue_wait(req);
+ if (rc)
+ GOTO(out_req, rc);
+
+ pmf = lustre_swab_repbuf(req, 0, sizeof(*pmf), lustre_swab_md_fld);
out_req:
- ptlrpc_req_finished(req);
- RETURN(rc);
- }
+ ptlrpc_req_finished(req);
RETURN(rc);
}
.o_disconnect = client_disconnect_export,
.o_iocontrol = mdc_iocontrol,
.o_set_info = mdc_set_info,
- .o_get_info = mdc_get_info,
.o_statfs = mdc_statfs,
.o_pin = mdc_pin,
.o_unpin = mdc_unpin,
EXPORT_SYMBOL(mdc_init_ea_size);
EXPORT_SYMBOL(mdc_getxattr);
EXPORT_SYMBOL(mdc_setxattr);
+EXPORT_SYMBOL(mdc_fld);
module_init(mdc_init);
module_exit(mdc_exit);
return -ENOENT;
}
-static int mdd_get_info(struct lu_context *ctx, struct md_device *m,
- __u32 keylen, void *key, __u32 *vallen, void *val)
+static int mdd_fld(struct lu_context *ctx, struct md_device *m,
+ __u32 opts, void *mf)
{
- struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
+ struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
+ struct md_fld *pmf = mf;
int rc;
-
ENTRY;
- if (keylen >= strlen("fld_get") &&
- memcmp(key, "fld_get", 7) == 0) {
- struct md_fld *mf = val;
- rc = mdd_fld_get(mdd, mf->mf_seq, &mf->mf_mds);
- RETURN(rc);
- }
- RETURN(-EINVAL);
-}
-static int mdd_set_info(struct lu_context *ctx, struct md_device *m,
- __u32 keylen, void *key, __u32 vallen, void *val)
-{
- struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
- int rc;
- ENTRY;
-
- if (keylen >= strlen("fld_create") &&
- memcmp(key, "fld_create", 10) == 0) {
- struct md_fld *mf = val;
- rc = mdd_fld_create(mdd, mf->mf_seq, mf->mf_mds);
- RETURN(rc);
- }
-
- if (keylen >= strlen("fld_delete") &&
- memcmp(key, "fld_delete", 10) == 0) {
- struct md_fld *mf = val;
- rc = mdd_fld_delete(mdd, mf->mf_seq, mf->mf_mds);
- RETURN(rc);
- }
+ switch (opts) {
+ case FLD_CREATE:
+ rc = mdd_fld_create(mdd, pmf->mf_seq, pmf->mf_mds);
+ break;
+ case FLD_DELETE:
+ rc = mdd_fld_delete(mdd, pmf->mf_seq, pmf->mf_mds);
+ break;
+ case FLD_GET:
+ rc = mdd_fld_get(mdd, pmf->mf_seq, &pmf->mf_mds);
+ break;
+ default:
+ rc = -EINVAL;
+ break;
+ }
+ RETURN(rc);
- RETURN(-EINVAL);
}
struct md_device_operations mdd_ops = {
.mdo_root_get = mdd_root_get,
- .mdo_get_info = mdd_get_info,
- .mdo_set_info = mdd_set_info,
+ .mdo_fld = mdd_fld,
.mdo_config = mdd_config,
.mdo_statfs = mdd_statfs
};
RETURN(result);
}
-static int mdt_set_info(struct mdt_thread_info *info,
- struct ptlrpc_request *req, int offset)
+static int mdt_fld(struct mdt_thread_info *info,
+ struct ptlrpc_request *req, int offset)
{
struct md_device *next = info->mti_mdt->mdt_child;
- char *key;
- int keylen, rc = 0;
+ struct md_fld mf, *p, *reply;
+ int size = sizeof(*reply);
+ __u32 *opt;
+ int rc;
ENTRY;
- key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
- if (key == NULL) {
- DEBUG_REQ(D_HA, req, "no set_info key");
- RETURN(-EFAULT);
- }
- keylen = req->rq_reqmsg->buflens[0];
-
- if (((keylen >= strlen("fld_create") &&
- memcmp(key, "fld_create", keylen) == 0)) ||
- ((keylen >= strlen("fld_delete") &&
- memcmp(key, "fld_delete", keylen) == 0))) {
- struct md_fld mf, *p;
- __u32 size = sizeof(struct md_fld);
-
- rc = lustre_pack_reply(req, 0, NULL, NULL);
- if (rc)
- RETURN(rc);
-
- p = lustre_swab_reqbuf(req, 1, sizeof(mf), lustre_swab_md_fld);
- mf = *p;
- rc = next->md_ops->mdo_get_info(info->mti_ctxt, next, keylen,
- key, &size, &mf);
+ rc = lustre_pack_reply(req, 1, &size, NULL);
+ if (rc)
RETURN(rc);
- }
- CDEBUG(D_IOCTL, "invalid key\n");
- RETURN(-EINVAL);
+ opt = lustre_swab_reqbuf(req, 0, sizeof(*opt),
+ lustre_swab_generic_32s);
-}
+ p = lustre_swab_reqbuf(req, 1, sizeof(mf), lustre_swab_md_fld);
+ mf = *p;
+ rc = next->md_ops->mdo_fld(info->mti_ctxt, next, *opt, &mf);
+ reply = lustre_msg_buf(req->rq_repmsg, 0, size);
+ *reply = mf;
-static int mdt_get_info(struct mdt_thread_info *info,
- struct ptlrpc_request *req, int offset)
-{
- struct md_device *next = info->mti_mdt->mdt_child;
- char *key;
- int keylen, rc = 0;
- ENTRY;
-
- key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
- if (key == NULL) {
- DEBUG_REQ(D_HA, req, "no set_info key");
- RETURN(-EFAULT);
- }
- keylen = req->rq_reqmsg->buflens[0];
-
- if (((keylen >= strlen("fld_get") &&
- memcmp(key, "fld_get", keylen) == 0))) {
- struct md_fld mf, *p, *reply;
- int size = sizeof(*reply);
-
- rc = lustre_pack_reply(req, 1, &size, NULL);
- if (rc)
- RETURN(rc);
- p = lustre_swab_reqbuf(req, 1, sizeof(mf), lustre_swab_md_fld);
- mf = *p;
- rc = next->md_ops->mdo_get_info(info->mti_ctxt, next, keylen,
- key, &size, &mf);
- reply = lustre_msg_buf(req->rq_repmsg, 0, size);
- *reply = mf;
- RETURN(rc);
- }
-
- CDEBUG(D_IOCTL, "invalid key\n");
- RETURN(-EINVAL);
+ RETURN(rc);
}
static struct lu_device_operations mdt_lu_ops;
.smo_write = mdt_seq_mgr_write
};
+static void mdt_stop_ptlrpc_service(struct mdt_device *m)
+{
+ if (m->mdt_service != NULL) {
+ ptlrpc_unregister_service(m->mdt_service);
+ m->mdt_service = NULL;
+ }
+ if (m->mdt_fld_service != NULL) {
+ ptlrpc_unregister_service(m->mdt_fld_service);
+ m->mdt_fld_service = NULL;
+ }
+}
+
static void mdt_fini(struct mdt_device *m)
{
struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
+ mdt_stop_ptlrpc_service(m);
if (d->ld_site != NULL) {
lu_site_fini(d->ld_site);
OBD_FREE_PTR(d->ld_site);
d->ld_site = NULL;
}
- if (m->mdt_service != NULL) {
- ptlrpc_unregister_service(m->mdt_service);
- m->mdt_service = NULL;
- }
if (m->mdt_namespace != NULL) {
ldlm_namespace_free(m->mdt_namespace, 0);
m->mdt_namespace = NULL;
md_device_fini(&m->mdt_md_dev);
}
+static int mdt_start_ptlrpc_service(struct mdt_device *m)
+{
+ int rc;
+ ENTRY;
+
+ m->mdt_service_conf.psc_nbufs = MDS_NBUFS;
+ m->mdt_service_conf.psc_bufsize = MDS_BUFSIZE;
+ m->mdt_service_conf.psc_max_req_size = MDS_MAXREQSIZE;
+ m->mdt_service_conf.psc_max_reply_size = MDS_MAXREPSIZE;
+ m->mdt_service_conf.psc_req_portal = MDS_REQUEST_PORTAL;
+ m->mdt_service_conf.psc_rep_portal = MDC_REPLY_PORTAL;
+ m->mdt_service_conf.psc_watchdog_timeout = MDS_SERVICE_WATCHDOG_TIMEOUT;
+ /*
+ * We'd like to have a mechanism to set this on a per-device basis,
+ * but alas...
+ */
+ m->mdt_service_conf.psc_num_threads = min(max(mdt_num_threads,
+ MDT_MIN_THREADS),
+ MDT_MAX_THREADS);
+
+ ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
+ "mdt_ldlm_client", &m->mdt_ldlm_client);
+
+ m->mdt_service =
+ ptlrpc_init_svc_conf(&m->mdt_service_conf, mdt_handle,
+ LUSTRE_MDT0_NAME,
+ m->mdt_md_dev.md_lu_dev.ld_proc_entry,
+ NULL);
+ if (m->mdt_service == NULL)
+ RETURN(-ENOMEM);
+
+ rc = ptlrpc_start_threads(NULL, m->mdt_service, LUSTRE_MDT0_NAME);
+ if (rc)
+ GOTO(err_mdt_svc, rc);
+
+ /*start mdt fld service */
+
+ m->mdt_service_conf.psc_req_portal = MDS_FLD_PORTAL;
+
+ m->mdt_fld_service =
+ ptlrpc_init_svc_conf(&m->mdt_service_conf, mdt_handle,
+ LUSTRE_FLD0_NAME,
+ m->mdt_md_dev.md_lu_dev.ld_proc_entry,
+ NULL);
+ if (m->mdt_fld_service == NULL)
+ RETURN(-ENOMEM);
+
+ rc = ptlrpc_start_threads(NULL, m->mdt_fld_service, LUSTRE_FLD0_NAME);
+ if (rc)
+ GOTO(err_fld_svc, rc);
+
+ RETURN(rc);
+err_fld_svc:
+ ptlrpc_unregister_service(m->mdt_fld_service);
+ m->mdt_fld_service = NULL;
+err_mdt_svc:
+ ptlrpc_unregister_service(m->mdt_service);
+ m->mdt_service = NULL;
+
+ RETURN(rc);
+}
+
static int mdt_init0(struct mdt_device *m,
struct lu_device_type *t, struct lustre_cfg *cfg)
{
m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
- m->mdt_service_conf.psc_nbufs = MDS_NBUFS;
- m->mdt_service_conf.psc_bufsize = MDS_BUFSIZE;
- m->mdt_service_conf.psc_max_req_size = MDS_MAXREQSIZE;
- m->mdt_service_conf.psc_max_reply_size = MDS_MAXREPSIZE;
- m->mdt_service_conf.psc_req_portal = MDS_REQUEST_PORTAL;
- m->mdt_service_conf.psc_rep_portal = MDC_REPLY_PORTAL;
- m->mdt_service_conf.psc_watchdog_timeout = MDS_SERVICE_WATCHDOG_TIMEOUT;
- /*
- * We'd like to have a mechanism to set this on a per-device basis,
- * but alas...
- */
- m->mdt_service_conf.psc_num_threads = min(max(mdt_num_threads,
- MDT_MIN_THREADS),
- MDT_MAX_THREADS);
- snprintf(ns_name, sizeof ns_name, LUSTRE_MDT0_NAME"-%p", m);
- m->mdt_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
- if (m->mdt_namespace == NULL)
- GOTO(err_fini_site, rc = -ENOMEM);
-
- ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
-
- ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
- "mdt_ldlm_client", &m->mdt_ldlm_client);
-
- m->mdt_service =
- ptlrpc_init_svc_conf(&m->mdt_service_conf, mdt_handle,
- LUSTRE_MDT0_NAME,
- m->mdt_md_dev.md_lu_dev.ld_proc_entry,
- NULL);
- if (m->mdt_service == NULL)
- GOTO(err_free_ns, rc = -ENOMEM);
-
/* init the stack */
LASSERT(mdt_child->ld_type->ldt_ops->ldto_device_init != NULL);
rc = mdt_child->ld_type->ldt_ops->ldto_device_init(mdt_child, top);
if (rc) {
CERROR("can't init device stack, rc %d\n", rc);
- GOTO(err_free_svc, rc);
+ GOTO(err_fini_site, rc);
}
m->mdt_seq_mgr = seq_mgr_init(&seq_mgr_ops, m);
if (rc)
GOTO(err_fini_ctx, rc);
- rc = ptlrpc_start_threads(NULL, m->mdt_service, LUSTRE_MDT0_NAME);
- if (rc)
- GOTO(err_fini_ctx, rc);
-
lu_context_fini(&ctx);
- RETURN(0);
+
+ snprintf(ns_name, sizeof ns_name, LUSTRE_MDT0_NAME"-%p", m);
+ m->mdt_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
+ if (m->mdt_namespace == NULL)
+ GOTO(err_fini_site, rc = -ENOMEM);
+ ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
+
+ rc = mdt_start_ptlrpc_service(m);
+ if (rc)
+ GOTO(err_free_ns, rc);
+ RETURN(0);
+err_free_ns:
+ ldlm_namespace_free(m->mdt_namespace, 0);
+ m->mdt_namespace = NULL;
err_fini_ctx:
lu_context_fini(&ctx);
err_fini_mgr:
m->mdt_seq_mgr = NULL;
err_fini_child:
mdt_child->ld_type->ldt_ops->ldto_device_fini(mdt_child);
-err_free_svc:
- ptlrpc_unregister_service(m->mdt_service);
- m->mdt_service = NULL;
-err_free_ns:
- ldlm_namespace_free(m->mdt_namespace, 0);
- m->mdt_namespace = NULL;
+
err_fini_site:
lu_site_fini(s);
OBD_FREE_PTR(s);
DEF_MDT_HNDL(HABEO_CORPUS, DONE_WRITING, mdt_done_writing),
DEF_MDT_HNDL(0, PIN, mdt_pin),
DEF_MDT_HNDL(HABEO_CORPUS, SYNC, mdt_sync),
- DEF_MDT_HNDL(0, SET_INFO, mdt_set_info),
- DEF_MDT_HNDL(0, GET_INFO, mdt_get_info),
+ DEF_MDT_HNDL(0, FLD, mdt_fld),
DEF_MDT_HNDL(0, QUOTACHECK, mdt_handle_quotacheck),
DEF_MDT_HNDL(0, QUOTACTL, mdt_handle_quotactl)
};
/* super-class */
struct md_device mdt_md_dev;
struct ptlrpc_service *mdt_service;
+ struct ptlrpc_service *mdt_fld_service;
struct ptlrpc_service_conf mdt_service_conf;
/* DLM name-space for meta-data locks maintained by this server */
struct ldlm_namespace *mdt_namespace;
{ MDS_QUOTACTL, "mds_quotactl" },
{ MDS_GETXATTR, "mds_getxattr" },
{ MDS_SETXATTR, "mds_setxattr" },
- { MDS_GET_INFO, "mds_get_info" },
{ LDLM_ENQUEUE, "ldlm_enqueue" },
{ LDLM_CONVERT, "ldlm_convert" },
{ LDLM_CANCEL, "ldlm_cancel" },
__swab64s(id);
}
+void lustre_swab_generic_32s(__u32 *val)
+{
+ __swab32s(val);
+}
+
void lustre_swab_ost_lvb(struct ost_lvb *lvb)
{
__swab64s(&lvb->lvb_size);
EXPORT_SYMBOL(lustre_swab_qdata);
EXPORT_SYMBOL(lustre_swab_mgs_target_info);
EXPORT_SYMBOL(lustre_swab_md_fld);
+EXPORT_SYMBOL(lustre_swab_generic_32s);
/* recover.c */
EXPORT_SYMBOL(ptlrpc_run_recovery_over_upcall);