* Main server-side entry point into LDLM. This is called by ptlrpc service
* threads to carry out client lock enqueueing requests.
*/
-int ldlm_handle_enqueue(struct ptlrpc_request *req,
- ldlm_completion_callback completion_callback,
- ldlm_blocking_callback blocking_callback,
- ldlm_glimpse_callback glimpse_callback)
+int ldlm_handle_enqueue0(struct ptlrpc_request *req,
+ struct ldlm_request *dlm_req,
+ struct ldlm_callback_suite *cbs)
{
struct obd_device *obddev = req->rq_export->exp_obd;
struct ldlm_reply *dlm_rep;
- struct ldlm_request *dlm_req;
int rc = 0, size[2] = {sizeof(*dlm_rep)};
__u32 flags;
ldlm_error_t err = ELDLM_OK;
LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
- dlm_req = lustre_swab_reqbuf (req, MDS_REQ_INTENT_LOCKREQ_OFF,
- sizeof (*dlm_req),
- lustre_swab_ldlm_request);
- if (dlm_req == NULL) {
- CERROR ("Can't unpack dlm_req\n");
- GOTO(out, rc = -EFAULT);
- }
-
flags = dlm_req->lock_flags;
LASSERT(req->rq_export);
dlm_req->lock_desc.l_resource.lr_name,
dlm_req->lock_desc.l_resource.lr_type,
dlm_req->lock_desc.l_req_mode,
- blocking_callback, completion_callback,
- glimpse_callback, NULL, 0);
+ cbs->lcs_blocking, cbs->lcs_completion,
+ cbs->lcs_glimpse, NULL, 0);
if (!lock)
GOTO(out, rc = -ENOMEM);
return rc;
}
-int ldlm_handle_convert(struct ptlrpc_request *req)
+int ldlm_handle_enqueue(struct ptlrpc_request *req,
+ ldlm_completion_callback completion_callback,
+ ldlm_blocking_callback blocking_callback,
+ ldlm_glimpse_callback glimpse_callback)
{
+ int rc;
struct ldlm_request *dlm_req;
- struct ldlm_reply *dlm_rep;
- struct ldlm_lock *lock;
- int rc, size = sizeof(*dlm_rep);
- ENTRY;
+ struct ldlm_callback_suite cbs = {
+ .lcs_completion = completion_callback,
+ .lcs_blocking = blocking_callback,
+ .lcs_glimpse = glimpse_callback
+ };
- dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req),
- lustre_swab_ldlm_request);
+
+ dlm_req = lustre_swab_reqbuf(req, MDS_REQ_INTENT_LOCKREQ_OFF,
+ sizeof *dlm_req, lustre_swab_ldlm_request);
if (dlm_req == NULL) {
+ rc = ldlm_handle_enqueue0(req, dlm_req, &cbs);
+ } else {
CERROR ("Can't unpack dlm_req\n");
- RETURN (-EFAULT);
+ rc = -EFAULT;
}
+ return rc;
+}
+
+int ldlm_handle_convert0(struct ptlrpc_request *req,
+ struct ldlm_request *dlm_req)
+{
+ struct ldlm_reply *dlm_rep;
+ struct ldlm_lock *lock;
+ int rc, size = sizeof(*dlm_rep);
+ ENTRY;
rc = lustre_pack_reply(req, 1, &size, NULL);
if (rc) {
RETURN(0);
}
+int ldlm_handle_convert(struct ptlrpc_request *req)
+{
+ int rc;
+ struct ldlm_request *dlm_req;
+
+ dlm_req = lustre_swab_reqbuf(req, 0, sizeof *dlm_req,
+ lustre_swab_ldlm_request);
+ if (dlm_req != NULL) {
+ rc = ldlm_handle_convert0(req, dlm_req);
+ } else {
+ CERROR ("Can't unpack dlm_req\n");
+ rc = -EFAULT;
+ }
+ return rc;
+}
+
int ldlm_handle_cancel(struct ptlrpc_request *req)
{
struct ldlm_request *dlm_req;
EXPORT_SYMBOL(ldlm_server_completion_ast);
EXPORT_SYMBOL(ldlm_server_glimpse_ast);
EXPORT_SYMBOL(ldlm_handle_enqueue);
+EXPORT_SYMBOL(ldlm_handle_enqueue0);
EXPORT_SYMBOL(ldlm_handle_cancel);
EXPORT_SYMBOL(ldlm_handle_convert);
+EXPORT_SYMBOL(ldlm_handle_convert0);
EXPORT_SYMBOL(ldlm_del_waiting_lock);
EXPORT_SYMBOL(ldlm_get_ref);
EXPORT_SYMBOL(ldlm_put_ref);
return -EOPNOTSUPP;
}
+/*
+ * DLM handlers.
+ */
+
+static struct ldlm_callback_suite cbs = {
+ .lcs_completion = ldlm_server_completion_ast,
+ .lcs_blocking = ldlm_server_blocking_ast,
+ .lcs_glimpse = NULL
+};
+
+static int mdt_enqueue(struct mdt_thread_info *info,
+ struct ptlrpc_request *req, int offset)
+{
+ /*
+ * info->mti_dlm_req already contains swapped and (if necessary)
+ * converted dlm request.
+ */
+ LASSERT(info->mti_dlm_req);
+
+ info->mti_fail_id = OBD_FAIL_LDLM_REPLY;
+ return ldlm_handle_enqueue0(req, info->mti_dlm_req, &cbs);
+}
+
+static int mdt_convert(struct mdt_thread_info *info,
+ struct ptlrpc_request *req, int offset)
+{
+ LASSERT(info->mti_dlm_req);
+ return ldlm_handle_convert0(req, info->mti_dlm_req);
+}
+
+static int mdt_bl_callback(struct mdt_thread_info *info,
+ struct ptlrpc_request *req, int offset)
+{
+ CERROR("bl callbacks should not happen on MDS\n");
+ LBUG();
+ return -EOPNOTSUPP;
+}
+
+static int mdt_cp_callback(struct mdt_thread_info *info,
+ struct ptlrpc_request *req, int offset)
+{
+ CERROR("cp callbacks should not happen on MDS\n");
+ LBUG();
+ return -EOPNOTSUPP;
+}
+
+/*
+ * Build (DLM) resource name from fid.
+ */
+struct ldlm_res_id *fid_build_res_name(const struct lu_fid *f,
+ struct ldlm_res_id *name)
+{
+ memset(name, 0, sizeof *name);
+ /* we use fid_num() whoch includes also object version instread of raw
+ * fid_oid(). */
+ name->name[0] = fid_seq(f);
+ name->name[1] = fid_num(f);
+ return name;
+}
+
+/*
+ * Return true if resource is for object identified by fid.
+ */
+int fid_res_name_eq(const struct lu_fid *f, const struct ldlm_res_id *name)
+{
+ return name->name[0] == fid_seq(f) && name->name[1] == fid_num(f);
+}
+
/* issues dlm lock on passed @ns, @f stores it lock handle into @lh. */
int fid_lock(struct ldlm_namespace *ns, const struct lu_fid *f,
struct lustre_handle *lh, ldlm_mode_t mode,
ldlm_policy_data_t *policy)
{
- struct ldlm_res_id res_id = { .name = {0} };
+ struct ldlm_res_id res_id;
int flags = 0, rc;
ENTRY;
LASSERT(lh != NULL);
LASSERT(f != NULL);
- /* we use fid_num() whoch includes also object version instread of raw
- * fid_oid(). */
- res_id.name[0] = fid_seq(f);
- res_id.name[1] = fid_num(f);
-
/* FIXME: is that correct to have @flags=0 here? */
- rc = ldlm_cli_enqueue(NULL, NULL, ns, res_id, LDLM_IBITS, policy,
- mode, &flags, ldlm_blocking_ast,
- ldlm_completion_ast, NULL, NULL,
- NULL, 0, NULL, lh);
+ rc = ldlm_cli_enqueue(NULL, NULL, ns, *fid_build_res_name(f, &res_id),
+ LDLM_IBITS, policy, mode, &flags,
+ ldlm_blocking_ast, ldlm_completion_ast, NULL,
+ NULL, NULL, 0, NULL, lh);
RETURN (rc == ELDLM_OK ? 0 : -EIO);
}
LBUG();
}
- LASSERT(fid_seq(f) == lock->l_resource->lr_name.name[0] &&
- fid_num(f) == lock->l_resource->lr_name.name[1]);
+ LASSERT(fid_res_name_eq(f, &lock->l_resource->lr_name));
ldlm_lock_decref(lh, mode);
EXIT;
/*
* struct mdt_body is passed in the 0-th incoming buffer.
*/
- HABEO_CORPUS = (1 << 0)
+ HABEO_CORPUS = (1 << 0),
+ /*
+ * struct ldlm_request is passed in MDS_REQ_INTENT_LOCKREQ_OFF-th
+ * incoming buffer.
+ */
+ HABEO_CLAVIS = (1 << 1)
};
struct mdt_opc_slice {
return req->rq_export->exp_mds_data.med_mcd->mcd_last_xid;
}
+static int mdt_lock_resname_compat(struct mdt_device *m,
+ struct ldlm_request *req)
+{
+ /* XXX something... later. */
+ return 0;
+}
+
+static int mdt_lock_reply_compat(struct mdt_device *m, struct ldlm_reply *rep)
+{
+ /* XXX something... later. */
+ return 0;
+}
+
/*
* Invoke handler for this request opc. Also do necessary preprocessing
* (according to handler ->mh_flags), and post-processing (setting of
{
int result;
int off;
+ int lock_conv;
ENTRY;
OBD_FAIL_RETURN(h->mh_fail_id, 0);
off = MDS_REQ_REC_OFF + shift;
+ lock_conv =
+ h->mh_flags & HABEO_CLAVIS &&
+ info->mti_mdt->mdt_flags & MDT_CL_COMPAT_RESNAME;
+
result = 0;
if (h->mh_flags & HABEO_CORPUS) {
- info->mti_body = lustre_swab_reqbuf(req, off,
- sizeof *info->mti_body,
- lustre_swab_mdt_body);
- if (info->mti_body == NULL) {
+ struct mdt_body *body;
+
+ body = info->mti_body =
+ lustre_swab_reqbuf(req, off, sizeof *info->mti_body,
+ lustre_swab_mdt_body);
+ if (body != NULL) {
+ info->mti_object = mdt_object_find(info->mti_mdt,
+ &body->fid1);
+ if (IS_ERR(info->mti_object))
+ result = PTR_ERR(info->mti_object);
+ } else {
CERROR("Can't unpack body\n");
- result = req->rq_status = -EFAULT;
+ result = -EFAULT;
+ }
+ } else if (lock_conv) {
+ struct ldlm_request *dlm;
+
+ LASSERT(shift == 0);
+ dlm = info->mti_dlm_req =
+ lustre_swab_reqbuf(req, MDS_REQ_INTENT_LOCKREQ_OFF,
+ sizeof *dlm,
+ lustre_swab_ldlm_request);
+ if (dlm != NULL)
+ result = mdt_lock_resname_compat(info->mti_mdt, dlm);
+ else {
+ CERROR("Can't unpack dlm request\n");
+ result = -EFAULT;
}
- info->mti_object = mdt_object_find(info->mti_mdt,
- &info->mti_body->fid1);
- if (IS_ERR(info->mti_object))
- result = PTR_ERR(info->mti_object);
}
if (result == 0)
/*
LASSERT(current->journal_info == NULL);
+ if (lock_conv) {
+ struct ldlm_reply *rep;
+
+ rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof *rep);
+ if (rep != NULL)
+ result = mdt_lock_reply_compat(info->mti_mdt, rep);
+ }
+
/* If we're DISCONNECTing, the mds_export_data is already freed */
if (result == 0 && h->mh_opc != MDS_DISCONNECT) {
req->rq_reqmsg->last_xid = le64_to_cpu(req_exp_last_xid(req));
/*
* Handle recovery. Return:
- * +ve: continue request processing;
- * -ve: abort immediately with given (negated) error code;
+ * +1: continue request processing;
+ * -ve: abort immediately with the given error code;
* 0: send reply with error code in req->rq_status;
*/
static int mdt_recovery(struct ptlrpc_request *req)
static int mdt_handle0(struct ptlrpc_request *req, struct mdt_thread_info *info)
{
- struct lustre_msg *msg;
- int result;
+ struct mdt_handler *h;
+ struct lustre_msg *msg;
+ int result;
ENTRY;
msg = req->rq_reqmsg;
result = mds_msg_check_version(msg);
- if (result != 0) {
- CERROR(LUSTRE_MDT0_NAME" drops mal-formed request\n");
- RETURN(result);
- }
-
- result = mdt_recovery(req);
- if (result > 0) {
- struct mdt_handler *h;
-
- h = mdt_handler_find(msg->opc);
- if (h != NULL)
- result = mdt_req_handle(info, h, req, 0);
- else {
- req->rq_status = -ENOTSUPP;
- result = ptlrpc_error(req);
- RETURN(result);
+ if (result == 0) {
+ result = mdt_recovery(req);
+ switch (result) {
+ case +1:
+ h = mdt_handler_find(msg->opc);
+ if (h != NULL)
+ result = mdt_req_handle(info, h, req, 0);
+ else {
+ req->rq_status = -ENOTSUPP;
+ result = ptlrpc_error(req);
+ break;
+ }
+ /* fall through */
+ case 0:
+ result = mdt_reply(req, info);
}
- } else if (result < 0)
- RETURN(result);
-
- RETURN(mdt_reply(req, info));
+ } else
+ CERROR(LUSTRE_MDT0_NAME" drops mal-formed request\n");
+ RETURN(result);
}
static struct mdt_device *mdt_dev(struct lu_device *d)
}
-#define DEF_HNDL(prefix, base, flags, opc, fn) \
-[prefix ## _ ## opc - prefix ## _ ## base] = { \
- .mh_name = #opc, \
- .mh_fail_id = OBD_FAIL_ ## prefix ## _ ## opc ## _NET, \
- .mh_opc = prefix ## _ ## opc, \
- .mh_flags = flags, \
- .mh_act = fn \
+#define DEF_HNDL(prefix, base, suffix, flags, opc, fn) \
+[prefix ## _ ## opc - prefix ## _ ## base] = { \
+ .mh_name = #opc, \
+ .mh_fail_id = OBD_FAIL_ ## prefix ## _ ## opc ## suffix, \
+ .mh_opc = prefix ## _ ## opc, \
+ .mh_flags = flags, \
+ .mh_act = fn \
}
-#define DEF_MDT_HNDL(flags, name, fn) DEF_HNDL(MDS, GETATTR, flags, name, fn)
+#define DEF_MDT_HNDL(flags, name, fn) \
+ DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn)
static struct mdt_handler mdt_mds_ops[] = {
DEF_MDT_HNDL(0, CONNECT, mdt_connect),
DEF_MDT_HNDL(HABEO_CORPUS, SYNC, mdt_sync),
DEF_MDT_HNDL(0, SET_INFO, mdt_set_info),
DEF_MDT_HNDL(0, QUOTACHECK, mdt_handle_quotacheck),
- DEF_MDT_HNDL(0, QUOTACTL, mdt_handle_quotactl),
+ DEF_MDT_HNDL(0, QUOTACTL, mdt_handle_quotactl)
};
static struct mdt_handler mdt_obd_ops[] = {
};
+#define DEF_DLM_HNDL(flags, name, fn) \
+ DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn)
+
static struct mdt_handler mdt_dlm_ops[] = {
+ DEF_DLM_HNDL(HABEO_CLAVIS, ENQUEUE, mdt_enqueue),
+ DEF_DLM_HNDL(HABEO_CLAVIS, CONVERT, mdt_convert),
+ DEF_DLM_HNDL(0, BL_CALLBACK, mdt_bl_callback),
+ DEF_DLM_HNDL(0, CP_CALLBACK, mdt_cp_callback)
};
static struct mdt_handler mdt_llog_ops[] = {