#include <linux/lustre_lib.h>
#include <linux/lustre_ha.h>
-void ptlrpc_init_client(struct recovd_obd *recovd,
- int (*recover)(struct ptlrpc_client *recover),
- int req_portal,
- int rep_portal, struct ptlrpc_client *cl)
+void ptlrpc_init_client(int req_portal, int rep_portal, struct ptlrpc_client *cl,
+ struct ptlrpc_connection *conn)
{
memset(cl, 0, sizeof(*cl));
- cl->cli_recover = recover;
- if (recovd)
- recovd_cli_manage(recovd, cl);
+ /* Some things, like the LDLM, can call us without a connection.
+ * I don't like it one bit.
+ */
+ if (conn) {
+ cl->cli_connection = conn;
+ list_add(&cl->cli_client_chain, &conn->c_clients);
+ }
cl->cli_obd = NULL;
cl->cli_request_portal = req_portal;
cl->cli_reply_portal = rep_portal;
- INIT_LIST_HEAD(&cl->cli_delayed_head);
- INIT_LIST_HEAD(&cl->cli_sending_head);
- INIT_LIST_HEAD(&cl->cli_dying_head);
- spin_lock_init(&cl->cli_lock);
sema_init(&cl->cli_rpc_sem, 32);
}
}
struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl,
- struct ptlrpc_connection *conn,
int opcode, int count, int *lengths,
char **bufs)
{
struct ptlrpc_request *request;
+ struct ptlrpc_connection *conn = cl->cli_connection;
int rc;
ENTRY;
}
clobd = &export->exp_obd->u.cli;
- req = ptlrpc_prep_req(clobd->cl_client, clobd->cl_conn,
- opcode, count, lengths, bufs);
+ req = ptlrpc_prep_req(clobd->cl_client, opcode, count, lengths, bufs);
ptlrpc_hdl2req(req, &clobd->cl_exporth);
return req;
}
if (request->rq_reqmsg != NULL)
OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
- if (request->rq_client) {
- spin_lock(&request->rq_client->cli_lock);
+ if (request->rq_connection) {
+ spin_lock(&request->rq_connection->c_lock);
list_del_init(&request->rq_list);
- spin_unlock(&request->rq_client->cli_lock);
+ spin_unlock(&request->rq_connection->c_lock);
}
ptlrpc_put_connection(request->rq_connection);
}
/* caller must lock cli */
-void ptlrpc_free_committed(struct ptlrpc_client *cli)
+void ptlrpc_free_committed(struct ptlrpc_connection *conn)
{
struct list_head *tmp, *saved;
struct ptlrpc_request *req;
- list_for_each_safe(tmp, saved, &cli->cli_sending_head) {
+ list_for_each_safe(tmp, saved, &conn->c_sending_head) {
req = list_entry(tmp, struct ptlrpc_request, rq_list);
if ( (req->rq_flags & PTL_RPC_FL_REPLAY) ) {
}
/* not yet committed */
- if (req->rq_transno > cli->cli_last_committed)
+ if (req->rq_transno > conn->c_last_committed)
break;
CDEBUG(D_INFO, "Marking request xid %Ld as committed ("
"transno=%Lu, last_committed=%Lu\n",
(long long)req->rq_xid, (long long)req->rq_transno,
- (long long)cli->cli_last_committed);
+ (long long)conn->c_last_committed);
if (atomic_dec_and_test(&req->rq_refcount)) {
/* we do this to prevent free_req deadlock */
list_del_init(&req->rq_list);
ptlrpc_free_req(req);
} else {
list_del_init(&req->rq_list);
- list_add(&req->rq_list, &cli->cli_dying_head);
+ list_add(&req->rq_list, &conn->c_dying_head);
}
}
{
struct list_head *tmp, *saved;
struct ptlrpc_request *req;
+ struct ptlrpc_connection *conn = cli->cli_connection;
ENTRY;
- spin_lock(&cli->cli_lock);
- list_for_each_safe(tmp, saved, &cli->cli_sending_head) {
+ if (!conn) {
+ EXIT;
+ return;
+ }
+
+ spin_lock(&conn->c_lock);
+ list_for_each_safe(tmp, saved, &conn->c_sending_head) {
req = list_entry(tmp, struct ptlrpc_request, rq_list);
+ if (req->rq_client != cli)
+ continue;
CDEBUG(D_INFO, "Cleaning req %p from sending list.\n", req);
list_del_init(&req->rq_list);
req->rq_client = NULL;
ptlrpc_free_req(req);
}
- list_for_each_safe(tmp, saved, &cli->cli_dying_head) {
+ list_for_each_safe(tmp, saved, &conn->c_dying_head) {
req = list_entry(tmp, struct ptlrpc_request, rq_list);
+ if (req->rq_client != cli)
+ continue;
CERROR("Request %p is on the dying list at cleanup!\n", req);
list_del_init(&req->rq_list);
req->rq_client = NULL;
ptlrpc_free_req(req);
}
- spin_unlock(&cli->cli_lock);
+ spin_unlock(&conn->c_lock);
EXIT;
return;
req->rq_connection->c_level = LUSTRE_CONN_RECOVD;
req->rq_flags |= PTL_RPC_FL_TIMEOUT;
/* Activate the recovd for this client, if there is one. */
- if (req->rq_client && req->rq_client->cli_recovd)
- recovd_cli_fail(req->rq_client);
+ if (req->rq_client && req->rq_client->cli_connection &&
+ req->rq_client->cli_connection->c_recovd)
+ recovd_conn_fail(req->rq_client->cli_connection);
/* If this request is for recovery or other primordial tasks,
* don't go back to sleep.
int rc = 0;
struct l_wait_info lwi;
struct ptlrpc_client *cli = req->rq_client;
+ struct ptlrpc_connection *conn = cli->cli_connection;
ENTRY;
init_waitqueue_head(&req->rq_wait_for_rep);
CERROR("process %d waiting for recovery (%d > %d)\n",
current->pid, req->rq_level, req->rq_connection->c_level);
- spin_lock(&cli->cli_lock);
+ spin_lock(&conn->c_lock);
list_del_init(&req->rq_list);
- list_add_tail(&req->rq_list, &cli->cli_delayed_head);
- spin_unlock(&cli->cli_lock);
+ list_add_tail(&req->rq_list, &conn->c_delayed_head);
+ spin_unlock(&conn->c_lock);
lwi = LWI_INTR(NULL, NULL);
rc = l_wait_event(req->rq_wait_for_rep,
req->rq_level <= req->rq_connection->c_level,
&lwi);
- spin_lock(&cli->cli_lock);
+ spin_lock(&conn->c_lock);
list_del_init(&req->rq_list);
- spin_unlock(&cli->cli_lock);
+ spin_unlock(&conn->c_lock);
if (rc)
RETURN(rc);
RETURN(-rc);
}
- spin_lock(&cli->cli_lock);
+ spin_lock(&conn->c_lock);
list_del_init(&req->rq_list);
- list_add_tail(&req->rq_list, &cli->cli_sending_head);
- spin_unlock(&cli->cli_lock);
+ list_add_tail(&req->rq_list, &conn->c_sending_head);
+ spin_unlock(&conn->c_lock);
CDEBUG(D_OTHER, "-- sleeping\n");
lwi = LWI_TIMEOUT_INTR(req->rq_timeout * HZ, expired_request,
CDEBUG(D_NET, "--> buf %p len %d status %d\n", req->rq_repmsg,
req->rq_replen, req->rq_repmsg->status);
- spin_lock(&cli->cli_lock);
- cli->cli_last_xid = req->rq_repmsg->last_xid;
- cli->cli_last_committed = req->rq_repmsg->last_committed;
- ptlrpc_free_committed(cli);
- spin_unlock(&cli->cli_lock);
+ spin_lock(&conn->c_lock);
+ conn->c_last_xid = req->rq_repmsg->last_xid;
+ conn->c_last_committed = req->rq_repmsg->last_committed;
+ ptlrpc_free_committed(conn);
+ spin_unlock(&conn->c_lock);
EXIT;
out: