do { \
if (((mask) & (D_ERROR | D_EMERG | D_WARNING | D_CONSOLE)) || \
(libcfs_debug & (mask) && \
- libcfs_subsystem_debug & DEBUG_SUBSYSTEM)) \
- fprintf(stderr, "(%s:%d:%s()) " format, \
- __FILE__, __LINE__, __FUNCTION__, ## a); \
+ libcfs_subsystem_debug & DEBUG_SUBSYSTEM)) { \
+ libcfs_debug_msg(DEBUG_SUBSYSTEM, mask, __FILE__, \
+ __FUNCTION__, __LINE__, 0, format, ## a);\
+ } \
} while (0)
#define CDEBUG_LIMIT CDEBUG
# define RECALC_SIGPENDING recalc_sigpending()
# define CLEAR_SIGPENDING (current->sigpending = 0)
# define CURRENT_SECONDS CURRENT_TIME
+# define wait_event_interruptible_exclusive(wq, condition) \
+ wait_event_interruptible(wq, condition)
#else /* 2.4.x */
# define RECALC_SIGPENDING recalc_sigpending(current)
# define CLEAR_SIGPENDING (current->sigpending = 0)
# define CURRENT_SECONDS CURRENT_TIME
+# define wait_event_interruptible_exclusive(wq, condition) \
+ wait_event_interruptible(wq, condition)
#endif
* list_move - delete from one list and add as another's head
* @list: the entry to move
* @head: the head that will precede our entry
+ *
+ * This is not safe to use if @list is already on the same list as @head.
*/
static inline void list_move(struct list_head *list, struct list_head *head)
{
* list_move_tail - delete from one list and add as another's tail
* @list: the entry to move
* @head: the head that will follow our entry
+ *
+ * This is not safe to use if @list is already on the same list as @head.
*/
static inline void list_move_tail(struct list_head *list,
struct list_head *head)
#define hlist_head list_head
#define hlist_node list_head
+#define hlist_del_init list_del_init
#endif /* __linux__*/
unsigned int offset, unsigned int len);
int lnet_send(lnet_nid_t nid, lnet_msg_t *msg);
void lnet_return_credits_locked (lnet_msg_t *msg);
-int lnet_parse (lnet_ni_t *ni, lnet_hdr_t *hdr,
- lnet_nid_t fromnid, void *private);
+int lnet_parse (lnet_ni_t *ni, lnet_hdr_t *hdr,
+ lnet_nid_t fromnid, void *private, int rdma_req);
void lnet_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg, int delayed,
unsigned int offset, unsigned int mlen, unsigned int rlen);
lnet_msg_t *lnet_create_reply_msg (lnet_ni_t *ni, lnet_msg_t *get_msg);
int msg_ack:1; /* ack on finalize (PUT) */
int msg_sending:1; /* outgoing message */
int msg_receiving:1; /* being received */
- int msg_recvaftersend:1; /* lnd_recv() outstanding */
int msg_delayed:1; /* had to Q for buffer or tx credit */
int msg_txcredit:1; /* taken an NI send credit */
int msg_peertxcredit:1; /* taken a peer send credit */
tx->tx_msgnob += len;
tx->tx_large_nob = 0;
-
- /* We've copied everything... */
- LASSERT(tx->tx_lntmsg == NULL);
- lnet_finalize(ni, lntmsg, 0);
} else {
/* stash payload pts to copy later */
tx->tx_large_nob = len;
tx->tx_large_frags.kiov = kiov;
else
tx->tx_large_frags.iov = iov;
-
- /* finalize later */
- tx->tx_lntmsg = lntmsg;
}
+
+ LASSERT(tx->tx_lntmsg == NULL);
+ tx->tx_lntmsg = lntmsg;
spin_lock(&gmni->gmni_tx_lock);
{
gmnal_ni_t *gmni = tx->tx_gmni;
int wake_sched = 0;
+ lnet_msg_t *lnetmsg = tx->tx_lntmsg;
- LASSERT(tx->tx_lntmsg == NULL);
+ tx->tx_lntmsg = NULL;
spin_lock(&gmni->gmni_tx_lock);
gmnal_check_txqueues_locked(gmni);
spin_unlock(&gmni->gmni_tx_lock);
+
+ /* Delay finalize until tx is free */
+ if (lnetmsg != NULL)
+ lnet_finalize(gmni->gmni_ni, lnetmsg, 0);
}
void
tx->tx_msgnob += tx->tx_large_nob;
- /* We've copied everything... */
- lnet_finalize(gmni->gmni_ni, tx->tx_lntmsg, 0);
- tx->tx_lntmsg = NULL;
-
spin_lock(&gmni->gmni_tx_lock);
}
- LASSERT (tx->tx_lntmsg == NULL);
-
list_add_tail(&tx->tx_list, &gmni->gmni_cred_txq);
}
rc = lnet_parse(gmni->gmni_ni,
&msg->gmm_u.immediate.gmim_hdr,
msg->gmm_srcnid,
- rx);
+ rx, 0);
}
if (rc < 0) /* parse failure */
{
struct list_head rx_list; /* queue for attention */
struct kib_conn *rx_conn; /* owning conn */
- int rx_responded; /* responded to peer? */
int rx_nob; /* # bytes received (-1 while posted) */
__u64 rx_hca_msg; /* pre-mapped buffer (hca vaddr) */
kib_msg_t *rx_msg; /* pre-mapped buffer (host vaddr) */
__u64 ibc_rxseq; /* rx sequence number */
atomic_t ibc_refcount; /* # users */
int ibc_state; /* what's happening */
- atomic_t ibc_nob; /* # bytes buffered */
int ibc_nsends_posted; /* # uncompleted sends */
int ibc_credits; /* # credits I have */
int ibc_outstanding_credits; /* # credits to return */
void
kibnal_tx_done (kib_tx_t *tx)
{
- int rc = tx->tx_status;
- int i;
+ lnet_msg_t *lntmsg[2];
+ int rc = tx->tx_status;
+ int i;
LASSERT (!in_interrupt());
LASSERT (!tx->tx_queued); /* mustn't be queued for sending */
#if IBNAL_USE_FMR
/* Handle unmapping if required */
#endif
- for (i = 0; i < 2; i++) {
- /* tx may have up to 2 ptlmsgs to finalise */
- if (tx->tx_lntmsg[i] == NULL)
- continue;
-
- lnet_finalize (kibnal_data.kib_ni, tx->tx_lntmsg[i], rc);
- tx->tx_lntmsg[i] = NULL;
- }
+ /* tx may have up to 2 lnet msgs to finalise */
+ lntmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
+ lntmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
if (tx->tx_conn != NULL) {
kibnal_conn_decref(tx->tx_conn);
list_add (&tx->tx_list, &kibnal_data.kib_idle_txs);
spin_unlock(&kibnal_data.kib_tx_lock);
+
+ /* delay finalize until my descs have been freed */
+ for (i = 0; i < 2; i++) {
+ if (lntmsg[i] == NULL)
+ continue;
+
+ lnet_finalize (kibnal_data.kib_ni, lntmsg[i], rc);
+ }
}
kib_tx_t *
kibnal_check_sends(conn);
}
- /* clear flag so GET_REQ can see if it caused a REPLY */
- rx->rx_responded = 0;
-
switch (msg->ibm_type) {
default:
CERROR("Bad IBNAL message type %x from %s\n",
case IBNAL_MSG_IMMEDIATE:
rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.immediate.ibim_hdr,
- msg->ibm_srcnid, rx);
+ msg->ibm_srcnid, rx, 0);
repost = rc < 0; /* repost on error */
break;
case IBNAL_MSG_PUT_REQ:
rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.putreq.ibprm_hdr,
- msg->ibm_srcnid, rx);
+ msg->ibm_srcnid, rx, 1);
repost = rc < 0; /* repost on error */
break;
case IBNAL_MSG_GET_REQ:
rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.get.ibgm_hdr,
- msg->ibm_srcnid, rx);
+ msg->ibm_srcnid, rx, 1);
repost = rc < 0; /* repost on error */
break;
kibnal_launch_tx(tx, target.nid);
return 0;
- case LNET_MSG_REPLY: {
- /* reply's 'private' is the incoming receive */
- kib_rx_t *rx = private;
-
- LASSERT(routing || rx != NULL);
-
- if (!routing && rx->rx_msg->ibm_type != IBNAL_MSG_IMMEDIATE) {
- /* Incoming message consistent with RDMA? */
- if (rx->rx_msg->ibm_type != IBNAL_MSG_GET_REQ) {
- CERROR("REPLY to %s bad msg type %x!!!\n",
- libcfs_nid2str(target.nid),
- rx->rx_msg->ibm_type);
- return -EIO;
- }
-
- /* NB handle_rx() will send GET_NAK when I return to
- * it from here, unless I set rx_responded! */
-
- tx = kibnal_get_idle_tx();
- if (tx == NULL) {
- CERROR("Can't get tx for REPLY to %s\n",
- libcfs_nid2str(target.nid));
- return -ENOMEM;
- }
-
- if (payload_nob == 0)
- rc = 0;
- else if (payload_kiov == NULL)
- rc = kibnal_setup_rd_iov(
- tx, tx->tx_rd, 1,
- payload_niov, payload_iov,
- payload_offset, payload_nob);
- else
- rc = kibnal_setup_rd_kiov(
- tx, tx->tx_rd, 1,
- payload_niov, payload_kiov,
- payload_offset, payload_nob);
- if (rc != 0) {
- CERROR("Can't setup GET src for %s: %d\n",
- libcfs_nid2str(target.nid), rc);
- kibnal_tx_done(tx);
- return -EIO;
- }
-
- rc = kibnal_init_rdma(tx, IBNAL_MSG_GET_DONE,
- payload_nob,
- &rx->rx_msg->ibm_u.get.ibgm_rd,
- rx->rx_msg->ibm_u.get.ibgm_cookie);
- if (rc < 0) {
- CERROR("Can't setup rdma for GET from %s: %d\n",
- libcfs_nid2str(target.nid), rc);
- } else if (rc == 0) {
- /* No RDMA: local completion may happen now! */
- lnet_finalize (kibnal_data.kib_ni, lntmsg, 0);
- } else {
- /* RDMA: lnet_finalize(lntmsg) when it
- * completes */
- tx->tx_lntmsg[0] = lntmsg;
- }
-
- kibnal_queue_tx(tx, rx->rx_conn);
- rx->rx_responded = 1;
- return (rc >= 0) ? 0 : -EIO;
- }
- /* fall through to handle like PUT */
- }
-
+ case LNET_MSG_REPLY:
case LNET_MSG_PUT:
/* Is the payload small enough not to need RDMA? */
nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
return 0;
}
+void
+kibnal_reply(lnet_ni_t *ni, kib_rx_t *rx, lnet_msg_t *lntmsg)
+{
+ lnet_process_id_t target = lntmsg->msg_target;
+ unsigned int niov = lntmsg->msg_niov;
+ struct iovec *iov = lntmsg->msg_iov;
+ lnet_kiov_t *kiov = lntmsg->msg_kiov;
+ unsigned int offset = lntmsg->msg_offset;
+ unsigned int nob = lntmsg->msg_len;
+ kib_tx_t *tx;
+ int rc;
+
+ tx = kibnal_get_idle_tx();
+ if (tx == NULL) {
+ CERROR("Can't get tx for REPLY to %s\n",
+ libcfs_nid2str(target.nid));
+ goto failed_0;
+ }
+
+ if (nob == 0)
+ rc = 0;
+ else if (kiov == NULL)
+ rc = kibnal_setup_rd_iov(tx, tx->tx_rd, 1,
+ niov, iov, offset, nob);
+ else
+ rc = kibnal_setup_rd_kiov(tx, tx->tx_rd, 1,
+ niov, kiov, offset, nob);
+
+ if (rc != 0) {
+ CERROR("Can't setup GET src for %s: %d\n",
+ libcfs_nid2str(target.nid), rc);
+ goto failed_1;
+ }
+
+ rc = kibnal_init_rdma(tx, IBNAL_MSG_GET_DONE, nob,
+ &rx->rx_msg->ibm_u.get.ibgm_rd,
+ rx->rx_msg->ibm_u.get.ibgm_cookie);
+ if (rc < 0) {
+ CERROR("Can't setup rdma for GET from %s: %d\n",
+ libcfs_nid2str(target.nid), rc);
+ goto failed_1;
+ }
+
+ if (rc == 0) {
+ /* No RDMA: local completion may happen now! */
+ lnet_finalize(ni, lntmsg, 0);
+ } else {
+ /* RDMA: lnet_finalize(lntmsg) when it
+ * completes */
+ tx->tx_lntmsg[0] = lntmsg;
+ }
+
+ kibnal_queue_tx(tx, rx->rx_conn);
+ return;
+
+ failed_1:
+ kibnal_tx_done(tx);
+ failed_0:
+ lnet_finalize(ni, lntmsg, -EIO);
+}
+
int
kibnal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
break;
case IBNAL_MSG_GET_REQ:
- LASSERT (lntmsg == NULL); /* no need to finalise */
- if (!rx->rx_responded) {
+ if (lntmsg != NULL) {
+ /* Optimized GET; RDMA lntmsg's payload */
+ kibnal_reply(ni, rx, lntmsg);
+ } else {
/* GET didn't match anything */
kibnal_send_completion(rx->rx_conn, IBNAL_MSG_GET_DONE,
-ENODATA,
/* Nothing to do; sleep... */
set_current_state(TASK_INTERRUPTIBLE);
- add_wait_queue(&kibnal_data.kib_sched_waitq, &wait);
+ add_wait_queue_exclusive(&kibnal_data.kib_sched_waitq, &wait);
spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
flags);
{
struct list_head rx_list; /* queue for attention */
struct kib_conn *rx_conn; /* owning conn */
- int rx_rdma; /* RDMA completion posted? */
int rx_nob; /* # bytes received (-1 while posted) */
__u64 rx_vaddr; /* pre-mapped buffer (hca vaddr) */
kib_msg_t *rx_msg; /* pre-mapped buffer (host vaddr) */
__u64 ibc_incarnation; /* which instance of the peer */
atomic_t ibc_refcount; /* # users */
int ibc_state; /* what's happening */
- atomic_t ibc_nob; /* # bytes buffered */
int ibc_nsends_posted; /* # uncompleted sends */
int ibc_credits; /* # credits I have */
int ibc_outstanding_credits; /* # credits to return */
void
kibnal_tx_done (kib_tx_t *tx)
{
+ lnet_msg_t *lntmsg[2];
unsigned long flags;
int i;
int rc;
#endif
}
- for (i = 0; i < 2; i++) {
- /* tx may have up to 2 ptlmsgs to finalise */
- if (tx->tx_lntmsg[i] == NULL)
- continue;
+ /* tx may have up to 2 ptlmsgs to finalise */
+ lntmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
+ lntmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
+ rc = tx->tx_status;
- lnet_finalize (kibnal_data.kib_ni, tx->tx_lntmsg[i],
- tx->tx_status);
- tx->tx_lntmsg[i] = NULL;
- }
-
if (tx->tx_conn != NULL) {
kibnal_put_conn (tx->tx_conn);
tx->tx_conn = NULL;
list_add_tail (&tx->tx_list, &kibnal_data.kib_idle_txs);
spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
+
+ /* delay finalize until my descs have been freed */
+ for (i = 0; i < 2; i++) {
+ if (lntmsg[i] == NULL)
+ continue;
+
+ lnet_finalize (kibnal_data.kib_ni, lntmsg[i], rc);
+ }
}
kib_tx_t *
int rc = 0;
kib_msg_t *msg = rx->rx_msg;
- /* Clear flag so I can detect if I've sent an RDMA completion */
- rx->rx_rdma = 0;
-
switch (msg->ibm_type) {
case IBNAL_MSG_GET_RDMA:
rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.rdma.ibrm_hdr,
- msg->ibm_srcnid, rx);
+ msg->ibm_srcnid, rx, 1);
break;
case IBNAL_MSG_PUT_RDMA:
rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.rdma.ibrm_hdr,
- msg->ibm_srcnid, rx);
+ msg->ibm_srcnid, rx, 1);
break;
case IBNAL_MSG_IMMEDIATE:
rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.immediate.ibim_hdr,
- msg->ibm_srcnid, rx);
+ msg->ibm_srcnid, rx, 0);
break;
default:
LASSERT (type == IBNAL_MSG_GET_DONE ||
type == IBNAL_MSG_PUT_DONE);
- /* Flag I'm completing the RDMA. Even if I fail to send the
- * completion message, I will have tried my best so further
- * attempts shouldn't be tried. */
- LASSERT (!rx->rx_rdma);
- rx->rx_rdma = 1;
-
if (type == IBNAL_MSG_GET_DONE) {
access = 0;
rdma_op = IB_OP_RDMA_WRITE;
NULL, lntmsg->msg_md->md_iov.kiov,
lntmsg->msg_md->md_length);
- case LNET_MSG_REPLY: {
- /* reply's 'private' is the incoming receive */
- kib_rx_t *rx = private;
-
- LASSERT (routing || rx != NULL);
-
- /* RDMA reply expected? */
- if (!routing && rx->rx_msg->ibm_type != IBNAL_MSG_IMMEDIATE) {
- /* Incoming message consistent with RDMA */
- if (rx->rx_msg->ibm_type != IBNAL_MSG_GET_RDMA) {
- CERROR ("REPLY to %s bad ibm type %d!!!\n",
- libcfs_nid2str(target.nid),
- rx->rx_msg->ibm_type);
- return (-EIO);
- }
-
- kibnal_start_active_rdma(IBNAL_MSG_GET_DONE, 0,
- rx, lntmsg, payload_niov,
- payload_iov, payload_kiov,
- payload_offset, payload_nob);
- return (0);
- }
- /* Fall through to handle like PUT */
- }
-
+ case LNET_MSG_REPLY:
case LNET_MSG_PUT:
/* Is the payload small enough not to need RDMA? */
nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
break;
case IBNAL_MSG_GET_RDMA:
- LASSERT (lntmsg == NULL); /* no need to finalise */
- if (!rx->rx_rdma) {
+ if (lntmsg != NULL) {
+ /* GET matched: RDMA lntmsg's payload */
+ kibnal_start_active_rdma(IBNAL_MSG_GET_DONE, 0,
+ rx, lntmsg,
+ lntmsg->msg_niov,
+ lntmsg->msg_iov,
+ lntmsg->msg_kiov,
+ lntmsg->msg_offset,
+ lntmsg->msg_len);
+ } else {
/* GET didn't match anything */
kibnal_start_active_rdma (IBNAL_MSG_GET_DONE, -ENODATA,
rx, NULL, 0, NULL, NULL, 0, 0);
continue;
set_current_state (TASK_INTERRUPTIBLE);
- add_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
+ add_wait_queue_exclusive(&kibnal_data.kib_connd_waitq, &wait);
spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
counter = 0;
if (!did_something) {
- rc = wait_event_interruptible(
+ rc = wait_event_interruptible_exclusive(
kibnal_data.kib_sched_waitq,
!list_empty(&kibnal_data.kib_sched_txq) ||
!list_empty(&kibnal_data.kib_sched_rxq) ||
"ptllnd_rx",
sizeof(kptl_rx_t) + *kptllnd_tunables.kptl_max_msg_size,
0, /* offset */
- 0, /* flags */
- NULL,NULL); /* CTOR/DTOR */
+ 0); /* flags */
if( kptllnd_data->kptl_rx_cache == NULL ){
CERROR("Can't create slab for RX descriptrs\n");
goto failed;
kptl_tx_t *tx = NULL;
kptl_data_t *kptllnd_data = ni->ni_data;
int nob;
- int rc;
PJK_UT_MSG_DATA(">>> SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS\n");
PJK_UT_MSG_DATA("nob=%d nov=%d offset=%d to %s\n",
LBUG();
return -EINVAL;
+ case LNET_MSG_REPLY:
case LNET_MSG_PUT:
- PJK_UT_MSG_DATA("LNET_MSG_PUT\n");
+ PJK_UT_MSG_DATA("LNET_MSG_PUT/REPLY\n");
/*
* Get an idle tx descriptor
if (nob <= *kptllnd_tunables.kptl_max_msg_size)
break;
-
- STAT_UPDATE(kps_send_put);
+ if (type == LNET_MSG_REPLY)
+ STAT_UPDATE(kps_send_reply);
+ else
+ STAT_UPDATE(kps_send_put);
kptllnd_do_put(tx,lntmsg,kptllnd_data);
PJK_UT_MSG_DATA("LNET_MSG_ACK\n");
LASSERT (payload_nob == 0);
break;
-
- case LNET_MSG_REPLY:
- PJK_UT_MSG_DATA("LNET_MSG_REPLY\n");
-
- STAT_UPDATE(kps_send_reply);
-
- if(routing!=0 || target_is_router!=0)
- {
- /*
- * Get an idle tx descriptor
- */
- tx = kptllnd_get_idle_tx(kptllnd_data,TX_TYPE_LARGE_PUT);
- if(tx == NULL){
- CERROR ("Can't send %d to %s: tx descs exhausted\n",
- type, libcfs_id2str(target));
- return -ENOMEM;
- }
-
- /* Is the payload small enough not to need RDMA? */
- nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[payload_nob]);
- if (nob <= *kptllnd_tunables.kptl_max_msg_size)
- break;
-
- kptllnd_do_put(tx,lntmsg,kptllnd_data);
-
- PJK_UT_MSG_DATA("<<< SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS\n");
- return 0;
- }else{
- /*
- * Reply's private is the incoming rx descriptor
- */
- kptl_rx_t *rx = private;
- LASSERT(rx != NULL);
-
- /*
- * If the request was to NOT do RDMA
- * break out and just send back an IMMEDIATE message
- */
- if (rx->rx_msg->ptlm_type == PTLLND_MSG_TYPE_IMMEDIATE) {
- /* RDMA not expected */
- nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[payload_nob]);
- if (nob > *kptllnd_tunables.kptl_max_msg_size) {
- CERROR("REPLY for %s too big but RDMA not requested:"
- "%d (max for message is %d)\n",
- libcfs_id2str(target), payload_nob,
- *kptllnd_tunables.kptl_max_msg_size);
- CERROR("Can't REPLY IMMEDIATE %d to %s\n",
- nob, libcfs_id2str(target));
- return -EINVAL;
- }
- break;
- }
-
-
- /* Incoming message consistent with RDMA? */
- if (rx->rx_msg->ptlm_type != PTLLND_MSG_TYPE_GET) {
- CERROR("REPLY to %s bad msg type %x!!!\n",
- libcfs_id2str(target), rx->rx_msg->ptlm_type);
- return -EINVAL;
- }
-
- rc = kptllnd_start_bulk_rdma(
- kptllnd_data,
- rx,
- lntmsg,
- PTL_MD_OP_PUT,
- payload_niov,
- payload_iov,
- payload_kiov,
- payload_offset,
- payload_nob);
- PJK_UT_MSG_DATA("<<< SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS rc=%d\n",rc);
- return rc;
- }
}
case PTLLND_MSG_TYPE_GET:
PJK_UT_MSG_DATA("PTLLND_MSG_TYPE_GET\n");
- /* We get called here just to discard any junk after the
- * GET hdr. */
- LASSERT (lntmsg == NULL); /* What is this all about ???*/
-
- lnet_finalize (ni, lntmsg, 0);
- rc = 0;
+ if (lntmsg == NULL) {
+ /* No match for the GET request */
+ /* XXX should RDMA 0 bytes of payload + hdr data saying GET failed */
+ rc = 0;
+ } else {
+ /* GET matched */
+ rc = kptllnd_start_bulk_rdma(
+ kptllnd_data,
+ rx,
+ lntmsg,
+ PTL_MD_OP_PUT,
+ lntmsg->msg_niov,
+ lntmsg->msg_iov,
+ lntmsg->msg_kiov,
+ lntmsg->msg_offset,
+ lntmsg->msg_len);
+ PJK_UT_MSG_DATA("<<< SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS rc=%d\n",rc);
+ }
break;
case PTLLND_MSG_TYPE_PUT:
set_current_state (TASK_INTERRUPTIBLE);
cfs_waitq_add(&kptllnd_data->kptl_sched_waitq, &waitlink);
- cfs_waitq_timedwait(&waitlink,cfs_time_seconds(PTLLND_TIMEOUT_SEC));
+ cfs_waitq_timedwait(&waitlink,CFS_TASK_INTERRUPTIBLE,
+ cfs_time_seconds(PTLLND_TIMEOUT_SEC));
set_current_state (TASK_RUNNING);
cfs_waitq_del (&kptllnd_data->kptl_sched_waitq, &waitlink);
*/
set_current_state (TASK_INTERRUPTIBLE);
- cfs_waitq_add(&kptllnd_data->kptl_sched_waitq, &waitlink);
- cfs_waitq_timedwait(&waitlink,cfs_time_seconds(PTLLND_TIMEOUT_SEC));
+ cfs_waitq_add_exclusive(&kptllnd_data->kptl_sched_waitq, &waitlink);
+ cfs_waitq_timedwait(&waitlink, CFS_TASK_INTERRUPTIBLE,
+ cfs_time_seconds(PTLLND_TIMEOUT_SEC));
set_current_state (TASK_RUNNING);
cfs_waitq_del (&kptllnd_data->kptl_sched_waitq, &waitlink);
rc = lnet_parse(kptllnd_data->kptl_ni,
&msg->ptlm_u.immediate.kptlim_hdr,
msg->ptlm_srcnid,
- rx);
+ rx, 0);
/* RX Completing asynchronously */
if( rc >= 0)
rx = 0;
rc = lnet_parse(kptllnd_data->kptl_ni,
&msg->ptlm_u.req.kptlrm_hdr,
msg->ptlm_srcnid,
- rx);
+ rx, 1);
/* RX Completing asynchronously */
if( rc >= 0)
return 0;
}
- rx = cfs_mem_cache_alloc ( kptllnd_data->kptl_rx_cache , CFS_SLAB_ATOMIC);
+ rx = cfs_mem_cache_alloc(kptllnd_data->kptl_rx_cache , CFS_ALLOC_ATOMIC);
if(rx == 0 ){
CERROR("Failed to allocate rx\n");
STAT_UPDATE(kps_rx_allocation_failed);
void
kptllnd_tx_done (kptl_tx_t *tx)
{
+ lnet_msg_t *lnetmsg[2];
+ int status = tx->tx_status;
kptl_data_t *kptllnd_data = tx->tx_po.po_kptllnd_data;
+
LASSERT (!in_interrupt());
PJK_UT_MSG(">>> tx=%p\n",tx);
LASSERT(atomic_read(&tx->tx_refcount) == 0);
LASSERT(list_empty(&tx->tx_schedlist)); /*not any the scheduler list*/
- if(tx->tx_ptlmsg != NULL){
- PJK_UT_MSG("tx=%p finalize\n",tx);
- lnet_finalize (kptllnd_data->kptl_ni, tx->tx_ptlmsg, tx->tx_status);
- tx->tx_ptlmsg = NULL;
- }
- if(tx->tx_ptlmsg_reply != NULL){
- PJK_UT_MSG("tx=%p finalize reply\n",tx);
- lnet_finalize (kptllnd_data->kptl_ni, tx->tx_ptlmsg_reply, tx->tx_status);
- tx->tx_ptlmsg_reply = NULL;
- }
+ /* stash lnet msgs for finalize AFTER I free this tx desc */
+ lnetmsg[0] = tx->tx_ptlmsg; tx->tx_ptlmsg = NULL;
+ lnetmsg[1] = tx->tx_ptlmsg_reply; tx->tx_ptlmsg_reply = NULL;
/*
* Release the associated RX if there is one
list_add (&tx->tx_list, &kptllnd_data->kptl_idle_txs);
STAT_UPDATE(kps_tx_released);
spin_unlock(&kptllnd_data->kptl_tx_lock);
+
+ if (lnetmsg[0] != NULL)
+ lnet_finalize(kptllnd_data->kptl_ni, lnetmsg[0], status);
+
+ if (lnetmsg[1] != NULL)
+ lnet_finalize(kptllnd_data->kptl_ni, lnetmsg[1], status);
PJK_UT_MSG("<<< tx=%p\n",tx);
}
void
kqswnal_tx_done_in_thread_context (kqswnal_tx_t *ktx)
{
+ lnet_msg_t *lnetmsg[2];
+ int status[2];
+ int nlnetmsg = 0;
+
LASSERT (!in_interrupt());
if (ktx->ktx_status == -EHOSTDOWN)
case KTX_RDMAING: /* optimized GET/PUT handled */
case KTX_PUTTING: /* optimized PUT sent */
case KTX_SENDING: /* normal send */
- lnet_finalize (kqswnal_data.kqn_ni,
- (lnet_msg_t *)ktx->ktx_args[1],
- ktx->ktx_status);
+ lnetmsg[0] = (lnet_msg_t *)ktx->ktx_args[1];
+ status[0] = ktx->ktx_status;
+ nlnetmsg = 1;
break;
case KTX_GETTING: /* optimized GET sent & REPLY received */
/* Complete the GET with success since we can't avoid
* delivering a REPLY event; we committed to it when we
* launched the GET */
- lnet_finalize (kqswnal_data.kqn_ni,
- (lnet_msg_t *)ktx->ktx_args[1], 0);
- lnet_finalize (kqswnal_data.kqn_ni,
- (lnet_msg_t *)ktx->ktx_args[2],
- ktx->ktx_status);
+ lnetmsg[0] = (lnet_msg_t *)ktx->ktx_args[1];
+ status[0] = 0;
+ lnetmsg[1] = (lnet_msg_t *)ktx->ktx_args[2];
+ status[1] = ktx->ktx_status;
+ nlnetmsg = 2;
break;
default:
}
kqswnal_put_idle_tx (ktx);
+
+ while (nlnetmsg-- > 0)
+ lnet_finalize (kqswnal_data.kqn_ni,
+ lnetmsg[nlnetmsg], status[nlnetmsg]);
}
void
/* payload is either all vaddrs or all pages */
LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
- if (type == LNET_MSG_REPLY) {
- kqswnal_rx_t *rx = (kqswnal_rx_t *)private;
-
- LASSERT (routing || rx != NULL);
-
- if (!routing && rx->krx_rpc_reply_needed) { /* is it an RPC */
- /* Must be a REPLY for an optimized GET */
- return kqswnal_rdma (
- rx, lntmsg, LNET_MSG_GET,
- payload_niov, payload_iov, payload_kiov,
- payload_offset, payload_nob);
- }
- }
-
if (kqswnal_nid2elanid (target.nid) < 0) {
CERROR("%s not in my cluster\n", libcfs_nid2str(target.nid));
return -EIO;
target_is_router ? "(router)" : "", rc);
if (rc != 0) {
- if (ktx->ktx_state == KTX_GETTING &&
- ktx->ktx_args[2] != NULL) {
+ lnet_msg_t *repmsg = (lnet_msg_t *)ktx->ktx_args[2];
+ int state = ktx->ktx_state;
+
+ kqswnal_put_idle_tx (ktx);
+
+ if (state == KTX_GETTING && repmsg != NULL) {
/* We committed to reply, but there was a problem
* launching the GET. We can't avoid delivering a
* REPLY event since we committed above, so we
* failed. */
rc = 0;
lnet_finalize (kqswnal_data.kqn_ni, lntmsg, 0);
- lnet_finalize (kqswnal_data.kqn_ni,
- (lnet_msg_t *)ktx->ktx_args[2], -EIO);
+ lnet_finalize (kqswnal_data.kqn_ni, repmsg, -EIO);
}
- kqswnal_put_idle_tx (ktx);
}
atomic_dec(&kqswnal_data.kqn_pending_txs);
fromnid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ep_rxd_node(krx->krx_rxd));
- rc = lnet_parse(ni, hdr, kqswnal_rx_nid(krx), krx);
+ rc = lnet_parse(ni, hdr, kqswnal_rx_nid(krx), krx,
+ krx->krx_rpc_reply_needed);
if (rc < 0) {
kqswnal_rx_decref(krx);
return;
/* NB hdr still in network byte order */
- if (krx->krx_rpc_reply_needed &&
- (hdrtype == LNET_MSG_PUT ||
- hdrtype == LNET_MSG_REPLY)) {
- /* This is an optimized PUT/REPLY */
- rc = kqswnal_rdma(krx, lntmsg, hdrtype,
- niov, iov, kiov, offset, mlen);
+ if (krx->krx_rpc_reply_needed) {
+ /* optimized (rdma) request sent as RPC */
+ switch (hdrtype) {
+ case LNET_MSG_PUT:
+ case LNET_MSG_REPLY:
+ /* This is an optimized PUT/REPLY */
+ rc = kqswnal_rdma(krx, lntmsg, hdrtype,
+ niov, iov, kiov, offset, mlen);
+ break;
+
+ case LNET_MSG_GET:
+ if (lntmsg == NULL) {
+ /* No buffer match: my decref will complete the
+ * RPC with failure */
+ rc = 0;
+ } else {
+ /* Matched something! */
+ rc = kqswnal_rdma (krx, lntmsg,
+ LNET_MSG_GET,
+ lntmsg->msg_niov,
+ lntmsg->msg_iov,
+ lntmsg->msg_kiov,
+ lntmsg->msg_offset,
+ lntmsg->msg_len);
+ }
+ break;
+
+ default:
+ CERROR("Bad RPC type %d\n", hdrtype);
+ rc = -EPROTO;
+ break;
+ }
kqswnal_rx_decref(krx);
return rc;
}
* there's nothing left to do */
break;
}
- rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
- kqswnal_data.kqn_shuttingdown == 2 ||
- !list_empty(&kqswnal_data.kqn_readyrxds) ||
- !list_empty(&kqswnal_data.kqn_donetxds) ||
- !list_empty(&kqswnal_data.kqn_delayedtxds));
+ rc = wait_event_interruptible_exclusive (
+ kqswnal_data.kqn_sched_waitq,
+ kqswnal_data.kqn_shuttingdown == 2 ||
+ !list_empty(&kqswnal_data.kqn_readyrxds) ||
+ !list_empty(&kqswnal_data.kqn_donetxds) ||
+ !list_empty(&kqswnal_data.kqn_delayedtxds));
LASSERT (rc == 0);
} else if (need_resched())
schedule ();
void
kranal_tx_done (kra_tx_t *tx, int completion)
{
+ lnet_msg_t *lnetmsg[2];
unsigned long flags;
int i;
kranal_unmap_buffer(tx);
- for (i = 0; i < 2; i++) {
- /* tx may have up to 2 lntmsgs to finalise */
- if (tx->tx_lntmsg[i] == NULL)
- continue;
-
- lnet_finalize(kranal_data.kra_ni, tx->tx_lntmsg[i],
- completion);
- tx->tx_lntmsg[i] = NULL;
- }
+ lnetmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
+ lnetmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
tx->tx_buftype = RANAL_BUF_NONE;
tx->tx_msg.ram_type = RANAL_MSG_NONE;
list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
+
+ /* finalize AFTER freeing lnet msgs */
+ for (i = 0; i < 2; i++) {
+ if (lnetmsg[i] == NULL)
+ continue;
+
+ lnet_finalize(kranal_data.kra_ni, lnetmsg[i], completion);
+ }
}
kra_conn_t *
lnet_kiov_t *kiov = lntmsg->msg_kiov;
unsigned int offset = lntmsg->msg_offset;
unsigned int nob = lntmsg->msg_len;
- kra_conn_t *conn;
kra_tx_t *tx;
int rc;
return 0;
case LNET_MSG_REPLY:
- /* reply's 'private' is the conn that received the GET_REQ */
- conn = private;
-
- LASSERT (routing || conn != NULL);
-
- LASSERT (conn->rac_rxmsg != NULL);
-
- if (!routing && conn->rac_rxmsg->ram_type != RANAL_MSG_IMMEDIATE) {
- /* Incoming message consistent with RDMA? */
- if (conn->rac_rxmsg->ram_type != RANAL_MSG_GET_REQ) {
- CERROR("REPLY to %s bad msg type %x!!!\n",
- libcfs_nid2str(target.nid),
- conn->rac_rxmsg->ram_type);
- return -EIO;
- }
-
- tx = kranal_get_idle_tx();
- if (tx == NULL)
- return -EIO;
-
- rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov,
- offset, nob);
- if (rc != 0) {
- kranal_tx_done(tx, rc);
- return -EIO;
- }
-
- tx->tx_conn = conn;
- tx->tx_lntmsg[0] = lntmsg;
-
- rc = kranal_map_buffer(tx);
- if (rc != 0) {
- kranal_tx_done(tx, rc);
- return -EIO;
- }
-
- kranal_rdma(tx, RANAL_MSG_GET_DONE,
- &conn->rac_rxmsg->ram_u.get.ragm_desc, nob,
- conn->rac_rxmsg->ram_u.get.ragm_cookie);
-
- /* flag matched by consuming rx message */
- kranal_consume_rxmsg(conn, NULL, 0);
- return 0;
- }
-
- /* Fall through and handle like PUT */
-
case LNET_MSG_PUT:
if (kiov == NULL && /* not paged */
nob <= RANAL_FMA_MAX_DATA && /* small enough */
return 0;
}
+void
+kranal_reply(lnet_ni_t *ni, kra_conn_t *conn, lnet_msg_t *lntmsg)
+{
+ kra_msg_t *rxmsg = conn->rac_rxmsg;
+ unsigned int niov = lntmsg->msg_niov;
+ struct iovec *iov = lntmsg->msg_iov;
+ lnet_kiov_t *kiov = lntmsg->msg_kiov;
+ unsigned int offset = lntmsg->msg_offset;
+ unsigned int nob = lntmsg->msg_len;
+ kra_tx_t *tx;
+ int rc;
+
+ tx = kranal_get_idle_tx();
+ if (tx == NULL)
+ goto failed_0;
+
+ rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
+ if (rc != 0)
+ goto failed_1;
+
+ tx->tx_conn = conn;
+ tx->tx_lntmsg[0] = lntmsg;
+
+ rc = kranal_map_buffer(tx);
+ if (rc != 0)
+ goto failed_1;
+
+ kranal_rdma(tx, RANAL_MSG_GET_DONE,
+ &rxmsg->ram_u.get.ragm_desc, nob,
+ rxmsg->ram_u.get.ragm_cookie);
+ return;
+
+ failed_1:
+ kranal_tx_done(tx, -EIO);
+ failed_0:
+ lnet_finalize(ni, lntmsg, -EIO);
+}
+
int
kranal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
int delayed, unsigned int niov,
CDEBUG(D_NET, "conn %p, rxmsg %p, lntmsg %p\n", conn, rxmsg, lntmsg);
- if (rxmsg == NULL) {
- /* already consumed: must have been a GET_REQ that matched */
- LASSERT (mlen == 0);
- LASSERT (lntmsg == NULL); /* no need to finalise */
- return 0;
- }
-
switch(rxmsg->ram_type) {
default:
LBUG();
return 0;
case RANAL_MSG_GET_REQ:
- /* This one didn't match anything */
- tx = kranal_new_tx_msg(RANAL_MSG_GET_NAK);
- if (tx != NULL) {
- tx->tx_msg.ram_u.completion.racm_cookie =
- rxmsg->ram_u.get.ragm_cookie;
- kranal_post_fma(conn, tx);
+ if (lntmsg != NULL) {
+ /* Matched! */
+ kranal_reply(ni, conn, lntmsg);
+ } else {
+ /* No match */
+ tx = kranal_new_tx_msg(RANAL_MSG_GET_NAK);
+ if (tx != NULL) {
+ tx->tx_msg.ram_u.completion.racm_cookie =
+ rxmsg->ram_u.get.ragm_cookie;
+ kranal_post_fma(conn, tx);
+ }
}
- LASSERT (lntmsg == NULL); /* no need to finalise */
kranal_consume_rxmsg(conn, NULL, 0);
return 0;
}
continue;
set_current_state(TASK_INTERRUPTIBLE);
- add_wait_queue(&kranal_data.kra_connd_waitq, &wait);
+ add_wait_queue_exclusive(&kranal_data.kra_connd_waitq, &wait);
spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
if ((msg->ram_type & RANAL_MSG_FENCE) != 0) {
/* This message signals RDMA completion... */
rrc = RapkFmaSyncWait(conn->rac_rihandle);
- LASSERT (rrc == RAP_SUCCESS);
+ if (rrc != RAP_SUCCESS) {
+ CERROR("RapkFmaSyncWait failed: %d\n", rrc);
+ rc = -ENETDOWN;
+ goto out;
+ }
}
if (conn->rac_close_recvd) {
case RANAL_MSG_IMMEDIATE:
CDEBUG(D_NET, "RX IMMEDIATE on %p\n", conn);
rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.immediate.raim_hdr,
- msg->ram_srcnid, conn);
+ msg->ram_srcnid, conn, 0);
repost = rc < 0;
break;
case RANAL_MSG_PUT_REQ:
CDEBUG(D_NET, "RX PUT_REQ on %p\n", conn);
rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.putreq.raprm_hdr,
- msg->ram_srcnid, conn);
+ msg->ram_srcnid, conn, 1);
repost = rc < 0;
break;
case RANAL_MSG_GET_REQ:
CDEBUG(D_NET, "RX GET_REQ on %p\n", conn);
rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.get.ragm_hdr,
- msg->ram_srcnid, conn);
+ msg->ram_srcnid, conn, 1);
repost = rc < 0;
break;
break;
}
- if (rc < 0) /* lnet_parse() detected an error */
+ out:
+ if (rc < 0) /* protocol/comms error */
kranal_close_conn (conn, rc);
- out:
if (repost && conn->rac_rxmsg != NULL)
kranal_consume_rxmsg(conn, NULL, 0);
continue;
set_current_state(TASK_INTERRUPTIBLE);
- add_wait_queue(&dev->rad_waitq, &wait);
+ add_wait_queue_exclusive(&dev->rad_waitq, &wait);
spin_unlock_irqrestore(&dev->rad_lock, flags);
if (nsoonest == 0) {
void
ksocknal_tx_done (lnet_ni_t *ni, ksock_tx_t *tx, int asynch)
{
+ lnet_msg_t *lnetmsg = tx->tx_lnetmsg;
+ int rc = (tx->tx_resid == 0) ? 0 : -EIO;
ENTRY;
if (tx->tx_conn != NULL) {
#endif
}
- lnet_finalize (ni, tx->tx_lnetmsg, (tx->tx_resid == 0) ? 0 : -EIO);
ksocknal_free_tx (tx);
+ lnet_finalize (ni, lnetmsg, rc);
+
EXIT;
}
ksocknal_conn_addref(conn); /* ++ref while parsing */
rc = lnet_parse(conn->ksnc_peer->ksnp_ni, &conn->ksnc_hdr,
- conn->ksnc_peer->ksnp_id.nid, conn);
+ conn->ksnc_peer->ksnp_id.nid, conn, 0);
if (rc < 0) {
/* I just received garbage: give up on this conn */
ksocknal_new_packet(conn, 0);
nloops = 0;
if (!did_something) { /* wait for something to do */
- rc = wait_event_interruptible (sched->kss_waitq,
- !ksocknal_sched_cansleep(sched));
+ rc = wait_event_interruptible_exclusive(
+ sched->kss_waitq,
+ !ksocknal_sched_cansleep(sched));
LASSERT (rc == 0);
} else
our_cond_resched();
spin_unlock_irqrestore(&ksocknal_data.ksnd_connd_lock,
flags);
- rc = wait_event_interruptible(ksocknal_data.ksnd_connd_waitq,
- ksocknal_data.ksnd_shuttingdown ||
- !list_empty(&ksocknal_data.ksnd_connd_connreqs) ||
- !list_empty(&ksocknal_data.ksnd_connd_routes));
+ rc = wait_event_interruptible_exclusive(
+ ksocknal_data.ksnd_connd_waitq,
+ ksocknal_data.ksnd_shuttingdown ||
+ !list_empty(&ksocknal_data.ksnd_connd_connreqs) ||
+ !list_empty(&ksocknal_data.ksnd_connd_routes));
spin_lock_irqsave(&ksocknal_data.ksnd_connd_lock, flags);
}
#define IBNAL_PEER_HASH_SIZE 101 /* # peer lists */
#define IBNAL_RESCHED 100 /* # scheduler loops before reschedule */
#define IBNAL_MSG_QUEUE_SIZE 8 /* # messages/RDMAs in-flight */
-#define IBNAL_CREDIT_HIGHWATER 7 /* when to eagerly return credits */
+#define IBNAL_CREDIT_HIGHWATER 7 /* when eagerly to return credits */
#define IBNAL_MSG_SIZE (4<<10) /* max size of queued messages (inc hdr) */
/* constants derived from sdp-connection.c */
{
struct list_head rx_list; /* queue for attention */
struct kib_conn *rx_conn; /* owning conn */
- int rx_responded; /* responded to peer? */
int rx_nob; /* # bytes received (-1 while posted) */
vv_l_key_t rx_lkey; /* local key */
kib_msg_t *rx_msg; /* pre-mapped buffer (host vaddr) */
__u64 ibc_rxseq; /* rx sequence number */
atomic_t ibc_refcount; /* # users */
int ibc_state; /* what's happening */
- atomic_t ibc_nob; /* # bytes buffered */
int ibc_nsends_posted; /* # uncompleted sends */
int ibc_credits; /* # credits I have */
int ibc_outstanding_credits; /* # credits to return */
static inline void
kibnal_queue_tx_locked (kib_tx_t *tx, kib_conn_t *conn)
{
- /* CAVEAT EMPTOR: tx takes caller's ref on conn */
-
LASSERT (tx->tx_nwrq > 0); /* work items set up */
LASSERT (!tx->tx_queued); /* not queued for sending already */
void
kibnal_tx_done (kib_tx_t *tx)
{
- int rc = tx->tx_status;
- int i;
+ lnet_msg_t *lntmsg[2];
+ int rc = tx->tx_status;
+ int i;
LASSERT (!in_interrupt());
LASSERT (!tx->tx_queued); /* mustn't be queued for sending */
}
tx->tx_md.md_active = 0;
#endif
- for (i = 0; i < 2; i++) {
- /* tx may have up to 2 ptlmsgs to finalise */
- if (tx->tx_lntmsg[i] == NULL)
- continue;
- lnet_finalize (kibnal_data.kib_ni, tx->tx_lntmsg[i], rc);
- tx->tx_lntmsg[i] = NULL;
- }
+ /* tx may have up to 2 lnet msgs to finalise */
+ lntmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
+ lntmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
if (tx->tx_conn != NULL) {
kibnal_conn_decref(tx->tx_conn);
list_add (&tx->tx_list, &kibnal_data.kib_idle_txs);
spin_unlock(&kibnal_data.kib_tx_lock);
+
+ /* delay finalize until my descs have been freed */
+ for (i = 0; i < 2; i++) {
+ if (lntmsg[i] == NULL)
+ continue;
+
+ lnet_finalize (kibnal_data.kib_ni, lntmsg[i], rc);
+ }
}
kib_tx_t *
kibnal_check_sends(conn);
}
- /* clear flag so GET_REQ can see if it caused a REPLY */
- rx->rx_responded = 0;
-
switch (msg->ibm_type) {
default:
CERROR("Bad IBNAL message type %x from %s\n",
case IBNAL_MSG_IMMEDIATE:
rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.immediate.ibim_hdr,
- msg->ibm_srcnid, rx);
+ msg->ibm_srcnid, rx, 0);
repost = rc < 0; /* repost on error */
break;
case IBNAL_MSG_PUT_REQ:
rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.putreq.ibprm_hdr,
- msg->ibm_srcnid, rx);
+ msg->ibm_srcnid, rx, 1);
repost = rc < 0; /* repost on error */
break;
case IBNAL_MSG_GET_REQ:
rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.get.ibgm_hdr,
- msg->ibm_srcnid, rx);
+ msg->ibm_srcnid, rx, 1);
repost = rc < 0; /* repost on error */
break;
kibnal_launch_tx(tx, target.nid);
return 0;
- case LNET_MSG_REPLY: {
- /* reply's 'private' is the incoming receive */
- kib_rx_t *rx = private;
-
- LASSERT(routing || rx != NULL);
-
- if (!routing && rx->rx_msg->ibm_type != IBNAL_MSG_IMMEDIATE) {
- /* Incoming message consistent with RDMA? */
- if (rx->rx_msg->ibm_type != IBNAL_MSG_GET_REQ) {
- CERROR("REPLY to %s bad msg type %x!!!\n",
- libcfs_nid2str(target.nid),
- rx->rx_msg->ibm_type);
- return -EIO;
- }
-
- /* NB handle_rx() will send GET_NAK when I return to
- * it from here, unless I set rx_responded! */
-
- tx = kibnal_get_idle_tx();
- if (tx == NULL) {
- CERROR("Can't get tx for REPLY to %s\n",
- libcfs_nid2str(target.nid));
- return -ENOMEM;
- }
-
- if (payload_nob == 0)
- rc = 0;
- else if (payload_kiov == NULL)
- rc = kibnal_setup_rd_iov(
- tx, tx->tx_rd, 0,
- payload_niov, payload_iov,
- payload_offset, payload_nob);
- else
- rc = kibnal_setup_rd_kiov(
- tx, tx->tx_rd, 0,
- payload_niov, payload_kiov,
- payload_offset, payload_nob);
- if (rc != 0) {
- CERROR("Can't setup GET src for %s: %d\n",
- libcfs_nid2str(target.nid), rc);
- kibnal_tx_done(tx);
- return -EIO;
- }
-
- rc = kibnal_init_rdma(tx, IBNAL_MSG_GET_DONE,
- payload_nob,
- &rx->rx_msg->ibm_u.get.ibgm_rd,
- rx->rx_msg->ibm_u.get.ibgm_cookie);
- if (rc < 0) {
- CERROR("Can't setup rdma for GET from %s: %d\n",
- libcfs_nid2str(target.nid), rc);
- } else if (rc == 0) {
- /* No RDMA: local completion may happen now! */
- lnet_finalize (kibnal_data.kib_ni, lntmsg, 0);
- } else {
- /* RDMA: lnet_finalize(lntmsg) when it
- * completes */
- tx->tx_lntmsg[0] = lntmsg;
- }
-
- kibnal_queue_tx(tx, rx->rx_conn);
- rx->rx_responded = 1;
- return (rc >= 0) ? 0 : -EIO;
- }
- /* fall through to handle like PUT */
- }
-
+ case LNET_MSG_REPLY:
case LNET_MSG_PUT:
/* Is the payload small enough not to need RDMA? */
nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
return 0;
}
+void
+kibnal_reply (lnet_ni_t *ni, kib_rx_t *rx, lnet_msg_t *lntmsg)
+{
+ lnet_process_id_t target = lntmsg->msg_target;
+ unsigned int niov = lntmsg->msg_niov;
+ struct iovec *iov = lntmsg->msg_iov;
+ lnet_kiov_t *kiov = lntmsg->msg_kiov;
+ unsigned int offset = lntmsg->msg_offset;
+ unsigned int nob = lntmsg->msg_len;
+ kib_tx_t *tx;
+ int rc;
+
+ tx = kibnal_get_idle_tx();
+ if (tx == NULL) {
+ CERROR("Can't get tx for REPLY to %s\n",
+ libcfs_nid2str(target.nid));
+ goto failed_0;
+ }
+
+ if (nob == 0)
+ rc = 0;
+ else if (kiov == NULL)
+ rc = kibnal_setup_rd_iov(tx, tx->tx_rd, 0,
+ niov, iov, offset, nob);
+ else
+ rc = kibnal_setup_rd_kiov(tx, tx->tx_rd, 0,
+ niov, kiov, offset, nob);
+
+ if (rc != 0) {
+ CERROR("Can't setup GET src for %s: %d\n",
+ libcfs_nid2str(target.nid), rc);
+ goto failed_1;
+ }
+
+ rc = kibnal_init_rdma(tx, IBNAL_MSG_GET_DONE, nob,
+ &rx->rx_msg->ibm_u.get.ibgm_rd,
+ rx->rx_msg->ibm_u.get.ibgm_cookie);
+ if (rc < 0) {
+ CERROR("Can't setup rdma for GET from %s: %d\n",
+ libcfs_nid2str(target.nid), rc);
+ goto failed_1;
+ }
+
+ if (rc == 0) {
+ /* No RDMA: local completion may happen now! */
+ lnet_finalize(ni, lntmsg, 0);
+ } else {
+ /* RDMA: lnet_finalize(lntmsg) when it
+ * completes */
+ tx->tx_lntmsg[0] = lntmsg;
+ }
+
+ kibnal_queue_tx(tx, rx->rx_conn);
+ return;
+
+ failed_1:
+ kibnal_tx_done(tx);
+ failed_0:
+ lnet_finalize(ni, lntmsg, -EIO);
+}
+
int
kibnal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
break;
case IBNAL_MSG_GET_REQ:
- LASSERT (lntmsg == NULL); /* no need to finalise */
- if (!rx->rx_responded) {
+ if (lntmsg != NULL) {
+ /* Optimized GET; RDMA lntmsg's payload */
+ kibnal_reply(ni, rx, lntmsg);
+ } else {
/* GET didn't match anything */
kibnal_send_completion(rx->rx_conn, IBNAL_MSG_GET_DONE,
-ENODATA,
/* Nothing to do; sleep... */
set_current_state(TASK_INTERRUPTIBLE);
- add_wait_queue(&kibnal_data.kib_sched_waitq, &wait);
+ add_wait_queue_exclusive(&kibnal_data.kib_sched_waitq, &wait);
spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
flags);
int libcfs_debug_init(unsigned long bufsize)
-{
+{
+ /* if (getenv("LIBLUSTRE_DEBUG_BASENAME")) {
+ struct FILE *newfile;
+ sprintf(filename, "%s-%s-%lu.log",
+ getenv("LIBLUSTRE_DEBUG_BASENAME", uname(), time(0));
+ newfile = fopen(filename, "w");
+ if (newfile != NULL)
+ debug_file_fd = newfile;
+ } else
+ */
debug_file_fd = stdout;
+
return 0;
}
int libcfs_debug_cleanup(void)
{
- return 0; //close(portals_debug_fd);
+ if (debug_file_fd != stdout)
+ fclose(debug_file_fd);
+ return 0;
}
int libcfs_debug_clear_buffer(void)
return 0;
}
-/* FIXME: I'm not very smart; someone smarter should make this better. */
void
-libcfs_debug_msg (int subsys, int mask, char *file, const char *fn,
+libcfs_debug_msg (int subsys, int mask, char *file, const char *fn,
const int line, unsigned long stack, char *format, ...)
{
va_list ap;
- unsigned long flags;
struct timeval tv;
int nob;
-
- /* NB since we pass a non-zero sized buffer (at least) on the first
- * print, we can be assured that by the end of all the snprinting,
- * we _do_ have a terminated buffer, even if our message got truncated.
- */
+ if (debug_file_fd == NULL)
+ return;
gettimeofday(&tv, NULL);
- nob += fprintf(debug_file_fd,
- "%02x:%06x:%d:%lu.%06lu ",
- subsys >> 24, mask, smp_processor_id,
- tv.tv_sec, tv.tv_usec);
-
- nob += fprintf(debug_file_fd,
- "(%s:%d:%s() %d+%ld): ",
- file, line, fn, 0,
- 8192 - ((unsigned long)&flags & 8191UL));
+ nob += fprintf(debug_file_fd, "%lu.%06lu:(%s:%d:%s()): ",
+ tv.tv_sec, tv.tv_usec, file, line, fn);
va_start (ap, format);
- nob += fprintf(debug_file_fd, format, ap);
+ nob += vfprintf(debug_file_fd, format, ap);
va_end (ap);
-
-
}
void
void libcfs_debug_dumpstack(struct task_struct *tsk)
{
-#if defined(__arch_um__)
- if (tsk != NULL)
- CWARN("stack dump for pid %d (%d) requested; wake up gdb.\n",
- tsk->pid, UML_PID(tsk));
- asm("int $3");
-#elif defined(HAVE_SHOW_TASK)
- /* this is exported by lustre kernel version 42 */
- extern void show_task(struct task_struct *);
-
- if (tsk == NULL)
- tsk = current;
- CWARN("showing stack for process %d\n", tsk->pid);
- show_task(tsk);
-#else
+#if defined(__arch_um__)
+ if (tsk != NULL)
+ CWARN("stack dump for pid %d (%d) requested; wake up gdb.\n",
+ tsk->pid, UML_PID(tsk));
+ //asm("int $3");
+#elif defined(HAVE_SHOW_TASK)
+ /* this is exported by lustre kernel version 42 */
+ extern void show_task(struct task_struct *);
+
+ if (tsk == NULL)
+ tsk = current;
+ CWARN("showing stack for process %d\n", tsk->pid);
+ show_task(tsk);
+#else
CWARN("can't show stack: kernel doesn't export show_task\n");
#endif
}
cfs_task_t *libcfs_current(void)
-{
+{
CWARN("current task struct is %p\n", current);
return current;
}
if (nid == LNET_NID_ANY)
return "LNET_NID_ANY";
- nf = libcfs_lnd2netstrfns(LNET_NETTYP(net));
+ nf = libcfs_lnd2netstrfns(lnd);
str = libcfs_next_nidstring();
if (nf == NULL)
the_lnet.ln_loni = ni;
continue;
}
-
+
#ifndef __KERNEL__
if (lnd->lnd_wait != NULL) {
if (the_lnet.ln_eqwaitni == NULL) {
}
return 0;
-
+
failed:
lnet_shutdown_lndnis();
list_del(&ni->ni_list);
LIBCFS_FREE(ni, sizeof(*ni));
}
-
+
return -ENETDOWN;
}
rc = lnet_get_portals_compatibility();
if (rc < 0)
return rc;
-
+
lnet_init_locks();
CFS_INIT_LIST_HEAD(&the_lnet.ln_lnds);
the_lnet.ln_ptlcompat = rc;
rc = -ENETDOWN;
goto failed0;
}
-
+
rc = lnet_prepare(requested_pid);
if (rc != 0)
goto failed0;
rc = lnet_startup_lndnis();
if (rc != 0)
goto failed1;
-
+
rc = lnet_parse_routes(lnet_get_routes(), &im_a_router);
if (rc != 0)
goto failed2;
char *str;
lnet_ni_t *ni;
__u32 net;
- int count = 0;
+ int nnets = 0;
if (strlen(networks) > LNET_SINGLE_TEXTBUF_NOB) {
/* _WAY_ conservative */
goto failed;
}
- if (count++ > 0) {
+ if (nnets > 0 &&
+ the_lnet.ln_ptlcompat > 0) {
LCONSOLE_ERROR("Only 1 network supported when "
"'portals_compatible' is set\n");
goto failed;
}
+ nnets++;
ni = lnet_new_ni(net, nilist);
if (ni == NULL)
goto failed;
LASSERT (mlen == 0 || msg != NULL);
if (msg != NULL) {
- LASSERT(!msg->msg_recvaftersend);
LASSERT(msg->msg_receiving);
+ LASSERT(!msg->msg_sending);
msg->msg_receiving = 0;
if (mlen != 0) {
lnet_ni_send(lnet_ni_t *ni, lnet_msg_t *msg)
{
void *priv = msg->msg_private;
- int recv = msg->msg_recvaftersend;
- int delayed = msg->msg_delayed;
int rc;
LASSERT (!in_interrupt ());
-
- /* On GET, call lnet_ni_recv() right after the send. The recv gets
- * delayed until after the send to ensure the LND still has any RDMA
- * descriptors associated with the incoming GET when lnd_send() calls
- * in with the REPLY. Note that if we actually had to pass 'msg' in to
- * lnet_ni_recv() here, we'd be forking it (i.e. it would have 2 separate
- * existances and we'd have to refcount it) */
-
- LASSERT (!recv == !msg->msg_receiving);
- msg->msg_recvaftersend = 0;
- msg->msg_receiving = 0;
-
LASSERT (LNET_NETTYP(LNET_NIDNET(ni->ni_nid)) == LOLND ||
(msg->msg_txcredit && msg->msg_peertxcredit));
LASSERT(!in_interrupt());
if (rc < 0)
lnet_finalize(ni, msg, rc);
-
- if (recv)
- lnet_ni_recv(ni, priv, NULL, delayed, 0, 0, 0);
}
int
LASSERT (!msg->msg_delayed);
msg->msg_delayed = 1;
- /* I might have to do an eager receive since I'm blocking */
- if (!msg->msg_receiving)
- return 0;
+ LASSERT (msg->msg_receiving);
+ LASSERT (msg->msg_routing);
+ LASSERT (!msg->msg_sending);
- if (msg->msg_routing) {
- peer = msg->msg_rxpeer;
- LASSERT (!msg->msg_sending);
- } else {
- peer = msg->msg_txpeer;
- LASSERT (msg->msg_recvaftersend);
- LASSERT (msg->msg_type == LNET_MSG_REPLY);
- }
-
+ peer = msg->msg_rxpeer;
ni = peer->lp_ni;
+
if (ni->ni_lnd->lnd_eager_recv != NULL) {
LNET_UNLOCK();
lnet_peer_t *lp = msg->msg_txpeer;
lnet_ni_t *ni = lp->lp_ni;
+ /* non-lnet_send() callers have checked before */
+ LASSERT (!do_send || msg->msg_delayed);
+ LASSERT (!msg->msg_receiving);
+
if (!msg->msg_peertxcredit) {
LASSERT ((lp->lp_txcredits < 0) == !list_empty(&lp->lp_txq));
msg->msg_peertxcredit = 1;
lp->lp_txqnob += msg->msg_len + sizeof(lnet_hdr_t);
lp->lp_txcredits--;
+
if (lp->lp_txcredits < lp->lp_mintxcredits)
lp->lp_mintxcredits = lp->lp_txcredits;
if (lp->lp_txcredits < 0) {
- /* must have checked eager_recv before here */
- LASSERT (msg->msg_delayed);
+ msg->msg_delayed = 1;
list_add_tail (&msg->msg_list, &lp->lp_txq);
return EAGAIN;
}
ni->ni_mintxcredits = ni->ni_txcredits;
if (ni->ni_txcredits < 0) {
- /* must have checkd eager_recv before here */
- LASSERT (msg->msg_delayed);
+ msg->msg_delayed = 1;
list_add_tail (&msg->msg_list, &ni->ni_txq);
return EAGAIN;
}
if (do_send) {
LNET_UNLOCK();
- /* non-lnet_send() callers always send delayed */
- LASSERT (msg->msg_delayed);
lnet_ni_send(ni, msg);
LNET_LOCK();
}
int
lnet_post_routed_recv_locked (lnet_msg_t *msg, int do_recv)
{
- /* lnet_route is going to LNET_UNLOCK immediately after this, so it
+ /* lnet_parse is going to LNET_UNLOCK immediately after this, so it
* sets do_recv FALSE and I don't do the unlock/send/lock bit. I
* return EAGAIN if msg blocked and 0 if sent or OK to send */
lnet_peer_t *lp = msg->msg_rxpeer;
LASSERT (msg->msg_niov == 0);
LASSERT (msg->msg_routing);
LASSERT (msg->msg_receiving);
+ LASSERT (!msg->msg_sending);
+
+ /* non-lnet_parse callers only send delayed messages */
+ LASSERT (!do_recv || msg->msg_delayed);
if (!msg->msg_peerrtrcredit) {
LASSERT ((lp->lp_rtrcredits < 0) == !list_empty(&lp->lp_rtrq));
if (do_recv) {
LNET_UNLOCK();
- /* non-lnet_route() callers always send delayed */
- LASSERT (msg->msg_delayed);
lnet_ni_recv(lp->lp_ni, msg->msg_private, msg, 1,
0, msg->msg_len, msg->msg_len);
LNET_LOCK();
lnet_msg_t, msg_list);
list_del(&msg2->msg_list);
- LASSERT (msg2->msg_delayed);
-
(void) lnet_post_routed_recv_locked(msg2, 1);
}
}
lnet_msg_t, msg_list);
list_del(&msg2->msg_list);
- LASSERT (msg2->msg_delayed);
-
(void) lnet_post_routed_recv_locked(msg2, 1);
}
}
LASSERT (msg->msg_txpeer == NULL);
LASSERT (!msg->msg_sending);
LASSERT (!msg->msg_target_is_router);
+ LASSERT (!msg->msg_receiving);
msg->msg_sending = 1;
msg->msg_txpeer = lp; /* msg takes my ref on lp */
- if (!msg->msg_delayed &&
- (lp->lp_txcredits <= 0 || src_ni->ni_txcredits <= 0)) {
- rc = lnet_eager_recv_locked(msg);
- if (rc != 0) {
- msg->msg_txpeer = NULL;
- lnet_peer_decref_locked(lp);
- LNET_UNLOCK();
- return rc;
- }
- }
-
rc = lnet_post_send_locked(msg, 0);
LNET_UNLOCK();
}
static int
-lnet_parse_get(lnet_ni_t *ni, lnet_msg_t *msg)
+lnet_parse_get(lnet_ni_t *ni, lnet_msg_t *msg, int rdma_get)
{
lnet_hdr_t *hdr = &msg->msg_hdr;
unsigned int mlength = 0;
msg->msg_ev.target.nid = hdr->dest_nid;
msg->msg_ev.hdr_data = 0;
- /* set msg_recvaftersend so the incoming message is consumed (by
- * calling lnet_ni_recv()) in lnet_ni_send() AFTER lnd_send() has been
- * called. This ensures that the LND can rely on the recv happening
- * after the send so any RDMA descriptors it has stashed are still
- * valid. */
- msg->msg_recvaftersend = 1;
-
+ if (rdma_get) {
+ /* The LND completes the REPLY from her recv procedure */
+ lnet_ni_recv(ni, msg->msg_private, msg, 0, 0, 0, 0);
+ return 0;
+ }
+
+ lnet_ni_recv(ni, msg->msg_private, NULL, 0, 0, 0, 0);
+ msg->msg_receiving = 0;
+
rc = lnet_send(ni->ni_nid, msg);
if (rc < 0) {
/* didn't get as far as lnet_ni_send() */
CERROR("%s: Unable to send REPLY for GET from %s: %d\n",
libcfs_nid2str(ni->ni_nid), libcfs_id2str(src), rc);
- /* consume to release LND resources */
- lnet_ni_recv(ni, msg->msg_private, NULL, 0, 0, 0, 0);
-
- msg->msg_recvaftersend = 0;
- msg->msg_receiving = 0;
lnet_finalize(ni, msg, rc);
}
int
-lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid, void *private)
+lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
+ void *private, int rdma_req)
{
int rc = 0;
int for_me;
if (!for_me) {
if (the_lnet.ln_ptlcompat > 0) {
/* portals compatibility is single-network */
- CERROR ("%s, %s: Bad dest nid %s "
+ CERROR ("%s, src %s: Bad dest nid %s "
"(routing not supported)\n",
libcfs_nid2str(from_nid),
libcfs_nid2str(src_nid),
if (the_lnet.ln_ptlcompat == 0 &&
LNET_NIDNET(dest_nid) == LNET_NIDNET(ni->ni_nid)) {
/* should have gone direct */
- CERROR ("%s, %s: Bad dest nid %s "
+ CERROR ("%s, src %s: Bad dest nid %s "
"(should have been sent direct)\n",
libcfs_nid2str(from_nid),
libcfs_nid2str(src_nid),
lnet_islocalnid(dest_nid)) {
/* dest is another local NI; sender should have used
* this node's NID on its own network */
- CERROR ("%s, %s: Bad dest nid %s "
+ CERROR ("%s, src %s: Bad dest nid %s "
"(it's my nid but on a different network)\n",
libcfs_nid2str(from_nid),
libcfs_nid2str(src_nid),
return -EPROTO;
}
+ if (rdma_req && type == LNET_MSG_GET) {
+ CERROR ("%s, src %s: Bad optimized GET for %s "
+ "(final destination must be me)\n",
+ libcfs_nid2str(from_nid),
+ libcfs_nid2str(src_nid),
+ libcfs_nid2str(dest_nid));
+ return -EPROTO;
+ }
+
if (!the_lnet.ln_routing) {
- CERROR ("%s, %s: Dropping message for %s "
+ CERROR ("%s, src %s: Dropping message for %s "
"(routing not enabled)\n",
libcfs_nid2str(from_nid),
libcfs_nid2str(src_nid),
rc = lnet_parse_put(ni, msg);
break;
case LNET_MSG_GET:
- rc = lnet_parse_get(ni, msg);
+ rc = lnet_parse_get(ni, msg, rdma_req);
break;
case LNET_MSG_REPLY:
rc = lnet_parse_reply(ni, msg);
if (msg == NULL)
return;
#if 0
- CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n",
+ CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n",
lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target),
msg->msg_target_is_router ? "t" : "",
msg->msg_routing ? "X" : "",
msg->msg_ack ? "A" : "",
msg->msg_sending ? "S" : "",
msg->msg_receiving ? "R" : "",
- msg->msg_recvaftersend ? "g" : "",
msg->msg_delayed ? "d" : "",
msg->msg_txcredit ? "C" : "",
msg->msg_peertxcredit ? "c" : "",
LASSERT (!lntmsg->msg_routing);
LASSERT (!lntmsg->msg_target_is_router);
- rc = lnet_parse(ni, &lntmsg->msg_hdr, ni->ni_nid, lntmsg);
+ rc = lnet_parse(ni, &lntmsg->msg_hdr, ni->ni_nid, lntmsg, 0);
if (rc >= 0)
lnet_finalize(ni, lntmsg, 0);
ptllnd_peer_t *rx_peer;
kptl_msg_t *rx_msg;
int rx_nob;
- int rx_replied;
char rx_space[0];
} ptllnd_rx_t;
PJK_UT_MSG("<<< rc=%d\n",rc);
return rc;
- case LNET_MSG_REPLY: {
- ptllnd_rx_t *rx = private; /* incoming GET */
- LASSERT (rx != NULL);
-
- PJK_UT_MSG("LNET_MSG_REPLY rx=%p\n",rx);
-
- if (rx->rx_msg->ptlm_type == PTLLND_MSG_TYPE_GET) {
- __u64 matchbits;
-
- matchbits = rx->rx_msg->ptlm_u.req.kptlrm_matchbits;
- PJK_UT_MSG("matchbits="LPX64"\n",matchbits);
-
- LASSERT (!rx->rx_replied);
- rc = ptllnd_active_rdma(plp, PTLLND_RDMA_WRITE, msg,
- matchbits,
- msg->msg_niov, msg->msg_iov,
- msg->msg_offset, msg->msg_len);
- rx->rx_replied = (rc == 0);
- ptllnd_peer_decref(plp);
- PJK_UT_MSG("<<< rc=%d\n",rc);
- return rc;
- }
-
- if (rx->rx_msg->ptlm_type != PTLLND_MSG_TYPE_IMMEDIATE) {
- CERROR("Reply to %s bad msg type %x!!!\n",
- libcfs_id2str(msg->msg_target),
- rx->rx_msg->ptlm_type);
- ptllnd_peer_decref(plp);
- return -EPROTO;
- }
-
- /* fall through to handle like PUT */
- }
-
+ case LNET_MSG_REPLY:
case LNET_MSG_PUT:
PJK_UT_MSG("LNET_MSG_PUT nob=%d\n",msg->msg_len);
nob = msg->msg_len;
ptllnd_rx_t *stackrx = private;
ptllnd_rx_t *heaprx;
+ /* Shouldn't get here; recvs only block for router buffers */
+ LBUG();
+
PJK_UT_MSG("rx=%p (stack)\n", stackrx);
/* Don't ++plni_nrxs: heaprx replaces stackrx */
case PTLLND_MSG_TYPE_GET:
PJK_UT_MSG("PTLLND_MSG_TYPE_GET\n");
- LASSERT (msg == NULL); /* no need to finalize */
- if (!rx->rx_replied) /* peer will time out */
+ if (msg != NULL) {
+ /* matched! */
+ PJK_UT_MSG("matchbits="LPX64"\n",
+ rx->rx_msg->ptlm_u.req.kptlrm_matchbits);
+
+ rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_WRITE, msg,
+ rx->rx_msg->ptlm_u.req.kptlrm_matchbits,
+ msg->msg_niov, msg->msg_iov,
+ msg->msg_offset, msg->msg_len);
+ PJK_UT_MSG("<<< rc=%d\n",rc);
+ break;
+ } else {
ptllnd_close_peer(rx->rx_peer);
+ }
break;
}
rx.rx_peer = plp;
rx.rx_msg = msg;
rx.rx_nob = nob;
- rx.rx_replied = 0;
plni->plni_nrxs++;
PJK_UT_MSG("rx=%p type=%d\n",&rx,msg->ptlm_type);
PJK_UT_MSG("PTLLND_MSG_TYPE_%s\n",
msg->ptlm_type==PTLLND_MSG_TYPE_PUT ? "PUT" : "GET");
rc = lnet_parse(ni, &msg->ptlm_u.req.kptlrm_hdr,
- msg->ptlm_srcnid, &rx);
+ msg->ptlm_srcnid, &rx, 1);
PJK_UT_MSG("lnet_parse rc=%d\n",rc);
if (rc < 0)
ptllnd_rx_done(&rx);
case PTLLND_MSG_TYPE_IMMEDIATE:
PJK_UT_MSG("PTLLND_MSG_TYPE_IMMEDIATE\n");
rc = lnet_parse(ni, &msg->ptlm_u.immediate.kptlim_hdr,
- msg->ptlm_srcnid, &rx);
- PJK_UT_MSG("lnet_parse rc=%d\n",rc);
+ msg->ptlm_srcnid, &rx, 0);
+ PJK_UT_MSG("lnet_parse rc=%d\n",rc, 0);
if (rc < 0)
ptllnd_rx_done(&rx);
break;
hdr.dest_nid = cpu_to_le64(b->b_ni->ni_nid);
hdr.dest_pid = cpu_to_le32(the_lnet.ln_pid);
- rc = lnet_parse(b->b_ni, &hdr, c->peer_nid, c);
+ rc = lnet_parse(b->b_ni, &hdr, c->peer_nid, c, 0);
if (rc < 0) {
CERROR("Error %d from lnet_parse\n", rc);
return 0;
lib_LIBRARIES = libptlctl.a
libptlctl_a_SOURCES = portals.c nidstrings.c debug.c l_ioctl.c parser.c parser.h
+libptlctl_a_CPPFLAGS = $(LLCPPFLAGS)
+libptlctl_a_CFLAGS = $(LLCFLAGS) -DLUSTRE_UTILS=1
if UTILS
sbin_PROGRAMS += ptlctl routerstat wirecheck
counter.route_length = subull(new_counter.route_length, old_counter.route_length);
counter.drop_length = subull(new_counter.drop_length, old_counter.drop_length);
- printf ("M %lu(%lu) E %0.0f S %7.2f/%6.0f R %7.2f/%6.0f F %7.2f/%6.0f D %4.2f/%0.0f\n",
+ printf ("M %3lu(%3lu) E %0.0f S %7.2f/%6.0f R %7.2f/%6.0f F %7.2f/%6.0f D %4.2f/%0.0f\n",
counter.msgs_alloc, counter.msgs_max,
rul(counter.errors,t),
rull(counter.send_length,t*1024.0*1024.0), rul(counter.send_count, t),