Whamcloud - gitweb
git://git.whamcloud.com
/
fs
/
lustre-release.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
| inline |
side by side
* Added socknal multi-frag I/O
[fs/lustre-release.git]
/
lustre
/
portals
/
knals
/
socknal
/
socknal_cb.c
diff --git
a/lustre/portals/knals/socknal/socknal_cb.c
b/lustre/portals/knals/socknal/socknal_cb.c
index
762133e
..
ed91f94
100644
(file)
--- a/
lustre/portals/knals/socknal/socknal_cb.c
+++ b/
lustre/portals/knals/socknal/socknal_cb.c
@@
-84,21
+84,17
@@
ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
{
struct socket *sock = conn->ksnc_sock;
struct iovec *iov = tx->tx_iov;
- int fragsize = iov->iov_len;
- unsigned long vaddr = (unsigned long)iov->iov_base;
- int more = (tx->tx_niov > 1) ||
- (tx->tx_nkiov > 0) ||
- (!list_empty (&conn->ksnc_tx_queue));
#if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
+ unsigned long vaddr = (unsigned long)iov->iov_base
int offset = vaddr & (PAGE_SIZE - 1);
- int zcsize = MIN (
fragsize
, PAGE_SIZE - offset);
+ int zcsize = MIN (
iov->iov_len
, PAGE_SIZE - offset);
struct page *page;
#endif
+ int nob;
int rc;
/* NB we can't trust socket ops to either consume our iovs
- * or leave them alone, so we only send 1 frag at a time. */
- LASSERT (fragsize <= tx->tx_resid);
+ * or leave them alone. */
LASSERT (tx->tx_niov > 0);
#if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
@@
-106,52
+102,74
@@
ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
(sock->sk->route_caps & NETIF_F_SG) &&
(sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
(page = ksocknal_kvaddr_to_page (vaddr)) != NULL) {
+ int msgflg = MSG_DONTWAIT;
CDEBUG(D_NET, "vaddr %p, page %p->%p + offset %x for %d\n",
(void *)vaddr, page, page_address(page), offset, zcsize);
- if (fragsize > zcsize) {
- more = 1;
- fragsize = zcsize;
- }
-
- rc = tcp_sendpage_zccd(sock, page, offset, zcsize,
- more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT,
- &tx->tx_zccd);
+ if (!list_empty (&conn->ksnc_tx_queue) ||
+ zcsize < tx->tx_resid)
+ msgflg |= MSG_MORE;
+
+ rc = tcp_sendpage_zccd(sock, page, offset, zcsize, msgflg, &tx->tx_zccd);
} else
#endif
{
- /* NB don't pass tx's iov; sendmsg may or may not update it */
- struct iovec fragiov = { .iov_base = (void *)vaddr,
- .iov_len = fragsize};
+#if SOCKNAL_SINGLE_FRAG_TX
+ struct iovec scratch;
+ struct iovec *scratchiov = &scratch;
+ int niov = 1;
+#else
+ struct iovec *scratchiov = conn->ksnc_tx_scratch_iov;
+ int niov = tx->tx_niov;
+#endif
struct msghdr msg = {
.msg_name = NULL,
.msg_namelen = 0,
- .msg_iov =
&frag
iov,
- .msg_iovlen =
1
,
+ .msg_iov =
scratch
iov,
+ .msg_iovlen =
niov
,
.msg_control = NULL,
.msg_controllen = 0,
- .msg_flags =
more ? (MSG_DONTWAIT | MSG_MORE) :
MSG_DONTWAIT
+ .msg_flags = MSG_DONTWAIT
};
mm_segment_t oldmm = get_fs();
+ int i;
+ for (nob = i = 0; i < niov; i++) {
+ scratchiov[i] = tx->tx_iov[i];
+ nob += scratchiov[i].iov_len;
+ }
+
+ if (!list_empty(&conn->ksnc_tx_queue) ||
+ nob < tx->tx_resid)
+ msg.msg_flags |= MSG_MORE;
+
set_fs (KERNEL_DS);
- rc = sock_sendmsg(sock, &msg,
fragsize
);
+ rc = sock_sendmsg(sock, &msg,
nob
);
set_fs (oldmm);
}
- if (rc
> 0) {
-
tx->tx_resid -= rc
;
+ if (rc
<= 0) /* sent nothing? */
+
return (rc)
;
- if (rc < iov->iov_len) {
- /* didn't send whole iov entry... */
- iov->iov_base = (void *)(vaddr + rc);
- iov->iov_len -= rc;
- } else {
- tx->tx_iov++;
- tx->tx_niov--;
+ nob = rc;
+ LASSERT (nob <= tx->tx_resid);
+ tx->tx_resid -= nob;
+
+ /* "consume" iov */
+ do {
+ LASSERT (tx->tx_niov > 0);
+
+ if (nob < iov->iov_len) {
+ iov->iov_base = (void *)(((unsigned long)(iov->iov_base)) + nob);
+ iov->iov_len -= nob;
+ return (rc);
}
- }
+
+ nob -= iov->iov_len;
+ tx->tx_iov = ++iov;
+ tx->tx_niov--;
+ } while (nob != 0);
return (rc);
}
@@
-161,66
+179,94
@@
ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
{
struct socket *sock = conn->ksnc_sock;
ptl_kiov_t *kiov = tx->tx_kiov;
- int fragsize = kiov->kiov_len;
- struct page *page = kiov->kiov_page;
- int offset = kiov->kiov_offset;
- int more = (tx->tx_nkiov > 1) ||
- (!list_empty (&conn->ksnc_tx_queue));
int rc;
-
+ int nob;
+
/* NB we can't trust socket ops to either consume our iovs
- * or leave them alone, so we only send 1 frag at a time. */
- LASSERT (fragsize <= tx->tx_resid);
- LASSERT (offset + fragsize <= PAGE_SIZE);
+ * or leave them alone. */
LASSERT (tx->tx_niov == 0);
LASSERT (tx->tx_nkiov > 0);
#if SOCKNAL_ZC
- if (
fragsize
>= ksocknal_tunables.ksnd_zc_min_frag &&
+ if (
kiov->kiov_len
>= ksocknal_tunables.ksnd_zc_min_frag &&
(sock->sk->route_caps & NETIF_F_SG) &&
(sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM))) {
+ struct page *page = kiov->kiov_page;
+ int offset = kiov->kiov_offset;
+ int fragsize = kiov->kiov_len;
+ int msgflg = MSG_DONTWAIT;
CDEBUG(D_NET, "page %p + offset %x for %d\n",
- page, offset,
fragsize
);
+ page, offset,
kiov->kiov_len
);
- rc = tcp_sendpage_zccd(sock, page, offset, fragsize,
- more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT,
+ if (!list_empty(&conn->ksnc_tx_queue) ||
+ fragsize < tx->tx_resid)
+ msgflg |= MSG_MORE;
+
+ rc = tcp_sendpage_zccd(sock, page, offset, fragsize, msgflg,
&tx->tx_zccd);
} else
#endif
{
- char *addr = ((char *)kmap (page)) + offset;
- struct iovec fragiov = {.iov_base = addr,
- .iov_len = fragsize};
+#if SOCKNAL_SINGLE_FRAG_TX || !SOCKNAL_RISK_KMAP_DEADLOCK
+ struct iovec scratch;
+ struct iovec *scratchiov = &scratch;
+ int niov = 1;
+#else
+#warning "XXX risk of kmap deadlock on multiple frags..."
+ struct iovec *scratchiov = conn->ksnc_tx_scratch_iov;
+ int niov = tx->tx_nkiov;
+#endif
struct msghdr msg = {
.msg_name = NULL,
.msg_namelen = 0,
- .msg_iov =
&frag
iov,
- .msg_iovlen =
1
,
+ .msg_iov =
scratch
iov,
+ .msg_iovlen =
niov
,
.msg_control = NULL,
.msg_controllen = 0,
- .msg_flags =
more ? (MSG_DONTWAIT | MSG_MORE) :
MSG_DONTWAIT
+ .msg_flags = MSG_DONTWAIT
};
mm_segment_t oldmm = get_fs();
+ int i;
+ for (nob = i = 0; i < niov; i++) {
+ scratchiov[i].iov_base = kmap(kiov[i].kiov_page) +
+ kiov[i].kiov_offset;
+ nob += scratchiov[i].iov_len = kiov[i].kiov_len;
+ }
+
+ if (!list_empty(&conn->ksnc_tx_queue) ||
+ nob < tx->tx_resid)
+ msg.msg_flags |= MSG_DONTWAIT;
+
set_fs (KERNEL_DS);
- rc = sock_sendmsg(sock, &msg,
fragsize
);
+ rc = sock_sendmsg(sock, &msg,
nob
);
set_fs (oldmm);
- kunmap (page);
+ for (i = 0; i < niov; i++)
+ kunmap(kiov[i].kiov_page);
}
- if (rc > 0) {
- tx->tx_resid -= rc;
-
- if (rc < fragsize) {
- kiov->kiov_offset = offset + rc;
- kiov->kiov_len = fragsize - rc;
- } else {
- tx->tx_kiov++;
- tx->tx_nkiov--;
+ if (rc <= 0) /* sent nothing? */
+ return (rc);
+
+ nob = rc;
+ LASSERT (nob <= tx->tx_resid);
+ tx->tx_resid -= nob;
+
+ do {
+ LASSERT(tx->tx_nkiov > 0);
+
+ if (nob < kiov->kiov_len) {
+ kiov->kiov_offset += nob;
+ kiov->kiov_len -= nob;
+ return rc;
}
- }
+
+ nob -= kiov->kiov_len;
+ tx->tx_kiov = ++kiov;
+ tx->tx_nkiov--;
+ } while (nob != 0);
return (rc);
}
@@
-269,35
+315,35
@@
ksocknal_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
mb();
}
- if (rc <= 0) {
- /* Didn't write anything.
- *
- * NB: rc == 0 and rc == -EAGAIN both mean try
- * again later (linux stack returns -EAGAIN for
- * this, but Adaptech TOE returns 0).
- *
- * Also, sends never fail with -ENOMEM, just
- * -EAGAIN, but with the added bonus that we can't
- * expect write_space() to call us back to tell us
- * when to try sending again. We use the
- * SOCK_NOSPACE flag to diagnose... */
-
- LASSERT(rc != -ENOMEM);
-
- if (rc == 0 || rc == -EAGAIN) {
- if (test_bit(SOCK_NOSPACE,
- &conn->ksnc_sock->flags)) {
- rc = -EAGAIN;
- } else {
- static int counter;
-
- counter++;
- if ((counter & (-counter)) == counter)
- CWARN("%d ENOMEM tx %p\n",
- counter, conn);
- rc = -ENOMEM;
- }
+ if (rc <= 0) { /* Didn't write anything? */
+ unsigned long flags;
+ ksock_sched_t *sched;
+
+ if (rc == 0) /* some stacks return 0 instead of -EAGAIN */
+ rc = -EAGAIN;
+
+ if (rc != -EAGAIN)
+ break;
+
+ /* Check if EAGAIN is due to memory pressure */
+
+ sched = conn->ksnc_scheduler;
+ spin_lock_irqsave(&sched->kss_lock, flags);
+
+ if (!test_bit(SOCK_NOSPACE, &conn->ksnc_sock->flags) &&
+ !conn->ksnc_tx_ready) {
+ /* SOCK_NOSPACE is set when the socket fills
+ * and cleared in the write_space callback
+ * (which also sets ksnc_tx_ready). If
+ * SOCK_NOSPACE and ksnc_tx_ready are BOTH
+ * zero, I didn't fill the socket and
+ * write_space won't reschedule me, so I
+ * return -ENOMEM to get my caller to retry
+ * after a timeout */
+ rc = -ENOMEM;
}
+
+ spin_unlock_irqrestore(&sched->kss_lock, flags);
break;
}
@@
-332,114
+378,151
@@
ksocknal_eager_ack (ksock_conn_t *conn)
int
ksocknal_recv_iov (ksock_conn_t *conn)
{
+#if SOCKNAL_SINGLE_FRAG_RX
+ struct iovec scratch;
+ struct iovec *scratchiov = &scratch;
+ int niov = 1;
+#else
+ struct iovec *scratchiov = conn->ksnc_rx_scratch_iov;
+ int niov = conn->ksnc_rx_niov;
+#endif
struct iovec *iov = conn->ksnc_rx_iov;
- int fragsize = iov->iov_len;
- unsigned long vaddr = (unsigned long)iov->iov_base;
- struct iovec fragiov = { .iov_base = (void *)vaddr,
- .iov_len = fragsize};
struct msghdr msg = {
.msg_name = NULL,
.msg_namelen = 0,
- .msg_iov =
&frag
iov,
- .msg_iovlen =
1
,
+ .msg_iov =
scratch
iov,
+ .msg_iovlen =
niov
,
.msg_control = NULL,
.msg_controllen = 0,
.msg_flags = 0
};
mm_segment_t oldmm = get_fs();
+ int nob;
+ int i;
int rc;
/* NB we can't trust socket ops to either consume our iovs
- * or leave them alone, so we only receive 1 frag at a time. */
- LASSERT (conn->ksnc_rx_niov > 0);
- LASSERT (fragsize <= conn->ksnc_rx_nob_wanted);
+ * or leave them alone. */
+ LASSERT (niov > 0);
+
+ for (nob = i = 0; i < niov; i++) {
+ scratchiov[i] = iov[i];
+ nob += scratchiov[i].iov_len;
+ }
+ LASSERT (nob <= conn->ksnc_rx_nob_wanted);
set_fs (KERNEL_DS);
- rc = sock_recvmsg (conn->ksnc_sock, &msg,
fragsize
, MSG_DONTWAIT);
- /* NB this is just a boolean..........................
..
^ */
+ rc = sock_recvmsg (conn->ksnc_sock, &msg,
nob
, MSG_DONTWAIT);
+ /* NB this is just a boolean..........................^ */
set_fs (oldmm);
if (rc <= 0)
return (rc);
/* received something... */
+ nob = rc;
+
conn->ksnc_peer->ksnp_last_alive = jiffies;
conn->ksnc_rx_deadline = jiffies +
ksocknal_tunables.ksnd_io_timeout * HZ;
mb(); /* order with setting rx_started */
conn->ksnc_rx_started = 1;
- conn->ksnc_rx_nob_wanted -= rc;
- conn->ksnc_rx_nob_left -= rc;
+ conn->ksnc_rx_nob_wanted -= nob;
+ conn->ksnc_rx_nob_left -= nob;
+
+ do {
+ LASSERT (conn->ksnc_rx_niov > 0);
- if (rc < fragsize) {
- iov->iov_base = (void *)(vaddr + rc);
- iov->iov_len = fragsize - rc;
- return (-EAGAIN);
- }
+ if (nob < iov->iov_len) {
+ iov->iov_len -= nob;
+ iov->iov_base = (void *)(((unsigned long)iov->iov_base) + nob);
+ return (-EAGAIN);
+ }
+
+ nob -= iov->iov_len;
+ conn->ksnc_rx_iov = ++iov;
+ conn->ksnc_rx_niov--;
+ } while (nob != 0);
- conn->ksnc_rx_iov++;
- conn->ksnc_rx_niov--;
- return (1);
+ return (rc);
}
int
ksocknal_recv_kiov (ksock_conn_t *conn)
{
+#if SOCKNAL_SINGLE_FRAG_RX || !SOCKNAL_RISK_KMAP_DEADLOCK
+ struct iovec scratch;
+ struct iovec *scratchiov = &scratch;
+ int niov = 1;
+#else
+#warning "XXX risk of kmap deadlock on multiple frags..."
+ struct iovec *scratchiov = conn->ksnc_rx_scratch_iov;
+ int niov = conn->ksnc_rx_nkiov;
+#endif
ptl_kiov_t *kiov = conn->ksnc_rx_kiov;
- struct page *page = kiov->kiov_page;
- int offset = kiov->kiov_offset;
- int fragsize = kiov->kiov_len;
- unsigned long vaddr = ((unsigned long)kmap (page)) + offset;
- struct iovec fragiov = { .iov_base = (void *)vaddr,
- .iov_len = fragsize};
struct msghdr msg = {
.msg_name = NULL,
.msg_namelen = 0,
- .msg_iov =
&frag
iov,
- .msg_iovlen =
1
,
+ .msg_iov =
scratch
iov,
+ .msg_iovlen =
niov
,
.msg_control = NULL,
.msg_controllen = 0,
.msg_flags = 0
};
mm_segment_t oldmm = get_fs();
+ int nob;
+ int i;
int rc;
- /* NB we can't trust socket ops to either consume our iovs
- * or leave them alone, so we only receive 1 frag at a time. */
- LASSERT (fragsize <= conn->ksnc_rx_nob_wanted);
LASSERT (conn->ksnc_rx_nkiov > 0);
- LASSERT (offset + fragsize <= PAGE_SIZE);
+
+ /* NB we can't trust socket ops to either consume our iovs
+ * or leave them alone. */
+ for (nob = i = 0; i < niov; i++) {
+ scratchiov[i].iov_base = kmap(kiov[i].kiov_page) + kiov[i].kiov_offset;
+ nob += scratchiov[i].iov_len = kiov[i].kiov_len;
+ }
+ LASSERT (nob <= conn->ksnc_rx_nob_wanted);
set_fs (KERNEL_DS);
- rc = sock_recvmsg (conn->ksnc_sock, &msg,
fragsize
, MSG_DONTWAIT);
- /* NB this is just a boolean.......................
.....
^ */
+ rc = sock_recvmsg (conn->ksnc_sock, &msg,
nob
, MSG_DONTWAIT);
+ /* NB this is just a boolean.......................^ */
set_fs (oldmm);
- kunmap (page);
-
+ for (i = 0; i < niov; i++)
+ kunmap(kiov[i].kiov_page);
+
if (rc <= 0)
return (rc);
/* received something... */
+ nob = rc;
+
conn->ksnc_peer->ksnp_last_alive = jiffies;
conn->ksnc_rx_deadline = jiffies +
ksocknal_tunables.ksnd_io_timeout * HZ;
mb(); /* order with setting rx_started */
conn->ksnc_rx_started = 1;
- conn->ksnc_rx_nob_wanted -= rc;
- conn->ksnc_rx_nob_left -= rc;
+ conn->ksnc_rx_nob_wanted -= nob;
+ conn->ksnc_rx_nob_left -= nob;
+
+ do {
+ LASSERT (conn->ksnc_rx_nkiov > 0);
- if (rc < fragsize) {
- kiov->kiov_offset = offset + rc;
- kiov->kiov_len = fragsize - rc;
- return (-EAGAIN);
- }
+ if (nob < kiov->kiov_len) {
+ kiov->kiov_offset += nob;
+ kiov->kiov_len -= nob;
+ return -EAGAIN;
+ }
+
+ nob -= kiov->kiov_len;
+ conn->ksnc_rx_kiov = ++kiov;
+ conn->ksnc_rx_nkiov--;
+ } while (nob != 0);
- conn->ksnc_rx_kiov++;
- conn->ksnc_rx_nkiov--;
- return (1);
+ return 1;
}
int
@@
-599,6
+682,12
@@
ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
return (rc);
if (rc == -ENOMEM) {
+ static int counter;
+
+ counter++; /* exponential backoff warnings */
+ if ((counter & (-counter)) == counter)
+ CWARN("%d ENOMEM tx %p\n", counter, conn);
+
/* Queue on ksnd_enomem_conns for retry after a timeout */
spin_lock_irqsave(&ksocknal_data.ksnd_reaper_lock, flags);
@@
-1853,12
+1942,11
@@
ksocknal_write_space (struct sock *sk)
}
if (tcp_wspace(sk) >= SOCKNAL_TX_LOW_WATER(sk)) { /* got enough space */
- clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags);
-
sched = conn->ksnc_scheduler;
spin_lock_irqsave (&sched->kss_lock, flags);
+ clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags);
conn->ksnc_tx_ready = 1;
if (!conn->ksnc_tx_scheduled && // not being progressed