From: vitaly Date: Thu, 21 Feb 2008 13:25:21 +0000 (+0000) Subject: Branch HEAD X-Git-Tag: v1_7_0_51~222 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=218d9b7631b25e9e2ea4e2c4e8b752aa13749c18 Branch HEAD b=14149 i=shadow i=huanghua fixes for ELC bugs appeared after moving to capsule --- diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index 1a9768d..748fe49 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -883,6 +883,7 @@ int lustre_pack_reply_v2(struct ptlrpc_request *req, int count, int lustre_shrink_msg(struct lustre_msg *msg, int segment, unsigned int newlen, int move_data); void lustre_free_reply_state(struct ptlrpc_reply_state *rs); +int lustre_msg_hdr_size(__u32 magic, int count); int lustre_msg_size(__u32 magic, int count, int *lengths); int lustre_msg_size_v2(int count, int *lengths); int lustre_packed_msg_size(struct lustre_msg *msg); diff --git a/lustre/include/lustre_req_layout.h b/lustre/include/lustre_req_layout.h index eb5cde5b..4c88a6b 100644 --- a/lustre/include/lustre_req_layout.h +++ b/lustre/include/lustre_req_layout.h @@ -91,6 +91,9 @@ void req_capsule_set_size(struct req_capsule *pill, int req_capsule_get_size(const struct req_capsule *pill, const struct req_msg_field *field, enum req_location loc); +int req_capsule_msg_size(const struct req_capsule *pill, enum req_location loc); +int req_capsule_fmt_size(__u32 magic, const struct req_format *fmt, + enum req_location loc); void req_capsule_extend(struct req_capsule *pill, const struct req_format *fmt); int req_capsule_has_field(const struct req_capsule *pill, diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 9ccf69c..fae7632 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -486,28 +486,31 @@ cleanup: /* PAGE_SIZE-512 is to allow TCP/IP and LNET headers to fit into * a single page on the send/receive side. XXX: 512 should be changed * to more adequate value. */ -static inline int ldlm_req_handles_avail(struct obd_export *exp, - int *size, int bufcount, - int bufoff, int off) +static inline int ldlm_req_handles_avail(int req_size, int off) { - int avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512); - int old_size = size[bufoff]; + int avail; - size[bufoff] = sizeof(struct ldlm_request); - avail -= lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, - bufcount, size); + avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512) - req_size; avail /= sizeof(struct lustre_handle); avail += LDLM_LOCKREQ_HANDLES - off; - size[bufoff] = old_size; return avail; } -static inline int ldlm_cancel_handles_avail(struct obd_export *exp) +static inline int ldlm_capsule_handles_avail(struct req_capsule *pill, + enum req_location loc, + int off) { - int size[2] = { sizeof(struct ptlrpc_body), - sizeof(struct ldlm_request) }; - return ldlm_req_handles_avail(exp, size, 2, DLM_LOCKREQ_OFF, 0); + int size = req_capsule_msg_size(pill, loc); + return ldlm_req_handles_avail(size, off); +} + +static inline int ldlm_format_handles_avail(struct obd_import *imp, + const struct req_format *fmt, + enum req_location loc, int off) +{ + int size = req_capsule_fmt_size(imp->imp_msg_magic, fmt, loc); + return ldlm_req_handles_avail(size, off); } /* Cancel lru locks and pack them into the enqueue request. Pack there the given @@ -520,17 +523,17 @@ int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req, struct req_capsule *pill = &req->rq_pill; struct ldlm_request *dlm = NULL; int flags, avail, to_free, bufcount, pack = 0; + CFS_LIST_HEAD(head); int rc; ENTRY; - - LASSERT(cancels != NULL); - + if (cancels == NULL) + cancels = &head; if (exp_connect_cancelset(exp)) { /* Estimate the amount of available space in the request. */ bufcount = req_capsule_filled_sizes(pill, RCL_CLIENT); - avail = ldlm_req_handles_avail(exp, pill->rc_area[RCL_CLIENT], - bufcount, bufcount - 1, canceloff); + avail = ldlm_capsule_handles_avail(pill, RCL_CLIENT, canceloff); + flags = ns_connect_lru_resize(ns) ? LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED; to_free = !ns_connect_lru_resize(ns) && @@ -546,8 +549,8 @@ int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req, pack = count; else pack = avail; - req_capsule_set_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT, - ldlm_request_bufsize(count, opc)); + req_capsule_set_size(pill, &RMF_DLM_REQ, RCL_CLIENT, + ldlm_request_bufsize(pack, opc)); } rc = ptlrpc_request_pack(req, version, opc); @@ -576,10 +579,8 @@ int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req, RETURN(0); } -int ldlm_prep_enqueue_req(struct obd_export *exp, - struct ptlrpc_request *req, - struct list_head *cancels, - int count) +int ldlm_prep_enqueue_req(struct obd_export *exp, struct ptlrpc_request *req, + struct list_head *cancels, int count) { return ldlm_prep_elc_req(exp, req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE, LDLM_ENQUEUE_CANCEL_OFF, cancels, count); @@ -923,9 +924,14 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels, if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_RACE)) RETURN(count); + free = ldlm_format_handles_avail(class_exp2cliimp(exp), + &RQF_LDLM_CANCEL, RCL_CLIENT, 0); + if (count > free) + count = free; + while (1) { int bufcount; - struct req_capsule *pill; + imp = class_exp2cliimp(exp); if (imp == NULL || imp->imp_invalid) { CDEBUG(D_DLMTRACE, @@ -937,14 +943,7 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels, if (req == NULL) GOTO(out, rc = -ENOMEM); - pill = &req->rq_pill; - bufcount = req_capsule_filled_sizes(pill, RCL_CLIENT); - - free = ldlm_req_handles_avail(exp, pill->rc_area[RCL_CLIENT], - bufcount, bufcount, 0); - if (count > free) - count = free; - + bufcount = req_capsule_filled_sizes(&req->rq_pill, RCL_CLIENT); req_capsule_set_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT, ldlm_request_bufsize(count, LDLM_CANCEL)); @@ -1057,6 +1056,7 @@ EXPORT_SYMBOL(ldlm_cli_update_pool); int ldlm_cli_cancel(struct lustre_handle *lockh) { + struct obd_export *exp; int avail, flags, count = 1, rc = 0; struct ldlm_namespace *ns; struct ldlm_lock *lock; @@ -1080,8 +1080,12 @@ int ldlm_cli_cancel(struct lustre_handle *lockh) * here and send them all as one LDLM_CANCEL rpc. */ LASSERT(list_empty(&lock->l_bl_ast)); list_add(&lock->l_bl_ast, &cancels); - if (exp_connect_cancelset(lock->l_conn_export)) { - avail = ldlm_cancel_handles_avail(lock->l_conn_export); + + exp = lock->l_conn_export; + if (exp_connect_cancelset(exp)) { + avail = ldlm_format_handles_avail(class_exp2cliimp(exp), + &RQF_LDLM_CANCEL, + RCL_CLIENT, 0); LASSERT(avail > 0); ns = lock->l_resource->lr_namespace; diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index 343b918..932564d 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -307,19 +307,11 @@ static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp, RCL_CLIENT, 0); } - if (exp_connect_cancelset(exp) && count) { - req_capsule_set_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT, - ldlm_request_bufsize(count, LDLM_ENQUEUE)); - } - - rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE); + rc = ldlm_prep_enqueue_req(exp, req, &cancels, count); if (rc) { ptlrpc_request_free(req); - ldlm_lock_list_put(&cancels, l_bl_ast, count); return NULL; } - if (exp_connect_cancelset(exp) && req) - ldlm_cli_cancel_list(&cancels, count, req, 0); if (joinfile) { __u64 head_size = *(__u64 *)op_data->op_data; @@ -365,7 +357,7 @@ static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp, req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, op_data->op_namelen + 1); - rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE); + rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); if (rc) { ptlrpc_request_free(req); RETURN(ERR_PTR(rc)); @@ -410,7 +402,7 @@ static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp, req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, op_data->op_namelen + 1); - rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE); + rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); if (rc) { ptlrpc_request_free(req); RETURN(ERR_PTR(rc)); @@ -435,14 +427,19 @@ static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp, static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp) { struct ptlrpc_request *req; + int rc; ENTRY; - req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), - &RQF_LDLM_ENQUEUE, LUSTRE_DLM_VERSION, - LDLM_ENQUEUE); + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE); if (req == NULL) RETURN(ERR_PTR(-ENOMEM)); + rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); + if (rc) { + ptlrpc_request_free(req); + RETURN(ERR_PTR(rc)); + } + ptlrpc_request_set_replen(req); RETURN(req); } diff --git a/lustre/mdc/mdc_reint.c b/lustre/mdc/mdc_reint.c index 9a81e2c..8a49a72 100644 --- a/lustre/mdc/mdc_reint.c +++ b/lustre/mdc/mdc_reint.c @@ -138,7 +138,6 @@ int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data, rc = mdc_prep_elc_req(exp, req, &cancels, count); if (rc) { - ldlm_lock_list_put(&cancels, l_bl_ast, count); ptlrpc_request_free(req); RETURN(rc); } @@ -233,7 +232,6 @@ int mdc_create(struct obd_export *exp, struct md_op_data *op_data, rc = mdc_prep_elc_req(exp, req, &cancels, count); if (rc) { - ldlm_lock_list_put(&cancels, l_bl_ast, count); ptlrpc_request_free(req); RETURN(rc); } @@ -306,7 +304,6 @@ int mdc_unlink(struct obd_export *exp, struct md_op_data *op_data, rc = mdc_prep_elc_req(exp, req, &cancels, count); if (rc) { - ldlm_lock_list_put(&cancels, l_bl_ast, count); ptlrpc_request_free(req); RETURN(rc); } @@ -359,7 +356,6 @@ int mdc_link(struct obd_export *exp, struct md_op_data *op_data, rc = mdc_prep_elc_req(exp, req, &cancels, count); if (rc) { - ldlm_lock_list_put(&cancels, l_bl_ast, count); ptlrpc_request_free(req); RETURN(rc); } @@ -420,7 +416,6 @@ int mdc_rename(struct obd_export *exp, struct md_op_data *op_data, rc = mdc_prep_elc_req(exp, req, &cancels, count); if (rc) { - ldlm_lock_list_put(&cancels, l_bl_ast, count); ptlrpc_request_free(req); RETURN(rc); } diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 1f1c42b..f89cb19 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -694,7 +694,6 @@ static int osc_destroy(struct obd_export *exp, struct obdo *oa, rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY, 0, &cancels, count); if (rc) { - ldlm_lock_list_put(&cancels, l_bl_ast, count); ptlrpc_request_free(req); RETURN(rc); } diff --git a/lustre/ptlrpc/layout.c b/lustre/ptlrpc/layout.c index cfdf171..89ffe05 100644 --- a/lustre/ptlrpc/layout.c +++ b/lustre/ptlrpc/layout.c @@ -1435,6 +1435,31 @@ int req_capsule_get_size(const struct req_capsule *pill, } EXPORT_SYMBOL(req_capsule_get_size); +int req_capsule_msg_size(const struct req_capsule *pill, enum req_location loc) +{ + return lustre_msg_size(pill->rc_req->rq_import->imp_msg_magic, + pill->rc_fmt->rf_fields[loc].nr, + (int *)pill->rc_area[loc]); +} + +int req_capsule_fmt_size(__u32 magic, const struct req_format *fmt, + enum req_location loc) +{ + int size, i = 0; + + size = lustre_msg_hdr_size(magic, fmt->rf_fields[loc].nr); + if (size < 0) + return size; + + if (magic == LUSTRE_MSG_MAGIC_V1) + i++; + + for (; i < fmt->rf_fields[loc].nr; ++i) + if (fmt->rf_fields[loc].d[i]->rmf_size != -1) + size += size_round(fmt->rf_fields[loc].d[i]->rmf_size); + return size; +} + #define FMT_FIELD(fmt, i, j) (fmt)->rf_fields[(i)].d[(j)] void req_capsule_extend(struct req_capsule *pill, const struct req_format *fmt) diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index e49eaa5..d108815 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -55,6 +55,20 @@ static inline int lustre_msg_hdr_size_v2(int count) return size_round(offsetof(struct lustre_msg_v2, lm_buflens[count])); } +int lustre_msg_hdr_size(__u32 magic, int count) +{ + switch (magic) { + case LUSTRE_MSG_MAGIC_V1: + return lustre_msg_hdr_size_v1(count - 1); + case LUSTRE_MSG_MAGIC_V2: + return lustre_msg_hdr_size_v2(count); + default: + LASSERTF(0, "incorrect message magic: %08x\n", magic); + return -EINVAL; + } +} +EXPORT_SYMBOL(lustre_msg_hdr_size); + int lustre_msg_swabbed(struct lustre_msg *msg) { return (msg->lm_magic == LUSTRE_MSG_MAGIC_V1_SWABBED) ||