From 995460b085670b344f0a9c255c77179e9459a970 Mon Sep 17 00:00:00 2001 From: pjkirner Date: Thu, 27 Oct 2005 21:34:51 +0000 Subject: [PATCH] * Key for peer db is ptl_process_id_t rather than just NID (+ Assocated log msg chanages) Necesary to support catamount processes that come up one right after another using the same NID but diffrent PIDs * Fix to bug in peer_time() code with incorrect return code --- lnet/klnds/ptllnd/ptllnd.h | 11 +++--- lnet/klnds/ptllnd/ptllnd_cb.c | 36 ++++++++--------- lnet/klnds/ptllnd/ptllnd_peer.c | 82 +++++++++++++++++++++------------------ lnet/klnds/ptllnd/ptllnd_rx_buf.c | 27 +++++++------ 4 files changed, 83 insertions(+), 73 deletions(-) diff --git a/lnet/klnds/ptllnd/ptllnd.h b/lnet/klnds/ptllnd/ptllnd.h index 91dbe98..6203c2e 100755 --- a/lnet/klnds/ptllnd/ptllnd.h +++ b/lnet/klnds/ptllnd/ptllnd.h @@ -529,19 +529,18 @@ kptllnd_peer_check_bucket ( void kptllnd_tx_launch ( kptl_tx_t *tx, - lnet_nid_t target_nid, + lnet_process_id_t target, lnet_msg_t *ptlmsg ); kptl_peer_t * kptllnd_peer_find ( kptl_data_t *kptllnd_data, - lnet_nid_t nid); + lnet_process_id_t target); kptl_peer_t * kptllnd_peer_handle_hello ( kptl_data_t *kptllnd_data, - lnet_nid_t nid, - int pid, + lnet_process_id_t initiator, kptl_msg_t *msg); static inline struct list_head * @@ -665,13 +664,13 @@ do{ \ #define PJK_UT_MSG_SIMULATION(fmt, a...) PJK_UT_MSG_ALWAYS(fmt, ## a ) -#if 0 +#if 1 #define PJK_UT_MSG_DATA(fmt, a...) PJK_UT_MSG_ALWAYS(fmt, ## a ) #else #define PJK_UT_MSG_DATA(fmt, a...) do{}while(0) #endif -#if 0 +#if 1 #define PJK_UT_MSG(fmt, a...) PJK_UT_MSG_ALWAYS(fmt, ## a ) #else #define PJK_UT_MSG(fmt, a...) do{}while(0) diff --git a/lnet/klnds/ptllnd/ptllnd_cb.c b/lnet/klnds/ptllnd/ptllnd_cb.c index 7286526..325568a 100644 --- a/lnet/klnds/ptllnd/ptllnd_cb.c +++ b/lnet/klnds/ptllnd/ptllnd_cb.c @@ -231,8 +231,8 @@ kptllnd_start_bulk_rdma( op == PTL_MD_OP_GET ? TX_TYPE_LARGE_PUT_RESPONSE : TX_TYPE_LARGE_GET_RESPONSE); if(tx == NULL){ - CERROR ("Can't start bulk rdma %d to "FMT_NID": tx descs exhausted\n", - op, rx->rx_initiator.nid); + CERROR ("Can't start bulk rdma %d to " FMT_NID ": tx descs exhausted\n", + op,rx->rx_initiator.nid); return -ENOMEM; } @@ -389,7 +389,7 @@ kptllnd_do_put( kptllnd_init_msg (tx->tx_msg, PTLLND_MSG_TYPE_PUT, sizeof(kptl_request_msg_t)); - kptllnd_tx_launch(tx, lntmsg->msg_target.nid,lntmsg); + kptllnd_tx_launch(tx, lntmsg->msg_target, lntmsg); } int @@ -456,8 +456,8 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) */ tx = kptllnd_get_idle_tx(kptllnd_data,TX_TYPE_LARGE_PUT); if(tx == NULL){ - CERROR ("Can't send %d to "LPX64": tx descs exhausted\n", - type, target.nid); + CERROR ("Can't send %d to %s: tx descs exhausted\n", + type, libcfs_id2str(target)); return -ENOMEM; } @@ -483,8 +483,8 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) */ tx = kptllnd_get_idle_tx(kptllnd_data,TX_TYPE_LARGE_GET); if(tx == NULL){ - CERROR ("Can't send %d to "LPX64": tx descs exhausted\n", - type, target.nid); + CERROR ("Can't send %d to %s: tx descs exhausted\n", + type, libcfs_id2str(target)); return -ENOMEM; } @@ -543,8 +543,8 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) */ tx = kptllnd_get_idle_tx(kptllnd_data,TX_TYPE_LARGE_PUT); if(tx == NULL){ - CERROR ("Can't send %d to "LPX64": tx descs exhausted\n", - type, target.nid); + CERROR ("Can't send %d to %s: tx descs exhausted\n", + type, libcfs_id2str(target)); return -ENOMEM; } @@ -572,12 +572,12 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) /* RDMA not expected */ nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[payload_nob]); if (nob > *kptllnd_tunables.kptl_max_msg_size) { - CERROR("REPLY for "LPX64" too big but RDMA not requested:" + CERROR("REPLY for %s too big but RDMA not requested:" "%d (max for message is %d)\n", - target.nid, payload_nob, + libcfs_id2str(target), payload_nob, *kptllnd_tunables.kptl_max_msg_size); - CERROR("Can't REPLY IMMEDIATE %d to "LPX64"\n", - nob, target.nid); + CERROR("Can't REPLY IMMEDIATE %d to %s\n", + nob, libcfs_id2str(target)); return -EINVAL; } break; @@ -586,8 +586,8 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) /* Incoming message consistent with RDMA? */ if (rx->rx_msg->ptlm_type != PTLLND_MSG_TYPE_GET) { - CERROR("REPLY to "LPX64" bad msg type %x!!!\n", - target.nid, rx->rx_msg->ptlm_type); + CERROR("REPLY to %s bad msg type %x!!!\n", + libcfs_id2str(target), rx->rx_msg->ptlm_type); return -EINVAL; } @@ -615,8 +615,8 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) */ tx = kptllnd_get_idle_tx(kptllnd_data,TX_TYPE_SMALL_MESSAGE); if(tx == NULL){ - CERROR ("Can't send %d to "LPX64": tx descs exhausted\n", - type, target.nid); + CERROR ("Can't send %d to %s: tx descs exhausted\n", + type, libcfs_id2str(target)); return -ENOMEM; } }else{ @@ -660,7 +660,7 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) launch: - kptllnd_tx_launch(tx, target.nid,lntmsg); + kptllnd_tx_launch(tx, target, lntmsg); PJK_UT_MSG_DATA("<<< SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS\n"); return 0; } diff --git a/lnet/klnds/ptllnd/ptllnd_peer.c b/lnet/klnds/ptllnd/ptllnd_peer.c index 48f599a..fe62903 100644 --- a/lnet/klnds/ptllnd/ptllnd_peer.c +++ b/lnet/klnds/ptllnd/ptllnd_peer.c @@ -26,7 +26,7 @@ kptllnd_peer_destroy ( kptl_peer_t * kptllnd_peer_find_holding_list_lock ( kptl_data_t *kptllnd_data, - lnet_nid_t nid); + lnet_process_id_t target); int @@ -73,15 +73,14 @@ int kptllnd_peer_allocate ( kptl_data_t *kptllnd_data, kptl_peer_t **peerp, - lnet_nid_t nid, - int pid) + lnet_process_id_t target) { kptl_peer_t *peer; int rc; - PJK_UT_MSG(">>> nid="LPX64" pid=%d\n",nid,pid); + PJK_UT_MSG(">>> id=%s\n",libcfs_id2str(target)); - LASSERT (nid != PTL_NID_ANY); + LASSERT (target.nid != PTL_NID_ANY); LIBCFS_ALLOC(peer, sizeof (*peer)); if (peer == NULL) { @@ -99,8 +98,8 @@ kptllnd_peer_allocate ( peer->peer_state = PEER_STATE_ALLOCATED; peer->peer_kptllnd_data = kptllnd_data; - peer->peer_nid = nid; - peer->peer_pid = pid; + peer->peer_nid = target.nid; + peer->peer_pid = target.pid; //peer->peer_incarnation = 0; //peer->peer_tx_seqnum = 0; @@ -137,7 +136,7 @@ kptllnd_peer_allocate ( */ atomic_set (&peer->peer_refcount, 1); - PJK_UT_MSG("<<< Peer=%p nid="LPX64"\n",peer,nid); + PJK_UT_MSG("<<< Peer=%p id=%s\n",peer,libcfs_id2str(target)); *peerp = peer; return 0; } @@ -711,7 +710,7 @@ kptllnd_peer_check_sends ( PJK_UT_MSG_DATA("Seq # = "LPX64"\n",tx->tx_msg->ptlm_seq); PJK_UT_MSG("lnet TX nid=" LPX64 " pid=%d\n",peer->peer_nid,peer->peer_pid); - PJK_UT_MSG("ptl TX nid=" LPX64 " pid=%d\n",target.nid,target.pid); + PJK_UT_MSG("ptl TX nid=" FMT_NID " pid=%d\n",target.nid,target.pid); if(tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_GET || tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_PUT){ @@ -939,7 +938,7 @@ kptllnd_peer_timedout(kptl_peer_t *peer) if(time_after_eq(jiffies,tx->tx_deadline)){ PJK_UT_MSG("Peer=%p PENDING tx=%p time=%lu sec\n", peer,tx,(jiffies - tx->tx_deadline)/HZ); - rc = 0; + rc = 1; } } @@ -951,7 +950,7 @@ kptllnd_peer_timedout(kptl_peer_t *peer) if(time_after_eq(jiffies,tx->tx_deadline)){ PJK_UT_MSG("Peer=%p ACTIVE tx=%p time=%lu sec\n", peer,tx,(jiffies - tx->tx_deadline)/HZ); - rc = 0; + rc = 1; } } @@ -1011,12 +1010,12 @@ kptllnd_peer_check_bucket (int idx, kptl_data_t *kptllnd_data) kptl_peer_t * kptllnd_peer_find ( kptl_data_t *kptllnd_data, - lnet_nid_t nid) + lnet_process_id_t target) { kptl_peer_t *peer; unsigned long flags; read_lock_irqsave(&kptllnd_data->kptl_peer_rw_lock, flags); - peer = kptllnd_peer_find_holding_list_lock(kptllnd_data,nid); + peer = kptllnd_peer_find_holding_list_lock(kptllnd_data,target); read_unlock_irqrestore(&kptllnd_data->kptl_peer_rw_lock, flags); return peer; } @@ -1024,25 +1023,31 @@ kptllnd_peer_find ( kptl_peer_t * kptllnd_peer_find_holding_list_lock ( kptl_data_t *kptllnd_data, - lnet_nid_t nid) + lnet_process_id_t target) { - struct list_head *peer_list = kptllnd_nid2peerlist (kptllnd_data,nid); + struct list_head *peer_list = kptllnd_nid2peerlist (kptllnd_data,target.nid); struct list_head *tmp; kptl_peer_t *peer; - PJK_UT_MSG(">>> nid="LPX64"\n",nid); + PJK_UT_MSG(">>> id=%s\n",libcfs_id2str(target)); list_for_each (tmp, peer_list) { peer = list_entry (tmp, kptl_peer_t, peer_list); LASSERT(peer->peer_state != PEER_STATE_CANCELED); - - if (peer->peer_nid != nid) + + PJK_UT_MSG("NID: peer="LPX64" target="LPX64"\n", + peer->peer_nid,target.nid); + PJK_UT_MSG("PID: peer=%d target=%d\n", + peer->peer_pid,target.pid); + + if (! (peer->peer_nid == target.nid && + peer->peer_pid == target.pid)) continue; - CDEBUG(D_NET, "got peer [%p] -> "LPX64" (%d)\n", - peer, nid, atomic_read (&peer->peer_refcount)); + CDEBUG(D_NET, "got peer [%p] -> %s (%d)\n", + peer, libcfs_id2str(target), atomic_read (&peer->peer_refcount)); kptllnd_peer_addref(peer,"find"); PJK_UT_MSG("<<< Peer=%p\n",peer); @@ -1056,8 +1061,7 @@ kptllnd_peer_find_holding_list_lock ( kptl_peer_t * kptllnd_peer_handle_hello ( kptl_data_t *kptllnd_data, - lnet_nid_t nid, - int pid, + lnet_process_id_t initiator, kptl_msg_t *msg) { kptl_peer_t *peer = NULL; @@ -1093,7 +1097,7 @@ kptllnd_peer_handle_hello ( */ tx_hello = kptllnd_get_idle_tx(kptllnd_data,TX_TYPE_SMALL_MESSAGE); if( tx_hello == NULL) { - CERROR("Unable to allocate connect message for "LPX64"\n",nid); + CERROR("Unable to allocate connect message for %s\n",libcfs_id2str(initiator)); return 0; } @@ -1107,10 +1111,10 @@ kptllnd_peer_handle_hello ( * however we want to avoid doing this while holidng * the peer_rw_lock and be forced into atomic context */ - rc = kptllnd_peer_allocate ( kptllnd_data, &peer_allocated, nid, pid); + rc = kptllnd_peer_allocate ( kptllnd_data, &peer_allocated, initiator); if(rc != 0){ kptllnd_tx_decref(tx_hello); - CERROR("Failed to create peer (nid="LPX64")\n",nid); + CERROR("Failed to create peer (id=%s)\n",libcfs_id2str(initiator)); return 0; } @@ -1119,7 +1123,7 @@ kptllnd_peer_handle_hello ( /* * Look for peer because it could have been previously here */ - peer = kptllnd_peer_find_holding_list_lock(kptllnd_data,nid); + peer = kptllnd_peer_find_holding_list_lock(kptllnd_data,initiator); /* * If peer is already here @@ -1169,7 +1173,8 @@ kptllnd_peer_handle_hello ( peer = NULL; }else{ - CERROR("Receiving HELLO message on already connected peer " LPX64"\n",nid); + CERROR("Receiving HELLO message on already connected peer %s\n", + libcfs_id2str(initiator)); } } @@ -1188,7 +1193,8 @@ kptllnd_peer_handle_hello ( */ rc = kptllnd_peer_add_to_list_locked ( kptllnd_data, peer_allocated); if(rc != 0){ - CERROR("Failed to create peer (nid="LPX64")\n",nid); + CERROR("Failed to create peer (id=%s)\n", + libcfs_id2str(initiator)); goto failed; } @@ -1267,7 +1273,7 @@ failed: void kptllnd_tx_launch ( kptl_tx_t *tx, - lnet_nid_t target_nid, + lnet_process_id_t target, lnet_msg_t *ptlmsg ) { kptl_data_t *kptllnd_data = tx->tx_po.po_kptllnd_data; @@ -1282,7 +1288,7 @@ kptllnd_tx_launch ( /* If I get here, I've committed to send, so I complete the tx with * failure on any problems */ - PJK_UT_MSG(">>> TX=%p nid="LPX64"\n",tx,target_nid); + PJK_UT_MSG(">>> TX=%p target=%s\n",tx,libcfs_id2str(target)); LASSERT (tx->tx_ptlmsg == NULL); tx->tx_ptlmsg = ptlmsg; /* finalize ptlmsg on completion */ @@ -1294,7 +1300,7 @@ kptllnd_tx_launch ( * First try to find the peer (this will grab the * read lock */ - peer = kptllnd_peer_find (kptllnd_data,target_nid); + peer = kptllnd_peer_find (kptllnd_data,target); /* * If we find the peer @@ -1320,7 +1326,7 @@ kptllnd_tx_launch ( */ tx_hello = kptllnd_get_idle_tx(kptllnd_data,TX_TYPE_SMALL_MESSAGE); if( tx_hello == NULL) { - CERROR("Unable to allocate connect message for "LPX64"\n",target_nid); + CERROR("Unable to allocate connect message for %s\n",libcfs_id2str(target)); kptllnd_tx_decref (tx); return; } @@ -1342,10 +1348,10 @@ kptllnd_tx_launch ( * Allocate a new peer * (it's not active until its on the list) */ - PJK_UT_MSG("TX %p creating NEW PEER nid="LPX64"\n",tx,target_nid); - rc = kptllnd_peer_allocate ( kptllnd_data, &peer_allocated, target_nid, PTLLND_PID); + PJK_UT_MSG("TX %p creating NEW PEER %s\n",tx,libcfs_id2str(target)); + rc = kptllnd_peer_allocate ( kptllnd_data, &peer_allocated, target); if(rc != 0){ - CERROR("Failed to create peer (nid="LPX64")\n",target_nid); + CERROR("Failed to create peer %s\n",libcfs_id2str(target)); kptllnd_tx_decref (tx); kptllnd_tx_decref (tx_hello); return; @@ -1358,7 +1364,7 @@ kptllnd_tx_launch ( */ write_lock_irqsave(g_lock, flags); - peer = kptllnd_peer_find_holding_list_lock (kptllnd_data,target_nid); + peer = kptllnd_peer_find_holding_list_lock (kptllnd_data,target); /* * If we find the peer @@ -1368,7 +1374,7 @@ kptllnd_tx_launch ( if (peer != NULL) { write_unlock_irqrestore(g_lock, flags); - CDEBUG(D_TRACE,"HELLO message race occurred (nid="LPX64")\n",target_nid); + CDEBUG(D_TRACE,"HELLO message race occurred for %s\n",libcfs_id2str(target)); spin_lock_irqsave(&peer->peer_lock, flags); kptllnd_peer_queue_tx_locked ( peer, tx ); @@ -1390,7 +1396,7 @@ kptllnd_tx_launch ( if(rc != 0){ write_unlock_irqrestore(g_lock, flags); - CERROR("Failed to add peer to list (nid="LPX64")\n",target_nid); + CERROR("Failed to add peer to list for %s\n",libcfs_id2str(target)); /* Drop these TXs tx*/ kptllnd_tx_decref (tx); diff --git a/lnet/klnds/ptllnd/ptllnd_rx_buf.c b/lnet/klnds/ptllnd/ptllnd_rx_buf.c index 7ee5383..cca8459 100644 --- a/lnet/klnds/ptllnd/ptllnd_rx_buf.c +++ b/lnet/klnds/ptllnd/ptllnd_rx_buf.c @@ -517,7 +517,7 @@ kptllnd_rx_buffer_callback(ptl_event_t *ev) STAT_UPDATE(kps_rx_unlink_event); if(!rxbp->rxbp_shutdown){ - PJK_UT_MSG("RXB Callback %s(%d) rxb=%p nid="FMT_NID" unlink=%d\n", + PJK_UT_MSG("RXB Callback %s(%d) rxb=%p id="FMT_NID" unlink=%d\n", get_ev_type_string(ev->type),ev->type, rxb,ev->initiator.nid,unlinked); } @@ -625,7 +625,7 @@ kptllnd_rx_scheduler_handler(kptl_rx_t *rx) kptl_peer_t *peer = NULL; int returned_credits = 0; int type = msg->ptlm_type; - lnet_nid_t lnet_initiator_nid = ptl2lnetnid(kptllnd_data,rx->rx_initiator.nid); + lnet_process_id_t lnet_initiator; unsigned long flags; @@ -633,6 +633,12 @@ kptllnd_rx_scheduler_handler(kptl_rx_t *rx) PJK_UT_MSG_DATA("rx=%p nob=%d\n",rx,rx->rx_nob); /* + * Setup the intiator for LNET + */ + lnet_initiator.nid = ptl2lnetnid(kptllnd_data,rx->rx_initiator.nid); + lnet_initiator.pid = rx->rx_initiator.pid; + + /* * If the nob==0 then silently discard this message */ if(rx->rx_nob == 0) @@ -650,19 +656,18 @@ kptllnd_rx_scheduler_handler(kptl_rx_t *rx) PJK_UT_MSG_DATA("Msg NOB = %d\n",msg->ptlm_nob); PJK_UT_MSG_DATA("Credits back from peer=%d\n",msg->ptlm_credits); PJK_UT_MSG_DATA("Seq # ="LPX64"\n",msg->ptlm_seq); - PJK_UT_MSG_DATA("lnet RX nid=" LPX64 "\n",lnet_initiator_nid); + PJK_UT_MSG_DATA("lnet RX nid=" LPX64 "\n",lnet_initiator.nid); PJK_UT_MSG("ptl RX nid=" FMT_NID " pid=%d\n",rx->rx_initiator.nid,rx->rx_initiator.pid); if(type == PTLLND_MSG_TYPE_HELLO) { peer = kptllnd_peer_handle_hello( kptllnd_data, - lnet_initiator_nid, - rx->rx_initiator.pid, + lnet_initiator, msg); if( peer == NULL){ - CERROR ("Failed to create peer for "LPX64"\n", - lnet_initiator_nid); + CERROR ("Failed to create peer for %s\n", + libcfs_id2str(lnet_initiator)); goto exit; } @@ -677,10 +682,10 @@ kptllnd_rx_scheduler_handler(kptl_rx_t *rx) } else { - peer = kptllnd_peer_find(kptllnd_data,lnet_initiator_nid); + peer = kptllnd_peer_find(kptllnd_data,lnet_initiator); if( peer == NULL){ - CERROR ("No connection with "LPX64"\n", - lnet_initiator_nid); + CERROR ("No connection with %s\n", + libcfs_id2str(lnet_initiator)); goto exit; } @@ -921,7 +926,7 @@ kptllnd_rx_destroy(kptl_rx_t *rx,kptl_data_t *kptllnd_data) *kptllnd_tunables.kptl_peercredits); spin_unlock_irqrestore(&peer->peer_lock, flags); - PJK_UT_MSG_ALWAYS("Peer=%p Credits=%d Outstanding=%d\n", + PJK_UT_MSG("Peer=%p Credits=%d Outstanding=%d\n", peer,peer->peer_credits,peer->peer_outstanding_credits); /* Have I received credits that will let me send? */ -- 1.8.3.1