From: nikita Date: Thu, 6 Apr 2006 13:47:08 +0000 (+0000) Subject: mdt prototype: 0. add handlers for dlm requests; 1. add fid_build_res_name() and... X-Git-Tag: v1_8_0_110~486^2~2090 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=ea9017e9545ba4b5c57bc3cab67b84221738ce38;p=fs%2Flustre-release.git mdt prototype: 0. add handlers for dlm requests; 1. add fid_build_res_name() and fid_res_name_eq() (should go into separate file, ultimately); 2. add some stubs for interoperability code --- diff --git a/lustre/include/linux/lustre_dlm.h b/lustre/include/linux/lustre_dlm.h index 7d03b63..9d632eb 100644 --- a/lustre/include/linux/lustre_dlm.h +++ b/lustre/include/linux/lustre_dlm.h @@ -444,6 +444,11 @@ int ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data); /* ldlm_extent.c */ __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms); +struct ldlm_callback_suite { + ldlm_completion_callback lcs_completion; + ldlm_blocking_callback lcs_blocking; + ldlm_glimpse_callback lcs_glimpse; +}; /* ldlm_lockd.c */ int ldlm_server_blocking_ast(struct ldlm_lock *, struct ldlm_lock_desc *, @@ -452,7 +457,12 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data); int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data); int ldlm_handle_enqueue(struct ptlrpc_request *req, ldlm_completion_callback, ldlm_blocking_callback, ldlm_glimpse_callback); +int ldlm_handle_enqueue0(struct ptlrpc_request *req, + struct ldlm_request *dlm_req, + struct ldlm_callback_suite *cbs); int ldlm_handle_convert(struct ptlrpc_request *req); +int ldlm_handle_convert0(struct ptlrpc_request *req, + struct ldlm_request *dlm_req); int ldlm_handle_cancel(struct ptlrpc_request *req); int ldlm_del_waiting_lock(struct ldlm_lock *lock); int ldlm_get_ref(void); diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 25e042c..a6da2a9 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -671,14 +671,12 @@ find_existing_lock(struct obd_export *exp, struct lustre_handle *remote_hdl) * Main server-side entry point into LDLM. This is called by ptlrpc service * threads to carry out client lock enqueueing requests. */ -int ldlm_handle_enqueue(struct ptlrpc_request *req, - ldlm_completion_callback completion_callback, - ldlm_blocking_callback blocking_callback, - ldlm_glimpse_callback glimpse_callback) +int ldlm_handle_enqueue0(struct ptlrpc_request *req, + struct ldlm_request *dlm_req, + struct ldlm_callback_suite *cbs) { struct obd_device *obddev = req->rq_export->exp_obd; struct ldlm_reply *dlm_rep; - struct ldlm_request *dlm_req; int rc = 0, size[2] = {sizeof(*dlm_rep)}; __u32 flags; ldlm_error_t err = ELDLM_OK; @@ -688,14 +686,6 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req, LDLM_DEBUG_NOLOCK("server-side enqueue handler START"); - dlm_req = lustre_swab_reqbuf (req, MDS_REQ_INTENT_LOCKREQ_OFF, - sizeof (*dlm_req), - lustre_swab_ldlm_request); - if (dlm_req == NULL) { - CERROR ("Can't unpack dlm_req\n"); - GOTO(out, rc = -EFAULT); - } - flags = dlm_req->lock_flags; LASSERT(req->rq_export); @@ -759,8 +749,8 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req, dlm_req->lock_desc.l_resource.lr_name, dlm_req->lock_desc.l_resource.lr_type, dlm_req->lock_desc.l_req_mode, - blocking_callback, completion_callback, - glimpse_callback, NULL, 0); + cbs->lcs_blocking, cbs->lcs_completion, + cbs->lcs_glimpse, NULL, 0); if (!lock) GOTO(out, rc = -ENOMEM); @@ -918,20 +908,38 @@ existing_lock: return rc; } -int ldlm_handle_convert(struct ptlrpc_request *req) +int ldlm_handle_enqueue(struct ptlrpc_request *req, + ldlm_completion_callback completion_callback, + ldlm_blocking_callback blocking_callback, + ldlm_glimpse_callback glimpse_callback) { + int rc; struct ldlm_request *dlm_req; - struct ldlm_reply *dlm_rep; - struct ldlm_lock *lock; - int rc, size = sizeof(*dlm_rep); - ENTRY; + struct ldlm_callback_suite cbs = { + .lcs_completion = completion_callback, + .lcs_blocking = blocking_callback, + .lcs_glimpse = glimpse_callback + }; - dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req), - lustre_swab_ldlm_request); + + dlm_req = lustre_swab_reqbuf(req, MDS_REQ_INTENT_LOCKREQ_OFF, + sizeof *dlm_req, lustre_swab_ldlm_request); if (dlm_req == NULL) { + rc = ldlm_handle_enqueue0(req, dlm_req, &cbs); + } else { CERROR ("Can't unpack dlm_req\n"); - RETURN (-EFAULT); + rc = -EFAULT; } + return rc; +} + +int ldlm_handle_convert0(struct ptlrpc_request *req, + struct ldlm_request *dlm_req) +{ + struct ldlm_reply *dlm_rep; + struct ldlm_lock *lock; + int rc, size = sizeof(*dlm_rep); + ENTRY; rc = lustre_pack_reply(req, 1, &size, NULL); if (rc) { @@ -977,6 +985,22 @@ int ldlm_handle_convert(struct ptlrpc_request *req) RETURN(0); } +int ldlm_handle_convert(struct ptlrpc_request *req) +{ + int rc; + struct ldlm_request *dlm_req; + + dlm_req = lustre_swab_reqbuf(req, 0, sizeof *dlm_req, + lustre_swab_ldlm_request); + if (dlm_req != NULL) { + rc = ldlm_handle_convert0(req, dlm_req); + } else { + CERROR ("Can't unpack dlm_req\n"); + rc = -EFAULT; + } + return rc; +} + int ldlm_handle_cancel(struct ptlrpc_request *req) { struct ldlm_request *dlm_req; @@ -1758,8 +1782,10 @@ EXPORT_SYMBOL(ldlm_server_blocking_ast); EXPORT_SYMBOL(ldlm_server_completion_ast); EXPORT_SYMBOL(ldlm_server_glimpse_ast); EXPORT_SYMBOL(ldlm_handle_enqueue); +EXPORT_SYMBOL(ldlm_handle_enqueue0); EXPORT_SYMBOL(ldlm_handle_cancel); EXPORT_SYMBOL(ldlm_handle_convert); +EXPORT_SYMBOL(ldlm_handle_convert0); EXPORT_SYMBOL(ldlm_del_waiting_lock); EXPORT_SYMBOL(ldlm_get_ref); EXPORT_SYMBOL(ldlm_put_ref); diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 349737f..670c7aa 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -188,12 +188,80 @@ static int mdt_handle_quotactl(struct mdt_thread_info *info, return -EOPNOTSUPP; } +/* + * DLM handlers. + */ + +static struct ldlm_callback_suite cbs = { + .lcs_completion = ldlm_server_completion_ast, + .lcs_blocking = ldlm_server_blocking_ast, + .lcs_glimpse = NULL +}; + +static int mdt_enqueue(struct mdt_thread_info *info, + struct ptlrpc_request *req, int offset) +{ + /* + * info->mti_dlm_req already contains swapped and (if necessary) + * converted dlm request. + */ + LASSERT(info->mti_dlm_req); + + info->mti_fail_id = OBD_FAIL_LDLM_REPLY; + return ldlm_handle_enqueue0(req, info->mti_dlm_req, &cbs); +} + +static int mdt_convert(struct mdt_thread_info *info, + struct ptlrpc_request *req, int offset) +{ + LASSERT(info->mti_dlm_req); + return ldlm_handle_convert0(req, info->mti_dlm_req); +} + +static int mdt_bl_callback(struct mdt_thread_info *info, + struct ptlrpc_request *req, int offset) +{ + CERROR("bl callbacks should not happen on MDS\n"); + LBUG(); + return -EOPNOTSUPP; +} + +static int mdt_cp_callback(struct mdt_thread_info *info, + struct ptlrpc_request *req, int offset) +{ + CERROR("cp callbacks should not happen on MDS\n"); + LBUG(); + return -EOPNOTSUPP; +} + +/* + * Build (DLM) resource name from fid. + */ +struct ldlm_res_id *fid_build_res_name(const struct lu_fid *f, + struct ldlm_res_id *name) +{ + memset(name, 0, sizeof *name); + /* we use fid_num() whoch includes also object version instread of raw + * fid_oid(). */ + name->name[0] = fid_seq(f); + name->name[1] = fid_num(f); + return name; +} + +/* + * Return true if resource is for object identified by fid. + */ +int fid_res_name_eq(const struct lu_fid *f, const struct ldlm_res_id *name) +{ + return name->name[0] == fid_seq(f) && name->name[1] == fid_num(f); +} + /* issues dlm lock on passed @ns, @f stores it lock handle into @lh. */ int fid_lock(struct ldlm_namespace *ns, const struct lu_fid *f, struct lustre_handle *lh, ldlm_mode_t mode, ldlm_policy_data_t *policy) { - struct ldlm_res_id res_id = { .name = {0} }; + struct ldlm_res_id res_id; int flags = 0, rc; ENTRY; @@ -201,16 +269,11 @@ int fid_lock(struct ldlm_namespace *ns, const struct lu_fid *f, LASSERT(lh != NULL); LASSERT(f != NULL); - /* we use fid_num() whoch includes also object version instread of raw - * fid_oid(). */ - res_id.name[0] = fid_seq(f); - res_id.name[1] = fid_num(f); - /* FIXME: is that correct to have @flags=0 here? */ - rc = ldlm_cli_enqueue(NULL, NULL, ns, res_id, LDLM_IBITS, policy, - mode, &flags, ldlm_blocking_ast, - ldlm_completion_ast, NULL, NULL, - NULL, 0, NULL, lh); + rc = ldlm_cli_enqueue(NULL, NULL, ns, *fid_build_res_name(f, &res_id), + LDLM_IBITS, policy, mode, &flags, + ldlm_blocking_ast, ldlm_completion_ast, NULL, + NULL, NULL, 0, NULL, lh); RETURN (rc == ELDLM_OK ? 0 : -EIO); } @@ -227,8 +290,7 @@ void fid_unlock(struct ldlm_namespace *ns, const struct lu_fid *f, LBUG(); } - LASSERT(fid_seq(f) == lock->l_resource->lr_name.name[0] && - fid_num(f) == lock->l_resource->lr_name.name[1]); + LASSERT(fid_res_name_eq(f, &lock->l_resource->lr_name)); ldlm_lock_decref(lh, mode); EXIT; @@ -325,7 +387,12 @@ enum mdt_handler_flags { /* * struct mdt_body is passed in the 0-th incoming buffer. */ - HABEO_CORPUS = (1 << 0) + HABEO_CORPUS = (1 << 0), + /* + * struct ldlm_request is passed in MDS_REQ_INTENT_LOCKREQ_OFF-th + * incoming buffer. + */ + HABEO_CLAVIS = (1 << 1) }; struct mdt_opc_slice { @@ -360,6 +427,19 @@ static inline __u64 req_exp_last_xid(struct ptlrpc_request *req) return req->rq_export->exp_mds_data.med_mcd->mcd_last_xid; } +static int mdt_lock_resname_compat(struct mdt_device *m, + struct ldlm_request *req) +{ + /* XXX something... later. */ + return 0; +} + +static int mdt_lock_reply_compat(struct mdt_device *m, struct ldlm_reply *rep) +{ + /* XXX something... later. */ + return 0; +} + /* * Invoke handler for this request opc. Also do necessary preprocessing * (according to handler ->mh_flags), and post-processing (setting of @@ -371,6 +451,7 @@ static int mdt_req_handle(struct mdt_thread_info *info, { int result; int off; + int lock_conv; ENTRY; @@ -384,19 +465,40 @@ static int mdt_req_handle(struct mdt_thread_info *info, OBD_FAIL_RETURN(h->mh_fail_id, 0); off = MDS_REQ_REC_OFF + shift; + lock_conv = + h->mh_flags & HABEO_CLAVIS && + info->mti_mdt->mdt_flags & MDT_CL_COMPAT_RESNAME; + result = 0; if (h->mh_flags & HABEO_CORPUS) { - info->mti_body = lustre_swab_reqbuf(req, off, - sizeof *info->mti_body, - lustre_swab_mdt_body); - if (info->mti_body == NULL) { + struct mdt_body *body; + + body = info->mti_body = + lustre_swab_reqbuf(req, off, sizeof *info->mti_body, + lustre_swab_mdt_body); + if (body != NULL) { + info->mti_object = mdt_object_find(info->mti_mdt, + &body->fid1); + if (IS_ERR(info->mti_object)) + result = PTR_ERR(info->mti_object); + } else { CERROR("Can't unpack body\n"); - result = req->rq_status = -EFAULT; + result = -EFAULT; + } + } else if (lock_conv) { + struct ldlm_request *dlm; + + LASSERT(shift == 0); + dlm = info->mti_dlm_req = + lustre_swab_reqbuf(req, MDS_REQ_INTENT_LOCKREQ_OFF, + sizeof *dlm, + lustre_swab_ldlm_request); + if (dlm != NULL) + result = mdt_lock_resname_compat(info->mti_mdt, dlm); + else { + CERROR("Can't unpack dlm request\n"); + result = -EFAULT; } - info->mti_object = mdt_object_find(info->mti_mdt, - &info->mti_body->fid1); - if (IS_ERR(info->mti_object)) - result = PTR_ERR(info->mti_object); } if (result == 0) /* @@ -414,6 +516,14 @@ static int mdt_req_handle(struct mdt_thread_info *info, LASSERT(current->journal_info == NULL); + if (lock_conv) { + struct ldlm_reply *rep; + + rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof *rep); + if (rep != NULL) + result = mdt_lock_reply_compat(info->mti_mdt, rep); + } + /* If we're DISCONNECTing, the mds_export_data is already freed */ if (result == 0 && h->mh_opc != MDS_DISCONNECT) { req->rq_reqmsg->last_xid = le64_to_cpu(req_exp_last_xid(req)); @@ -559,8 +669,8 @@ static int mdt_filter_recovery_request(struct ptlrpc_request *req, /* * Handle recovery. Return: - * +ve: continue request processing; - * -ve: abort immediately with given (negated) error code; + * +1: continue request processing; + * -ve: abort immediately with the given error code; * 0: send reply with error code in req->rq_status; */ static int mdt_recovery(struct ptlrpc_request *req) @@ -642,8 +752,9 @@ static int mdt_reply(struct ptlrpc_request *req, struct mdt_thread_info *info) static int mdt_handle0(struct ptlrpc_request *req, struct mdt_thread_info *info) { - struct lustre_msg *msg; - int result; + struct mdt_handler *h; + struct lustre_msg *msg; + int result; ENTRY; @@ -653,27 +764,25 @@ static int mdt_handle0(struct ptlrpc_request *req, struct mdt_thread_info *info) msg = req->rq_reqmsg; result = mds_msg_check_version(msg); - if (result != 0) { - CERROR(LUSTRE_MDT0_NAME" drops mal-formed request\n"); - RETURN(result); - } - - result = mdt_recovery(req); - if (result > 0) { - struct mdt_handler *h; - - h = mdt_handler_find(msg->opc); - if (h != NULL) - result = mdt_req_handle(info, h, req, 0); - else { - req->rq_status = -ENOTSUPP; - result = ptlrpc_error(req); - RETURN(result); + if (result == 0) { + result = mdt_recovery(req); + switch (result) { + case +1: + h = mdt_handler_find(msg->opc); + if (h != NULL) + result = mdt_req_handle(info, h, req, 0); + else { + req->rq_status = -ENOTSUPP; + result = ptlrpc_error(req); + break; + } + /* fall through */ + case 0: + result = mdt_reply(req, info); } - } else if (result < 0) - RETURN(result); - - RETURN(mdt_reply(req, info)); + } else + CERROR(LUSTRE_MDT0_NAME" drops mal-formed request\n"); + RETURN(result); } static struct mdt_device *mdt_dev(struct lu_device *d) @@ -1009,16 +1118,17 @@ static void __exit mdt_mod_exit(void) } -#define DEF_HNDL(prefix, base, flags, opc, fn) \ -[prefix ## _ ## opc - prefix ## _ ## base] = { \ - .mh_name = #opc, \ - .mh_fail_id = OBD_FAIL_ ## prefix ## _ ## opc ## _NET, \ - .mh_opc = prefix ## _ ## opc, \ - .mh_flags = flags, \ - .mh_act = fn \ +#define DEF_HNDL(prefix, base, suffix, flags, opc, fn) \ +[prefix ## _ ## opc - prefix ## _ ## base] = { \ + .mh_name = #opc, \ + .mh_fail_id = OBD_FAIL_ ## prefix ## _ ## opc ## suffix, \ + .mh_opc = prefix ## _ ## opc, \ + .mh_flags = flags, \ + .mh_act = fn \ } -#define DEF_MDT_HNDL(flags, name, fn) DEF_HNDL(MDS, GETATTR, flags, name, fn) +#define DEF_MDT_HNDL(flags, name, fn) \ + DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn) static struct mdt_handler mdt_mds_ops[] = { DEF_MDT_HNDL(0, CONNECT, mdt_connect), @@ -1037,13 +1147,20 @@ static struct mdt_handler mdt_mds_ops[] = { DEF_MDT_HNDL(HABEO_CORPUS, SYNC, mdt_sync), DEF_MDT_HNDL(0, SET_INFO, mdt_set_info), DEF_MDT_HNDL(0, QUOTACHECK, mdt_handle_quotacheck), - DEF_MDT_HNDL(0, QUOTACTL, mdt_handle_quotactl), + DEF_MDT_HNDL(0, QUOTACTL, mdt_handle_quotactl) }; static struct mdt_handler mdt_obd_ops[] = { }; +#define DEF_DLM_HNDL(flags, name, fn) \ + DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn) + static struct mdt_handler mdt_dlm_ops[] = { + DEF_DLM_HNDL(HABEO_CLAVIS, ENQUEUE, mdt_enqueue), + DEF_DLM_HNDL(HABEO_CLAVIS, CONVERT, mdt_convert), + DEF_DLM_HNDL(0, BL_CALLBACK, mdt_bl_callback), + DEF_DLM_HNDL(0, CP_CALLBACK, mdt_cp_callback) }; static struct mdt_handler mdt_llog_ops[] = { diff --git a/lustre/mdt/mdt_internal.h b/lustre/mdt/mdt_internal.h index d57e315..cd4ae32 100644 --- a/lustre/mdt/mdt_internal.h +++ b/lustre/mdt/mdt_internal.h @@ -84,6 +84,19 @@ struct mdt_device { struct ptlrpc_client mdt_ldlm_client; /* underlying device */ struct md_device *mdt_child; + /* + * Device flags, taken from enum mdt_flags. No locking (so far) is + * necessary. + */ + unsigned long mdt_flags; +}; + +enum mdt_flags { + /* + * This mdt works with legacy clients with different resource name + * encoding (pre-fid, etc.). + */ + MDT_CL_COMPAT_RESNAME = 1 << 0, }; struct md_object { @@ -161,6 +174,10 @@ struct mdt_thread_info { */ struct mdt_body *mti_body; /* + * Lock request for "habeo clavis" operations. + */ + struct ldlm_request *mti_dlm_req; + /* * Host object. This is released at the end of mdt_handler(). */ struct mdt_object *mti_object;