__u32 cli_request_portal;
__u32 cli_reply_portal;
- struct semaphore cli_rpc_sem;
- struct list_head cli_sending_head;
+ struct semaphore cli_rpc_sem; /* limits outstanding requests */
+
+ struct list_head cli_sending_head; /* lists protected by ha_mgr lock */
struct list_head cli_sent_head;
struct list_head cli_ha_item;
+
struct connmgr_obd *cli_ha_mgr;
};
/* rpc/niobuf.c */
int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *);
int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *, int portal);
-int ptl_send_buf(struct ptlrpc_request *, struct ptlrpc_connection *,
- int portal);
int ptlrpc_register_bulk(struct ptlrpc_bulk_desc *);
int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *bulk);
int ptlrpc_reply(struct ptlrpc_service *svc, struct ptlrpc_request *req);
struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl,
struct ptlrpc_connection *u, int opcode,
int count, int *lengths, char **bufs);
+void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *bulk);
void ptlrpc_free_req(struct ptlrpc_request *request);
struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct ptlrpc_connection *);
int ptlrpc_check_status(struct ptlrpc_request *req, int err);
struct ldlm_request *body;
struct ldlm_reply *reply;
struct ldlm_lock *lock;
- struct ptlrpc_request *req;
- int rc, size[2] = {sizeof(*body), lock->l_data_len};
- char *bufs[2] = {NULL, lock->l_data};
+ struct ptlrpc_request *req = NULL;
+ int rc, size[2] = {sizeof(*body), 0};
+ char *bufs[2] = {NULL, NULL};
ENTRY;
lock = ldlm_handle2object(lockh);
if (is_local_conn(lock->l_connection))
GOTO(local, 0);
+ size[1] = lock->l_data_len;
+ bufs[1] = lock->l_data;
req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CONVERT, 2, size,
bufs);
if (!req)
{
struct ldlm_request *body;
struct ldlm_lock *lock;
- struct ptlrpc_request *req;
- int rc, size[2] = {sizeof(*body), lock->l_data_len};
- char *bufs[2] = {NULL, lock->l_data};
+ struct ptlrpc_request *req = NULL;
+ int rc, size[2] = {sizeof(*body), 0};
+ char *bufs[2] = {NULL, NULL};
ENTRY;
lock = ldlm_handle2object(lockh);
if (is_local_conn(lock->l_connection))
GOTO(local, 0);
+ size[1] = lock->l_data_len;
+ bufs[1] = lock->l_data;
req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CANCEL, 2, size,
bufs);
if (!req)
out:
*request = req;
- if (bulk != NULL)
- OBD_FREE(bulk, sizeof(*bulk));
+ ptlrpc_free_bulk(bulk);
return rc;
}
cleanup_buf:
OBD_FREE(buf, PAGE_SIZE);
cleanup_bulk:
- OBD_FREE(bulk, sizeof(*bulk));
+ ptlrpc_free_bulk(bulk);
out:
return rc;
}
rc = ptlrpc_send_bulk(bulk, OSC_BULK_PORTAL);
if (rc != 0) {
CERROR("send_bulk failed: %d\n", rc);
+ ptlrpc_free_bulk(bulk);
LBUG();
return rc;
}
ptlrpc_check_bulk_sent(bulk));
if (bulk->b_flags == PTL_RPC_INTR) {
- EXIT;
- /* FIXME: hey hey, we leak here. */
- return -EINTR;
+ ptlrpc_free_bulk(bulk);
+ RETURN(-EINTR);
}
- OBD_FREE(bulk, sizeof(*bulk));
+ ptlrpc_free_bulk(bulk);
}
return 0;
return bulk;
}
+void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *bulk)
+{
+ if (bulk == NULL)
+ return;
+
+ ptlrpc_put_connection(bulk->b_connection);
+
+ OBD_FREE(bulk, sizeof(*bulk));
+}
+
struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl,
struct ptlrpc_connection *conn,
int opcode, int count, int *lengths,
request->rq_reqmsg->token = conn->c_remote_token;
request->rq_reqmsg->opc = HTON__u32(opcode);
request->rq_reqmsg->type = HTON__u32(request->rq_type);
+ INIT_LIST_HEAD(&request->rq_list);
spin_lock(&conn->c_lock);
request->rq_reqmsg->xid = HTON__u32(++conn->c_xid_out);
if (request->rq_repmsg != NULL)
OBD_FREE(request->rq_repmsg, request->rq_replen);
+ spin_lock(&request->rq_client->cli_ha_mgr->mgr_lock);
+ list_del(&request->rq_list);
+ spin_unlock(&request->rq_client->cli_ha_mgr->mgr_lock);
+
ptlrpc_put_connection(request->rq_connection);
OBD_FREE(request, sizeof(*request));
GOTO(out, rc = 1);
}
- if (CURRENT_TIME - req->rq_time >= 3) {
- CERROR("-- REQ TIMEOUT --\n");
+ if (CURRENT_TIME - req->rq_time >= 3) {
+ CERROR("-- REQ TIMEOUT --\n");
if (req->rq_client && req->rq_client->cli_ha_mgr)
- connmgr_cli_fail(req->rq_client);
+ connmgr_cli_fail(req->rq_client);
return 0;
}
ENTRY;
if (ev->type == PTL_EVENT_SENT) {
+ spin_lock(&req->rq_client->cli_ha_mgr->mgr_lock);
list_del(&req->rq_list);
list_add(&req->rq_list, &cl->cli_sent_head);
+ spin_unlock(&req->rq_client->cli_ha_mgr->mgr_lock);
} else {
// XXX make sure we understand all events, including ACK's
CERROR("Unknown event %d\n", ev->type);
RETURN(0);
}
-int ptl_send_buf(struct ptlrpc_request *request, struct ptlrpc_connection *conn,
- int portal)
+static int ptl_send_buf(struct ptlrpc_request *request,
+ struct ptlrpc_connection *conn, int portal)
{
int rc;
ptl_process_id_t remote_id;
request->rq_replen, request->rq_reqmsg->xid,
request->rq_client->cli_request_portal);
+ spin_lock(&request->rq_client->cli_ha_mgr->mgr_lock);
list_add(&request->rq_list, &request->rq_client->cli_sending_head);
+ spin_unlock(&request->rq_client->cli_ha_mgr->mgr_lock);
+
rc = ptl_send_buf(request, request->rq_connection,
request->rq_client->cli_request_portal);
RETURN(rc);
if (request.rq_reqmsg->conn) {
request.rq_connection =
(void *)(unsigned long)request.rq_reqmsg->conn;
- if (request.rq_reqmsg->token != request.rq_connection->c_token)
+ if (request.rq_reqmsg->token !=
+ request.rq_connection->c_token) {
+ struct ptlrpc_connection *tmp;
+ tmp = ptlrpc_get_connection(&peer);
+ CERROR("rq_reqmsg->conn: %p\n", request.rq_connection);
+ CERROR("real connection: %p\n", tmp);
+ CERROR("rq_reqmsg->token: %Lu\n",
+ request.rq_reqmsg->token);
+ CERROR("real token : %Lu\n", tmp->c_token);
LBUG();
+ }
ptlrpc_connection_addref(request.rq_connection);
} else {
request.rq_connection = ptlrpc_get_connection(&peer);