peer->peer_credits = 1; /* enough for HELLO */
peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS;
peer->peer_outstanding_credits = *kptllnd_tunables.kptl_peercredits - 1;
- peer->peer_active_rxs = 0;
+ peer->peer_sent_credits = 1; /* HELLO credit is implicit */
+ peer->peer_max_msg_size = PTLLND_MIN_BUFFER_SIZE; /* until we know better */
atomic_set(&peer->peer_refcount, 1); /* 1 ref for caller */
LASSERT (!in_interrupt());
LASSERT (atomic_read(&peer->peer_refcount) == 0);
- LASSERT (peer->peer_active_rxs == 0);
LASSERT (peer->peer_state == PEER_STATE_ALLOCATED ||
peer->peer_state == PEER_STATE_ZOMBIE);
LASSERT (list_empty(&peer->peer_sendq));
}
void
-kptllnd_peer_cancel_txs(kptl_peer_t *peer)
+kptllnd_cancel_txlist (struct list_head *peerq, struct list_head *txs)
{
- struct list_head sendq;
- struct list_head activeq;
struct list_head *tmp;
struct list_head *nxt;
kptl_tx_t *tx;
- unsigned long flags;
-
- /* atomically grab all the peer's tx-es... */
-
- spin_lock_irqsave(&peer->peer_lock, flags);
- list_add(&sendq, &peer->peer_sendq);
- list_del_init(&peer->peer_sendq);
- list_for_each (tmp, &sendq) {
+ list_for_each_safe (tmp, nxt, peerq) {
tx = list_entry(tmp, kptl_tx_t, tx_list);
- tx->tx_active = 0;
- }
- list_add(&activeq, &peer->peer_activeq);
- list_del_init(&peer->peer_activeq);
- list_for_each (tmp, &activeq) {
- tx = list_entry(tmp, kptl_tx_t, tx_list);
+ list_del(&tx->tx_list);
+ list_add_tail(&tx->tx_list, txs);
+
+ tx->tx_status = -EIO;
tx->tx_active = 0;
}
+}
- spin_unlock_irqrestore(&peer->peer_lock, flags);
-
- /* ...then drop the peer's ref on them at leasure. This will get
- * kptllnd_tx_fini() to abort outstanding comms if necessary. */
+void
+kptllnd_peer_cancel_txs(kptl_peer_t *peer, struct list_head *txs)
+{
+ unsigned long flags;
- list_for_each_safe (tmp, nxt, &sendq) {
- tx = list_entry(tmp, kptl_tx_t, tx_list);
- list_del(&tx->tx_list);
- tx->tx_status = -EIO;
- kptllnd_tx_decref(tx);
- }
+ spin_lock_irqsave(&peer->peer_lock, flags);
- list_for_each_safe (tmp, nxt, &activeq) {
- tx = list_entry(tmp, kptl_tx_t, tx_list);
- list_del(&tx->tx_list);
- tx->tx_status = -EIO;
- kptllnd_tx_decref(tx);
- }
+ kptllnd_cancel_txlist(&peer->peer_sendq, txs);
+ kptllnd_cancel_txlist(&peer->peer_activeq, txs);
+
+ spin_unlock_irqrestore(&peer->peer_lock, flags);
}
void
kptllnd_handle_closing_peers ()
{
unsigned long flags;
+ struct list_head txs;
kptl_peer_t *peer;
struct list_head *tmp;
struct list_head *nxt;
+ kptl_tx_t *tx;
int idle;
/* Check with a read lock first to avoid blocking anyone */
read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
- idle = list_empty(&kptllnd_data.kptl_closing_peers);
+ idle = list_empty(&kptllnd_data.kptl_closing_peers) &&
+ list_empty(&kptllnd_data.kptl_zombie_peers);
read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
if (idle)
return;
- /* Scan the closing peers and cancel their txs.
- * NB only safe while there is only a single watchdog */
+ INIT_LIST_HEAD(&txs);
write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
+ /* Cancel txs on all zombie peers. NB anyone dropping the last peer
+ * ref removes it from this list, so I musn't drop the lock while
+ * scanning it. */
+ list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
+ peer = list_entry (tmp, kptl_peer_t, peer_list);
+
+ LASSERT (peer->peer_state == PEER_STATE_ZOMBIE);
+
+ kptllnd_peer_cancel_txs(peer, &txs);
+ }
+
+ /* Notify LNET and cancel txs on closing (i.e. newly closed) peers. NB
+ * I'm the only one removing from this list, but peers can be added on
+ * the end any time I drop the lock. */
+
list_for_each_safe (tmp, nxt, &kptllnd_data.kptl_closing_peers) {
peer = list_entry (tmp, kptl_peer_t, peer_list);
write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
kptllnd_peer_notify(peer);
- kptllnd_peer_cancel_txs(peer);
+ kptllnd_peer_cancel_txs(peer, &txs);
kptllnd_peer_decref(peer);
write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
}
write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
+
+ /* Drop peer's ref on all cancelled txs. This will get
+ * kptllnd_tx_fini() to abort outstanding comms if necessary. */
+
+ list_for_each_safe (tmp, nxt, &txs) {
+ tx = list_entry(tmp, kptl_tx_t, tx_list);
+ list_del(&tx->tx_list);
+ kptllnd_tx_decref(tx);
+ }
}
void
kptllnd_peer_unreserve_buffers();
peer->peer_error = why; /* stash 'why' only on first close */
+ peer->peer_state = PEER_STATE_CLOSING;
/* Schedule for immediate attention, taking peer table's ref */
list_add_tail(&peer->peer_list,
break;
case PEER_STATE_ZOMBIE:
- /* Schedule for attention at next timeout */
- kptllnd_peer_addref(peer);
- list_del(&peer->peer_list);
- list_add_tail(&peer->peer_list,
- &kptllnd_data.kptl_closing_peers);
- break;
-
case PEER_STATE_CLOSING:
break;
}
-
- peer->peer_state = PEER_STATE_CLOSING;
}
void
}
void
-kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx)
+kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
{
/* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
- ptl_handle_md_t rdma_mdh = PTL_INVALID_HANDLE;
- ptl_handle_md_t msg_mdh = PTL_INVALID_HANDLE;
- ptl_handle_me_t meh;
+ ptl_handle_md_t msg_mdh;
ptl_md_t md;
ptl_err_t prc;
unsigned long flags;
kptllnd_set_tx_peer(tx, peer);
- if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
- tx->tx_type == TX_TYPE_GET_REQUEST) {
-
- spin_lock_irqsave(&peer->peer_lock, flags);
-
- /* Assume 64-bit matchbits can't wrap */
- LASSERT (peer->peer_next_matchbits >= PTL_RESERVED_MATCHBITS);
- tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits =
- peer->peer_next_matchbits++;
-
- spin_unlock_irqrestore(&peer->peer_lock, flags);
-
- prc = PtlMEAttach(kptllnd_data.kptl_nih,
- *kptllnd_tunables.kptl_portal,
- peer->peer_ptlid,
- tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits,
- 0, /* ignore bits */
- PTL_UNLINK,
- PTL_INS_BEFORE,
- &meh);
- if (prc != PTL_OK) {
- CERROR("PtlMEAttach(%s) failed: %d\n",
- libcfs_id2str(peer->peer_id), prc);
- goto failed;
- }
-
- prc = PtlMDAttach(meh, tx->tx_rdma_md, PTL_UNLINK, &rdma_mdh);
- if (prc != PTL_OK) {
- CERROR("PtlMDAttach(%s) failed: %d\n",
- libcfs_id2str(tx->tx_peer->peer_id), prc);
- prc = PtlMEUnlink(meh);
- LASSERT(prc == PTL_OK);
- rdma_mdh = PTL_INVALID_HANDLE;
- goto failed;
- }
-
- /* I'm not racing with the event callback here. It's a bug if
- * there's an event on the MD I just attached before I actually
- * send the RDMA request message which the event callback
- * catches by asserting 'rdma_mdh' is valid. */
- }
-
memset(&md, 0, sizeof(md));
-
- md.start = tx->tx_msg;
- md.length = tx->tx_msg->ptlm_nob;
- md.threshold = 1;
+
+ md.threshold = tx->tx_acked ? 2 : 1; /* SEND END + ACK? */
md.options = PTL_MD_OP_PUT |
PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
PTL_MD_EVENT_START_DISABLE;
md.user_ptr = &tx->tx_msg_eventarg;
md.eq_handle = kptllnd_data.kptl_eqh;
+ if (nfrag == 0) {
+ md.start = tx->tx_msg;
+ md.length = tx->tx_msg->ptlm_nob;
+ } else {
+ LASSERT (nfrag > 1);
+ LASSERT (tx->tx_frags->iov[0].iov_base == (void *)tx->tx_msg);
+
+ md.start = tx->tx_frags;
+ md.length = nfrag;
+ md.options |= PTL_MD_IOVEC;
+ }
+
prc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &msg_mdh);
if (prc != PTL_OK) {
- msg_mdh = PTL_INVALID_HANDLE;
- goto failed;
+ CERROR("PtlMDBind(%s) failed: %d\n",
+ libcfs_id2str(peer->peer_id), prc);
+ tx->tx_status = -EIO;
+ kptllnd_tx_decref(tx);
+ return;
}
spin_lock_irqsave(&peer->peer_lock, flags);
tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * HZ);
tx->tx_active = 1;
- tx->tx_rdma_mdh = rdma_mdh;
tx->tx_msg_mdh = msg_mdh;
/* Ensure HELLO is sent first */
list_add_tail(&tx->tx_list, &peer->peer_sendq);
spin_unlock_irqrestore(&peer->peer_lock, flags);
- return;
-
- failed:
- spin_lock_irqsave(&peer->peer_lock, flags);
-
- tx->tx_status = -EIO;
- tx->tx_rdma_mdh = rdma_mdh;
- tx->tx_msg_mdh = msg_mdh;
-
- spin_unlock_irqrestore(&peer->peer_lock, flags);
-
- kptllnd_tx_decref(tx);
}
void
kptllnd_peer_check_sends (kptl_peer_t *peer)
{
-
+ ptl_handle_me_t meh;
kptl_tx_t *tx;
int rc;
unsigned long flags;
spin_lock_irqsave(&peer->peer_lock, flags);
+ peer->peer_retry_noop = 0;
+
if (list_empty(&peer->peer_sendq) &&
peer->peer_outstanding_credits >= PTLLND_CREDIT_HIGHWATER &&
peer->peer_credits != 0) {
libcfs_id2str(peer->peer_id));
} else {
kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP, 0);
- kptllnd_post_tx(peer, tx);
+ kptllnd_post_tx(peer, tx, 0);
}
spin_lock_irqsave(&peer->peer_lock, flags);
+ peer->peer_retry_noop = (tx == NULL);
}
while (!list_empty(&peer->peer_sendq)) {
LASSERT (tx->tx_active);
LASSERT (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
- LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
- !PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
+ LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
LASSERT (peer->peer_outstanding_credits >= 0);
- LASSERT (peer->peer_outstanding_credits <=
+ LASSERT (peer->peer_sent_credits >= 0);
+ LASSERT (peer->peer_sent_credits +
+ peer->peer_outstanding_credits <=
*kptllnd_tunables.kptl_peercredits);
LASSERT (peer->peer_credits >= 0);
- LASSERT (peer->peer_credits <=
- *kptllnd_tunables.kptl_peercredits);
/* Ensure HELLO is sent first */
if (!peer->peer_sent_hello) {
}
if (peer->peer_credits == 0) {
- CDEBUG(D_NETTRACE, "%s[%d/%d]: no credits for %p\n",
- libcfs_id2str(peer->peer_id),
- peer->peer_credits, peer->peer_outstanding_credits, tx);
+ CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: no credits for %p\n",
+ libcfs_id2str(peer->peer_id),
+ peer->peer_credits,
+ peer->peer_outstanding_credits,
+ peer->peer_sent_credits, tx);
break;
}
* return */
if (peer->peer_credits == 1 &&
peer->peer_outstanding_credits == 0) {
- CDEBUG(D_NETTRACE, "%s[%d/%d]: not using last credit for %p\n",
- libcfs_id2str(peer->peer_id),
- peer->peer_credits, peer->peer_outstanding_credits, tx);
+ CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: "
+ "not using last credit for %p\n",
+ libcfs_id2str(peer->peer_id),
+ peer->peer_credits,
+ peer->peer_outstanding_credits,
+ peer->peer_sent_credits, tx);
break;
}
continue;
}
- /* fill last-minute msg header fields */
+ /* fill last-minute msg fields */
kptllnd_msg_pack(tx->tx_msg, peer);
+ if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
+ tx->tx_type == TX_TYPE_GET_REQUEST) {
+ /* peer_next_matchbits must be known good */
+ LASSERT (peer->peer_state >= PEER_STATE_ACTIVE);
+ /* Assume 64-bit matchbits can't wrap */
+ LASSERT (peer->peer_next_matchbits >= PTL_RESERVED_MATCHBITS);
+ tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits =
+ peer->peer_next_matchbits++;
+ }
+
+ peer->peer_sent_credits += peer->peer_outstanding_credits;
peer->peer_outstanding_credits = 0;
peer->peer_credits--;
- CDEBUG(D_NETTRACE, "%s[%d/%d]: %s tx=%p nob=%d cred=%d\n",
- libcfs_id2str(peer->peer_id),
- peer->peer_credits, peer->peer_outstanding_credits,
+ CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s tx=%p nob=%d cred=%d\n",
+ libcfs_id2str(peer->peer_id), peer->peer_credits,
+ peer->peer_outstanding_credits, peer->peer_sent_credits,
kptllnd_msgtype2str(tx->tx_msg->ptlm_type),
tx, tx->tx_msg->ptlm_nob,
tx->tx_msg->ptlm_credits);
spin_unlock_irqrestore(&peer->peer_lock, flags);
+ if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
+ tx->tx_type == TX_TYPE_GET_REQUEST) {
+ /* Post bulk now we have safe matchbits */
+ rc = PtlMEAttach(kptllnd_data.kptl_nih,
+ *kptllnd_tunables.kptl_portal,
+ peer->peer_ptlid,
+ tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits,
+ 0, /* ignore bits */
+ PTL_UNLINK,
+ PTL_INS_BEFORE,
+ &meh);
+ if (rc != PTL_OK) {
+ CERROR("PtlMEAttach(%s) failed: %d\n",
+ libcfs_id2str(peer->peer_id), rc);
+ goto failed;
+ }
+
+ rc = PtlMDAttach(meh, tx->tx_rdma_md, PTL_UNLINK,
+ &tx->tx_rdma_mdh);
+ if (rc != PTL_OK) {
+ CERROR("PtlMDAttach(%s) failed: %d\n",
+ libcfs_id2str(tx->tx_peer->peer_id), rc);
+ rc = PtlMEUnlink(meh);
+ LASSERT(rc == PTL_OK);
+ tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
+ goto failed;
+ }
+ /* I'm not racing with the event callback here. It's a
+ * bug if there's an event on the MD I just attached
+ * before I actually send the RDMA request message -
+ * probably matchbits re-used in error. */
+ }
+
+ tx->tx_tposted = jiffies; /* going on the wire */
+
rc = PtlPut (tx->tx_msg_mdh,
- PTL_NOACK_REQ,
+ tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,
peer->peer_ptlid,
*kptllnd_tunables.kptl_portal,
0, /* acl cookie */
if (rc != PTL_OK) {
CERROR("PtlPut %s error %d\n",
libcfs_id2str(peer->peer_id), rc);
-
- /* Nuke everything (including this tx) */
- kptllnd_peer_close(peer, -EIO);
- return;
+ goto failed;
}
kptllnd_tx_decref(tx); /* drop my ref */
}
spin_unlock_irqrestore(&peer->peer_lock, flags);
+ return;
+
+ failed:
+ /* Nuke everything (including tx we were trying) */
+ kptllnd_peer_close(peer, -EIO);
+ kptllnd_tx_decref(tx);
}
kptl_tx_t *
{
kptl_tx_t *tx;
struct list_head *tmp;
- unsigned long flags;
-
- spin_lock_irqsave(&peer->peer_lock, flags);
list_for_each(tmp, &peer->peer_sendq) {
tx = list_entry(peer->peer_sendq.next, kptl_tx_t, tx_list);
if (time_after_eq(jiffies, tx->tx_deadline)) {
kptllnd_tx_addref(tx);
- spin_unlock_irqrestore(&peer->peer_lock, flags);
return tx;
}
}
if (time_after_eq(jiffies, tx->tx_deadline)) {
kptllnd_tx_addref(tx);
- spin_unlock_irqrestore(&peer->peer_lock, flags);
return tx;
}
}
- spin_unlock_irqrestore(&peer->peer_lock, flags);
return NULL;
}
void
-kptllnd_peer_check_bucket (int idx)
+kptllnd_peer_check_bucket (int idx, int stamp)
{
struct list_head *peers = &kptllnd_data.kptl_peers[idx];
struct list_head *ptmp;
unsigned long flags;
int nsend;
int nactive;
+ int check_sends;
- CDEBUG(D_NET, "Bucket=%d\n", idx);
+ CDEBUG(D_NET, "Bucket=%d, stamp=%d\n", idx, stamp);
again:
/* NB. Shared lock while I just look */
list_for_each (ptmp, peers) {
peer = list_entry (ptmp, kptl_peer_t, peer_list);
- CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d\n",
- libcfs_id2str(peer->peer_id),
- peer->peer_credits, peer->peer_outstanding_credits);
+ CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d Send=%d\n",
+ libcfs_id2str(peer->peer_id), peer->peer_credits,
+ peer->peer_outstanding_credits, peer->peer_sent_credits);
+
+ spin_lock(&peer->peer_lock);
- /* In case we have enough credits to return via a
- * NOOP, but there were no non-blocking tx descs
- * free to do it last time... */
- kptllnd_peer_check_sends(peer);
+ if (peer->peer_check_stamp == stamp) {
+ /* checked already this pass */
+ spin_unlock(&peer->peer_lock);
+ continue;
+ }
+ peer->peer_check_stamp = stamp;
tx = kptllnd_find_timed_out_tx(peer);
- if (tx == NULL)
+ check_sends = peer->peer_retry_noop;
+
+ spin_unlock(&peer->peer_lock);
+
+ if (tx == NULL && !check_sends)
continue;
kptllnd_peer_addref(peer); /* 1 ref for me... */
- read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
- flags);
+ read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
+
+ if (tx == NULL) { /* nothing timed out */
+ kptllnd_peer_check_sends(peer);
+ kptllnd_peer_decref(peer); /* ...until here or... */
+
+ /* rescan after dropping the lock */
+ goto again;
+ }
spin_lock_irqsave(&peer->peer_lock, flags);
nsend = kptllnd_count_queue(&peer->peer_sendq);
nactive = kptllnd_count_queue(&peer->peer_activeq);
spin_unlock_irqrestore(&peer->peer_lock, flags);
- LCONSOLE_ERROR("Timing out %s: please check Portals\n",
- libcfs_id2str(peer->peer_id));
-
- CERROR("%s timed out: cred %d outstanding %d sendq %d "
- "activeq %d Tx %s (%s%s%s) status %d T/O %ds\n",
- libcfs_id2str(peer->peer_id),
- peer->peer_credits, peer->peer_outstanding_credits,
- nsend, nactive, kptllnd_tx_typestr(tx->tx_type),
+ LCONSOLE_ERROR("Timing out %s: %s\n",
+ libcfs_id2str(peer->peer_id),
+ (tx->tx_tposted == 0) ?
+ "no free peer buffers" : "please check Portals");
+
+ CERROR("%s timed out: cred %d outstanding %d, sent %d, "
+ "sendq %d, activeq %d Tx %p %s (%s%s%s) status %d "
+ "%sposted %lu T/O %ds\n",
+ libcfs_id2str(peer->peer_id), peer->peer_credits,
+ peer->peer_outstanding_credits, peer->peer_sent_credits,
+ nsend, nactive, tx, kptllnd_tx_typestr(tx->tx_type),
tx->tx_active ? "A" : "",
PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ?
"" : "M",
PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ?
"" : "D",
- tx->tx_status, *kptllnd_tunables.kptl_timeout);
+ tx->tx_status,
+ (tx->tx_tposted == 0) ? "not " : "",
+ (tx->tx_tposted == 0) ? 0UL : (jiffies - tx->tx_tposted),
+ *kptllnd_tunables.kptl_timeout);
kptllnd_dump_ptltrace();
return NULL;
}
- if (msg->ptlm_u.hello.kptlhm_max_msg_size !=
- *kptllnd_tunables.kptl_max_msg_size) {
- CERROR("max message size MUST be equal for all peers: "
- "got %d expected %d from %s\n",
+ if (msg->ptlm_u.hello.kptlhm_max_msg_size < PTLLND_MIN_BUFFER_SIZE) {
+ CERROR("%s: max message size %d < MIN %d",
+ libcfs_id2str(lpid),
msg->ptlm_u.hello.kptlhm_max_msg_size,
- *kptllnd_tunables.kptl_max_msg_size,
- libcfs_id2str(lpid));
+ *kptllnd_tunables.kptl_max_msg_size);
return NULL;
}
- if (msg->ptlm_credits + 1 != *kptllnd_tunables.kptl_peercredits) {
- CERROR("peercredits MUST be equal on all peers: "
- "got %d expected %d from %s\n",
- msg->ptlm_credits + 1,
- *kptllnd_tunables.kptl_peercredits,
- libcfs_id2str(lpid));
+ if (msg->ptlm_credits <= 1) {
+ CERROR("Need more than 1+%d credits from %s\n",
+ msg->ptlm_credits, libcfs_id2str(lpid));
return NULL;
}
peer->peer_state = PEER_STATE_ACTIVE;
peer->peer_incarnation = msg->ptlm_srcstamp;
peer->peer_next_matchbits = safe_matchbits;
+ peer->peer_max_msg_size =
+ msg->ptlm_u.hello.kptlhm_max_msg_size;
write_unlock_irqrestore(g_lock, flags);
return peer;
}
write_lock_irqsave(g_lock, flags);
-
+ again:
peer = kptllnd_id2peer_locked(lpid);
if (peer != NULL) {
if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
- /* An outgoing message instantiated 'peer' for me and
- * presumably provoked this reply */
- CWARN("Outgoing instantiated peer %s\n", libcfs_id2str(lpid));
+ /* An outgoing message instantiated 'peer' for me */
LASSERT(peer->peer_incarnation == 0);
peer->peer_state = PEER_STATE_ACTIVE;
peer->peer_incarnation = msg->ptlm_srcstamp;
peer->peer_next_matchbits = safe_matchbits;
+ peer->peer_max_msg_size =
+ msg->ptlm_u.hello.kptlhm_max_msg_size;
+
+ write_unlock_irqrestore(g_lock, flags);
+
+ CWARN("Outgoing instantiated peer %s\n",
+ libcfs_id2str(lpid));
} else {
LASSERT (peer->peer_state == PEER_STATE_ACTIVE);
+
+ write_unlock_irqrestore(g_lock, flags);
+
/* WOW! Somehow this peer completed the HELLO
* handshake while I slept. I guess I could have slept
* while it rebooted and sent a new HELLO, so I'll fail
kptllnd_peer_decref(peer);
peer = NULL;
}
-
- write_unlock_irqrestore(g_lock, flags);
kptllnd_peer_unreserve_buffers();
kptllnd_peer_decref(new_peer);
write_lock_irqsave(g_lock, flags);
kptllnd_data.kptl_expected_peers++;
+ goto again;
}
last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(lpid);
new_peer->peer_incarnation = msg->ptlm_srcstamp;
new_peer->peer_next_matchbits = safe_matchbits;
new_peer->peer_last_matchbits_seen = last_matchbits_seen;
+ new_peer->peer_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
kptllnd_peer_add_peertable_locked(new_peer);
CDEBUG(D_NETTRACE, "%s: post response hello %p\n",
libcfs_id2str(new_peer->peer_id), hello_tx);
- kptllnd_post_tx(new_peer, hello_tx);
+ kptllnd_post_tx(new_peer, hello_tx, 0);
kptllnd_peer_check_sends(new_peer);
return new_peer;
}
void
-kptllnd_tx_launch(kptl_tx_t *tx, lnet_process_id_t target)
+kptllnd_tx_launch(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
+{
+ kptllnd_post_tx(peer, tx, nfrag);
+ kptllnd_peer_check_sends(peer);
+}
+
+int
+kptllnd_find_target(kptl_peer_t **peerp, lnet_process_id_t target)
{
rwlock_t *g_lock = &kptllnd_data.kptl_peer_rw_lock;
ptl_process_id_t ptl_id;
- kptl_peer_t *peer;
- kptl_peer_t *new_peer = NULL;
- kptl_tx_t *hello_tx = NULL;
+ kptl_peer_t *new_peer;
+ kptl_tx_t *hello_tx;
unsigned long flags;
int rc;
__u64 last_matchbits_seen;
- LASSERT (tx->tx_lnet_msg != NULL);
- LASSERT (tx->tx_peer == NULL);
-
/* I expect to find the peer, so I only take a read lock... */
read_lock_irqsave(g_lock, flags);
- peer = kptllnd_id2peer_locked(target);
+ *peerp = kptllnd_id2peer_locked(target);
read_unlock_irqrestore(g_lock, flags);
- if (peer != NULL) {
- goto post;
- }
+ if (*peerp != NULL)
+ return 0;
if ((target.pid & LNET_PID_USERFLAG) != 0) {
CWARN("Refusing to create a new connection to %s "
"(non-kernel peer)\n", libcfs_id2str(target));
- tx->tx_status = -EHOSTUNREACH;
- goto failed;
+ return -EHOSTUNREACH;
}
/* The new peer is a kernel ptllnd, and kernel ptllnds all have
ptl_id.nid = kptllnd_lnet2ptlnid(target.nid);
ptl_id.pid = kptllnd_data.kptl_portals_id.pid;
- write_lock_irqsave(g_lock, flags);
-
- peer = kptllnd_id2peer_locked(target);
- if (peer != NULL) {
- write_unlock_irqrestore(g_lock, flags);
- goto post;
- }
-
- kptllnd_cull_peertable_locked(target);
-
- write_unlock_irqrestore(g_lock, flags);
-
hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
if (hello_tx == NULL) {
CERROR("Unable to allocate connect message for %s\n",
libcfs_id2str(target));
- tx->tx_status = -ENOMEM;
- goto failed;
+ return -ENOMEM;
}
kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
new_peer = kptllnd_peer_allocate(target, ptl_id);
if (new_peer == NULL) {
- tx->tx_status = -ENOMEM;
- goto failed;
+ rc = -ENOMEM;
+ goto unwind_0;
}
rc = kptllnd_peer_reserve_buffers();
- if (rc != 0) {
- tx->tx_status = rc;
- goto failed;
- }
+ if (rc != 0)
+ goto unwind_1;
write_lock_irqsave(g_lock, flags);
-
- peer = kptllnd_id2peer_locked(target);
- if (peer != NULL) { /* someone else beat me to it */
+ again:
+ *peerp = kptllnd_id2peer_locked(target);
+ if (*peerp != NULL) {
write_unlock_irqrestore(g_lock, flags);
-
- kptllnd_peer_unreserve_buffers();
- kptllnd_peer_decref(new_peer);
- kptllnd_tx_decref(hello_tx);
- goto post;
+ goto unwind_2;
}
-
+
+ kptllnd_cull_peertable_locked(target);
+
if (kptllnd_data.kptl_n_active_peers ==
kptllnd_data.kptl_expected_peers) {
/* peer table full */
if (rc != 0) {
CERROR("Can't create connection to %s\n",
libcfs_id2str(target));
- kptllnd_peer_unreserve_buffers();
- tx->tx_status = -ENOMEM;
- goto failed;
+ rc = -ENOMEM;
+ goto unwind_2;
}
write_lock_irqsave(g_lock, flags);
kptllnd_data.kptl_expected_peers++;
+ goto again;
}
last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(target);
CDEBUG(D_NETTRACE, "%s: post initial hello %p\n",
libcfs_id2str(new_peer->peer_id), hello_tx);
- peer = new_peer;
- kptllnd_post_tx(peer, hello_tx);
-
- post:
- kptllnd_post_tx(peer, tx);
- kptllnd_peer_check_sends(peer);
- kptllnd_peer_decref(peer);
- return;
+ kptllnd_post_tx(new_peer, hello_tx, 0);
+ kptllnd_peer_check_sends(new_peer);
+
+ *peerp = new_peer;
+ return 0;
- failed:
- if (hello_tx != NULL)
- kptllnd_tx_decref(hello_tx);
+ unwind_2:
+ kptllnd_peer_unreserve_buffers();
+ unwind_1:
+ kptllnd_peer_decref(new_peer);
+ unwind_0:
+ kptllnd_tx_decref(hello_tx);
- if (new_peer != NULL)
- kptllnd_peer_decref(new_peer);
-
- LASSERT (tx->tx_status != 0);
- kptllnd_tx_decref(tx);
-
+ return rc;
}