From c6383b2bc3846d79482144d0958c1ee84b1cfc0c Mon Sep 17 00:00:00 2001 From: pjkirner Date: Fri, 14 Oct 2005 19:31:48 +0000 Subject: [PATCH] Catamount PTLLND now runs sanity * Resolved a number of unitilized variables * Remove unused plb_refcount * ptllnd_close_peer() released ref (could have been last) and then touched peer pointer. * .lnd_wait was not defined in ptllnd.c causing LNET to reject it as a valid LND in single thread environment. * Incorrect check on buf->plb_posted caused assert to be triggered in destroy_buffer * buf->plb_posted was never set to 0 on unlink causing infinite loop. * plni_nposted_buffers was never decremented would hit ASSERT * Portals NID was not retrivied, and LNET NID was not created, caused init failure. * type == INVALID value cause active_rdma to always issue PtlPut() * (CLEANUP) variable ni, masks parameter ni * Credits not given to peer correctly in HELLO message. * Credits never decremented * Matchbits did not handle reserved properly * Bulk MD/ME should be inserted BEFORE (PTL_INS_BEFORE) all the small message MD for performance reasons. * Missing PTL_ACK_DISABLE * HELLO Message payload was being swab'ed even when it wasn't supposed to be. * Some complex syntax of x?yz comibend with to few parthensis was causing md.options to be set incorrectly and messages to be dropped * MD options not set properly in ptllnd_active_rdma * MD threshold calculation compares against invalid type * ptllnd_recv() used rlen rather than mlen * Fixes for max_immd_size being max message size, not max immidate payload size. --- lnet/ulnds/ptllnd/ptllnd.c | 186 +++++++++++++---- lnet/ulnds/ptllnd/ptllnd.h | 78 ++++++- lnet/ulnds/ptllnd/ptllnd_cb.c | 467 ++++++++++++++++++++++++++++++------------ 3 files changed, 548 insertions(+), 183 deletions(-) diff --git a/lnet/ulnds/ptllnd/ptllnd.c b/lnet/ulnds/ptllnd/ptllnd.c index 8733c2b..a414461 100644 --- a/lnet/ulnds/ptllnd/ptllnd.c +++ b/lnet/ulnds/ptllnd/ptllnd.c @@ -25,6 +25,7 @@ lnd_t the_ptllnd = { .lnd_send = ptllnd_send, .lnd_recv = ptllnd_recv, .lnd_eager_recv = ptllnd_eager_recv, + .lnd_wait = ptllnd_wait, }; static int ptllnd_ni_count = 0; @@ -39,11 +40,11 @@ ptllnd_parse_int_tunable(int *value, char *name, int dflt) *value = dflt; return 0; } - + *value = strtoull(env, &end, 0); if (*end == 0) return 0; - + CERROR("Can't parse tunable %s=%s\n", name, env); return -EINVAL; } @@ -72,49 +73,60 @@ ptllnd_get_tunables(lnet_ni_t *ni) return rc; rc = ptllnd_parse_int_tunable(&max_immediate, - "PTLLND_MAX_MSG_SIZE", + "PTLLND_MAX_MSG_SIZE", PTLLND_MAX_MSG_SIZE); if (rc != 0) return rc; - + rc = ptllnd_parse_int_tunable(&msgs_per_buffer, - "PTLLND_MSGS_PER_BUFFER", + "PTLLND_MSGS_PER_BUFFER", PTLLND_MSGS_PER_BUFFER); if (rc != 0) return rc; rc = ptllnd_parse_int_tunable(&plni->plni_msgs_spare, - "PTLLND_MSGS_SPARE", + "PTLLND_MSGS_SPARE", PTLLND_MSGS_SPARE); if (rc != 0) return rc; rc = ptllnd_parse_int_tunable(&plni->plni_peer_hash_size, - "PTLLND_PEER_HASH_SIZE", + "PTLLND_PEER_HASH_SIZE", PTLLND_PEER_HASH_SIZE); if (rc != 0) return rc; - + + rc = ptllnd_parse_int_tunable(&plni->plni_eq_size, "PTLLND_EQ_SIZE", PTLLND_EQ_SIZE); if (rc != 0) return rc; - plni->plni_max_msg_size = - offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[max_immediate]); + plni->plni_max_msg_size = max_immediate; if (plni->plni_max_msg_size < sizeof(kptl_msg_t)) plni->plni_max_msg_size = sizeof(kptl_msg_t); - + plni->plni_buffer_size = plni->plni_max_msg_size * msgs_per_buffer; + + PJK_UT_MSG("portal = %d\n",plni->plni_portal); + PJK_UT_MSG("pid = %d\n",plni->plni_pid); + PJK_UT_MSG("max_immediate = %d\n",max_immediate); + PJK_UT_MSG("msgs_per_buffer = %d\n",msgs_per_buffer); + PJK_UT_MSG("msgs_spare = %d\n",plni->plni_msgs_spare); + PJK_UT_MSG("peer_hash_size = %d\n",plni->plni_peer_hash_size); + PJK_UT_MSG("eq_size = %d\n",plni->plni_eq_size); + PJK_UT_MSG("max_msg_size = %d\n",plni->plni_max_msg_size); + PJK_UT_MSG("buffer_size = %d\n",plni->plni_buffer_size); + return 0; } ptllnd_buffer_t * -ptllnd_create_buffer (lnet_ni_t *ni) +ptllnd_create_buffer (lnet_ni_t *ni) { ptllnd_ni_t *plni = ni->ni_data; ptllnd_buffer_t *buf; - + LIBCFS_ALLOC(buf, sizeof(*buf)); if (buf == NULL) { CERROR("Can't allocate buffer descriptor\n"); @@ -123,7 +135,8 @@ ptllnd_create_buffer (lnet_ni_t *ni) buf->plb_ni = ni; buf->plb_posted = 0; - + CFS_INIT_LIST_HEAD(&buf->plb_list); + LIBCFS_ALLOC(buf->plb_buffer, plni->plni_buffer_size); if (buf->plb_buffer == NULL) { CERROR("Can't allocate buffer size %d\n", @@ -131,7 +144,7 @@ ptllnd_create_buffer (lnet_ni_t *ni) LIBCFS_FREE(buf, sizeof(*buf)); return NULL; } - + list_add(&buf->plb_list, &plni->plni_buffers); plni->plni_nbuffers++; @@ -159,24 +172,37 @@ ptllnd_grow_buffers (lnet_ni_t *ni) int nmsgs; int nbufs; int rc; - + + PJK_UT_MSG("nposted_buffers = %d (before)\n",plni->plni_nposted_buffers); + PJK_UT_MSG("nbuffers = %d (before)\n",plni->plni_nbuffers); + + nmsgs = plni->plni_npeers * plni->plni_peer_credits + plni->plni_msgs_spare; nbufs = (nmsgs * plni->plni_max_msg_size + plni->plni_buffer_size - 1) / plni->plni_buffer_size; - + while (nbufs > plni->plni_nbuffers) { buf = ptllnd_create_buffer(ni); - + if (buf == NULL) return -ENOMEM; rc = ptllnd_post_buffer(buf); - if (rc != 0) + if (rc != 0){ + /* TODO - this path seems to orpahn the buffer + * in a state where its not posted and will never be + * However it does not leak the buffer as it's + * already been put onto the global buffer list + * and will be cleaned up + */ return rc; + } } - + + PJK_UT_MSG("nposted_buffers = %d (after)\n",plni->plni_nposted_buffers); + PJK_UT_MSG("nbuffers = %d (after)\n",plni->plni_nbuffers); return 0; } @@ -187,17 +213,22 @@ ptllnd_destroy_buffers (lnet_ni_t *ni) ptllnd_buffer_t *buf; struct list_head *tmp; struct list_head *nxt; - + + PJK_UT_MSG("nposted_buffers = %d (before)\n",plni->plni_nposted_buffers); + PJK_UT_MSG("nbuffers = %d (before)\n",plni->plni_nbuffers); + list_for_each_safe(tmp, nxt, &plni->plni_buffers) { buf = list_entry(tmp, ptllnd_buffer_t, plb_list); - + + //PJK_UT_MSG("buf=%p posted=%d\n",buf,buf->plb_posted); + LASSERT (plni->plni_nbuffers > 0); if (buf->plb_posted) { LASSERT (plni->plni_nposted_buffers > 0); -#ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS +#ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS (void) PtlMDUnlink(buf->plb_md); - while (!buf->plb_posted) + while (buf->plb_posted) ptllnd_wait(ni, -1); #else while (buf->plb_posted) { @@ -215,6 +246,9 @@ ptllnd_destroy_buffers (lnet_ni_t *ni) ptllnd_destroy_buffer(buf); } + PJK_UT_MSG("nposted_buffers = %d (after)\n",plni->plni_nposted_buffers); + PJK_UT_MSG("nbuffers = %d (after)\n",plni->plni_nbuffers); + LASSERT (plni->plni_nposted_buffers == 0); LASSERT (plni->plni_nbuffers == 0); } @@ -225,7 +259,9 @@ ptllnd_create_peer_hash (lnet_ni_t *ni) ptllnd_ni_t *plni = ni->ni_data; int i; - LIBCFS_ALLOC(plni->plni_peer_hash, + plni->plni_npeers = 0; + + LIBCFS_ALLOC(plni->plni_peer_hash, plni->plni_peer_hash_size * sizeof(*plni->plni_peer_hash)); if (plni->plni_peer_hash == NULL) { CERROR("Can't allocate ptllnd peer hash (size %d)\n", @@ -235,19 +271,21 @@ ptllnd_create_peer_hash (lnet_ni_t *ni) for (i = 0; i < plni->plni_peer_hash_size; i++) CFS_INIT_LIST_HEAD(&plni->plni_peer_hash[i]); - + return 0; } -void +void ptllnd_destroy_peer_hash (lnet_ni_t *ni) { ptllnd_ni_t *plni = ni->ni_data; int i; + LASSERT( plni->plni_npeers == 0); + for (i = 0; i < plni->plni_peer_hash_size; i++) LASSERT (list_empty(&plni->plni_peer_hash[i])); - + LIBCFS_FREE(plni->plni_peer_hash, plni->plni_peer_hash_size * sizeof(*plni->plni_peer_hash)); } @@ -259,6 +297,8 @@ ptllnd_close_peers (lnet_ni_t *ni) ptllnd_peer_t *plp; int i; + PJK_UT_MSG(">>> npeers=%d\n",plni->plni_npeers); + for (i = 0; i < plni->plni_peer_hash_size; i++) while (!list_empty(&plni->plni_peer_hash[i])) { plp = list_entry(plni->plni_peer_hash[i].next, @@ -266,6 +306,8 @@ ptllnd_close_peers (lnet_ni_t *ni) ptllnd_close_peer(plp); } + + PJK_UT_MSG("<<< npeers=%d\n",plni->plni_npeers); } __u64 @@ -278,12 +320,14 @@ ptllnd_get_timestamp(void) return ((__u64)tv.tv_sec) * 1000000 + tv.tv_usec; } -void +void ptllnd_shutdown (lnet_ni_t *ni) { ptllnd_ni_t *plni = ni->ni_data; int rc; + PJK_UT_MSG(">>>\n"); + LASSERT (ptllnd_ni_count == 1); ptllnd_destroy_buffers(ni); @@ -295,16 +339,18 @@ ptllnd_shutdown (lnet_ni_t *ni) LASSERT (plni->plni_ntxs == 0); LASSERT (plni->plni_nrxs == 0); - + rc = PtlEQFree(plni->plni_eqh); LASSERT (rc == PTL_OK); - + rc = PtlNIFini(plni->plni_nih); LASSERT (rc == PTL_OK); - + ptllnd_destroy_peer_hash(ni); LIBCFS_FREE(plni, sizeof(*plni)); ptllnd_ni_count--; + + PJK_UT_MSG("<<<\n"); } int @@ -312,9 +358,11 @@ ptllnd_startup (lnet_ni_t *ni) { ptllnd_ni_t *plni; int rc; - + + PJK_UT_MSG(">>> ni=%p\n",ni); + /* could get limits from portals I guess... */ - ni->ni_maxtxcredits = + ni->ni_maxtxcredits = ni->ni_peertxcredits = 1000; if (ptllnd_ni_count != 0) { @@ -323,14 +371,14 @@ ptllnd_startup (lnet_ni_t *ni) } ptllnd_ni_count++; - + LIBCFS_ALLOC(plni, sizeof(*plni)); if (plni == NULL) { CERROR("Can't allocate ptllnd state\n"); rc = -ENOMEM; goto failed0; } - + ni->ni_data = plni; plni->plni_stamp = ptllnd_get_timestamp(); @@ -338,7 +386,7 @@ ptllnd_startup (lnet_ni_t *ni) plni->plni_ntxs = 0; CFS_INIT_LIST_HEAD(&plni->plni_active_txs); CFS_INIT_LIST_HEAD(&plni->plni_zombie_txs); - + /* * Initilize buffer related data structures */ @@ -361,6 +409,7 @@ ptllnd_startup (lnet_ni_t *ni) rc = -ENODEV; goto failed2; } + PJK_UT_MSG("plni->plni_nih=%p\n",plni->plni_nih); rc = PtlEQAlloc(plni->plni_nih, plni->plni_eq_size, PTL_EQ_HANDLER_NONE, &plni->plni_eqh); @@ -369,11 +418,38 @@ ptllnd_startup (lnet_ni_t *ni) rc = -ENODEV; goto failed3; } + PJK_UT_MSG("plni->plni_eqh=%p\n",plni->plni_eqh); + + /* + * Fetch the Portals NID + */ + if(rc != PtlGetId(plni->plni_nih,&plni->plni_portals_id)){ + CERROR ("PtlGetID failed : %d\n", rc); + rc = -EINVAL; + goto failed4; + } + + PJK_UT_MSG("lnet nid=" LPX64 " (passed in)\n",ni->ni_nid); + + /* + * Create the new NID. Based on the LND network type + * and the lower ni's address data. + */ + ni->ni_nid = ptl2lnetnid(ni,plni->plni_portals_id.nid); + + PJK_UT_MSG("ptl pid=" FMT_NID "\n",plni->plni_portals_id.pid); + PJK_UT_MSG("ptl nid=" FMT_NID "\n",plni->plni_portals_id.nid); + PJK_UT_MSG("lnet nid=" LPX64 " (passed back)\n",ni->ni_nid); + + CDEBUG(D_INFO,"ptl pid=" FMT_NID "\n",plni->plni_portals_id.pid); + CDEBUG(D_INFO,"ptl nid=" FMT_NID "\n",plni->plni_portals_id.nid); + CDEBUG(D_INFO,"lnet nid=" LPX64 "\n",ni->ni_nid); rc = ptllnd_grow_buffers(ni); if (rc != 0) goto failed4; + PJK_UT_MSG("<<<\n"); return 0; failed4: @@ -387,6 +463,42 @@ ptllnd_startup (lnet_ni_t *ni) LIBCFS_FREE(plni, sizeof(*plni)); failed0: ptllnd_ni_count--; + PJK_UT_MSG("<<< rc=%d\n",rc); return rc; } +#define DO_TYPE(x) case x: return #x; + +const char *get_ev_type_string(int type) +{ + switch(type) + { + DO_TYPE(PTL_EVENT_GET_START); + DO_TYPE(PTL_EVENT_GET_END); + DO_TYPE(PTL_EVENT_PUT_START); + DO_TYPE(PTL_EVENT_PUT_END); + DO_TYPE(PTL_EVENT_REPLY_START); + DO_TYPE(PTL_EVENT_REPLY_END); + DO_TYPE(PTL_EVENT_ACK); + DO_TYPE(PTL_EVENT_SEND_START); + DO_TYPE(PTL_EVENT_SEND_END); + DO_TYPE(PTL_EVENT_UNLINK); + default: + return ""; + } +} + +const char *get_msg_type_string(int type) +{ + switch(type) + { + DO_TYPE(PTLLND_MSG_TYPE_INVALID); + DO_TYPE(PTLLND_MSG_TYPE_PUT); + DO_TYPE(PTLLND_MSG_TYPE_GET); + DO_TYPE(PTLLND_MSG_TYPE_IMMEDIATE); + DO_TYPE(PTLLND_MSG_TYPE_HELLO); + DO_TYPE(PTLLND_MSG_TYPE_NOOP); + default: + return ""; + } +} diff --git a/lnet/ulnds/ptllnd/ptllnd.h b/lnet/ulnds/ptllnd/ptllnd.h index b1e2126..717f2c6 100644 --- a/lnet/ulnds/ptllnd/ptllnd.h +++ b/lnet/ulnds/ptllnd/ptllnd.h @@ -53,6 +53,7 @@ typedef struct ptl_handle_ni_t plni_nih; ptl_handle_eq_t plni_eqh; + ptl_process_id_t plni_portals_id; /* Portals ID of interface */ struct list_head *plni_peer_hash; int plni_npeers; @@ -88,7 +89,6 @@ typedef struct struct list_head plb_list; lnet_ni_t *plb_ni; int plb_posted; - int plb_refcount; ptl_handle_md_t plb_md; char *plb_buffer; } ptllnd_buffer_t; @@ -129,29 +129,29 @@ typedef struct #define PTLLND_EVENTARG_TYPE_MASK 0x3 static inline void * -ptllnd_obj2eventarg (void *obj, int type) +ptllnd_obj2eventarg (void *obj, int type) { unsigned long ptr = (unsigned long)obj; - + LASSERT ((ptr & PTLLND_EVENTARG_TYPE_MASK) == 0); LASSERT ((type & ~PTLLND_EVENTARG_TYPE_MASK) == 0); - + return (void *)(ptr | type); } static inline int -ptllnd_eventarg2type (void *arg) +ptllnd_eventarg2type (void *arg) { unsigned long ptr = (unsigned long)arg; - + return (ptr & PTLLND_EVENTARG_TYPE_MASK); } static inline void * -ptllnd_eventarg2obj (void *arg) +ptllnd_eventarg2obj (void *arg) { unsigned long ptr = (unsigned long)arg; - + return (void *)(ptr & ~PTLLND_EVENTARG_TYPE_MASK); } @@ -159,7 +159,7 @@ int ptllnd_startup(lnet_ni_t *ni); void ptllnd_shutdown(lnet_ni_t *ni); int ptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *msg); int ptllnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg, - int delayed, unsigned int niov, + int delayed, unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov, unsigned int offset, unsigned int mlen, unsigned int rlen); int ptllnd_eager_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg, @@ -172,9 +172,11 @@ void ptllnd_abort_txs(lnet_ni_t *ni); void ptllnd_close_peer(ptllnd_peer_t *peer); int ptllnd_post_buffer(ptllnd_buffer_t *buf); int ptllnd_grow_buffers (lnet_ni_t *ni); +const char *get_ev_type_string(int type); +const char *get_msg_type_string(int type); static inline void -ptllnd_peer_addref (ptllnd_peer_t *peer) +ptllnd_peer_addref (ptllnd_peer_t *peer) { LASSERT (peer->plp_refcount > 0); peer->plp_refcount++; @@ -193,8 +195,62 @@ static inline void ptllnd_post_tx(ptllnd_tx_t *tx) { ptllnd_peer_t *peer = tx->tx_peer; - + LASSERT(tx->tx_peer != NULL); list_add_tail(&tx->tx_list, &peer->plp_txq); ptllnd_check_sends(peer); } +static inline lnet_nid_t +ptl2lnetnid(lnet_ni_t *ni,ptl_nid_t portals_nid) +{ + return LNET_MKNID(LNET_NIDNET(ni->ni_nid), portals_nid); +} + +static inline ptl_nid_t +lnet2ptlnid(lnet_nid_t lnet_nid) +{ + return LNET_NIDADDR(lnet_nid); +} + +/* + * Define this to enable console debug logging + * and simulation + */ +//#define PJK_DEBUGGING + +#ifdef PJK_DEBUGGING + +#define PJK_UT_MSG_ALWAYS(fmt, a...) \ +do{ \ + lprintf("ptllnd:%-30s:",__FUNCTION__); \ + lprintf(fmt,## a); \ +}while(0) + +// CDEBUG(D_TRACE,fmt,## a); + + +#define PJK_UT_MSG_SIMULATION(fmt, a...) PJK_UT_MSG_ALWAYS(fmt, ## a ) + + +#if 1 +#define PJK_UT_MSG_DATA(fmt, a...) PJK_UT_MSG_ALWAYS(fmt, ## a ) +#else +#define PJK_UT_MSG_DATA(fmt, a...) do{}while(0) +#endif + +#if 1 +#define PJK_UT_MSG(fmt, a...) PJK_UT_MSG_ALWAYS(fmt, ## a ) +#else +#define PJK_UT_MSG(fmt, a...) do{}while(0) +#endif + +#else + + +#define PJK_UT_MSG_ALWAYS(fmt, a...) do{}while(0) +#define PJK_UT_MSG_SIMULATION(fmt, a...) do{}while(0) +#define PJK_UT_MSG_DATA(fmt, a...) do{}while(0) +#define PJK_UT_MSG(fmt, a...) do{}while(0) + +#endif + diff --git a/lnet/ulnds/ptllnd/ptllnd_cb.c b/lnet/ulnds/ptllnd/ptllnd_cb.c index 93b8e0f..d0763da 100644 --- a/lnet/ulnds/ptllnd/ptllnd_cb.c +++ b/lnet/ulnds/ptllnd/ptllnd_cb.c @@ -23,10 +23,10 @@ ptllnd_ptlid2str(ptl_process_id_t id) { static char strs[32][16]; static int idx = 0; - - snprintf(strs[idx], sizeof(strs[0]), + + snprintf(strs[idx], sizeof(strs[0]), "%d-"LPD64, id.pid, (__u64)id.nid); - + return strs[idx++]; } @@ -38,6 +38,7 @@ ptllnd_destroy_peer(ptllnd_peer_t *peer) LASSERT (peer->plp_closing); LASSERT (plni->plni_npeers > 0); + LASSERT (list_empty(&peer->plp_txq)); plni->plni_npeers--; LIBCFS_FREE(peer, sizeof(*peer)); } @@ -53,9 +54,6 @@ ptllnd_close_peer(ptllnd_peer_t *peer) peer->plp_closing = 1; - list_del(&peer->plp_list); - ptllnd_peer_decref(peer); - while (!list_empty(&peer->plp_txq)) { ptllnd_tx_t *tx = list_entry(peer->plp_txq.next, ptllnd_tx_t, tx_list); @@ -63,6 +61,9 @@ ptllnd_close_peer(ptllnd_peer_t *peer) list_del(&tx->tx_list); list_add_tail(&tx->tx_list, &plni->plni_zombie_txs); } + + list_del(&peer->plp_list); + ptllnd_peer_decref(peer); } ptllnd_peer_t * @@ -75,20 +76,23 @@ ptllnd_find_peer(lnet_ni_t *ni, lnet_nid_t nid, int create) ptllnd_tx_t *tx; int rc; + PJK_UT_MSG(">>> nid=" LPX64 "\n",nid); + LASSERT (LNET_NIDNET(nid) == LNET_NIDNET(ni->ni_nid)); list_for_each(tmp, &plni->plni_peer_hash[hash]) { plp = list_entry(tmp, ptllnd_peer_t, plp_list); - + if (plp->plp_nid == nid) { ptllnd_peer_addref(plp); + PJK_UT_MSG("<<< peer=%p FOUND\n",plp); return plp; } } if (!create) return NULL; - + /* New peer: check first for enough posted buffers */ plni->plni_npeers++; rc = ptllnd_grow_buffers(ni); @@ -96,7 +100,7 @@ ptllnd_find_peer(lnet_ni_t *ni, lnet_nid_t nid, int create) plni->plni_npeers--; return NULL; } - + LIBCFS_ALLOC(plp, sizeof(*plp)); if (plp == NULL) { CERROR("Can't allocate new peer %s\n", @@ -105,22 +109,27 @@ ptllnd_find_peer(lnet_ni_t *ni, lnet_nid_t nid, int create) return NULL; } + PJK_UT_MSG("new peer=%p\n",plp); + plp->plp_ni = ni; plp->plp_nid = nid; plp->plp_ptlid.nid = LNET_NIDADDR(nid); plp->plp_ptlid.pid = plni->plni_pid; plp->plp_max_credits = plp->plp_credits = 1; /* add more later when she gives me credits */ - plp->plp_max_msg_size = sizeof(kptl_msg_t); /* until I hear from her */ + plp->plp_max_msg_size = plni->plni_max_msg_size; /* until I hear from her */ plp->plp_outstanding_credits = plni->plni_peer_credits - 1; plp->plp_match = 0; plp->plp_recvd_hello = 0; plp->plp_closing = 0; plp->plp_refcount = 1; + plp->plp_seq = 0; + CFS_INIT_LIST_HEAD(&plp->plp_list); + CFS_INIT_LIST_HEAD(&plp->plp_txq); ptllnd_peer_addref(plp); list_add_tail(&plp->plp_list, &plni->plni_peer_hash[hash]); - + tx = ptllnd_new_tx(plp, PTLLND_MSG_TYPE_HELLO, 0); if (tx == NULL) { CERROR("Can't send HELLO to %s\n", libcfs_nid2str(nid)); @@ -130,11 +139,11 @@ ptllnd_find_peer(lnet_ni_t *ni, lnet_nid_t nid, int create) } tx->tx_msg.ptlm_u.hello.kptlhm_matchbits = PTL_RESERVED_MATCHBITS; - tx->tx_msg.ptlm_u.hello.kptlhm_max_immd_size = - plni->plni_max_msg_size - - offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload); + tx->tx_msg.ptlm_u.hello.kptlhm_max_msg_size = plni->plni_max_msg_size; ptllnd_post_tx(tx); + + PJK_UT_MSG("<<< peer=%p NEW\n",plp); return plp; } @@ -145,7 +154,9 @@ ptllnd_new_tx(ptllnd_peer_t *peer, int type, int payload_nob) ptllnd_ni_t *plni = ni->ni_data; ptllnd_tx_t *tx; int msgsize; - + + PJK_UT_MSG("peer=%p type=%d payload=%d\n",peer,type,payload_nob); + switch (type) { default: LBUG(); @@ -155,24 +166,24 @@ ptllnd_new_tx(ptllnd_peer_t *peer, int type, int payload_nob) LASSERT (payload_nob == 0); msgsize = 0; break; - + case PTLLND_MSG_TYPE_PUT: case PTLLND_MSG_TYPE_GET: LASSERT (payload_nob == 0); msgsize = offsetof(kptl_msg_t, ptlm_u) + sizeof(kptl_request_msg_t); break; - + case PTLLND_MSG_TYPE_IMMEDIATE: - msgsize = offsetof(kptl_msg_t, + msgsize = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[payload_nob]); break; - + case PTLLND_MSG_TYPE_NOOP: LASSERT (payload_nob == 0); msgsize = offsetof(kptl_msg_t, ptlm_u); break; - + case PTLLND_MSG_TYPE_HELLO: LASSERT (payload_nob == 0); msgsize = offsetof(kptl_msg_t, ptlm_u) + @@ -182,8 +193,10 @@ ptllnd_new_tx(ptllnd_peer_t *peer, int type, int payload_nob) LASSERT (msgsize <= peer->plp_max_msg_size); + PJK_UT_MSG("msgsize=%d\n",msgsize); + LIBCFS_ALLOC(tx, offsetof(ptllnd_tx_t, tx_msg) + msgsize); - + if (tx == NULL) { CERROR("Can't allocate msg type %d for %s\n", type, libcfs_nid2str(peer->plp_nid)); @@ -199,6 +212,7 @@ ptllnd_new_tx(ptllnd_peer_t *peer, int type, int payload_nob) tx->tx_reqmdh = PTL_INVALID_HANDLE; tx->tx_bulkmdh = PTL_INVALID_HANDLE; tx->tx_msgsize = msgsize; + tx->tx_completing = 0; tx->tx_status = 0; if (msgsize != 0) { @@ -214,10 +228,12 @@ ptllnd_new_tx(ptllnd_peer_t *peer, int type, int payload_nob) tx->tx_msg.ptlm_dststamp = peer->plp_stamp; tx->tx_msg.ptlm_seq = peer->plp_seq++; } - + ptllnd_peer_addref(peer); plni->plni_ntxs++; + PJK_UT_MSG("tx=%p\n",tx); + return tx; } @@ -251,9 +267,16 @@ ptllnd_tx_done(ptllnd_tx_t *tx) * events for this tx until it's unlinked. So I set tx_completing to * flag the tx is getting handled */ + PJK_UT_MSG(">>> tx=%p peer=%p\n",tx,peer); + PJK_UT_MSG("completing=%d\n",tx->tx_completing); + PJK_UT_MSG("status=%d\n",tx->tx_status); + PJK_UT_MSG("niov=%d\n",tx->tx_niov); + PJK_UT_MSG("lnetreplymsg=%p\n",tx->tx_lnetreplymsg); + PJK_UT_MSG("lnetmsg=%p\n",tx->tx_lnetmsg); + if (tx->tx_completing) return; - + tx->tx_completing = 1; if (!list_empty(&tx->tx_list)) @@ -261,7 +284,7 @@ ptllnd_tx_done(ptllnd_tx_t *tx) if (tx->tx_status != 0) ptllnd_close_peer(peer); - + ptllnd_abort_tx(tx, &tx->tx_reqmdh); ptllnd_abort_tx(tx, &tx->tx_bulkmdh); @@ -275,6 +298,7 @@ ptllnd_tx_done(ptllnd_tx_t *tx) LASSERT (tx->tx_lnetmsg != NULL); /* Simulate GET success always */ lnet_finalize(ni, tx->tx_lnetmsg, 0); + PJK_UT_MSG("lnet_finalize(tx_lnetreplymsg=%p)\n",tx->tx_lnetreplymsg); lnet_finalize(ni, tx->tx_lnetreplymsg, tx->tx_status); } else if (tx->tx_lnetmsg != NULL) { lnet_finalize(ni, tx->tx_lnetmsg, tx->tx_status); @@ -285,13 +309,15 @@ ptllnd_tx_done(ptllnd_tx_t *tx) LASSERT (plni->plni_ntxs > 0); plni->plni_ntxs--; LIBCFS_FREE(tx, offsetof(ptllnd_tx_t, tx_msg) + tx->tx_msgsize); + + PJK_UT_MSG("<<< tx=%p\n",tx); } void ptllnd_abort_txs(lnet_ni_t *ni) { ptllnd_ni_t *plni = ni->ni_data; - + while (!list_empty(&plni->plni_active_txs)) { ptllnd_tx_t *tx = list_entry(plni->plni_active_txs.next, ptllnd_tx_t, tx_list); @@ -312,7 +338,7 @@ ptllnd_set_txiov(ptllnd_tx_t *tx, tx->tx_niov = 0; return 0; } - + for (;;) { LASSERT (niov > 0); if (offset < iov->iov_len) @@ -330,10 +356,10 @@ ptllnd_set_txiov(ptllnd_tx_t *tx, for (npiov = 0;; npiov++) { LASSERT (npiov < niov); LASSERT (iov->iov_len >= offset); - + piov[npiov].iov_base = iov[npiov].iov_base + offset; piov[npiov].iov_len = iov[npiov].iov_len - offset; - + if (piov[npiov].iov_len >= len) { piov[npiov].iov_len = len; npiov++; @@ -348,7 +374,7 @@ ptllnd_set_txiov(ptllnd_tx_t *tx, tx->tx_iov = piov; return 0; } - + /* Dang! The piov I allocated was too big and it's a drag to * have to maintain separate 'allocated' and 'used' sizes, so * I'll just do it again; NB this doesn't happen normally... */ @@ -362,7 +388,7 @@ ptllnd_set_md_buffer(ptl_md_t *md, ptllnd_tx_t *tx) { unsigned int niov = tx->tx_niov; ptl_md_iovec_t *iov = tx->tx_iov; - + LASSERT ((md->options & PTL_MD_IOVEC) == 0); if (niov == 0) { @@ -391,7 +417,7 @@ ptllnd_post_buffer(ptllnd_buffer_t *buf) .length = plni->plni_buffer_size, .threshold = PTL_MD_THRESH_INF, .max_size = plni->plni_max_msg_size, - .options = (PTLLND_MD_OPTIONS | + .options = (PTLLND_MD_OPTIONS | PTL_MD_OP_PUT | PTL_MD_MAX_SIZE), .user_ptr = ptllnd_obj2eventarg(buf, PTLLND_EVENTARG_TYPE_BUF), .eq_handle = plni->plni_eqh}; @@ -399,8 +425,8 @@ ptllnd_post_buffer(ptllnd_buffer_t *buf) int rc; LASSERT (!buf->plb_posted); - - rc = PtlMEAttach(plni->plni_nih, plni->plni_portal, + + rc = PtlMEAttach(plni->plni_nih, plni->plni_portal, anyid, LNET_MSG_MATCHBITS, 0, PTL_UNLINK, PTL_INS_AFTER, &meh); if (rc != PTL_OK) { @@ -410,19 +436,19 @@ ptllnd_post_buffer(ptllnd_buffer_t *buf) buf->plb_posted = 1; plni->plni_nposted_buffers++; - + rc = PtlMDAttach(meh, md, LNET_UNLINK, &buf->plb_md); if (rc == PTL_OK) return 0; CERROR("PtlMDAttach failed: %d\n", rc); - + buf->plb_posted = 0; plni->plni_nposted_buffers--; rc = PtlMEUnlink(meh); LASSERT (rc == PTL_OK); - + return -ENOMEM; } @@ -435,12 +461,16 @@ ptllnd_check_sends(ptllnd_peer_t *peer) ptl_md_t md; ptl_handle_md_t mdh; int rc; - + + PJK_UT_MSG(">>> peer=%p\n",peer); + PJK_UT_MSG("plp_outstanding_credits=%d\n",peer->plp_outstanding_credits); + if (list_empty(&peer->plp_txq) && - peer->plp_outstanding_credits >= + peer->plp_outstanding_credits >= PTLLND_CREDIT_HIGHWATER(plni)) { tx = ptllnd_new_tx(peer, PTLLND_MSG_TYPE_NOOP, 0); + PJK_UT_MSG("NOOP tx=%p\n",tx); if (tx == NULL) { CERROR("Can't return credits to %s\n", libcfs_nid2str(peer->plp_nid)); @@ -451,9 +481,13 @@ ptllnd_check_sends(ptllnd_peer_t *peer) while (!list_empty(&peer->plp_txq)) { tx = list_entry(peer->plp_txq.next, ptllnd_tx_t, tx_list); - + + PJK_UT_MSG("Looking at TX=%p\n",tx); + PJK_UT_MSG("plp_credits=%d\n",peer->plp_credits); + PJK_UT_MSG("plp_outstanding_credits=%d\n",peer->plp_outstanding_credits); + LASSERT (tx->tx_msgsize > 0); - + LASSERT (peer->plp_outstanding_credits >= 0); LASSERT (peer->plp_outstanding_credits <= plni->plni_peer_credits); @@ -466,9 +500,11 @@ ptllnd_check_sends(ptllnd_peer_t *peer) if (peer->plp_credits == 1 && /* last credit reserved for */ peer->plp_outstanding_credits == 0) /* returning credits */ break; - + list_del_init(&tx->tx_list); - + + PJK_UT_MSG("Sending at TX=%p type=%d\n",tx,tx->tx_type); + if (tx->tx_type == PTLLND_MSG_TYPE_NOOP && (!list_empty(&peer->plp_txq) || peer->plp_outstanding_credits < @@ -478,13 +514,34 @@ ptllnd_check_sends(ptllnd_peer_t *peer) continue; } + /* + * We need to set the stamp here because a hello + * could have modified peer->plp_stamp, after it + * was set to into the TX + */ + tx->tx_msg.ptlm_dststamp = peer->plp_stamp; + + PJK_UT_MSG("Returning %d to peer\n",peer->plp_outstanding_credits); + + /* + * Return all the credits we have + */ + tx->tx_msg.ptlm_credits = peer->plp_outstanding_credits; + peer->plp_outstanding_credits = 0; + + /* + * One less credit + */ + peer->plp_credits--; + + md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX); md.eq_handle = plni->plni_eqh; md.threshold = 1; md.options = PTLLND_MD_OPTIONS; md.start = &tx->tx_msg; md.length = tx->tx_msgsize; - + rc = PtlMDBind(plni->plni_nih, md, LNET_UNLINK, &mdh); if (rc != PTL_OK) { CERROR("PtlMDBind for %s failed: %d\n", @@ -493,7 +550,7 @@ ptllnd_check_sends(ptllnd_peer_t *peer) ptllnd_tx_done(tx); break; } - + tx->tx_reqmdh = mdh; rc = PtlPut(mdh, PTL_NOACK_REQ, peer->plp_ptlid, plni->plni_portal, 0, LNET_MSG_MATCHBITS, 0, 0); @@ -507,6 +564,8 @@ ptllnd_check_sends(ptllnd_peer_t *peer) list_add_tail(&tx->tx_list, &plni->plni_active_txs); } + + PJK_UT_MSG("<<< peer=%p\n",peer); } int @@ -524,6 +583,10 @@ ptllnd_passive_rdma(ptllnd_peer_t *peer, int type, lnet_msg_t *msg, int rc; int rc2; + PJK_UT_MSG(">>> peer=%p type=%s(%d) tx=%p\n",peer, + type == PTLLND_MSG_TYPE_GET ? "GET" : "PUT/REPLY",type,tx); + PJK_UT_MSG("niov=%d offset=%d len=%d\n",niov,offset,len); + LASSERT (type == PTLLND_MSG_TYPE_GET || type == PTLLND_MSG_TYPE_PUT); @@ -536,7 +599,7 @@ ptllnd_passive_rdma(ptllnd_peer_t *peer, int type, lnet_msg_t *msg, rc = ptllnd_set_txiov(tx, niov, iov, offset, len); if (rc != 0) { - CERROR ("Can't allocate iov %d for %s\n", + CERROR ("Can't allocate iov %d for %s\n", niov, libcfs_nid2str(peer->plp_nid)); rc = -ENOMEM; goto failed; @@ -545,30 +608,47 @@ ptllnd_passive_rdma(ptllnd_peer_t *peer, int type, lnet_msg_t *msg, md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX); md.eq_handle = plni->plni_eqh; md.threshold = 1; - md.options = PTLLND_MD_OPTIONS | - (type == PTLLND_MSG_TYPE_GET) ? PTL_MD_OP_PUT : PTL_MD_OP_GET; + md.max_size = 0; + md.options = PTLLND_MD_OPTIONS; + if(type == PTLLND_MSG_TYPE_GET) + md.options |= PTL_MD_OP_PUT | PTL_MD_ACK_DISABLE; + else + md.options |= PTL_MD_OP_GET; ptllnd_set_md_buffer(&md, tx); while (!peer->plp_recvd_hello) { /* wait to validate plp_match */ + PJK_UT_MSG("Wait For Hello\n"); if (peer->plp_closing) { rc = -EIO; goto failed; } ptllnd_wait(ni, -1); } - + + if(peer->plp_match < PTL_RESERVED_MATCHBITS) + peer->plp_match = PTL_RESERVED_MATCHBITS; matchbits = peer->plp_match++; - LASSERT (matchbits >= PTL_RESERVED_MATCHBITS); - - rc = PtlMEAttach(plni->plni_nih, plni->plni_portal, peer->plp_ptlid, - matchbits, 0, PTL_UNLINK, PTL_INS_AFTER, &meh); + PJK_UT_MSG("matchbits " LPX64 "\n",matchbits); + PJK_UT_MSG("nid " FMT_NID " pid=%d\n",peer->plp_ptlid.nid,peer->plp_ptlid.pid); + + rc = PtlMEAttach(plni->plni_nih, plni->plni_portal, peer->plp_ptlid, + matchbits, 0, PTL_UNLINK, PTL_INS_BEFORE, &meh); if (rc != PTL_OK) { - CERROR("PtlMEAttach for %s failed: %d\n", + CERROR("PtlMEAttach for %s failed: %d\n", libcfs_nid2str(peer->plp_nid), rc); rc = -EIO; goto failed; } +/* + PJK_UT_MSG("md.start=%p\n",md.start); + PJK_UT_MSG("md.length=%d\n",md.length); + PJK_UT_MSG("md.threshold=%d\n",md.threshold); + PJK_UT_MSG("md.max_size=%d\n",md.max_size); + PJK_UT_MSG("md.options=0x%x\n",md.options); + PJK_UT_MSG("md.user_ptr=%p\n",md.user_ptr); + PJK_UT_MSG("md.eq_handle=%p\n",md.eq_handle); +*/ rc = PtlMDAttach(meh, md, LNET_UNLINK, &mdh); if (rc != PTL_OK) { CERROR("PtlMDAttach for %s failed: %d\n", @@ -580,6 +660,13 @@ ptllnd_passive_rdma(ptllnd_peer_t *peer, int type, lnet_msg_t *msg, } tx->tx_bulkmdh = mdh; + /* + * We need to set the stamp here because it + * we could have received a HELLO above that set + * peer->plp_stamp + */ + tx->tx_msg.ptlm_dststamp = peer->plp_stamp; + tx->tx_msg.ptlm_u.req.kptlrm_hdr = msg->msg_hdr; tx->tx_msg.ptlm_u.req.kptlrm_matchbits = matchbits; @@ -592,13 +679,15 @@ ptllnd_passive_rdma(ptllnd_peer_t *peer, int type, lnet_msg_t *msg, goto failed; } } - + tx->tx_lnetmsg = msg; ptllnd_post_tx(tx); + PJK_UT_MSG("<<<\n"); return 0; - + failed: ptllnd_tx_done(tx); + PJK_UT_MSG("<<< rc=%d\n",rc); return rc; } @@ -615,9 +704,13 @@ ptllnd_active_rdma(ptllnd_peer_t *peer, int type, ptl_handle_md_t mdh; int rc; + PJK_UT_MSG(">>> peer=%p type=%d tx=%p\n",peer,type,tx); + PJK_UT_MSG("niov=%d offset=%d len=%d\n",niov,offset,len); + PJK_UT_MSG("matchbits " LPX64 "\n",matchbits); + LASSERT (type == PTLLND_RDMA_READ || type == PTLLND_RDMA_WRITE); - + if (tx == NULL) { CERROR("Can't allocate tx for RDMA %s with %s\n", (type == PTLLND_RDMA_WRITE) ? "write" : "read", @@ -628,7 +721,7 @@ ptllnd_active_rdma(ptllnd_peer_t *peer, int type, rc = ptllnd_set_txiov(tx, niov, iov, offset, len); if (rc != 0) { - CERROR ("Can't allocate iov %d for %s\n", + CERROR ("Can't allocate iov %d for %s\n", niov, libcfs_nid2str(peer->plp_nid)); rc = -ENOMEM; goto failed; @@ -636,11 +729,18 @@ ptllnd_active_rdma(ptllnd_peer_t *peer, int type, md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX); md.eq_handle = plni->plni_eqh; - /* If I've received a PUT, I fetch the data */ - md.threshold = (type == PTLLND_MSG_TYPE_PUT) ? 2 : 1; + md.threshold = 1; + md.max_size = 0; md.options = PTLLND_MD_OPTIONS; + if(type == PTLLND_RDMA_READ){ + md.options |= PTL_MD_OP_GET; + md.threshold ++; + } + else{ + md.options |= PTL_MD_OP_PUT | PTL_MD_ACK_DISABLE; + } ptllnd_set_md_buffer(&md, tx); - + rc = PtlMDBind(plni->plni_nih, md, LNET_UNLINK, &mdh); if (rc != PTL_OK) { CERROR("PtlMDBind for %s failed: %d\n", @@ -651,24 +751,27 @@ ptllnd_active_rdma(ptllnd_peer_t *peer, int type, tx->tx_bulkmdh = mdh; tx->tx_lnetmsg = msg; - - if (type == PTLLND_MSG_TYPE_PUT) + + if (type == PTLLND_RDMA_READ) rc = PtlGet(mdh, peer->plp_ptlid, plni->plni_portal, 0, matchbits, 0); else rc = PtlPut(mdh, PTL_NOACK_REQ, peer->plp_ptlid, plni->plni_portal, 0, matchbits, 0, 0); - if (rc == 0) + if (rc == 0){ + PJK_UT_MSG("<<<\n"); return 0; - + } + tx->tx_lnetmsg = NULL; failed: tx->tx_status = rc; ptllnd_tx_done(tx); /* this will close peer */ + PJK_UT_MSG("<<< rc=%d\n",rc); return rc; } -int +int ptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *msg) { ptllnd_ni_t *plni = ni->ni_data; @@ -681,20 +784,30 @@ ptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *msg) LASSERT (msg->msg_kiov == NULL); LASSERT (msg->msg_niov <= PTL_MD_MAX_IOV); /* !!! */ - + + PJK_UT_MSG("msg=%p nid=" LPX64 "\n",msg,msg->msg_target.nid); + PJK_UT_MSG("is_target_router=%d\n",msg->msg_target_is_router); + PJK_UT_MSG("msg_niov=%d\n",msg->msg_niov); + PJK_UT_MSG("msg_offset=%d\n",msg->msg_offset); + PJK_UT_MSG("msg_len=%d\n",msg->msg_len); + plp = ptllnd_find_peer(ni, msg->msg_target.nid, 1); if (plp == NULL) return -ENOMEM; - + switch (msg->msg_type) { default: LBUG(); - + case LNET_MSG_ACK: + PJK_UT_MSG("LNET_MSG_ACK\n"); + LASSERT (msg->msg_len == 0); break; /* send IMMEDIATE */ - + case LNET_MSG_GET: + PJK_UT_MSG("LNET_MSG_GET nob=%d\n",msg->msg_md->md_length); + if (msg->msg_target_is_router) break; /* send IMMEDIATE */ @@ -709,26 +822,32 @@ ptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *msg) msg->msg_md->md_iov.iov, 0, msg->msg_md->md_length); ptllnd_peer_decref(plp); + PJK_UT_MSG("<<< rc=%d\n",rc); return rc; case LNET_MSG_REPLY: { ptllnd_rx_t *rx = private; /* incoming GET */ - __u64 match; - LASSERT (rx != NULL); - match = rx->rx_msg->ptlm_u.req.kptlrm_matchbits; - + + PJK_UT_MSG("LNET_MSG_REPLY rx=%p\n",rx); + if (rx->rx_msg->ptlm_type == PTLLND_MSG_TYPE_GET) { + __u64 matchbits; + + matchbits = rx->rx_msg->ptlm_u.req.kptlrm_matchbits; + PJK_UT_MSG("matchbits="LPX64"\n",matchbits); + LASSERT (!rx->rx_replied); rc = ptllnd_active_rdma(plp, PTLLND_RDMA_WRITE, msg, - match, - msg->msg_niov, msg->msg_iov, + matchbits, + msg->msg_niov, msg->msg_iov, msg->msg_offset, msg->msg_len); rx->rx_replied = (rc == 0); ptllnd_peer_decref(plp); + PJK_UT_MSG("<<< rc=%d\n",rc); return rc; } - + if (rx->rx_msg->ptlm_type != PTLLND_MSG_TYPE_IMMEDIATE) { CERROR("Reply to %s bad msg type %x!!!\n", libcfs_id2str(msg->msg_target), @@ -739,10 +858,12 @@ ptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *msg) /* fall through to handle like PUT */ } - + case LNET_MSG_PUT: + PJK_UT_MSG("LNET_MSG_PUT nob=%d\n",msg->msg_len); nob = msg->msg_len; nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[nob]); + PJK_UT_MSG("msg_size=%d max=%d\n",msg->msg_len,plp->plp_max_msg_size); if (nob <= plp->plp_max_msg_size) break; /* send IMMEDIATE */ @@ -750,12 +871,14 @@ ptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *msg) msg->msg_niov, msg->msg_iov, msg->msg_offset, msg->msg_len); ptllnd_peer_decref(plp); + PJK_UT_MSG("<<< rc=%d\n",rc); return rc; } - /* send IMMEDIATE + /* send IMMEDIATE * NB copy the payload so we don't have to do a fragmented send */ - + + PJK_UT_MSG("IMMEDIATE len=%d\n", msg->msg_len); tx = ptllnd_new_tx(plp, PTLLND_MSG_TYPE_IMMEDIATE, msg->msg_len); if (tx == NULL) { CERROR("Can't allocate tx for lnet type %d to %s\n", @@ -763,16 +886,17 @@ ptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *msg) ptllnd_peer_decref(plp); return -ENOMEM; } - + lnet_copy_iov2flat(tx->tx_msgsize, &tx->tx_msg, offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload), - msg->msg_niov, msg->msg_iov, msg->msg_offset, + msg->msg_niov, msg->msg_iov, msg->msg_offset, msg->msg_len); tx->tx_msg.ptlm_u.immediate.kptlim_hdr = msg->msg_hdr; tx->tx_lnetmsg = msg; ptllnd_post_tx(tx); ptllnd_peer_decref(plp); + PJK_UT_MSG("<<<\n"); return 0; } @@ -782,10 +906,12 @@ ptllnd_rx_done(ptllnd_rx_t *rx) ptllnd_peer_t *plp = rx->rx_peer; lnet_ni_t *ni = plp->plp_ni; ptllnd_ni_t *plni = ni->ni_data; - + + PJK_UT_MSG("rx=%p\n", rx); + plp->plp_outstanding_credits++; ptllnd_check_sends(rx->rx_peer); - + if (rx->rx_msg != (kptl_msg_t *)rx->rx_space) LIBCFS_FREE(rx, offsetof(ptllnd_rx_t, rx_space[rx->rx_nob])); @@ -793,30 +919,34 @@ ptllnd_rx_done(ptllnd_rx_t *rx) plni->plni_nrxs--; } -int +int ptllnd_eager_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg, void **new_privatep) { ptllnd_rx_t *stackrx = private; ptllnd_rx_t *heaprx; + PJK_UT_MSG("rx=%p (stack)\n", stackrx); + /* Don't ++plni_nrxs: heaprx replaces stackrx */ LASSERT (stackrx->rx_msg != (kptl_msg_t *)stackrx->rx_space); - + LIBCFS_ALLOC(heaprx, offsetof(ptllnd_rx_t, rx_space[stackrx->rx_nob])); if (heaprx == NULL) return -ENOMEM; + PJK_UT_MSG("rx=%p (new heap)\n", stackrx); + heaprx->rx_msg = (kptl_msg_t *)heaprx->rx_space; memcpy(&heaprx->rx_msg, stackrx->rx_msg, stackrx->rx_nob); return 0; } -int +int ptllnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg, - int delayed, unsigned int niov, + int delayed, unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov, unsigned int offset, unsigned int mlen, unsigned int rlen) { @@ -827,12 +957,19 @@ ptllnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg, LASSERT (kiov == NULL); LASSERT (niov <= PTL_MD_MAX_IOV); /* !!! */ + PJK_UT_MSG(">>> msg=%p\n",msg); + PJK_UT_MSG("rx=%p rx_nob=%d\n",rx,rx->rx_nob); + PJK_UT_MSG("niov=%d\n",niov); + PJK_UT_MSG("offset=%d\n",offset); + PJK_UT_MSG("mlen=%d rlen=%d\n",mlen,rlen); + switch (rx->rx_msg->ptlm_type) { default: LBUG(); case PTLLND_MSG_TYPE_IMMEDIATE: - nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[rlen]); + nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[mlen]); + PJK_UT_MSG("PTLLND_MSG_TYPE_IMMEDIATE nob=%d\n",nob); if (nob > rx->rx_nob) { CERROR("Immediate message from %s too big: %d(%d)\n", libcfs_nid2str(rx->rx_peer->plp_nid), @@ -846,21 +983,24 @@ ptllnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg, mlen); lnet_finalize(ni, msg, 0); break; - + case PTLLND_MSG_TYPE_PUT: - rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_READ, msg, + PJK_UT_MSG("PTLLND_MSG_TYPE_PUT offset=%d mlen=%d\n",offset,mlen); + rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_READ, msg, rx->rx_msg->ptlm_u.req.kptlrm_matchbits, niov, iov, offset, mlen); break; - + case PTLLND_MSG_TYPE_GET: + PJK_UT_MSG("PTLLND_MSG_TYPE_GET\n"); LASSERT (msg == NULL); /* no need to finalize */ if (!rx->rx_replied) /* peer will time out */ ptllnd_close_peer(rx->rx_peer); break; } - + ptllnd_rx_done(rx); + PJK_UT_MSG("<<< rc=%d\n",rc); return rc; } @@ -874,7 +1014,10 @@ ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator, int flip; ptllnd_peer_t *plp; int rc; - + + + PJK_UT_MSG(">>> initiator =" LPX64 " nob=%d\n",initiator.nid,nob); + if (nob < basenob) { CERROR("Short receive from %s\n", ptllnd_ptlid2str(initiator)); @@ -883,11 +1026,11 @@ ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator, flip = msg->ptlm_magic == __swab32(PTLLND_MSG_MAGIC); if (!flip && msg->ptlm_magic != PTLLND_MSG_MAGIC) { - CERROR("Bad magic %08x from %s\n", msg->ptlm_magic, + CERROR("Bad magic %08x from %s\n", msg->ptlm_magic, ptllnd_ptlid2str(initiator)); return; } - + if (flip) { /* NB stamps are opaque cookies */ __swab16s(&msg->ptlm_version); @@ -897,9 +1040,9 @@ ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator, __swab64s(&msg->ptlm_dstnid); __swab64s(&msg->ptlm_seq); } - + if (msg->ptlm_version != PTLLND_MSG_VERSION) { - CERROR("Bad version %d from %s\n", (__u32)msg->ptlm_version, + CERROR("Bad version %d from %s\n", (__u32)msg->ptlm_version, ptllnd_ptlid2str(initiator)); return; } @@ -911,7 +1054,7 @@ ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator, libcfs_nid2str(msg->ptlm_srcnid)); return; } - + if (msg->ptlm_dststamp != plni->plni_stamp) { CERROR("Bad dststamp "LPX64"("LPX64" expected) from %s\n", msg->ptlm_dststamp, plni->plni_stamp, @@ -922,6 +1065,8 @@ ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator, switch (msg->ptlm_type) { case PTLLND_MSG_TYPE_PUT: case PTLLND_MSG_TYPE_GET: + PJK_UT_MSG("PTLLND_MSG_TYPE_%s\n", + msg->ptlm_type==PTLLND_MSG_TYPE_PUT ? "PUT" : "GET"); if (nob < basenob + sizeof(kptl_request_msg_t)) { CERROR("Short rdma request from %s(%s)\n", libcfs_nid2str(msg->ptlm_srcnid), @@ -933,7 +1078,8 @@ ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator, break; case PTLLND_MSG_TYPE_IMMEDIATE: - if (nob < offsetof(kptl_msg_t, + PJK_UT_MSG("PTLLND_MSG_TYPE_IMMEDIATE\n"); + if (nob < offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload)) { CERROR("Short immediate from %s(%s)\n", libcfs_nid2str(msg->ptlm_srcnid), @@ -941,18 +1087,21 @@ ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator, return; } break; - + case PTLLND_MSG_TYPE_HELLO: + PJK_UT_MSG("PTLLND_MSG_TYPE_HELLO\n"); if (nob < basenob + sizeof(kptl_hello_msg_t)) { CERROR("Short hello from %s(%s)\n", libcfs_nid2str(msg->ptlm_srcnid), ptllnd_ptlid2str(initiator)); return; } - __swab64s(&msg->ptlm_u.hello.kptlhm_matchbits); - __swab32s(&msg->ptlm_u.hello.kptlhm_max_immd_size); + if(flip){ + __swab64s(&msg->ptlm_u.hello.kptlhm_matchbits); + __swab32s(&msg->ptlm_u.hello.kptlhm_max_msg_size); + } break; - + default: CERROR("Bad message type %d from %s(%s)\n", msg->ptlm_type, libcfs_nid2str(msg->ptlm_srcnid), @@ -963,14 +1112,12 @@ ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator, plp = ptllnd_find_peer(ni, msg->ptlm_srcnid, msg->ptlm_type == PTLLND_MSG_TYPE_HELLO); if (plp == NULL) { - CERROR("Can't find peer %s\n", + CERROR("Can't find peer %s\n", libcfs_nid2str(msg->ptlm_srcnid)); return; } - + if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) { - int n; - if (plp->plp_recvd_hello) { CERROR("Unexpected HELLO from %s\n", libcfs_nid2str(msg->ptlm_srcnid)); @@ -978,14 +1125,19 @@ ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator, return; } - n = msg->ptlm_u.hello.kptlhm_max_immd_size; - n = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[n]); - plp->plp_max_msg_size = MAX(sizeof(kptl_msg_t), n); + PJK_UT_MSG("kptlhm_max_msg_size=%d\n",msg->ptlm_u.hello.kptlhm_max_msg_size); + PJK_UT_MSG("kptlhm_matchbits="LPX64"\n",msg->ptlm_u.hello.kptlhm_matchbits); + PJK_UT_MSG("ptlm_srcstamp="LPX64"\n",msg->ptlm_srcstamp); + + plp->plp_max_msg_size = MAX(plni->plni_max_msg_size, + msg->ptlm_u.hello.kptlhm_max_msg_size); plp->plp_match = msg->ptlm_u.hello.kptlhm_matchbits; plp->plp_stamp = msg->ptlm_srcstamp; plp->plp_max_credits += msg->ptlm_credits; plp->plp_recvd_hello = 1; + PJK_UT_MSG("plp_max_msg_size=%d\n",plp->plp_max_msg_size); + } else if (!plp->plp_recvd_hello) { CERROR("Bad message type %d (HELLO expected) from %s\n", @@ -995,14 +1147,15 @@ ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator, } else if (msg->ptlm_srcstamp != plp->plp_stamp) { - CERROR("Bad srcstamp "LPX64"("LPX64" expected) from %s\n", + CERROR("Bad srcstamp "LPX64"("LPX64" expected) from %s\n", msg->ptlm_srcstamp, plp->plp_stamp, libcfs_nid2str(msg->ptlm_srcnid)); ptllnd_peer_decref(plp); return; } - + if (msg->ptlm_credits > 0) { + PJK_UT_MSG("Getting back %d credits from peer\n",msg->ptlm_credits); if (plp->plp_credits + msg->ptlm_credits > plp->plp_max_credits) { CWARN("Too many credits from %s: %d + %d > %d\n", @@ -1015,60 +1168,85 @@ ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator, } ptllnd_check_sends(plp); } - + /* All OK so far; assume the message is good... */ - rx.rx_peer = plp; - rx.rx_msg = msg; - rx.rx_nob = nob; + rx.rx_peer = plp; + rx.rx_msg = msg; + rx.rx_nob = nob; + rx.rx_replied = 0; plni->plni_nrxs++; - + + PJK_UT_MSG("rx=%p type=%d\n",&rx,msg->ptlm_type); + switch (msg->ptlm_type) { default: /* message types have been checked already */ ptllnd_rx_done(&rx); break; - + case PTLLND_MSG_TYPE_PUT: case PTLLND_MSG_TYPE_GET: + PJK_UT_MSG("PTLLND_MSG_TYPE_%s\n", + msg->ptlm_type==PTLLND_MSG_TYPE_PUT ? "PUT" : "GET"); rc = lnet_parse(ni, &msg->ptlm_u.req.kptlrm_hdr, msg->ptlm_srcnid, &rx); + PJK_UT_MSG("lnet_parse rc=%d\n",rc); if (rc < 0) ptllnd_rx_done(&rx); break; case PTLLND_MSG_TYPE_IMMEDIATE: + PJK_UT_MSG("PTLLND_MSG_TYPE_IMMEDIATE\n"); rc = lnet_parse(ni, &msg->ptlm_u.immediate.kptlim_hdr, msg->ptlm_srcnid, &rx); + PJK_UT_MSG("lnet_parse rc=%d\n",rc); if (rc < 0) ptllnd_rx_done(&rx); break; } ptllnd_peer_decref(plp); + + PJK_UT_MSG("<<<\n"); } void ptllnd_buf_event (lnet_ni_t *ni, ptl_event_t *event) { ptllnd_buffer_t *buf = ptllnd_eventarg2obj(event->md.user_ptr); + ptllnd_ni_t *plni = ni->ni_data; char *msg = &buf->plb_buffer[event->offset]; int repost; + int unlinked = event->type == PTL_EVENT_UNLINK; LASSERT (buf->plb_ni == ni); LASSERT (event->type == PTL_EVENT_PUT_END || event->type == PTL_EVENT_UNLINK); + PJK_UT_MSG("buf=%p event=%d\n",buf,event->type); + if (event->type == PTL_EVENT_PUT_END) - ptllnd_parse_request(ni, event->initiator, + ptllnd_parse_request(ni, event->initiator, (kptl_msg_t *)msg, event->mlength); #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS /* UNLINK event only on explicit unlink */ repost = (event->unlinked && event->type != PTL_EVENT_UNLINK); + if(event->unlinked) + unlinked = 1; #else /* UNLINK event only on implicit unlink */ repost = (event->type == PTL_EVENT_UNLINK); #endif + + PJK_UT_MSG("repost=%d unlinked=%d\n",repost,unlinked); + + if(unlinked){ + LASSERT(buf->plb_posted); + buf->plb_posted = 0; + plni->plni_nposted_buffers--; + } + if (repost) (void) ptllnd_post_buffer(buf); } @@ -1082,21 +1260,29 @@ ptllnd_tx_event (lnet_ni_t *ni, ptl_event_t *event) int isreq; int isbulk; #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS - int exhausted = event->unlinked; + int unlinked = event->unlinked; #else - int exhausted = (event->type == PTL_EVENT_UNLINK); + int unlinked = (event->type == PTL_EVENT_UNLINK); #endif LASSERT (!PtlHandleIsEqual(event->md_handle, PTL_INVALID_HANDLE)); + PJK_UT_MSG("tx=%p type=%s (%d)\n",tx, + get_msg_type_string(tx->tx_type),tx->tx_type); + PJK_UT_MSG("unlinked=%d\n",unlinked); + PJK_UT_MSG("error=%d\n",error); + isreq = PtlHandleIsEqual(event->md_handle, tx->tx_reqmdh); + PJK_UT_MSG("isreq=%d\n",isreq); if (isreq) { LASSERT (event->md.start == (void *)&tx->tx_msg); - if (exhausted) + if (unlinked) tx->tx_reqmdh = PTL_INVALID_HANDLE; } - + + isbulk = PtlHandleIsEqual(event->md_handle, tx->tx_bulkmdh); + PJK_UT_MSG("isbulk=%d\n",isbulk); if (isbulk) { void *ptr; @@ -1104,18 +1290,18 @@ ptllnd_tx_event (lnet_ni_t *ni, ptl_event_t *event) ptr = tx->tx_iov[0].iov_base; else ptr = tx->tx_iov; - + LASSERT (event->md.start == ptr); - if (exhausted) + if (unlinked) tx->tx_bulkmdh = PTL_INVALID_HANDLE; } LASSERT (!isreq != !isbulk); /* always one and only 1 match */ - + switch (tx->tx_type) { default: LBUG(); - + case PTLLND_MSG_TYPE_NOOP: case PTLLND_MSG_TYPE_HELLO: case PTLLND_MSG_TYPE_IMMEDIATE: @@ -1135,20 +1321,20 @@ ptllnd_tx_event (lnet_ni_t *ni, ptl_event_t *event) event->mlength; } break; - + case PTLLND_MSG_TYPE_PUT: LASSERT (event->type == PTL_EVENT_UNLINK || (isreq && event->type == PTL_EVENT_SEND_END) || (isbulk && event->type == PTL_EVENT_GET_END)); break; - + case PTLLND_RDMA_READ: LASSERT (event->type == PTL_EVENT_UNLINK || event->type == PTL_EVENT_SEND_END || event->type == PTL_EVENT_REPLY_END); LASSERT (isbulk); break; - + case PTLLND_RDMA_WRITE: LASSERT (event->type == PTL_EVENT_UNLINK || event->type == PTL_EVENT_SEND_END); @@ -1163,6 +1349,7 @@ ptllnd_tx_event (lnet_ni_t *ni, ptl_event_t *event) tx->tx_status = -EIO; list_del(&tx->tx_list); list_add_tail(&tx->tx_list, &plni->plni_zombie_txs); + PJK_UT_MSG("tx=%p ONTO ZOMBIE LIST\n",tx); } } @@ -1178,6 +1365,8 @@ ptllnd_wait (lnet_ni_t *ni, int milliseconds) int found = 0; int timeout = 0; + PJK_UT_MSG(">>> ms=%d\n",milliseconds); + /* Handle any currently queued events, returning immediately if any. * Otherwise block for the timeout and handle all events queued * then. */ @@ -1185,12 +1374,15 @@ ptllnd_wait (lnet_ni_t *ni, int milliseconds) for (;;) { rc = PtlEQPoll(&plni->plni_eqh, 1, timeout, &event, &which); timeout = 0; + PJK_UT_MSG("PtlEQPoll rc=%d\n",rc); if (rc == PTL_EQ_EMPTY) { if (found || /* handled some events */ milliseconds == 0 || /* just checking */ - blocked) /* blocked already */ + blocked){ /* blocked already */ + PJK_UT_MSG("found=%d blocked=%d\n",found,blocked); break; + } blocked = 1; timeout = milliseconds; @@ -1198,20 +1390,23 @@ ptllnd_wait (lnet_ni_t *ni, int milliseconds) } LASSERT (rc == PTL_OK || rc == PTL_EQ_DROPPED); - + if (rc == PTL_EQ_DROPPED) CERROR("Event queue: size %d is too small\n", plni->plni_eq_size); + PJK_UT_MSG("event.type=%s(%d)\n", + get_ev_type_string(event.type),event.type); + found = 1; switch (ptllnd_eventarg2type(event.md.user_ptr)) { default: LBUG(); - + case PTLLND_EVENTARG_TYPE_TX: ptllnd_tx_event(ni, &event); break; - + case PTLLND_EVENTARG_TYPE_BUF: ptllnd_buf_event(ni, &event); break; @@ -1221,7 +1416,9 @@ ptllnd_wait (lnet_ni_t *ni, int milliseconds) while (!list_empty(&plni->plni_zombie_txs)) { tx = list_entry(plni->plni_zombie_txs.next, ptllnd_tx_t, tx_list); - + PJK_UT_MSG("Process ZOMBIE tx=%p\n",tx); ptllnd_tx_done(tx); } + + PJK_UT_MSG("<<<\n"); } -- 1.8.3.1