From 1ef040b59bd9d7f127ca491b1f89f2477e6553b2 Mon Sep 17 00:00:00 2001 From: rread Date: Sat, 17 Jul 2004 00:57:13 +0000 Subject: [PATCH] b=1451 Don't allow client to reconnect if an RPC is already in progress for that client. Added an extra counter for this, because the export refcount is also held by locks and who knows what else. --- lustre/include/linux/lustre_export.h | 1 + lustre/include/linux/obd_class.h | 16 ++++++++++++++++ lustre/ldlm/ldlm_lib.c | 29 ++++++++++++++++++++++++----- lustre/obdclass/genops.c | 1 + lustre/ptlrpc/service.c | 7 ++++++- 5 files changed, 48 insertions(+), 6 deletions(-) diff --git a/lustre/include/linux/lustre_export.h b/lustre/include/linux/lustre_export.h index 5570094..8cc24b9 100644 --- a/lustre/include/linux/lustre_export.h +++ b/lustre/include/linux/lustre_export.h @@ -64,6 +64,7 @@ struct filter_export_data { struct obd_export { struct portals_handle exp_handle; atomic_t exp_refcount; + atomic_t exp_rpc_count; struct obd_uuid exp_client_uuid; struct list_head exp_obd_chain; struct obd_device *exp_obd; diff --git a/lustre/include/linux/obd_class.h b/lustre/include/linux/obd_class.h index 85edc8c..d7e847d 100644 --- a/lustre/include/linux/obd_class.h +++ b/lustre/include/linux/obd_class.h @@ -103,6 +103,22 @@ struct lustre_profile { struct lustre_profile *class_get_profile(char * prof); void class_del_profile(char *prof); +#define class_export_rpc_get(exp) \ +({ \ + atomic_inc(&(exp)->exp_rpc_count); \ + CDEBUG(D_INFO, "RPC GETting export %p : new rpc_count %d\n", \ + (exp), atomic_read(&(exp)->exp_rpc_count)); \ + class_export_get(exp); \ +}) + +#define class_export_rpc_put(exp) \ +({ \ + atomic_dec(&(exp)->exp_rpc_count); \ + CDEBUG(D_INFO, "RPC PUTting export %p : new rpc_count %d\n", \ + (exp), atomic_read(&(exp)->exp_rpc_count)); \ + class_export_put(exp); \ +}) + #define class_export_get(exp) \ ({ \ struct obd_export *exp_ = exp; \ diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index 7c636f3..7d28ef0 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -375,6 +375,7 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) int rc = 0, abort_recovery; unsigned long flags; int initial_conn = 0; + char peer_str[PTL_NALFMT_SIZE]; ENTRY; OBD_RACE(OBD_FAIL_TGT_CONN_RACE); @@ -461,7 +462,22 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) /* If we found an export, we already unlocked. */ if (!export) { spin_unlock(&target->obd_dev_lock); - } else if (req->rq_reqmsg->conn_cnt == 1 && !initial_conn) { + } else if (req->rq_export == NULL && + atomic_read(&export->exp_rpc_count) > 0) { + CWARN("%s: refuse connection from %s/%s to 0x%p/%d\n", + target->obd_name, cluuid.uuid, + ptlrpc_peernid2str(&req->rq_peer, peer_str), + export, atomic_read(&export->exp_refcount)); + GOTO(out, rc = -EBUSY); + } else if (req->rq_export != NULL && + atomic_read(&export->exp_rpc_count) > 1) { + CWARN("%s: refuse reconnection from %s@%s to 0x%p/%d\n", + target->obd_name, cluuid.uuid, + ptlrpc_peernid2str(&req->rq_peer, peer_str), + export, atomic_read(&export->exp_rpc_count)); + GOTO(out, rc = -EBUSY); + } + else if (req->rq_reqmsg->conn_cnt == 1 && !initial_conn) { CERROR("%s reconnected with 1 conn_cnt; cookies not random?\n", cluuid.uuid); GOTO(out, rc = -EALREADY); @@ -469,7 +485,8 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) /* Tell the client if we're in recovery. */ /* If this is the first client, start the recovery timer */ - CWARN("%s: connection from %s %s\n", target->obd_name, cluuid.uuid, + CWARN("%s: connection from %s@%s %s\n", target->obd_name, cluuid.uuid, + ptlrpc_peernid2str(&req->rq_peer, peer_str), target->obd_recovering ? "(recovering)" : ""); if (target->obd_recovering) { lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECOVERING); @@ -482,8 +499,9 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) #endif if (export == NULL) { if (target->obd_recovering) { - CERROR("denying connection for new client %s: " + CERROR("denying connection for new client %s@%s: " "%d clients in recovery for %lds\n", cluuid.uuid, + ptlrpc_peernid2str(&req->rq_peer, peer_str), target->obd_recoverable_clients, (target->obd_recovery_timer.expires-jiffies)/HZ); rc = -EBUSY; @@ -522,8 +540,9 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) if (initial_conn) { req->rq_repmsg->conn_cnt = export->exp_conn_cnt + 1; } else if (export->exp_conn_cnt >= req->rq_reqmsg->conn_cnt) { - CERROR("%s: already connected at a higher conn_cnt: %d > %d\n", - cluuid.uuid, export->exp_conn_cnt, + CERROR("%s@%s: already connected at a higher conn_cnt: %d > %d\n", + cluuid.uuid, ptlrpc_peernid2str(&req->rq_peer, peer_str), + export->exp_conn_cnt, req->rq_reqmsg->conn_cnt); spin_unlock_irqrestore(&export->exp_lock, flags); GOTO(out, rc = -EALREADY); diff --git a/lustre/obdclass/genops.c b/lustre/obdclass/genops.c index b3f43ba..c355425 100644 --- a/lustre/obdclass/genops.c +++ b/lustre/obdclass/genops.c @@ -462,6 +462,7 @@ struct obd_export *class_new_export(struct obd_device *obd) export->exp_conn_cnt = 0; atomic_set(&export->exp_refcount, 2); + atomic_set(&export->exp_rpc_count, 0); export->exp_obd = obd; INIT_LIST_HEAD(&export->exp_outstanding_replies); /* XXX this should be in LDLM init */ diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index d8271f2..36bed9b 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -357,6 +357,7 @@ ptlrpc_server_free_request(struct ptlrpc_service *svc, struct ptlrpc_request *re static int ptlrpc_server_handle_request (struct ptlrpc_service *svc) { + struct obd_export *export = NULL; struct ptlrpc_request *request; unsigned long flags; struct timeval work_start; @@ -440,7 +441,8 @@ ptlrpc_server_handle_request (struct ptlrpc_service *svc) request->rq_export->exp_conn_cnt); goto put_conn; } - + + export = class_export_rpc_get(request->rq_export); request->rq_export->exp_last_request_time = LTIME_S(CURRENT_TIME); } @@ -468,6 +470,9 @@ ptlrpc_server_handle_request (struct ptlrpc_service *svc) ptlrpc_peernid2str(&request->rq_peer, str), request->rq_reqmsg->opc); + if (export != NULL) + class_export_rpc_put(export); + put_conn: if (request->rq_export != NULL) class_export_put(request->rq_export); -- 1.8.3.1