peer->ksnp_id = id;
atomic_set (&peer->ksnp_refcount, 1); /* 1 ref for caller */
peer->ksnp_closing = 0;
+ peer->ksnp_accepting = 0;
CFS_INIT_LIST_HEAD (&peer->ksnp_conns);
CFS_INIT_LIST_HEAD (&peer->ksnp_routes);
CFS_INIT_LIST_HEAD (&peer->ksnp_tx_queue);
libcfs_id2str(peer->ksnp_id), peer);
LASSERT (atomic_read (&peer->ksnp_refcount) == 0);
+ LASSERT (peer->ksnp_accepting == 0);
LASSERT (list_empty (&peer->ksnp_conns));
LASSERT (list_empty (&peer->ksnp_routes));
LASSERT (list_empty (&peer->ksnp_tx_queue));
struct socket *sock, int type)
{
rwlock_t *global_lock = &ksocknal_data.ksnd_global_lock;
+ CFS_LIST_HEAD (zombies);
__u32 ipaddrs[LNET_MAX_INTERFACES];
int nipaddrs;
lnet_process_id_t peerid;
/* +1 ref for me */
ksocknal_peer_addref(peer);
-
+ peer->ksnp_accepting++;
+
/* Am I already connecting/connected to this guy? Resolve in
* favour of higher NID... */
rc = 0;
}
write_lock_irqsave(global_lock, flags);
+ peer->ksnp_accepting--;
+
if (rc != 0)
goto failed_2;
}
failed_2:
if (!peer->ksnp_closing &&
list_empty (&peer->ksnp_conns) &&
- list_empty (&peer->ksnp_routes))
+ list_empty (&peer->ksnp_routes)) {
+ list_add(&zombies, &peer->ksnp_tx_queue);
+ list_del_init(&peer->ksnp_tx_queue);
ksocknal_unlink_peer_locked(peer);
+ }
+
write_unlock_irqrestore(global_lock, flags);
if (warn != NULL) {
CDEBUG(D_NET, "Not creating conn %s type %d: %s\n",
libcfs_id2str(peerid), conn->ksnc_type, warn);
}
-
+
+ ksocknal_txlist_done(ni, &zombies);
ksocknal_peer_decref(peer);
failed_1:
typedef struct ksock_peer
{
struct list_head ksnp_list; /* stash on global peer list */
- lnet_process_id_t ksnp_id; /* who's on the other end(s) */
+ lnet_process_id_t ksnp_id; /* who's on the other end(s) */
atomic_t ksnp_refcount; /* # users */
int ksnp_sharecount; /* lconf usage counter */
int ksnp_closing; /* being closed */
+ int ksnp_accepting; /* # passive connections pending */
int ksnp_error; /* errno on closing last conn */
struct list_head ksnp_conns; /* all active connections */
struct list_head ksnp_routes; /* routes */
extern int ksocknal_close_matching_conns (lnet_process_id_t id, __u32 ipaddr);
extern void ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn);
-extern void ksocknal_tx_done (ksock_peer_t *peer, ksock_tx_t *tx, int asynch);
+extern void ksocknal_tx_done (lnet_ni_t *ni, ksock_tx_t *tx, int asynch);
+extern void ksocknal_txlist_done (lnet_ni_t *ni, struct list_head *txlist);
extern void ksocknal_notify (lnet_ni_t *ni, lnet_nid_t gw_nid, int alive);
extern int ksocknal_thread_start (int (*fn)(void *arg), void *arg);
extern void ksocknal_thread_fini (void);
#endif
void
-ksocknal_tx_done (ksock_peer_t *peer, ksock_tx_t *tx, int asynch)
+ksocknal_tx_done (lnet_ni_t *ni, ksock_tx_t *tx, int asynch)
{
ENTRY;
#endif
}
- lnet_finalize (peer->ksnp_ni, tx->tx_lnetmsg,
- (tx->tx_resid == 0) ? 0 : -EIO);
-
+ lnet_finalize (ni, tx->tx_lnetmsg, (tx->tx_resid == 0) ? 0 : -EIO);
ksocknal_free_tx (tx);
EXIT;
}
void
+ksocknal_txlist_done (lnet_ni_t *ni, struct list_head *txlist)
+{
+ ksock_tx_t *tx;
+
+ while (!list_empty (txlist)) {
+ tx = list_entry (txlist->next, ksock_tx_t, tx_list);
+
+ CERROR ("Deleting packet type %d len %d %s->%s\n",
+ le32_to_cpu (tx->tx_lnetmsg->msg_hdr.type),
+ le32_to_cpu (tx->tx_lnetmsg->msg_hdr.payload_length),
+ libcfs_nid2str(le64_to_cpu(tx->tx_lnetmsg->msg_hdr.src_nid)),
+ libcfs_nid2str(le64_to_cpu (tx->tx_lnetmsg->msg_hdr.dest_nid)));
+
+ list_del (&tx->tx_list);
+ ksocknal_tx_done (ni, tx, 0);
+ }
+}
+
+void
ksocknal_tx_launched (ksock_tx_t *tx)
{
#if SOCKNAL_ZC
#endif
/* Any zero-copy-ness (if any) has completed; I can complete the
* transmit now, avoiding an extra schedule */
- ksocknal_tx_done (tx->tx_conn->ksnc_peer, tx, 0);
+ ksocknal_tx_done (tx->tx_conn->ksnc_peer->ksnp_ni, tx, 0);
}
int
return (0);
}
- route = ksocknal_find_connecting_route_locked (peer);
- if (route != NULL) {
- /* At least 1 connection is being established; queue the
- * message... */
- list_add_tail (&tx->tx_list, &peer->ksnp_tx_queue);
- write_unlock_irqrestore (g_lock, flags);
- return (0);
- }
-
- write_unlock_irqrestore (g_lock, flags);
+ LASSERT (peer->ksnp_accepting > 0 ||
+ ksocknal_find_connecting_route_locked (peer) != NULL);
- CERROR("Peer entry with no routes: %s\n", libcfs_id2str(id));
- return (-EHOSTUNREACH);
+ /* Queue the message until a connection is established */
+ list_add_tail (&tx->tx_list, &peer->ksnp_tx_queue);
+ write_unlock_irqrestore (g_lock, flags);
+ return 0;
}
int
list_del (&tx->tx_list);
spin_unlock_irqrestore (&sched->kss_lock, flags);
- ksocknal_tx_done (tx->tx_conn->ksnc_peer, tx, 1);
+ ksocknal_tx_done (tx->tx_conn->ksnc_peer->ksnp_ni,
+ tx, 1);
spin_lock_irqsave (&sched->kss_lock, flags);
}
ksocknal_connect (ksock_route_t *route)
{
CFS_LIST_HEAD (zombies);
- ksock_tx_t *tx;
ksock_peer_t *peer = route->ksnr_peer;
unsigned long flags;
int type;
route->ksnr_timeout = cfs_time_add(cfs_time_current(),
route->ksnr_retry_interval);
- if (!list_empty (&peer->ksnp_tx_queue) &&
- ksocknal_find_connecting_route_locked (peer) == NULL) {
+ if (!list_empty(&peer->ksnp_tx_queue) &&
+ peer->ksnp_accepting != 0 &&
+ ksocknal_find_connecting_route_locked(peer) == NULL) {
+ /* ksnp_tx_queue is queued on a conn on successful
+ * connection */
LASSERT (list_empty (&peer->ksnp_conns));
- /* None of the connections that the blocked packets are
- * waiting for have been successful. Complete them now... */
- do {
- tx = list_entry (peer->ksnp_tx_queue.next,
- ksock_tx_t, tx_list);
- list_del (&tx->tx_list);
- list_add_tail (&tx->tx_list, &zombies);
- } while (!list_empty (&peer->ksnp_tx_queue));
+ /* take all the blocked packets while I've got the lock and
+ * complete below... */
+ list_add(&zombies, &peer->ksnp_tx_queue);
+ list_del_init(&peer->ksnp_tx_queue);
}
#if 0 /* irrelevent with only eager routes */
#endif
write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
- while (!list_empty (&zombies)) {
- tx = list_entry (zombies.next, ksock_tx_t, tx_list);
-
- CERROR ("Deleting packet type %d len %d %s->%s\n",
- le32_to_cpu (tx->tx_lnetmsg->msg_hdr.type),
- le32_to_cpu (tx->tx_lnetmsg->msg_hdr.payload_length),
- libcfs_nid2str(le64_to_cpu(tx->tx_lnetmsg->msg_hdr.src_nid)),
- libcfs_nid2str(le64_to_cpu (tx->tx_lnetmsg->msg_hdr.dest_nid)));
-
- list_del (&tx->tx_list);
- /* complete now */
- ksocknal_tx_done (peer, tx, 0);
- }
+ ksocknal_txlist_done(peer->ksnp_ni, &zombies);
}
int